]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
242fcfe3de8838867d4f7a8fe59ccdaa95eaa0ea
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.Resource;
43 import org.simantics.db.ResourceSerializer;
44 import org.simantics.db.Session;
45 import org.simantics.db.SessionManager;
46 import org.simantics.db.SessionVariables;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.authentication.UserAuthenticationAgent;
50 import org.simantics.db.authentication.UserAuthenticator;
51 import org.simantics.db.common.Indexing;
52 import org.simantics.db.common.TransactionPolicyRelease;
53 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
54 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
62 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
63 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
64 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
65 import org.simantics.db.common.utils.Logger;
66 import org.simantics.db.event.ChangeEvent;
67 import org.simantics.db.event.ChangeListener;
68 import org.simantics.db.event.SessionEventListener;
69 import org.simantics.db.exception.CancelTransactionException;
70 import org.simantics.db.exception.ClusterSetExistException;
71 import org.simantics.db.exception.DatabaseException;
72 import org.simantics.db.exception.ImmutableException;
73 import org.simantics.db.exception.InvalidResourceReferenceException;
74 import org.simantics.db.exception.ResourceNotFoundException;
75 import org.simantics.db.exception.RuntimeDatabaseException;
76 import org.simantics.db.exception.ServiceException;
77 import org.simantics.db.exception.ServiceNotFoundException;
78 import org.simantics.db.impl.ClusterBase;
79 import org.simantics.db.impl.ClusterI;
80 import org.simantics.db.impl.ClusterTraitsBase;
81 import org.simantics.db.impl.ClusterTranslator;
82 import org.simantics.db.impl.ResourceImpl;
83 import org.simantics.db.impl.TransientGraph;
84 import org.simantics.db.impl.VirtualGraphImpl;
85 import org.simantics.db.impl.graph.DelayedWriteGraph;
86 import org.simantics.db.impl.graph.ReadGraphImpl;
87 import org.simantics.db.impl.graph.WriteGraphImpl;
88 import org.simantics.db.impl.graph.WriteSupport;
89 import org.simantics.db.impl.internal.RandomAccessValueSupport;
90 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
91 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
92 import org.simantics.db.impl.query.QueryCache;
93 import org.simantics.db.impl.query.QueryCacheBase;
94 import org.simantics.db.impl.query.QueryProcessor;
95 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
96 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
97 import org.simantics.db.impl.service.QueryDebug;
98 import org.simantics.db.impl.support.VirtualGraphServerSupport;
99 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
100 import org.simantics.db.procedure.AsyncListener;
101 import org.simantics.db.procedure.AsyncMultiListener;
102 import org.simantics.db.procedure.AsyncMultiProcedure;
103 import org.simantics.db.procedure.AsyncProcedure;
104 import org.simantics.db.procedure.Listener;
105 import org.simantics.db.procedure.ListenerBase;
106 import org.simantics.db.procedure.MultiListener;
107 import org.simantics.db.procedure.MultiProcedure;
108 import org.simantics.db.procedure.Procedure;
109 import org.simantics.db.procedure.SyncListener;
110 import org.simantics.db.procedure.SyncMultiListener;
111 import org.simantics.db.procedure.SyncMultiProcedure;
112 import org.simantics.db.procedure.SyncProcedure;
113 import org.simantics.db.procore.cluster.ClusterImpl;
114 import org.simantics.db.procore.cluster.ClusterTraits;
115 import org.simantics.db.procore.protocol.Constants;
116 import org.simantics.db.procore.protocol.DebugPolicy;
117 import org.simantics.db.request.AsyncMultiRead;
118 import org.simantics.db.request.AsyncRead;
119 import org.simantics.db.request.DelayedWrite;
120 import org.simantics.db.request.DelayedWriteResult;
121 import org.simantics.db.request.ExternalRead;
122 import org.simantics.db.request.MultiRead;
123 import org.simantics.db.request.Read;
124 import org.simantics.db.request.ReadInterface;
125 import org.simantics.db.request.Write;
126 import org.simantics.db.request.WriteInterface;
127 import org.simantics.db.request.WriteOnly;
128 import org.simantics.db.request.WriteOnlyResult;
129 import org.simantics.db.request.WriteResult;
130 import org.simantics.db.request.WriteTraits;
131 import org.simantics.db.service.ByteReader;
132 import org.simantics.db.service.ClusterBuilder;
133 import org.simantics.db.service.ClusterBuilderFactory;
134 import org.simantics.db.service.ClusterControl;
135 import org.simantics.db.service.ClusterSetsSupport;
136 import org.simantics.db.service.ClusterUID;
137 import org.simantics.db.service.ClusteringSupport;
138 import org.simantics.db.service.CollectionSupport;
139 import org.simantics.db.service.DebugSupport;
140 import org.simantics.db.service.DirectQuerySupport;
141 import org.simantics.db.service.GraphChangeListenerSupport;
142 import org.simantics.db.service.InitSupport;
143 import org.simantics.db.service.LifecycleSupport;
144 import org.simantics.db.service.ManagementSupport;
145 import org.simantics.db.service.QueryControl;
146 import org.simantics.db.service.SerialisationSupport;
147 import org.simantics.db.service.ServerInformation;
148 import org.simantics.db.service.ServiceActivityMonitor;
149 import org.simantics.db.service.SessionEventSupport;
150 import org.simantics.db.service.SessionMonitorSupport;
151 import org.simantics.db.service.SessionUserSupport;
152 import org.simantics.db.service.StatementSupport;
153 import org.simantics.db.service.TransactionPolicySupport;
154 import org.simantics.db.service.TransactionSupport;
155 import org.simantics.db.service.TransferableGraphSupport;
156 import org.simantics.db.service.UndoRedoSupport;
157 import org.simantics.db.service.VirtualGraphSupport;
158 import org.simantics.db.service.XSupport;
159 import org.simantics.layer0.Layer0;
160 import org.simantics.utils.DataContainer;
161 import org.simantics.utils.Development;
162 import org.simantics.utils.threads.logger.ITask;
163 import org.simantics.utils.threads.logger.ThreadLogger;
164
165 import gnu.trove.procedure.TLongProcedure;
166 import gnu.trove.set.hash.TLongHashSet;
167
168
169 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
170
171     protected static final boolean DEBUG        = false;
172
173     private static final boolean DIAGNOSTICS       = false;
174
175     private TransactionPolicySupport transactionPolicy;
176
177     final private ClusterControl clusterControl;
178
179     final protected BuiltinSupportImpl builtinSupport;
180     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
181     final protected ClusterSetsSupport clusterSetsSupport;
182     final protected LifecycleSupportImpl lifecycleSupport;
183
184     protected QuerySupportImpl querySupport;
185     protected ResourceSupportImpl resourceSupport;
186     protected WriteSupport writeSupport;
187     public ClusterTranslator clusterTranslator;
188
189     boolean dirtyPrimitives = false;
190
191     public static final int SERVICE_MODE_CREATE = 2;
192     public static final int SERVICE_MODE_ALLOW = 1;
193     public int serviceMode = 0;
194     public boolean createdImmutableClusters = false;
195     public TLongHashSet createdClusters = new TLongHashSet();
196
197     private Layer0 L0;
198
199     /**
200      * The service locator maintained by the workbench. These services are
201      * initialized during workbench during the <code>init</code> method.
202      */
203     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
204
205     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
206
207     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
208
209     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
210
211     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
212
213     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
214
215     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
216
217     final protected State                                        state              = new State();
218
219     protected GraphSession                                       graphSession       = null;
220
221     protected SessionManager                                     sessionManagerImpl = null;
222
223     protected UserAuthenticationAgent                            authAgent          = null;
224
225     protected UserAuthenticator                                  authenticator      = null;
226
227     protected Resource                                           user               = null;
228
229     protected ClusterStream                                      clusterStream      = null;
230
231     protected SessionRequestManager                         requestManager = null;
232
233     public ClusterTable                                       clusterTable       = null;
234
235     public QueryProcessor                                     queryProvider2 = null;
236
237     ClientChangesImpl                                  clientChanges      = null;
238
239     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
240
241 //    protected long                                               newClusterId       = Constants.NullClusterId;
242
243     protected int                                                flushCounter       = 0;
244
245     protected boolean                                            writeOnly          = false;
246
247     WriteState<?>                                                writeState = null;
248     WriteStateBase<?>                                            delayedWriteState = null;
249     protected Resource defaultClusterSet = null; // If not null then used for newResource().
250
251     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
252
253 //        if (authAgent == null)
254 //            throw new IllegalArgumentException("null authentication agent");
255
256         File t = StaticSessionProperties.virtualGraphStoragePath;
257         if (null == t)
258             t = new File(".");
259         this.clusterTable = new ClusterTable(this, t);
260         this.builtinSupport = new BuiltinSupportImpl(this);
261         this.sessionManagerImpl = sessionManagerImpl;
262         this.user = null;
263 //        this.authAgent = authAgent;
264
265         serviceLocator.registerService(Session.class, this);
266         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
267         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
268         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
269         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
270         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
271         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
272         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
273         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
274         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
275         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
276         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
277         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
278         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
279         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
280         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
281         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
282         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
283         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
284         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
285         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
286         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
287         ServiceActivityUpdaterForWriteTransactions.register(this);
288
289         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
290         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
291         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
292         this.lifecycleSupport = new LifecycleSupportImpl(this);
293         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
294         this.transactionPolicy = new TransactionPolicyRelease();
295         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
296         this.clusterControl = new ClusterControlImpl(this);
297         serviceLocator.registerService(ClusterControl.class, clusterControl);
298         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
299         this.clusterSetsSupport.setReadDirectory(t.toPath());
300         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
301         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
302     }
303
304     /*
305      *
306      * Session interface
307      */
308
309
310     @Override
311     public Resource getRootLibrary() {
312         return queryProvider2.getRootLibraryResource();
313     }
314
315     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
316         if (!graphSession.dbSession.refreshEnabled())
317             return;
318         try {
319             getClusterTable().refresh(csid, this, clusterUID);
320         } catch (Throwable t) {
321             Logger.defaultLogError("Refesh failed.", t);
322         }
323     }
324
325     static final class TaskHelper {
326         private final String name;
327         private Object result;
328         final Semaphore sema = new Semaphore(0);
329         private Throwable throwable = null;
330         final Consumer<DatabaseException> callback = e -> {
331             synchronized (TaskHelper.this) {
332                 throwable = e;
333             }
334         };
335         final Procedure<Object> proc = new Procedure<Object>() {
336             @Override
337             public void execute(Object result) {
338                 callback.accept(null);
339             }
340             @Override
341             public void exception(Throwable t) {
342                 if (t instanceof DatabaseException)
343                     callback.accept((DatabaseException)t);
344                 else
345                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
346             }
347         };
348         final WriteTraits writeTraits = new WriteTraits() {};
349         TaskHelper(String name) {
350             this.name = name;
351         }
352         @SuppressWarnings("unchecked")
353         <T> T getResult() {
354             return (T)result;
355         }
356         void setResult(Object result) {
357             this.result = result;
358         }
359 //        Throwable throwableGet() {
360 //            return throwable;
361 //        }
362         synchronized void throwableSet(Throwable t) {
363             throwable = t;
364         }
365 //        void throwableSet(String t) {
366 //            throwable = new InternalException("" + name + " operation failed. " + t);
367 //        }
368         synchronized void throwableCheck()
369         throws DatabaseException {
370             if (null != throwable)
371                 if (throwable instanceof DatabaseException)
372                     throw (DatabaseException)throwable;
373                 else
374                     throw new DatabaseException("Undo operation failed.", throwable);
375         }
376         void throw_(String message)
377         throws DatabaseException {
378             throw new DatabaseException("" + name + " operation failed. " + message);
379         }
380     }
381
382
383
384     private ListenerBase getListenerBase(Object procedure) {
385         if (procedure instanceof ListenerBase)
386             return (ListenerBase) procedure;
387         else
388             return null;
389     }
390
391     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
392         scheduleRequest(request, callback, notify, null);
393     }
394
395     /* (non-Javadoc)
396      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
397      */
398     @Override
399     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
400
401         assert (request != null);
402
403         if(Development.DEVELOPMENT) {
404             try {
405             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
406                 System.err.println("schedule write '" + request + "'");
407             } catch (Throwable t) {
408                 Logger.defaultLogError(t);
409             }
410         }
411
412         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
413
414         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
415
416             @Override
417             public void run(int thread) {
418
419                 if(Development.DEVELOPMENT) {
420                     try {
421                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
422                         System.err.println("perform write '" + request + "'");
423                     } catch (Throwable t) {
424                         Logger.defaultLogError(t);
425                     }
426                 }
427
428                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
429
430                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
431
432                 try {
433
434                     flushCounter = 0;
435                     Disposable.safeDispose(clientChanges);
436                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
437
438                     VirtualGraph vg = getProvider(request.getProvider());
439
440                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
441                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
442
443                         @Override
444                         public void execute(Object result) {
445                             if(callback != null) callback.accept(null);
446                         }
447
448                         @Override
449                         public void exception(Throwable t) {
450                             if(callback != null) callback.accept((DatabaseException)t);
451                         }
452
453                     });
454
455                     assert (null != writer);
456 //                    writer.state.barrier.inc();
457
458                     try {
459                         request.perform(writer);
460                         assert (null != writer);
461                     } catch (Throwable t) {
462                         if (!(t instanceof CancelTransactionException))
463                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
464                         writeState.except(t);
465                     } finally {
466 //                        writer.state.barrier.dec();
467 //                        writer.waitAsync(request);
468                     }
469
470
471                     assert(!queryProvider2.cache.dirty);
472
473                 } catch (Throwable e) {
474
475                     // Log it first, just to be safe that the error is always logged.
476                     if (!(e instanceof CancelTransactionException))
477                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
478
479 //                    writeState.getGraph().state.barrier.dec();
480 //                    writeState.getGraph().waitAsync(request);
481
482                     writeState.except(e);
483
484 //                    try {
485 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
486 //                        // All we can do here is to log those, can't really pass them anywhere.
487 //                        if (callback != null) {
488 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
489 //                            else callback.run(new DatabaseException(e));
490 //                        }
491 //                    } catch (Throwable e2) {
492 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
493 //                    }
494 //                    boolean empty = clusterStream.reallyFlush();
495 //                    int callerThread = -1;
496 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
497 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
498 //                    try {
499 //                        // Send all local changes to server so it can calculate correct reverse change set.
500 //                        // This will call clusterStream.accept().
501 //                        state.cancelCommit(context, clusterStream);
502 //                        if (!empty) {
503 //                            if (!context.isOk()) // this is a blocking operation
504 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
505 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
506 //                        }
507 //                        state.cancelCommit2(context, clusterStream);
508 //                    } catch (DatabaseException e3) {
509 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
510 //                    } finally {
511 //                        clusterStream.setOff(false);
512 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
513 //                    }
514
515                 } finally {
516
517                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
518
519                 }
520
521                 task.finish();
522
523                 if(Development.DEVELOPMENT) {
524                     try {
525                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
526                         System.err.println("finish write '" + request + "'");
527                     } catch (Throwable t) {
528                         Logger.defaultLogError(t);
529                     }
530                 }
531
532             }
533
534         }, combine);
535
536     }
537
538     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
539         scheduleRequest(request, procedure, notify, null);
540     }
541
542     /* (non-Javadoc)
543      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
544      */
545     @Override
546     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
547
548         assert (request != null);
549
550         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
551
552         requestManager.scheduleWrite(new SessionTask(request, thread) {
553
554             @Override
555             public void run(int thread) {
556
557                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
558
559                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
560
561                 flushCounter = 0;
562                 Disposable.safeDispose(clientChanges);
563                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
564
565                 VirtualGraph vg = getProvider(request.getProvider());
566                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
567
568                 try {
569                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
570                     writeState = writeStateT;
571
572                     assert (null != writer);
573 //                    writer.state.barrier.inc();
574                     writeStateT.setResult(request.perform(writer));
575                     assert (null != writer);
576
577 //                    writer.state.barrier.dec();
578 //                    writer.waitAsync(null);
579
580                 } catch (Throwable e) {
581
582 //                    writer.state.barrier.dec();
583 //                    writer.waitAsync(null);
584
585                     writeState.except(e);
586
587 //                  state.stopWriteTransaction(clusterStream);
588 //
589 //              } catch (Throwable e) {
590 //                  // Log it first, just to be safe that the error is always logged.
591 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
592 //
593 //                  try {
594 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
595 //                      // All we can do here is to log those, can't really pass them anywhere.
596 //                      if (procedure != null) {
597 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
598 //                          else procedure.exception(new DatabaseException(e));
599 //                      }
600 //                  } catch (Throwable e2) {
601 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
602 //                  }
603 //
604 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
605 //
606 //                  state.stopWriteTransaction(clusterStream);
607
608                 } finally {
609                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
610                 }
611
612 //              if(notify != null) notify.release();
613
614                 task.finish();
615
616             }
617
618         }, combine);
619
620     }
621
622     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
623         scheduleRequest(request, callback, notify, null);
624     }
625
626     /* (non-Javadoc)
627      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
628      */
629     @Override
630     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
631
632         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
633
634         assert (request != null);
635
636         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
637
638         requestManager.scheduleWrite(new SessionTask(request, thread) {
639
640             @Override
641             public void run(int thread) {
642                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
643
644                 Procedure<Object> stateProcedure = new Procedure<Object>() {
645                     @Override
646                     public void execute(Object result) {
647                         if (callback != null)
648                             callback.accept(null);
649                     }
650                     @Override
651                     public void exception(Throwable t) {
652                         if (callback != null) {
653                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
654                             else callback.accept(new DatabaseException(t));
655                         } else
656                             Logger.defaultLogError("Unhandled exception", t);
657                     }
658                 };
659
660                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
661                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
662                 DelayedWriteGraph dwg = null;
663 //                newGraph.state.barrier.inc();
664
665                 try {
666                     dwg = new DelayedWriteGraph(newGraph);
667                     request.perform(dwg);
668                 } catch (Throwable e) {
669                     delayedWriteState.except(e);
670                     total.finish();
671                     dwg.close();
672                     return;
673                 } finally {
674 //                    newGraph.state.barrier.dec();
675 //                    newGraph.waitAsync(request);
676                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
677                 }
678
679                 delayedWriteState = null;
680
681                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
682                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
683
684                 flushCounter = 0;
685                 Disposable.safeDispose(clientChanges);
686                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
687
688                 acquireWriteOnly();
689
690                 VirtualGraph vg = getProvider(request.getProvider());
691                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
692
693                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
694
695                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
696
697                 assert (null != writer);
698 //                writer.state.barrier.inc();
699
700                 try {
701                     // Cannot cancel
702                     dwg.commit(writer, request);
703
704                         if(defaultClusterSet != null) {
705                                 XSupport xs = getService(XSupport.class);
706                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
707                                 ClusteringSupport cs = getService(ClusteringSupport.class);
708                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
709                         }
710
711                     // This makes clusters available from server
712                     clusterStream.reallyFlush();
713
714                     releaseWriteOnly(writer);
715                     handleUpdatesAndMetadata(writer);
716
717                 } catch (ServiceException e) {
718 //                    writer.state.barrier.dec();
719 //                    writer.waitAsync(null);
720
721                     // These shall be requested from server
722                     clusterTable.removeWriteOnlyClusters();
723                     // This makes clusters available from server
724                     clusterStream.reallyFlush();
725
726                     releaseWriteOnly(writer);
727                     writeState.except(e);
728                 } finally {
729                     // Debugging & Profiling
730                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
731                     task2.finish();
732                     total.finish();
733                 }
734             }
735
736         }, combine);
737
738     }
739
740     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
741         scheduleRequest(request, procedure, notify, null);
742     }
743
744     /* (non-Javadoc)
745      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
746      */
747     @Override
748     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
749         throw new Error("Not implemented");
750     }
751
752     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
753         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
754         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
755             createdClusters.add(cluster.clusterId);
756         }
757         return cluster;
758     }
759
760     class WriteOnlySupport implements WriteSupport {
761
762         ClusterStream stream;
763         ClusterImpl currentCluster;
764
765         public WriteOnlySupport() {
766             this.stream = clusterStream;
767         }
768
769         @Override
770         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
771             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
772         }
773
774         @Override
775         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
776
777             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
778
779             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
780                 if(s != queryProvider2.getRootLibrary())
781                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
782
783             try {
784                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
785             } catch (DatabaseException e) {
786                 Logger.defaultLogError(e);
787                 //throw e;
788             }
789
790             clientChanges.invalidate(s);
791
792             if (cluster.isWriteOnly())
793                 return;
794             queryProvider2.updateStatements(s, p);
795
796         }
797
798         @Override
799         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
800             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
801         }
802
803         @Override
804         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
805
806             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
807             try {
808                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
809             } catch (DatabaseException e) {
810                 Logger.defaultLogError(e);
811             }
812
813             clientChanges.invalidate(rid);
814
815             if (cluster.isWriteOnly())
816                 return;
817             queryProvider2.updateValue(rid);
818
819         }
820
821         @Override
822         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
823
824             if(amount < 65536) {
825                 claimValue(provider,resource, reader.readBytes(null, amount));
826                 return;
827             }
828
829             byte[] bytes = new byte[65536];
830
831             int rid = ((ResourceImpl)resource).id;
832             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
833             try {
834                 int left = amount;
835                 while(left > 0) {
836                     int block = Math.min(left, 65536);
837                     reader.readBytes(bytes, block);
838                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
839                     left -= block;
840                 }
841             } catch (DatabaseException e) {
842                 Logger.defaultLogError(e);
843             }
844
845             clientChanges.invalidate(rid);
846
847             if (cluster.isWriteOnly())
848                 return;
849             queryProvider2.updateValue(rid);
850
851         }
852
853         private void maintainCluster(ClusterImpl before, ClusterI after_) {
854             if(after_ != null && after_ != before) {
855                 ClusterImpl after = (ClusterImpl)after_;
856                 if(currentCluster == before) {
857                     currentCluster = after;
858                 }
859                 clusterTable.replaceCluster(after);
860             }
861         }
862
863         public int createResourceKey(int foreignCounter) throws DatabaseException {
864             if(currentCluster == null) {
865                 currentCluster = getNewResourceCluster();
866             }
867             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
868                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
869                 newCluster.foreignLookup = new byte[foreignCounter];
870                 currentCluster = newCluster;
871                 if (DEBUG)
872                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
873             }
874             return currentCluster.createResource(clusterTranslator);
875         }
876
877         @Override
878         public Resource createResource(VirtualGraph provider) throws DatabaseException {
879             if(currentCluster == null) {
880                 if (null != defaultClusterSet) {
881                         ResourceImpl result = getNewResource(defaultClusterSet);
882                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
883                     return result;
884                 } else {
885                     currentCluster = getNewResourceCluster();
886                 }
887             }
888             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
889                 if (null != defaultClusterSet) {
890                         ResourceImpl result = getNewResource(defaultClusterSet);
891                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
892                     return result;
893                 } else {
894                     currentCluster = getNewResourceCluster();
895                 }
896             }
897             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
898         }
899
900         @Override
901         public Resource createResource(VirtualGraph provider, long clusterId)
902         throws DatabaseException {
903             return getNewResource(clusterId);
904         }
905
906         @Override
907         public Resource createResource(VirtualGraph provider, Resource clusterSet)
908         throws DatabaseException {
909             return getNewResource(clusterSet);
910         }
911
912         @Override
913         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
914                 throws DatabaseException {
915             getNewClusterSet(clusterSet);
916         }
917
918         @Override
919         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
920         throws ServiceException {
921             return containsClusterSet(clusterSet);
922         }
923
924         public void selectCluster(long cluster) {
925                 currentCluster = clusterTable.getClusterByClusterId(cluster);
926                 long setResourceId = clusterSetsSupport.getSet(cluster);
927                 clusterSetsSupport.put(setResourceId, cluster);
928         }
929
930         @Override
931         public Resource setDefaultClusterSet(Resource clusterSet)
932         throws ServiceException {
933                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
934                 if(clusterSet != null) {
935                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
936                         currentCluster = clusterTable.getClusterByClusterId(id);
937                         return result;
938                 } else {
939                         currentCluster = null;
940                         return null;
941                 }
942         }
943
944         @Override
945         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
946
947                  provider = getProvider(provider);
948              if (null == provider) {
949                  int key = ((ResourceImpl)resource).id;
950
951
952
953                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
954 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
955 //                      if(key != queryProvider2.getRootLibrary())
956 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
957
958 //                 try {
959 //                     cluster.removeValue(key, clusterTranslator);
960 //                 } catch (DatabaseException e) {
961 //                     Logger.defaultLogError(e);
962 //                     return;
963 //                 }
964
965                  clusterTable.writeOnlyInvalidate(cluster);
966
967                  try {
968                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
969                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
970                          clusterTranslator.removeValue(cluster);
971                  } catch (DatabaseException e) {
972                          Logger.defaultLogError(e);
973                  }
974
975                  queryProvider2.invalidateResource(key);
976                  clientChanges.invalidate(key);
977
978              } else {
979                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
980                  queryProvider2.updateValue(querySupport.getId(resource));
981                  clientChanges.claimValue(resource);
982              }
983
984
985         }
986
987         @Override
988         public void flush(boolean intermediate) {
989             throw new UnsupportedOperationException();
990         }
991
992         @Override
993         public void flushCluster() {
994             clusterTable.flushCluster(graphSession);
995             if(defaultClusterSet != null) {
996                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
997             }
998             currentCluster = null;
999         }
1000
1001         @Override
1002         public void flushCluster(Resource r) {
1003             throw new UnsupportedOperationException("flushCluster resource " + r);
1004         }
1005
1006         @Override
1007         public void gc() {
1008         }
1009
1010         @Override
1011         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1012                 Resource object) {
1013
1014                 int s = ((ResourceImpl)subject).id;
1015                 int p = ((ResourceImpl)predicate).id;
1016                 int o = ((ResourceImpl)object).id;
1017
1018                 provider = getProvider(provider);
1019                 if (null == provider) {
1020
1021                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1022                         clusterTable.writeOnlyInvalidate(cluster);
1023
1024                         try {
1025
1026                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1027                                 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1028                                 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1029
1030                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1031                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1032
1033                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1034                                 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1035                                 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1036                                 clusterTranslator.removeStatement(cluster);
1037
1038                                 queryProvider2.invalidateResource(s);
1039                                 clientChanges.invalidate(s);
1040
1041                         } catch (DatabaseException e) {
1042
1043                                 Logger.defaultLogError(e);
1044
1045                         }
1046
1047                         return true;
1048
1049                 } else {
1050                         
1051                         ((VirtualGraphImpl)provider).deny(s, p, o);
1052                         queryProvider2.invalidateResource(s);
1053                         clientChanges.invalidate(s);
1054
1055                         return true;
1056
1057                 }
1058
1059         }
1060
1061         @Override
1062         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1063             throw new UnsupportedOperationException();
1064         }
1065
1066         @Override
1067         public boolean writeOnly() {
1068             return true;
1069         }
1070
1071         @Override
1072         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1073             writeSupport.performWriteRequest(graph, request);
1074         }
1075
1076         @Override
1077         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1078             throw new UnsupportedOperationException();
1079         }
1080
1081         @Override
1082         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1083             throw new UnsupportedOperationException();
1084         }
1085
1086         @Override
1087         public <T> void addMetadata(Metadata data) throws ServiceException {
1088             writeSupport.addMetadata(data);
1089         }
1090
1091         @Override
1092         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1093             return writeSupport.getMetadata(clazz);
1094         }
1095
1096         @Override
1097         public TreeMap<String, byte[]> getMetadata() {
1098             return writeSupport.getMetadata();
1099         }
1100
1101         @Override
1102         public void commitDone(WriteTraits writeTraits, long csid) {
1103             writeSupport.commitDone(writeTraits, csid);
1104         }
1105
1106         @Override
1107         public void clearUndoList(WriteTraits writeTraits) {
1108             writeSupport.clearUndoList(writeTraits);
1109         }
1110         @Override
1111         public int clearMetadata() {
1112             return writeSupport.clearMetadata();
1113         }
1114
1115                 @Override
1116                 public void startUndo() {
1117                         writeSupport.startUndo();
1118                 }
1119     }
1120
1121     class VirtualWriteOnlySupport implements WriteSupport {
1122
1123 //        @Override
1124 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1125 //                Resource object) {
1126 //            throw new UnsupportedOperationException();
1127 //        }
1128
1129         @Override
1130         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1131
1132             TransientGraph impl = (TransientGraph)provider;
1133             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1134             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1135             clientChanges.claim(subject, predicate, object);
1136
1137         }
1138
1139         @Override
1140         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1141
1142             TransientGraph impl = (TransientGraph)provider;
1143             impl.claim(subject, predicate, object);
1144             getQueryProvider2().updateStatements(subject, predicate);
1145             clientChanges.claim(subject, predicate, object);
1146
1147         }
1148
1149         @Override
1150         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1151             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1152         }
1153
1154         @Override
1155         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1156             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1157             getQueryProvider2().updateValue(resource);
1158             clientChanges.claimValue(resource);
1159         }
1160
1161         @Override
1162         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1163             byte[] value = reader.readBytes(null, amount);
1164             claimValue(provider, resource, value);
1165         }
1166
1167         @Override
1168         public Resource createResource(VirtualGraph provider) {
1169             TransientGraph impl = (TransientGraph)provider;
1170             return impl.getResource(impl.newResource(false));
1171         }
1172
1173         @Override
1174         public Resource createResource(VirtualGraph provider, long clusterId) {
1175             throw new UnsupportedOperationException();
1176         }
1177
1178         @Override
1179         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1180         throws DatabaseException {
1181             throw new UnsupportedOperationException();
1182         }
1183
1184         @Override
1185         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1186         throws DatabaseException {
1187             throw new UnsupportedOperationException();
1188         }
1189
1190         @Override
1191         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1192         throws ServiceException {
1193             throw new UnsupportedOperationException();
1194         }
1195
1196         @Override
1197         public Resource setDefaultClusterSet(Resource clusterSet)
1198         throws ServiceException {
1199                 return null;
1200         }
1201
1202         @Override
1203         public void denyValue(VirtualGraph provider, Resource resource) {
1204             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1205             getQueryProvider2().updateValue(querySupport.getId(resource));
1206             // NOTE: this only keeps track of value changes by-resource.
1207             clientChanges.claimValue(resource);
1208         }
1209
1210         @Override
1211         public void flush(boolean intermediate) {
1212             throw new UnsupportedOperationException();
1213         }
1214
1215         @Override
1216         public void flushCluster() {
1217             throw new UnsupportedOperationException();
1218         }
1219
1220         @Override
1221         public void flushCluster(Resource r) {
1222             throw new UnsupportedOperationException("Resource " + r);
1223         }
1224
1225         @Override
1226         public void gc() {
1227         }
1228
1229         @Override
1230         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1231             TransientGraph impl = (TransientGraph) provider;
1232             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1233             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1234             clientChanges.deny(subject, predicate, object);
1235             return true;
1236         }
1237
1238         @Override
1239         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1240             throw new UnsupportedOperationException();
1241         }
1242
1243         @Override
1244         public boolean writeOnly() {
1245             return true;
1246         }
1247
1248         @Override
1249         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1250             throw new UnsupportedOperationException();
1251         }
1252
1253         @Override
1254         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1255             throw new UnsupportedOperationException();
1256         }
1257
1258         @Override
1259         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1260             throw new UnsupportedOperationException();
1261         }
1262
1263         @Override
1264         public <T> void addMetadata(Metadata data) throws ServiceException {
1265             throw new UnsupportedOperationException();
1266         }
1267
1268         @Override
1269         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1270             throw new UnsupportedOperationException();
1271         }
1272
1273         @Override
1274         public TreeMap<String, byte[]> getMetadata() {
1275             throw new UnsupportedOperationException();
1276         }
1277
1278         @Override
1279         public void commitDone(WriteTraits writeTraits, long csid) {
1280         }
1281
1282         @Override
1283         public void clearUndoList(WriteTraits writeTraits) {
1284         }
1285
1286         @Override
1287         public int clearMetadata() {
1288             return 0;
1289         }
1290
1291                 @Override
1292                 public void startUndo() {
1293                 }
1294     }
1295
1296     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1297
1298         try {
1299
1300             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1301
1302             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1303
1304             flushCounter = 0;
1305             Disposable.safeDispose(clientChanges);
1306             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1307
1308             acquireWriteOnly();
1309
1310             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1311
1312             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1313
1314             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1315             writeState = writeStateT;
1316
1317             assert (null != writer);
1318 //            writer.state.barrier.inc();
1319             long start = System.nanoTime();
1320             T result = request.perform(writer);
1321             long duration = System.nanoTime() - start;
1322             if (DEBUG) {
1323                 System.err.println("################");
1324                 System.err.println("WriteOnly duration " + 1e-9*duration);
1325             }
1326             writeStateT.setResult(result);
1327
1328             // This makes clusters available from server
1329             clusterStream.reallyFlush();
1330
1331             // This will trigger query updates
1332             releaseWriteOnly(writer);
1333             assert (null != writer);
1334
1335             handleUpdatesAndMetadata(writer);
1336
1337         } catch (CancelTransactionException e) {
1338
1339             releaseWriteOnly(writeState.getGraph());
1340
1341             clusterTable.removeWriteOnlyClusters();
1342             state.stopWriteTransaction(clusterStream);
1343
1344         } catch (Throwable e) {
1345
1346             e.printStackTrace();
1347
1348             releaseWriteOnly(writeState.getGraph());
1349
1350             clusterTable.removeWriteOnlyClusters();
1351
1352             if (callback != null)
1353                 callback.exception(new DatabaseException(e));
1354
1355             state.stopWriteTransaction(clusterStream);
1356             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1357
1358         } finally  {
1359
1360             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1361
1362         }
1363
1364     }
1365
1366     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1367         scheduleRequest(request, callback, notify, null);
1368     }
1369
1370     @Override
1371     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1372
1373         assertAlive();
1374
1375         assert (request != null);
1376
1377         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1378
1379         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1380
1381             @Override
1382             public void run(int thread) {
1383
1384                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1385
1386                     try {
1387
1388                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1389
1390                         flushCounter = 0;
1391                         Disposable.safeDispose(clientChanges);
1392                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1393
1394                         acquireWriteOnly();
1395
1396                         VirtualGraph vg = getProvider(request.getProvider());
1397                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1398
1399                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1400
1401                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1402
1403                             @Override
1404                             public void execute(Object result) {
1405                                 if(callback != null) callback.accept(null);
1406                             }
1407
1408                             @Override
1409                             public void exception(Throwable t) {
1410                                 if(callback != null) callback.accept((DatabaseException)t);
1411                             }
1412
1413                         });
1414
1415                         assert (null != writer);
1416 //                        writer.state.barrier.inc();
1417
1418                         try {
1419
1420                             request.perform(writer);
1421
1422                         } catch (Throwable e) {
1423
1424 //                            writer.state.barrier.dec();
1425 //                            writer.waitAsync(null);
1426
1427                             releaseWriteOnly(writer);
1428
1429                             clusterTable.removeWriteOnlyClusters();
1430
1431                             if(!(e instanceof CancelTransactionException)) {
1432                                 if (callback != null)
1433                                     callback.accept(new DatabaseException(e));
1434                             }
1435
1436                             writeState.except(e);
1437
1438                             return;
1439
1440                         }
1441
1442                         // This makes clusters available from server
1443                         boolean empty = clusterStream.reallyFlush();
1444                         // This was needed to make WO requests call metadata listeners.
1445                         // NOTE: the calling event does not contain clientChanges information.
1446                         if (!empty && clientChanges.isEmpty())
1447                             clientChanges.setNotEmpty(true);
1448                         releaseWriteOnly(writer);
1449                         assert (null != writer);
1450                     } finally  {
1451
1452                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1453
1454                     }
1455
1456
1457                 task.finish();
1458
1459             }
1460
1461         }, combine);
1462
1463     }
1464
1465     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1466         scheduleRequest(request, callback, notify, null);
1467     }
1468
1469     /* (non-Javadoc)
1470      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1471      */
1472     @Override
1473     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1474
1475         assert (request != null);
1476
1477         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1478
1479         requestManager.scheduleWrite(new SessionTask(request, thread) {
1480
1481             @Override
1482             public void run(int thread) {
1483
1484                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1485
1486                 performWriteOnly(request, notify, callback);
1487
1488                 task.finish();
1489
1490             }
1491
1492         }, combine);
1493
1494     }
1495
1496     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1497
1498         assert (request != null);
1499         assert (procedure != null);
1500
1501         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1502
1503         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1504
1505             @Override
1506             public void run(int thread) {
1507
1508                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1509
1510                 ListenerBase listener = getListenerBase(procedure);
1511
1512                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1513
1514                 try {
1515
1516                     if (listener != null) {
1517
1518                         try {
1519                                 
1520                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1521
1522                                         @Override
1523                                         public void exception(AsyncReadGraph graph, Throwable t) {
1524                                                 procedure.exception(graph, t);
1525                                                 if(throwable != null) {
1526                                                         throwable.set(t);
1527                                                 } else {
1528                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1529                                                 }
1530                                         }
1531
1532                                         @Override
1533                                         public void execute(AsyncReadGraph graph, T t) {
1534                                                 if(result != null) result.set(t);
1535                                                 procedure.execute(graph, t);
1536                                         }
1537
1538                                 };
1539                                 
1540                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap);
1541                                 
1542                         } catch (Throwable t) {
1543                             // This is handled by the AsyncProcedure
1544                                 //Logger.defaultLogError("Internal error", t);
1545                         }
1546
1547                     } else {
1548
1549                         try {
1550
1551 //                            newGraph.state.barrier.inc();
1552
1553                             T t = request.perform(newGraph);
1554
1555                             try {
1556
1557                                 if(result != null) result.set(t);
1558                                 procedure.execute(newGraph, t);
1559
1560                             } catch (Throwable th) {
1561
1562                                 if(throwable != null) {
1563                                     throwable.set(th);
1564                                 } else {
1565                                     Logger.defaultLogError("Unhandled exception", th);
1566                                 }
1567
1568                             }
1569
1570                         } catch (Throwable t) {
1571
1572                             if (DEBUG)
1573                                 t.printStackTrace();
1574
1575                             if(throwable != null) {
1576                                 throwable.set(t);
1577                             } else {
1578                                 Logger.defaultLogError("Unhandled exception", t);
1579                             }
1580
1581                             try {
1582
1583                                 procedure.exception(newGraph, t);
1584
1585                             } catch (Throwable t2) {
1586
1587                                 if(throwable != null) {
1588                                     throwable.set(t2);
1589                                 } else {
1590                                     Logger.defaultLogError("Unhandled exception", t2);
1591                                 }
1592
1593                             }
1594
1595                         }
1596
1597 //                        newGraph.state.barrier.dec();
1598 //                        newGraph.waitAsync(request);
1599
1600                     }
1601
1602                 } finally {
1603
1604                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1605
1606                 }
1607
1608             }
1609
1610         });
1611
1612     }
1613
1614     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1615
1616         assert (request != null);
1617         assert (procedure != null);
1618
1619         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1620
1621         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1622
1623             @Override
1624             public void run(int thread) {
1625
1626                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1627
1628                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1629
1630                 try {
1631
1632                     if (listener != null) {
1633
1634                         try {
1635                                 QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure);
1636                                                         //newGraph.processor.query(newGraph, request, null, procedure, listener);
1637                                                 } catch (DatabaseException e) {
1638                                                         Logger.defaultLogError(e);
1639                                                 }
1640
1641                     } else {
1642
1643                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1644                                 procedure, "request");
1645
1646                         try {
1647
1648 //                            newGraph.state.barrier.inc();
1649
1650                             request.perform(newGraph, wrapper);
1651
1652 //                            newGraph.waitAsync(request);
1653
1654                         } catch (Throwable t) {
1655
1656                             wrapper.exception(newGraph, t);
1657 //                            newGraph.waitAsync(request);
1658
1659
1660                         }
1661
1662                     }
1663
1664                 } finally {
1665
1666                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1667
1668                 }
1669
1670             }
1671
1672         });
1673
1674     }
1675
1676     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1677
1678         assert (request != null);
1679         assert (procedure != null);
1680
1681         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1682
1683         int sync = notify != null ? thread : -1;
1684
1685         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1686
1687             @Override
1688             public void run(int thread) {
1689
1690                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1691
1692                 ListenerBase listener = getListenerBase(procedure);
1693
1694                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1695
1696                 try {
1697
1698                     if (listener != null) {
1699
1700                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1701
1702 //                        newGraph.waitAsync(request);
1703
1704                     } else {
1705
1706                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1707
1708                         try {
1709
1710                             request.perform(newGraph, wrapper);
1711
1712                         } catch (Throwable t) {
1713
1714                             t.printStackTrace();
1715
1716                         }
1717
1718                     }
1719
1720                 } finally {
1721
1722                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1723
1724                 }
1725
1726             }
1727
1728         });
1729
1730     }
1731
1732     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1733
1734         assert (request != null);
1735         assert (procedure != null);
1736
1737         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1738
1739         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1740
1741             @Override
1742             public void run(int thread) {
1743
1744                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1745
1746                 ListenerBase listener = getListenerBase(procedure);
1747
1748                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1749
1750                 try {
1751
1752                     if (listener != null) {
1753
1754                         try {
1755                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1756                         } catch (DatabaseException e) {
1757                             Logger.defaultLogError(e);
1758                         }
1759                         
1760                         
1761 //                        newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1762 //
1763 //                            @Override
1764 //                            public void exception(Throwable t) {
1765 //                                procedure.exception(t);
1766 //                                if(throwable != null) {
1767 //                                    throwable.set(t);
1768 //                                }
1769 //                            }
1770 //
1771 //                            @Override
1772 //                            public void execute(T t) {
1773 //                                if(result != null) result.set(t);
1774 //                                procedure.execute(t);
1775 //                            }
1776 //
1777 //                        }, listener);
1778
1779 //                        newGraph.waitAsync(request);
1780
1781                     } else {
1782
1783 //                        newGraph.state.barrier.inc();
1784
1785                         request.register(newGraph, new Listener<T>() {
1786
1787                             @Override
1788                             public void exception(Throwable t) {
1789                                 if(throwable != null) throwable.set(t);
1790                                 procedure.exception(t);
1791 //                                newGraph.state.barrier.dec();
1792                             }
1793
1794                             @Override
1795                             public void execute(T t) {
1796                                 if(result != null) result.set(t);
1797                                 procedure.execute(t);
1798 //                                newGraph.state.barrier.dec();
1799                             }
1800
1801                             @Override
1802                             public boolean isDisposed() {
1803                                 return true;
1804                             }
1805
1806                         });
1807
1808 //                        newGraph.waitAsync(request);
1809
1810                     }
1811
1812                 } finally {
1813
1814                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1815
1816                 }
1817
1818             }
1819
1820         });
1821
1822     }
1823
1824
1825     @Override
1826     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1827
1828         scheduleRequest(request, procedure, null, null, null);
1829
1830     }
1831
1832     @Override
1833     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1834         assertNotSession();
1835         Semaphore notify = new Semaphore(0);
1836         DataContainer<Throwable> container = new DataContainer<Throwable>();
1837         DataContainer<T> result = new DataContainer<T>();
1838         scheduleRequest(request, procedure, notify, container, result);
1839         acquire(notify, request);
1840         Throwable throwable = container.get();
1841         if(throwable != null) {
1842             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1843             else throw new DatabaseException("Unexpected exception", throwable);
1844         }
1845         return result.get();
1846     }
1847
1848     @Override
1849     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1850         assertNotSession();
1851         Semaphore notify = new Semaphore(0);
1852         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1853         final DataContainer<T> resultContainer = new DataContainer<T>();
1854         scheduleRequest(request, new AsyncProcedure<T>() {
1855             @Override
1856             public void exception(AsyncReadGraph graph, Throwable throwable) {
1857                 exceptionContainer.set(throwable);
1858                 procedure.exception(graph, throwable);
1859             }
1860             @Override
1861             public void execute(AsyncReadGraph graph, T result) {
1862                 resultContainer.set(result);
1863                 procedure.execute(graph, result);
1864             }
1865         }, getListenerBase(procedure), notify);
1866         acquire(notify, request, procedure);
1867         Throwable throwable = exceptionContainer.get();
1868         if (throwable != null) {
1869             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1870             else throw new DatabaseException("Unexpected exception", throwable);
1871         }
1872         return resultContainer.get();
1873     }
1874
1875
1876     @Override
1877     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1878         assertNotSession();
1879         Semaphore notify = new Semaphore(0);
1880         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1881         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1882             @Override
1883             public void exception(AsyncReadGraph graph, Throwable throwable) {
1884                 exceptionContainer.set(throwable);
1885                 procedure.exception(graph, throwable);
1886             }
1887             @Override
1888             public void execute(AsyncReadGraph graph, T result) {
1889                 procedure.execute(graph, result);
1890             }
1891             @Override
1892             public void finished(AsyncReadGraph graph) {
1893                 procedure.finished(graph);
1894             }
1895         }, notify);
1896         acquire(notify, request, procedure);
1897         Throwable throwable = exceptionContainer.get();
1898         if (throwable != null) {
1899             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1900             else throw new DatabaseException("Unexpected exception", throwable);
1901         }
1902         // TODO: implement return value
1903         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1904         return null;
1905     }
1906
1907     @Override
1908     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1909         return syncRequest(request, new ProcedureAdapter<T>());
1910
1911
1912 //        assert(request != null);
1913 //
1914 //        final DataContainer<T> result = new DataContainer<T>();
1915 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1916 //
1917 //        syncRequest(request, new Procedure<T>() {
1918 //
1919 //            @Override
1920 //            public void execute(T t) {
1921 //                result.set(t);
1922 //            }
1923 //
1924 //            @Override
1925 //            public void exception(Throwable t) {
1926 //                exception.set(t);
1927 //            }
1928 //
1929 //        });
1930 //
1931 //        Throwable t = exception.get();
1932 //        if(t != null) {
1933 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1934 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1935 //        }
1936 //
1937 //        return result.get();
1938
1939     }
1940
1941     @Override
1942     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1943         return syncRequest(request, (Procedure<T>)procedure);
1944     }
1945
1946
1947 //    @Override
1948 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1949 //        assertNotSession();
1950 //        Semaphore notify = new Semaphore(0);
1951 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1952 //        DataContainer<T> result = new DataContainer<T>();
1953 //        scheduleRequest(request, procedure, notify, container, result);
1954 //        acquire(notify, request);
1955 //        Throwable throwable = container.get();
1956 //        if(throwable != null) {
1957 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1958 //            else throw new DatabaseException("Unexpected exception", throwable);
1959 //        }
1960 //        return result.get();
1961 //    }
1962
1963
1964     @Override
1965     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1966         assertNotSession();
1967         Semaphore notify = new Semaphore(0);
1968         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1969         final DataContainer<T> result = new DataContainer<T>();
1970         scheduleRequest(request, procedure, notify, container, result);
1971         acquire(notify, request);
1972         Throwable throwable = container.get();
1973         if (throwable != null) {
1974             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1975             else throw new DatabaseException("Unexpected exception", throwable);
1976         }
1977         return result.get();
1978     }
1979
1980     @Override
1981     public void syncRequest(Write request) throws DatabaseException  {
1982         assertNotSession();
1983         assertAlive();
1984         Semaphore notify = new Semaphore(0);
1985         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1986         scheduleRequest(request, e -> exception.set(e), notify);
1987         acquire(notify, request);
1988         if(exception.get() != null) throw exception.get();
1989     }
1990
1991     @Override
1992     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1993         assertNotSession();
1994         Semaphore notify = new Semaphore(0);
1995         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1996         final DataContainer<T> result = new DataContainer<T>();
1997         scheduleRequest(request, new Procedure<T>() {
1998
1999             @Override
2000             public void exception(Throwable t) {
2001               exception.set(t);
2002             }
2003
2004             @Override
2005             public void execute(T t) {
2006               result.set(t);
2007             }
2008
2009         }, notify);
2010         acquire(notify, request);
2011         if(exception.get() != null) {
2012             Throwable t = exception.get();
2013             if(t instanceof DatabaseException) throw (DatabaseException)t;
2014             else throw new DatabaseException(t);
2015         }
2016         return result.get();
2017     }
2018
2019     @Override
2020     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2021         assertNotSession();
2022         Semaphore notify = new Semaphore(0);
2023         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2024         final DataContainer<T> result = new DataContainer<T>();
2025         scheduleRequest(request, new Procedure<T>() {
2026
2027             @Override
2028             public void exception(Throwable t) {
2029               exception.set(t);
2030             }
2031
2032             @Override
2033             public void execute(T t) {
2034               result.set(t);
2035             }
2036
2037         }, notify);
2038         acquire(notify, request);
2039         if(exception.get() != null) {
2040             Throwable t = exception.get();
2041             if(t instanceof DatabaseException) throw (DatabaseException)t;
2042             else throw new DatabaseException(t);
2043         }
2044         return result.get();
2045     }
2046
2047     @Override
2048     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2049         assertNotSession();
2050         Semaphore notify = new Semaphore(0);
2051         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2052         final DataContainer<T> result = new DataContainer<T>();
2053         scheduleRequest(request, new Procedure<T>() {
2054
2055             @Override
2056             public void exception(Throwable t) {
2057               exception.set(t);
2058             }
2059
2060             @Override
2061             public void execute(T t) {
2062               result.set(t);
2063             }
2064
2065         }, notify);
2066         acquire(notify, request);
2067         if(exception.get() != null) {
2068             Throwable t = exception.get();
2069             if(t instanceof DatabaseException) throw (DatabaseException)t;
2070             else throw new DatabaseException(t);
2071         }
2072         return result.get();
2073     }
2074
2075     @Override
2076     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2077         assertNotSession();
2078         Semaphore notify = new Semaphore(0);
2079         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2080         scheduleRequest(request, e -> exception.set(e), notify);
2081         acquire(notify, request);
2082         if(exception.get() != null) throw exception.get();
2083     }
2084
2085     @Override
2086     public void syncRequest(WriteOnly request) throws DatabaseException  {
2087         assertNotSession();
2088         assertAlive();
2089         Semaphore notify = new Semaphore(0);
2090         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2091         scheduleRequest(request, e -> exception.set(e), notify);
2092         acquire(notify, request);
2093         if(exception.get() != null) throw exception.get();
2094     }
2095
2096     /*
2097      *
2098      * GraphRequestProcessor interface
2099      */
2100
2101     @Override
2102     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2103
2104         scheduleRequest(request, procedure, null, null);
2105
2106     }
2107
2108     @Override
2109     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2110
2111         scheduleRequest(request, procedure, null);
2112
2113     }
2114
2115     @Override
2116     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2117
2118         scheduleRequest(request, procedure, null, null, null);
2119
2120     }
2121
2122     @Override
2123     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2124
2125         scheduleRequest(request, callback, null);
2126
2127     }
2128
2129     @Override
2130     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2131
2132         scheduleRequest(request, procedure, null);
2133
2134     }
2135
2136     @Override
2137     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2138
2139         scheduleRequest(request, procedure, null);
2140
2141     }
2142
2143     @Override
2144     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2145
2146         scheduleRequest(request, procedure, null);
2147
2148     }
2149
2150     @Override
2151     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2152
2153         scheduleRequest(request, callback, null);
2154
2155     }
2156
2157     @Override
2158     public void asyncRequest(final Write r) {
2159         asyncRequest(r, null);
2160     }
2161
2162     @Override
2163     public void asyncRequest(final DelayedWrite r) {
2164         asyncRequest(r, null);
2165     }
2166
2167     @Override
2168     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2169
2170         scheduleRequest(request, callback, null);
2171
2172     }
2173
2174     @Override
2175     public void asyncRequest(final WriteOnly request) {
2176
2177         asyncRequest(request, null);
2178
2179     }
2180
2181     @Override
2182     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2183         r.request(this, procedure);
2184     }
2185
2186     @Override
2187     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2188         r.request(this, procedure);
2189     }
2190
2191     @Override
2192     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2193         r.request(this, procedure);
2194     }
2195
2196     @Override
2197     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2198         r.request(this, procedure);
2199     }
2200
2201     @Override
2202     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2203         r.request(this, procedure);
2204     }
2205
2206     @Override
2207     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2208         r.request(this, procedure);
2209     }
2210
2211     @Override
2212     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2213         return r.request(this);
2214     }
2215
2216     @Override
2217     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2218         return r.request(this);
2219     }
2220
2221     @Override
2222     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2223         r.request(this, procedure);
2224     }
2225
2226     @Override
2227     public <T> void async(WriteInterface<T> r) {
2228         r.request(this, new ProcedureAdapter<T>());
2229     }
2230
2231     //@Override
2232     public void incAsync() {
2233         state.incAsync();
2234     }
2235
2236     //@Override
2237     public void decAsync() {
2238         state.decAsync();
2239     }
2240
2241     public long getCluster(ResourceImpl resource) {
2242         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2243         return cluster.getClusterId();
2244     }
2245
2246     public long getCluster(int id) {
2247         if (clusterTable == null)
2248             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2249         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2250     }
2251
2252     public ResourceImpl getResource(int id) {
2253         return new ResourceImpl(resourceSupport, id);
2254     }
2255
2256     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2257         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2258         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2259         int key = proxy.getClusterKey();
2260         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2261         return new ResourceImpl(resourceSupport, resourceKey);
2262     }
2263
2264     public ResourceImpl getResource2(int id) {
2265         assert (id != 0);
2266         return new ResourceImpl(resourceSupport, id);
2267     }
2268
2269     final public int getId(ResourceImpl impl) {
2270         return impl.id;
2271     }
2272
2273     public static final Charset UTF8 = Charset.forName("utf-8");
2274
2275
2276
2277
2278     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2279         for(TransientGraph g : support.providers) {
2280             if(g.isPending(subject)) return false;
2281         }
2282         return true;
2283     }
2284
2285     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2286         for(TransientGraph g : support.providers) {
2287             if(g.isPending(subject, predicate)) return false;
2288         }
2289         return true;
2290     }
2291
2292     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2293
2294         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2295
2296             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2297
2298             @Override
2299             public void accept(ReadGraphImpl graph) {
2300                 if(ready.decrementAndGet() == 0) {
2301                     runnable.accept(graph);
2302                 }
2303             }
2304
2305         };
2306
2307         for(TransientGraph g : support.providers) {
2308             if(g.isPending(subject)) {
2309                 try {
2310                     g.load(graph, subject, composite);
2311                 } catch (DatabaseException e) {
2312                     e.printStackTrace();
2313                 }
2314             } else {
2315                 composite.accept(graph);
2316             }
2317         }
2318
2319         composite.accept(graph);
2320
2321     }
2322
2323     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2324
2325         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2326
2327             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2328
2329             @Override
2330             public void accept(ReadGraphImpl graph) {
2331                 if(ready.decrementAndGet() == 0) {
2332                     runnable.accept(graph);
2333                 }
2334             }
2335
2336         };
2337
2338         for(TransientGraph g : support.providers) {
2339             if(g.isPending(subject, predicate)) {
2340                 try {
2341                     g.load(graph, subject, predicate, composite);
2342                 } catch (DatabaseException e) {
2343                     e.printStackTrace();
2344                 }
2345             } else {
2346                 composite.accept(graph);
2347             }
2348         }
2349
2350         composite.accept(graph);
2351
2352     }
2353
2354 //    void dumpHeap() {
2355 //
2356 //        try {
2357 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2358 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2359 //                    "d:/heap" + flushCounter + ".txt", true);
2360 //        } catch (IOException e) {
2361 //            e.printStackTrace();
2362 //        }
2363 //
2364 //    }
2365
2366     void fireReactionsToSynchronize(ChangeSet cs) {
2367
2368         // Do not fire empty events
2369         if (cs.isEmpty())
2370             return;
2371
2372         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2373 //        g.state.barrier.inc();
2374
2375         try {
2376
2377             if (!cs.isEmpty()) {
2378                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2379                 for (ChangeListener l : changeListeners2) {
2380                     try {
2381                         l.graphChanged(e2);
2382                     } catch (Exception ex) {
2383                         ex.printStackTrace();
2384                     }
2385                 }
2386             }
2387
2388         } finally {
2389
2390 //            g.state.barrier.dec();
2391 //            g.waitAsync(null);
2392
2393         }
2394
2395     }
2396
2397     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2398         try {
2399
2400             // Do not fire empty events
2401             if (cs2.isEmpty())
2402                 return;
2403
2404 //            graph.restart();
2405 //            graph.state.barrier.inc();
2406
2407             try {
2408
2409                 if (!cs2.isEmpty()) {
2410
2411                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2412                     for (ChangeListener l : changeListeners2) {
2413                         try {
2414                             l.graphChanged(e2);
2415 //                            System.out.println("changelistener " + l);
2416                         } catch (Exception ex) {
2417                             ex.printStackTrace();
2418                         }
2419                     }
2420
2421                 }
2422
2423             } finally {
2424
2425 //                graph.state.barrier.dec();
2426 //                graph.waitAsync(null);
2427
2428             }
2429         } catch (Throwable t) {
2430             t.printStackTrace();
2431
2432         }
2433     }
2434     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2435
2436         try {
2437
2438             // Do not fire empty events
2439             if (cs2.isEmpty())
2440                 return;
2441
2442             // Do not fire on virtual requests
2443             if(graph.getProvider() != null)
2444                 return;
2445
2446             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2447
2448             try {
2449
2450                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2451                 for (ChangeListener l : metadataListeners) {
2452                     try {
2453                         l.graphChanged(e2);
2454                     } catch (Throwable ex) {
2455                         ex.printStackTrace();
2456                     }
2457                 }
2458
2459             } finally {
2460
2461             }
2462
2463         } catch (Throwable t) {
2464             t.printStackTrace();
2465         }
2466
2467     }
2468
2469
2470     /*
2471      * (non-Javadoc)
2472      *
2473      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2474      */
2475     @Override
2476     public <T> T getService(Class<T> api) {
2477         T t = peekService(api);
2478         if (t == null) {
2479             if (state.isClosed())
2480                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2481             throw new ServiceNotFoundException(this, api);
2482         }
2483         return t;
2484     }
2485
2486     /**
2487      * @return
2488      */
2489     protected abstract ServerInformation getCachedServerInformation();
2490
2491         private Class<?> serviceKey1 = null;
2492         private Class<?> serviceKey2 = null;
2493         private Object service1 = null;
2494         private Object service2 = null;
2495
2496     /*
2497      * (non-Javadoc)
2498      *
2499      * @see
2500      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2501      */
2502     @SuppressWarnings("unchecked")
2503     @Override
2504     public synchronized <T> T peekService(Class<T> api) {
2505
2506                 if(serviceKey1 == api) {
2507                         return (T)service1;
2508                 } else if (serviceKey2 == api) {
2509                         // Promote this key
2510                         Object result = service2;
2511                         service2 = service1;
2512                         serviceKey2 = serviceKey1;
2513                         service1 = result;
2514                         serviceKey1 = api;
2515                         return (T)result;
2516                 }
2517
2518         if (Layer0.class == api)
2519                 return (T) L0;
2520         if (ServerInformation.class == api)
2521             return (T) getCachedServerInformation();
2522         else if (WriteGraphImpl.class == api)
2523             return (T) writeState.getGraph();
2524         else if (ClusterBuilder.class == api)
2525             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2526         else if (ClusterBuilderFactory.class == api)
2527             return (T)new ClusterBuilderFactoryImpl(this);
2528
2529                 service2 = service1;
2530                 serviceKey2 = serviceKey1;
2531
2532         service1 = serviceLocator.peekService(api);
2533         serviceKey1 = api;
2534
2535         return (T)service1;
2536
2537     }
2538
2539     /*
2540      * (non-Javadoc)
2541      *
2542      * @see
2543      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2544      */
2545     @Override
2546     public boolean hasService(Class<?> api) {
2547         return serviceLocator.hasService(api);
2548     }
2549
2550     /**
2551      * @param api the api that must be implemented by the specified service
2552      * @param service the service implementation
2553      */
2554     @Override
2555     public <T> void registerService(Class<T> api, T service) {
2556         if(Layer0.class == api) {
2557                 L0 = (Layer0)service;
2558                 return;
2559         }
2560         serviceLocator.registerService(api, service);
2561         if (TransactionPolicySupport.class == api) {
2562             transactionPolicy = (TransactionPolicySupport)service;
2563             state.resetTransactionPolicy();
2564         }
2565         if (api == serviceKey1)
2566                 service1 = service;
2567         else if (api == serviceKey2)
2568                 service2 = service;
2569     }
2570
2571     // ----------------
2572     // SessionMonitor
2573     // ----------------
2574
2575     void fireSessionVariableChange(String variable) {
2576         for (MonitorHandler h : monitorHandlers) {
2577             MonitorContext ctx = monitorContexts.getLeft(h);
2578             assert ctx != null;
2579             // SafeRunner functionality repeated here to avoid dependency.
2580             try {
2581                 h.valuesChanged(ctx);
2582             } catch (Exception e) {
2583                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2584             } catch (LinkageError e) {
2585                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2586             }
2587         }
2588     }
2589
2590
2591     class ResourceSerializerImpl implements ResourceSerializer {
2592
2593         public long createRandomAccessId(int id) throws DatabaseException {
2594             if(id < 0)
2595                 return id;
2596             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2597             long cluster = getCluster(id);
2598             if (0 == cluster)
2599                 return 0; // Better to return 0 then invalid id.
2600             long result = ClusterTraitsBase.createResourceId(cluster, index);
2601             return result;
2602         }
2603
2604         @Override
2605         public long getRandomAccessId(Resource resource) throws DatabaseException {
2606
2607             ResourceImpl resourceImpl = (ResourceImpl) resource;
2608             return createRandomAccessId(resourceImpl.id);
2609
2610         }
2611
2612         @Override
2613         public int getTransientId(Resource resource) throws DatabaseException {
2614
2615             ResourceImpl resourceImpl = (ResourceImpl) resource;
2616             return resourceImpl.id;
2617
2618         }
2619
2620         @Override
2621         public String createRandomAccessId(Resource resource)
2622         throws InvalidResourceReferenceException {
2623
2624             if(resource == null) throw new IllegalArgumentException();
2625
2626             ResourceImpl resourceImpl = (ResourceImpl) resource;
2627
2628             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2629
2630             int r;
2631             try {
2632                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2633             } catch (DatabaseException e1) {
2634                 throw new InvalidResourceReferenceException(e1);
2635             }
2636             try {
2637                 // Serialize as '<resource index>_<cluster id>'
2638                 return "" + r + "_" + getCluster(resourceImpl);
2639             } catch (Throwable e) {
2640                 e.printStackTrace();
2641                 throw new InvalidResourceReferenceException(e);
2642             } finally {
2643             }
2644         }
2645
2646         public int getTransientId(long serialized) throws DatabaseException {
2647             if (serialized <= 0)
2648                 return (int)serialized;
2649             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2650             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2651             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2652             if (cluster == null)
2653                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2654             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2655             return key;
2656         }
2657
2658         @Override
2659         public Resource getResource(long randomAccessId) throws DatabaseException {
2660             return getResourceByKey(getTransientId(randomAccessId));
2661         }
2662
2663         @Override
2664         public Resource getResource(int transientId) throws DatabaseException {
2665             return getResourceByKey(transientId);
2666         }
2667
2668         @Override
2669         public Resource getResource(String randomAccessId)
2670         throws InvalidResourceReferenceException {
2671             try {
2672                 int i = randomAccessId.indexOf('_');
2673                 if (i == -1)
2674                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2675                             + randomAccessId + "'");
2676                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2677                 if(r < 0) return getResourceByKey(r);
2678                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2679                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2680                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2681                 if (cluster.hasResource(key, clusterTranslator))
2682                     return getResourceByKey(key);
2683             } catch (InvalidResourceReferenceException e) {
2684                 throw e;
2685             } catch (NumberFormatException e) {
2686                 throw new InvalidResourceReferenceException(e);
2687             } catch (Throwable e) {
2688                 e.printStackTrace();
2689                 throw new InvalidResourceReferenceException(e);
2690             } finally {
2691             }
2692             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2693         }
2694
2695         @Override
2696         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2697             try {
2698                 return true;
2699             } catch (Throwable e) {
2700                 e.printStackTrace();
2701                 throw new InvalidResourceReferenceException(e);
2702             } finally {
2703             }
2704         }
2705     }
2706
2707     // Copied from old SessionImpl
2708
2709     void check() {
2710         if (state.isClosed())
2711             throw new Error("Session closed.");
2712     }
2713
2714
2715
2716     public ResourceImpl getNewResource(long clusterId) {
2717         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2718         int newId;
2719         try {
2720             newId = cluster.createResource(clusterTranslator);
2721         } catch (DatabaseException e) {
2722             Logger.defaultLogError(e);
2723             return null;
2724         }
2725         return new ResourceImpl(resourceSupport, newId);
2726     }
2727
2728     public ResourceImpl getNewResource(Resource clusterSet)
2729     throws DatabaseException {
2730         long resourceId = clusterSet.getResourceId();
2731         Long clusterId = clusterSetsSupport.get(resourceId);
2732         if (null == clusterId)
2733             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2734         if (Constants.NewClusterId == clusterId) {
2735             clusterId = getService(ClusteringSupport.class).createCluster();
2736             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2737                 createdClusters.add(clusterId);
2738             clusterSetsSupport.put(resourceId, clusterId);
2739             return getNewResource(clusterId);
2740         } else {
2741             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2742             ResourceImpl result;
2743             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2744                 clusterId = getService(ClusteringSupport.class).createCluster();
2745                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2746                     createdClusters.add(clusterId);
2747                 clusterSetsSupport.put(resourceId, clusterId);
2748                 return getNewResource(clusterId);
2749             } else {
2750                 result = getNewResource(clusterId);
2751                 int resultKey = querySupport.getId(result);
2752                 long resultCluster = querySupport.getClusterId(resultKey);
2753                 if (clusterId != resultCluster)
2754                     clusterSetsSupport.put(resourceId, resultCluster);
2755                 return result;
2756             }
2757         }
2758     }
2759
2760     public void getNewClusterSet(Resource clusterSet)
2761     throws DatabaseException {
2762         if (DEBUG)
2763             System.out.println("new cluster set=" + clusterSet);
2764         long resourceId = clusterSet.getResourceId();
2765         if (clusterSetsSupport.containsKey(resourceId))
2766             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2767         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2768     }
2769     public boolean containsClusterSet(Resource clusterSet)
2770     throws ServiceException {
2771         long resourceId = clusterSet.getResourceId();
2772         return clusterSetsSupport.containsKey(resourceId);
2773     }
2774     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2775         Resource r = defaultClusterSet;
2776         defaultClusterSet = clusterSet;
2777         return r;
2778     }
2779     void printDiagnostics() {
2780
2781         if (DIAGNOSTICS) {
2782
2783             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2784             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2785             for(ClusterI cluster : clusterTable.getClusters()) {
2786                 try {
2787                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2788                         residentClusters.set(residentClusters.get() + 1);
2789                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2790                     }
2791                 } catch (DatabaseException e) {
2792                     Logger.defaultLogError(e);
2793                 }
2794             }
2795
2796             System.out.println("--------------------------------");
2797             System.out.println("Cluster information:");
2798             System.out.println("-amount of resident clusters=" + residentClusters.get());
2799             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2800
2801             for(ClusterI cluster : clusterTable.getClusters()) {
2802                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2803                 try {
2804                     if (!cluster.isLoaded())
2805                         System.out.println("not loaded]");
2806                     else if (cluster.isEmpty())
2807                         System.out.println("is empty]");
2808                     else {
2809                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2810                         System.out.print(",references=" + cluster.getReferenceCount());
2811                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2812                     }
2813                 } catch (DatabaseException e) {
2814                     Logger.defaultLogError(e);
2815                     System.out.println("is corrupted]");
2816                 }
2817             }
2818
2819             queryProvider2.printDiagnostics();
2820             System.out.println("--------------------------------");
2821         }
2822
2823     }
2824
2825     public void fireStartReadTransaction() {
2826
2827         if (DIAGNOSTICS)
2828             System.out.println("StartReadTransaction");
2829
2830         for (SessionEventListener listener : eventListeners) {
2831             try {
2832                 listener.readTransactionStarted();
2833             } catch (Throwable t) {
2834                 t.printStackTrace();
2835             }
2836         }
2837
2838     }
2839
2840     public void fireFinishReadTransaction() {
2841
2842         if (DIAGNOSTICS)
2843             System.out.println("FinishReadTransaction");
2844
2845         for (SessionEventListener listener : eventListeners) {
2846             try {
2847                 listener.readTransactionFinished();
2848             } catch (Throwable t) {
2849                 t.printStackTrace();
2850             }
2851         }
2852
2853     }
2854
2855     public void fireStartWriteTransaction() {
2856
2857         if (DIAGNOSTICS)
2858             System.out.println("StartWriteTransaction");
2859
2860         for (SessionEventListener listener : eventListeners) {
2861             try {
2862                 listener.writeTransactionStarted();
2863             } catch (Throwable t) {
2864                 t.printStackTrace();
2865             }
2866         }
2867
2868     }
2869
2870     public void fireFinishWriteTransaction() {
2871
2872         if (DIAGNOSTICS)
2873             System.out.println("FinishWriteTransaction");
2874
2875         for (SessionEventListener listener : eventListeners) {
2876             try {
2877                 listener.writeTransactionFinished();
2878             } catch (Throwable t) {
2879                 t.printStackTrace();
2880             }
2881         }
2882
2883         Indexing.resetDependenciesIndexingDisabled();
2884
2885     }
2886
2887     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2888         throw new Error("Not supported at the moment.");
2889     }
2890
2891     State getState() {
2892         return state;
2893     }
2894
2895     ClusterTable getClusterTable() {
2896         return clusterTable;
2897     }
2898
2899     public GraphSession getGraphSession() {
2900         return graphSession;
2901     }
2902
2903     public QueryProcessor getQueryProvider2() {
2904         return queryProvider2;
2905     }
2906
2907     ClientChangesImpl getClientChanges() {
2908         return clientChanges;
2909     }
2910
2911     boolean getWriteOnly() {
2912         return writeOnly;
2913     }
2914
2915     static int counter = 0;
2916
2917     public void onClusterLoaded(long clusterId) {
2918
2919         clusterTable.updateSize();
2920
2921     }
2922
2923
2924
2925
2926     /*
2927      * Implementation of the interface RequestProcessor
2928      */
2929
2930     @Override
2931     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2932
2933         assertNotSession();
2934         assertAlive();
2935
2936         assert(request != null);
2937
2938         final DataContainer<T> result = new DataContainer<T>();
2939         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2940
2941         syncRequest(request, new AsyncProcedure<T>() {
2942
2943             @Override
2944             public void execute(AsyncReadGraph graph, T t) {
2945                 result.set(t);
2946             }
2947
2948             @Override
2949             public void exception(AsyncReadGraph graph, Throwable t) {
2950                 exception.set(t);
2951             }
2952
2953         });
2954
2955         Throwable t = exception.get();
2956         if(t != null) {
2957             if(t instanceof DatabaseException) throw (DatabaseException)t;
2958             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2959         }
2960
2961         return result.get();
2962
2963     }
2964
2965 //    @Override
2966 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2967 //        assertNotSession();
2968 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2969 //    }
2970
2971     @Override
2972     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2973         assertNotSession();
2974         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2975     }
2976
2977     @Override
2978     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2979         assertNotSession();
2980         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2981     }
2982
2983     @Override
2984     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2985         assertNotSession();
2986         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2987     }
2988
2989     @Override
2990     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2991         assertNotSession();
2992         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2993     }
2994
2995     @Override
2996     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2997
2998         assertNotSession();
2999
3000         assert(request != null);
3001
3002         final DataContainer<T> result = new DataContainer<T>();
3003         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3004
3005         syncRequest(request, new AsyncProcedure<T>() {
3006
3007             @Override
3008             public void execute(AsyncReadGraph graph, T t) {
3009                 result.set(t);
3010             }
3011
3012             @Override
3013             public void exception(AsyncReadGraph graph, Throwable t) {
3014                 exception.set(t);
3015             }
3016
3017         });
3018
3019         Throwable t = exception.get();
3020         if(t != null) {
3021             if(t instanceof DatabaseException) throw (DatabaseException)t;
3022             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3023         }
3024
3025         return result.get();
3026
3027     }
3028
3029     @Override
3030     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3031         assertNotSession();
3032         return syncRequest(request, (AsyncProcedure<T>)procedure);
3033     }
3034
3035     @Override
3036     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3037         assertNotSession();
3038         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3039     }
3040
3041     @Override
3042     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3043         assertNotSession();
3044         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3045     }
3046
3047     @Override
3048     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3049         assertNotSession();
3050         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3051     }
3052
3053     @Override
3054     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3055         assertNotSession();
3056         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3057     }
3058
3059     @Override
3060     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3061
3062         assertNotSession();
3063
3064         assert(request != null);
3065
3066         final ArrayList<T> result = new ArrayList<T>();
3067         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3068
3069         syncRequest(request, new AsyncMultiProcedure<T>() {
3070
3071             @Override
3072             public void execute(AsyncReadGraph graph, T t) {
3073                 synchronized(result) {
3074                     result.add(t);
3075                 }
3076             }
3077
3078             @Override
3079             public void finished(AsyncReadGraph graph) {
3080             }
3081
3082             @Override
3083             public void exception(AsyncReadGraph graph, Throwable t) {
3084                 exception.set(t);
3085             }
3086
3087
3088         });
3089
3090         Throwable t = exception.get();
3091         if(t != null) {
3092             if(t instanceof DatabaseException) throw (DatabaseException)t;
3093             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3094         }
3095
3096         return result;
3097
3098     }
3099
3100     @Override
3101     public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3102         assertNotSession();
3103         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3104     }
3105
3106     @Override
3107     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3108         assertNotSession();
3109         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3110     }
3111
3112     @Override
3113     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3114         assertNotSession();
3115         return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3116     }
3117
3118     @Override
3119     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3120         assertNotSession();
3121         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3122     }
3123
3124     @Override
3125     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3126         assertNotSession();
3127         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3128     }
3129
3130     @Override
3131     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3132
3133         assertNotSession();
3134
3135         assert(request != null);
3136
3137         final ArrayList<T> result = new ArrayList<T>();
3138         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3139
3140         syncRequest(request, new AsyncMultiProcedure<T>() {
3141
3142             @Override
3143             public void execute(AsyncReadGraph graph, T t) {
3144                 synchronized(result) {
3145                     result.add(t);
3146                 }
3147             }
3148
3149             @Override
3150             public void finished(AsyncReadGraph graph) {
3151             }
3152
3153             @Override
3154             public void exception(AsyncReadGraph graph, Throwable t) {
3155                 exception.set(t);
3156             }
3157
3158
3159         });
3160
3161         Throwable t = exception.get();
3162         if(t != null) {
3163             if(t instanceof DatabaseException) throw (DatabaseException)t;
3164             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3165         }
3166
3167         return result;
3168
3169     }
3170
3171     @Override
3172     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3173         assertNotSession();
3174         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3175     }
3176
3177     @Override
3178     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3179         assertNotSession();
3180         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3181     }
3182
3183     @Override
3184     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3185         assertNotSession();
3186        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3187     }
3188
3189     @Override
3190     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3191         assertNotSession();
3192         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3193     }
3194
3195     @Override
3196     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3197         assertNotSession();
3198         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3199     }
3200
3201
3202     @Override
3203     public <T> void asyncRequest(Read<T> request) {
3204
3205         asyncRequest(request, new ProcedureAdapter<T>() {
3206             @Override
3207             public void exception(Throwable t) {
3208                 t.printStackTrace();
3209             }
3210         });
3211
3212     }
3213
3214     @Override
3215     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3216         asyncRequest(request, (AsyncProcedure<T>)procedure);
3217     }
3218
3219     @Override
3220     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3221         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3222     }
3223
3224     @Override
3225     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3226         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3227     }
3228
3229     @Override
3230     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3231         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3232     }
3233
3234     @Override
3235     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3236         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3237     }
3238
3239     @Override
3240     final public <T> void asyncRequest(final AsyncRead<T> request) {
3241
3242         assert(request != null);
3243
3244         asyncRequest(request, new ProcedureAdapter<T>() {
3245             @Override
3246             public void exception(Throwable t) {
3247                 t.printStackTrace();
3248             }
3249         });
3250
3251     }
3252
3253     @Override
3254     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3255         scheduleRequest(request, procedure, procedure, null);
3256     }
3257
3258     @Override
3259     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3260         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3261     }
3262
3263     @Override
3264     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3265         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3266     }
3267
3268     @Override
3269     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3270         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3271     }
3272
3273     @Override
3274     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3275         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3276     }
3277
3278     @Override
3279     public <T> void asyncRequest(MultiRead<T> request) {
3280
3281         assert(request != null);
3282
3283         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3284             @Override
3285             public void exception(AsyncReadGraph graph, Throwable t) {
3286                 t.printStackTrace();
3287             }
3288         });
3289
3290     }
3291
3292     @Override
3293     public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3294         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3295     }
3296
3297     @Override
3298     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3299         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3300     }
3301
3302     @Override
3303     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3304         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3305     }
3306
3307     @Override
3308     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3309         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3310     }
3311
3312     @Override
3313     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3314         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3315     }
3316
3317     @Override
3318     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3319
3320         assert(request != null);
3321
3322         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3323             @Override
3324             public void exception(AsyncReadGraph graph, Throwable t) {
3325                 t.printStackTrace();
3326             }
3327         });
3328
3329     }
3330
3331     @Override
3332     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3333         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3334     }
3335
3336     @Override
3337     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3338         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3339     }
3340
3341     @Override
3342     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3343         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3344     }
3345
3346     @Override
3347     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3348         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3349     }
3350
3351     @Override
3352     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3353         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3354     }
3355
3356     @Override
3357     public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3358         assertNotSession();
3359         throw new Error("Not implemented!");
3360     }
3361
3362     @Override
3363     public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3364         throw new Error("Not implemented!");
3365     }
3366
3367     @Override
3368     final public <T> void asyncRequest(final ExternalRead<T> request) {
3369
3370         assert(request != null);
3371
3372         asyncRequest(request, new ProcedureAdapter<T>() {
3373             @Override
3374             public void exception(Throwable t) {
3375                 t.printStackTrace();
3376             }
3377         });
3378
3379     }
3380
3381     @Override
3382     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3383         asyncRequest(request, (Procedure<T>)procedure);
3384     }
3385
3386
3387
3388     void check(Throwable t) throws DatabaseException {
3389         if(t != null) {
3390             if(t instanceof DatabaseException) throw (DatabaseException)t;
3391             else throw new DatabaseException("Unexpected exception", t);
3392         }
3393     }
3394
3395     void check(DataContainer<Throwable> container) throws DatabaseException {
3396         Throwable t = container.get();
3397         if(t != null) {
3398             if(t instanceof DatabaseException) throw (DatabaseException)t;
3399             else throw new DatabaseException("Unexpected exception", t);
3400         }
3401     }
3402
3403
3404
3405
3406
3407
3408     boolean sameProvider(Write request) {
3409         if(writeState.getGraph().provider != null) {
3410             return writeState.getGraph().provider.equals(request.getProvider());
3411         } else {
3412             return request.getProvider() == null;
3413         }
3414     }
3415
3416     boolean plainWrite(WriteGraphImpl graph) {
3417         if(graph == null) return false;
3418         if(graph.writeSupport.writeOnly()) return false;
3419         return true;
3420     }
3421
3422
3423     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3424     private void assertNotSession() throws DatabaseException {
3425         Thread current = Thread.currentThread();
3426         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3427     }
3428
3429     void assertAlive() {
3430         if (!state.isAlive())
3431             throw new RuntimeDatabaseException("Session has been shut down.");
3432     }
3433
3434     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3435         return querySupport.getValueStream(graph, querySupport.getId(resource));
3436     }
3437
3438     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3439         return querySupport.getValue(graph, querySupport.getId(resource));
3440     }
3441
3442     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3443         acquire(semaphore, request, null);
3444     }
3445
3446     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3447         assertAlive();
3448         try {
3449             long tryCount = 0;
3450             // Loop until the semaphore is acquired reporting requests that take a long time.
3451             while (true) {
3452
3453                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3454                     // Acquired OK.
3455                     return;
3456                 } else {
3457                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3458                         ++tryCount;
3459                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3460                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3461                                 * tryCount;
3462                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3463                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3464                     }
3465                 }
3466
3467                 assertAlive();
3468
3469             }
3470         } catch (InterruptedException e) {
3471             e.printStackTrace();
3472             // FIXME: Should perhaps do something else in this case ??
3473         }
3474     }
3475
3476
3477
3478 //    @Override
3479 //    public boolean holdOnToTransactionAfterCancel() {
3480 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3481 //    }
3482 //
3483 //    @Override
3484 //    public boolean holdOnToTransactionAfterCommit() {
3485 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3486 //    }
3487 //
3488 //    @Override
3489 //    public boolean holdOnToTransactionAfterRead() {
3490 //        return transactionPolicy.holdOnToTransactionAfterRead();
3491 //    }
3492 //
3493 //    @Override
3494 //    public void onRelinquish() {
3495 //        transactionPolicy.onRelinquish();
3496 //    }
3497 //
3498 //    @Override
3499 //    public void onRelinquishDone() {
3500 //        transactionPolicy.onRelinquishDone();
3501 //    }
3502 //
3503 //    @Override
3504 //    public void onRelinquishError() {
3505 //        transactionPolicy.onRelinquishError();
3506 //    }
3507
3508
3509     @Override
3510     public Session getSession() {
3511         return this;
3512     }
3513
3514
3515     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3516     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3517
3518     public void ceased(int thread) {
3519
3520         requestManager.ceased(thread);
3521
3522     }
3523
3524     public int getAmountOfQueryThreads() {
3525         // This must be a power of two
3526         return 4;
3527 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3528     }
3529
3530     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3531
3532         return new ResourceImpl(resourceSupport, key);
3533
3534     }
3535
3536     public void acquireWriteOnly() {
3537         writeOnly = true;
3538     }
3539
3540     public void releaseWriteOnly(ReadGraphImpl graph) {
3541         writeOnly = false;
3542         queryProvider2.releaseWrite(graph);
3543     }
3544
3545     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3546
3547         long start = System.nanoTime();
3548
3549         while(dirtyPrimitives) {
3550             dirtyPrimitives = false;
3551             getQueryProvider2().performDirtyUpdates(writer);
3552             getQueryProvider2().performScheduledUpdates(writer);
3553         }
3554
3555         fireMetadataListeners(writer, clientChanges);
3556
3557         if(DebugPolicy.PERFORMANCE_DATA) {
3558             long end = System.nanoTime();
3559             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3560         }
3561
3562     }
3563
3564     void removeTemporaryData() {
3565         File platform = Platform.getLocation().toFile();
3566         File tempFiles = new File(platform, "tempFiles");
3567         File temp = new File(tempFiles, "db");
3568         if(!temp.exists()) 
3569             return;
3570         try {
3571             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3572                 @Override
3573                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3574                     try {
3575                         Files.delete(file);
3576                     } catch (IOException e) {
3577                         Logger.defaultLogError(e);
3578                     }
3579                     return FileVisitResult.CONTINUE;
3580                 }
3581             });
3582         } catch (IOException e) {
3583             Logger.defaultLogError(e);
3584         }
3585     }
3586
3587     public void handleCreatedClusters() {
3588         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3589             createdClusters.forEach(new TLongProcedure() {
3590
3591                 @Override
3592                 public boolean execute(long value) {
3593                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3594                     cluster.setImmutable(true, clusterTranslator);
3595                     return true;
3596                 }
3597             });
3598         }
3599         createdClusters.clear();
3600     }
3601
3602     @Override
3603     public Object getModificationCounter() {
3604         return queryProvider2.modificationCounter;
3605     }
3606     
3607     @Override
3608     public void markUndoPoint() {
3609         state.setCombine(false);
3610     }
3611
3612 }