]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Delete temporary files after use in delayed writes and model TG export
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.eclipse.core.runtime.Platform;
32 import org.simantics.databoard.Bindings;
33 import org.simantics.db.AsyncReadGraph;
34 import org.simantics.db.ChangeSet;
35 import org.simantics.db.DevelopmentKeys;
36 import org.simantics.db.ExternalValueSupport;
37 import org.simantics.db.Metadata;
38 import org.simantics.db.MonitorContext;
39 import org.simantics.db.MonitorHandler;
40 import org.simantics.db.Resource;
41 import org.simantics.db.ResourceSerializer;
42 import org.simantics.db.Session;
43 import org.simantics.db.SessionManager;
44 import org.simantics.db.SessionVariables;
45 import org.simantics.db.VirtualGraph;
46 import org.simantics.db.WriteGraph;
47 import org.simantics.db.authentication.UserAuthenticationAgent;
48 import org.simantics.db.authentication.UserAuthenticator;
49 import org.simantics.db.common.Indexing;
50 import org.simantics.db.common.TransactionPolicyRelease;
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
63 import org.simantics.db.common.utils.Logger;
64 import org.simantics.db.event.ChangeEvent;
65 import org.simantics.db.event.ChangeListener;
66 import org.simantics.db.event.SessionEventListener;
67 import org.simantics.db.exception.CancelTransactionException;
68 import org.simantics.db.exception.ClusterSetExistException;
69 import org.simantics.db.exception.DatabaseException;
70 import org.simantics.db.exception.ImmutableException;
71 import org.simantics.db.exception.InvalidResourceReferenceException;
72 import org.simantics.db.exception.ResourceNotFoundException;
73 import org.simantics.db.exception.RuntimeDatabaseException;
74 import org.simantics.db.exception.ServiceException;
75 import org.simantics.db.exception.ServiceNotFoundException;
76 import org.simantics.db.impl.ClusterBase;
77 import org.simantics.db.impl.ClusterI;
78 import org.simantics.db.impl.ClusterTraitsBase;
79 import org.simantics.db.impl.ClusterTranslator;
80 import org.simantics.db.impl.ResourceImpl;
81 import org.simantics.db.impl.TransientGraph;
82 import org.simantics.db.impl.VirtualGraphImpl;
83 import org.simantics.db.impl.graph.DelayedWriteGraph;
84 import org.simantics.db.impl.graph.ReadGraphImpl;
85 import org.simantics.db.impl.graph.WriteGraphImpl;
86 import org.simantics.db.impl.graph.WriteSupport;
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
90 import org.simantics.db.impl.query.QueryProcessor;
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
93 import org.simantics.db.impl.service.QueryDebug;
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
96 import org.simantics.db.procedure.AsyncListener;
97 import org.simantics.db.procedure.AsyncMultiListener;
98 import org.simantics.db.procedure.AsyncMultiProcedure;
99 import org.simantics.db.procedure.AsyncProcedure;
100 import org.simantics.db.procedure.Listener;
101 import org.simantics.db.procedure.ListenerBase;
102 import org.simantics.db.procedure.MultiListener;
103 import org.simantics.db.procedure.MultiProcedure;
104 import org.simantics.db.procedure.Procedure;
105 import org.simantics.db.procedure.SyncListener;
106 import org.simantics.db.procedure.SyncMultiListener;
107 import org.simantics.db.procedure.SyncMultiProcedure;
108 import org.simantics.db.procedure.SyncProcedure;
109 import org.simantics.db.procore.cluster.ClusterImpl;
110 import org.simantics.db.procore.cluster.ClusterTraits;
111 import org.simantics.db.procore.protocol.Constants;
112 import org.simantics.db.procore.protocol.DebugPolicy;
113 import org.simantics.db.request.AsyncMultiRead;
114 import org.simantics.db.request.AsyncRead;
115 import org.simantics.db.request.DelayedWrite;
116 import org.simantics.db.request.DelayedWriteResult;
117 import org.simantics.db.request.ExternalRead;
118 import org.simantics.db.request.MultiRead;
119 import org.simantics.db.request.Read;
120 import org.simantics.db.request.ReadInterface;
121 import org.simantics.db.request.UndoTraits;
122 import org.simantics.db.request.Write;
123 import org.simantics.db.request.WriteInterface;
124 import org.simantics.db.request.WriteOnly;
125 import org.simantics.db.request.WriteOnlyResult;
126 import org.simantics.db.request.WriteResult;
127 import org.simantics.db.request.WriteTraits;
128 import org.simantics.db.service.ByteReader;
129 import org.simantics.db.service.ClusterBuilder;
130 import org.simantics.db.service.ClusterBuilderFactory;
131 import org.simantics.db.service.ClusterControl;
132 import org.simantics.db.service.ClusterSetsSupport;
133 import org.simantics.db.service.ClusterUID;
134 import org.simantics.db.service.ClusteringSupport;
135 import org.simantics.db.service.CollectionSupport;
136 import org.simantics.db.service.DebugSupport;
137 import org.simantics.db.service.DirectQuerySupport;
138 import org.simantics.db.service.GraphChangeListenerSupport;
139 import org.simantics.db.service.InitSupport;
140 import org.simantics.db.service.LifecycleSupport;
141 import org.simantics.db.service.ManagementSupport;
142 import org.simantics.db.service.QueryControl;
143 import org.simantics.db.service.SerialisationSupport;
144 import org.simantics.db.service.ServerInformation;
145 import org.simantics.db.service.ServiceActivityMonitor;
146 import org.simantics.db.service.SessionEventSupport;
147 import org.simantics.db.service.SessionMonitorSupport;
148 import org.simantics.db.service.SessionUserSupport;
149 import org.simantics.db.service.StatementSupport;
150 import org.simantics.db.service.TransactionPolicySupport;
151 import org.simantics.db.service.TransactionSupport;
152 import org.simantics.db.service.TransferableGraphSupport;
153 import org.simantics.db.service.UndoRedoSupport;
154 import org.simantics.db.service.VirtualGraphSupport;
155 import org.simantics.db.service.XSupport;
156 import org.simantics.layer0.Layer0;
157 import org.simantics.utils.DataContainer;
158 import org.simantics.utils.Development;
159 import org.simantics.utils.datastructures.Callback;
160 import org.simantics.utils.threads.logger.ITask;
161 import org.simantics.utils.threads.logger.ThreadLogger;
162
163 import gnu.trove.procedure.TLongProcedure;
164 import gnu.trove.set.hash.TLongHashSet;
165
166
167 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
168
169     protected static final boolean DEBUG        = false;
170
171     private static final boolean DIAGNOSTICS       = false;
172
173     private TransactionPolicySupport transactionPolicy;
174
175     final private ClusterControl clusterControl;
176
177     final protected BuiltinSupportImpl builtinSupport;
178     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
179     final protected ClusterSetsSupport clusterSetsSupport;
180     final protected LifecycleSupportImpl lifecycleSupport;
181
182     protected QuerySupportImpl querySupport;
183     protected ResourceSupportImpl resourceSupport;
184     protected WriteSupport writeSupport;
185     public ClusterTranslator clusterTranslator;
186
187     boolean dirtyPrimitives = false;
188
189     public static final int SERVICE_MODE_CREATE = 2;
190     public static final int SERVICE_MODE_ALLOW = 1;
191     public int serviceMode = 0;
192     public boolean createdImmutableClusters = false;
193     public TLongHashSet createdClusters = new TLongHashSet();
194
195     private Layer0 L0;
196
197     /**
198      * The service locator maintained by the workbench. These services are
199      * initialized during workbench during the <code>init</code> method.
200      */
201     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
202
203     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
204
205     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
206
207     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
208
209     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
210
211     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
212
213     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
214
215     final protected State                                        state              = new State();
216
217     protected GraphSession                                       graphSession       = null;
218
219     protected SessionManager                                     sessionManagerImpl = null;
220
221     protected UserAuthenticationAgent                            authAgent          = null;
222
223     protected UserAuthenticator                                  authenticator      = null;
224
225     protected Resource                                           user               = null;
226
227     protected ClusterStream                                      clusterStream      = null;
228
229     protected SessionRequestManager                         requestManager = null;
230
231     public ClusterTable                                       clusterTable       = null;
232
233     public QueryProcessor                                     queryProvider2 = null;
234
235     ClientChangesImpl                                  clientChanges      = null;
236
237     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
238
239 //    protected long                                               newClusterId       = Constants.NullClusterId;
240
241     protected int                                                flushCounter       = 0;
242
243     protected boolean                                            writeOnly          = false;
244
245     WriteState<?>                                                writeState = null;
246     WriteStateBase<?>                                            delayedWriteState = null;
247     protected Resource defaultClusterSet = null; // If not null then used for newResource().
248
249     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
250
251 //        if (authAgent == null)
252 //            throw new IllegalArgumentException("null authentication agent");
253
254         File t = StaticSessionProperties.virtualGraphStoragePath;
255         if (null == t)
256             t = new File(".");
257         this.clusterTable = new ClusterTable(this, t);
258         this.builtinSupport = new BuiltinSupportImpl(this);
259         this.sessionManagerImpl = sessionManagerImpl;
260         this.user = null;
261 //        this.authAgent = authAgent;
262
263         serviceLocator.registerService(Session.class, this);
264         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
265         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
266         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
267         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
268         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
269         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
270         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
271         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
272         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
273         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
274         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
275         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
276         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
277         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
278         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
279         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
280         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
281         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
282         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
283         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
284         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
285         ServiceActivityUpdaterForWriteTransactions.register(this);
286
287         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
288         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
289         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
290         this.lifecycleSupport = new LifecycleSupportImpl(this);
291         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
292         this.transactionPolicy = new TransactionPolicyRelease();
293         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
294         this.clusterControl = new ClusterControlImpl(this);
295         serviceLocator.registerService(ClusterControl.class, clusterControl);
296         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
297         this.clusterSetsSupport.setReadDirectory(t.toPath());
298         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
299         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
300     }
301
302     /*
303      *
304      * Session interface
305      */
306
307
308     @Override
309     public Resource getRootLibrary() {
310         return queryProvider2.getRootLibraryResource();
311     }
312
313     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
314         if (!graphSession.dbSession.refreshEnabled())
315             return;
316         try {
317             getClusterTable().refresh(csid, this, clusterUID);
318         } catch (Throwable t) {
319             Logger.defaultLogError("Refesh failed.", t);
320         }
321     }
322
323     static final class TaskHelper {
324         private final String name;
325         private Object result;
326         final Semaphore sema = new Semaphore(0);
327         private Throwable throwable = null;
328         final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
329             @Override
330             public void run(DatabaseException e) {
331                 synchronized (TaskHelper.this) {
332                     throwable = e;
333                 }
334             }
335         };
336         final Procedure<Object> proc = new Procedure<Object>() {
337             @Override
338             public void execute(Object result) {
339                 callback.run(null);
340             }
341             @Override
342             public void exception(Throwable t) {
343                 if (t instanceof DatabaseException)
344                     callback.run((DatabaseException)t);
345                 else
346                     callback.run(new DatabaseException("" + name + "operation failed.", t));
347             }
348         };
349         final WriteTraits writeTraits = new WriteTraits() {
350             @Override
351             public UndoTraits getUndoTraits() {
352                 return null;
353             }
354             @Override
355             public VirtualGraph getProvider() {
356                 return null;
357             }
358         };
359         TaskHelper(String name) {
360             this.name = name;
361         }
362         <T> T getResult() {
363             return (T)result;
364         }
365         void setResult(Object result) {
366             this.result = result;
367         }
368 //        Throwable throwableGet() {
369 //            return throwable;
370 //        }
371         synchronized void throwableSet(Throwable t) {
372             throwable = t;
373         }
374 //        void throwableSet(String t) {
375 //            throwable = new InternalException("" + name + " operation failed. " + t);
376 //        }
377         synchronized void throwableCheck()
378         throws DatabaseException {
379             if (null != throwable)
380                 if (throwable instanceof DatabaseException)
381                     throw (DatabaseException)throwable;
382                 else
383                     throw new DatabaseException("Undo operation failed.", throwable);
384         }
385         void throw_(String message)
386         throws DatabaseException {
387             throw new DatabaseException("" + name + " operation failed. " + message);
388         }
389     }
390
391
392
393     private ListenerBase getListenerBase(Object procedure) {
394         if (procedure instanceof ListenerBase)
395             return (ListenerBase) procedure;
396         else
397             return null;
398     }
399
400     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
401         scheduleRequest(request, callback, notify, null);
402     }
403
404     /* (non-Javadoc)
405      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
406      */
407     @Override
408     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
409
410         assert (request != null);
411
412         if(Development.DEVELOPMENT) {
413             try {
414             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
415                 System.err.println("schedule write '" + request + "'");
416             } catch (Throwable t) {
417                 Logger.defaultLogError(t);
418             }
419         }
420
421         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
422
423         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
424
425             @Override
426             public void run(int thread) {
427
428                 if(Development.DEVELOPMENT) {
429                     try {
430                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
431                         System.err.println("perform write '" + request + "'");
432                     } catch (Throwable t) {
433                         Logger.defaultLogError(t);
434                     }
435                 }
436
437                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
438
439                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
440
441                 try {
442
443                     flushCounter = 0;
444                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
445
446                     VirtualGraph vg = getProvider(request.getProvider());
447
448                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
449                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
450
451                         @Override
452                         public void execute(Object result) {
453                             if(callback != null) callback.run(null);
454                         }
455
456                         @Override
457                         public void exception(Throwable t) {
458                             if(callback != null) callback.run((DatabaseException)t);
459                         }
460
461                     });
462
463                     assert (null != writer);
464 //                    writer.state.barrier.inc();
465
466                     try {
467                         request.perform(writer);
468                         assert (null != writer);
469                     } catch (Throwable t) {
470                         if (!(t instanceof CancelTransactionException))
471                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
472                         writeState.except(t);
473                     } finally {
474 //                        writer.state.barrier.dec();
475 //                        writer.waitAsync(request);
476                     }
477
478
479                     assert(!queryProvider2.dirty);
480
481                 } catch (Throwable e) {
482
483                     // Log it first, just to be safe that the error is always logged.
484                     if (!(e instanceof CancelTransactionException))
485                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
486
487 //                    writeState.getGraph().state.barrier.dec();
488 //                    writeState.getGraph().waitAsync(request);
489
490                     writeState.except(e);
491
492 //                    try {
493 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
494 //                        // All we can do here is to log those, can't really pass them anywhere.
495 //                        if (callback != null) {
496 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
497 //                            else callback.run(new DatabaseException(e));
498 //                        }
499 //                    } catch (Throwable e2) {
500 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
501 //                    }
502 //                    boolean empty = clusterStream.reallyFlush();
503 //                    int callerThread = -1;
504 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
505 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
506 //                    try {
507 //                        // Send all local changes to server so it can calculate correct reverse change set.
508 //                        // This will call clusterStream.accept().
509 //                        state.cancelCommit(context, clusterStream);
510 //                        if (!empty) {
511 //                            if (!context.isOk()) // this is a blocking operation
512 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
513 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
514 //                        }
515 //                        state.cancelCommit2(context, clusterStream);
516 //                    } catch (DatabaseException e3) {
517 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
518 //                    } finally {
519 //                        clusterStream.setOff(false);
520 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
521 //                    }
522
523                 } finally {
524
525                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
526
527                 }
528
529                 task.finish();
530
531                 if(Development.DEVELOPMENT) {
532                     try {
533                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
534                         System.err.println("finish write '" + request + "'");
535                     } catch (Throwable t) {
536                         Logger.defaultLogError(t);
537                     }
538                 }
539
540             }
541
542         }, combine);
543
544     }
545
546     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
547         scheduleRequest(request, procedure, notify, null);
548     }
549
550     /* (non-Javadoc)
551      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
552      */
553     @Override
554     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
555
556         assert (request != null);
557
558         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
559
560         requestManager.scheduleWrite(new SessionTask(request, thread) {
561
562             @Override
563             public void run(int thread) {
564
565                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
566
567                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
568
569                 flushCounter = 0;
570                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
571
572                 VirtualGraph vg = getProvider(request.getProvider());
573                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
574
575                 try {
576                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
577                     writeState = writeStateT;
578
579                     assert (null != writer);
580 //                    writer.state.barrier.inc();
581                     writeStateT.setResult(request.perform(writer));
582                     assert (null != writer);
583
584 //                    writer.state.barrier.dec();
585 //                    writer.waitAsync(null);
586
587                 } catch (Throwable e) {
588
589 //                    writer.state.barrier.dec();
590 //                    writer.waitAsync(null);
591
592                     writeState.except(e);
593
594 //                  state.stopWriteTransaction(clusterStream);
595 //
596 //              } catch (Throwable e) {
597 //                  // Log it first, just to be safe that the error is always logged.
598 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
599 //
600 //                  try {
601 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
602 //                      // All we can do here is to log those, can't really pass them anywhere.
603 //                      if (procedure != null) {
604 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
605 //                          else procedure.exception(new DatabaseException(e));
606 //                      }
607 //                  } catch (Throwable e2) {
608 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
609 //                  }
610 //
611 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
612 //
613 //                  state.stopWriteTransaction(clusterStream);
614
615                 } finally {
616                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
617                 }
618
619 //              if(notify != null) notify.release();
620
621                 task.finish();
622
623             }
624
625         }, combine);
626
627     }
628
629     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
630         scheduleRequest(request, callback, notify, null);
631     }
632
633     /* (non-Javadoc)
634      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
635      */
636     @Override
637     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
638
639         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
640
641         assert (request != null);
642
643         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
644
645         requestManager.scheduleWrite(new SessionTask(request, thread) {
646
647             @Override
648             public void run(int thread) {
649                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
650
651                 Procedure<Object> stateProcedure = new Procedure<Object>() {
652                     @Override
653                     public void execute(Object result) {
654                         if (callback != null)
655                             callback.run(null);
656                     }
657                     @Override
658                     public void exception(Throwable t) {
659                         if (callback != null) {
660                             if (t instanceof DatabaseException) callback.run((DatabaseException) t);
661                             else callback.run(new DatabaseException(t));
662                         } else
663                             Logger.defaultLogError("Unhandled exception", t);
664                     }
665                 };
666
667                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
668                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
669                 DelayedWriteGraph dwg = null;
670 //                newGraph.state.barrier.inc();
671
672                 try {
673                     dwg = new DelayedWriteGraph(newGraph);
674                     request.perform(dwg);
675                 } catch (Throwable e) {
676                     delayedWriteState.except(e);
677                     total.finish();
678                     dwg.close();
679                     return;
680                 } finally {
681 //                    newGraph.state.barrier.dec();
682 //                    newGraph.waitAsync(request);
683                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
684                 }
685
686                 delayedWriteState = null;
687
688                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
689                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
690
691                 flushCounter = 0;
692                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
693
694                 acquireWriteOnly();
695
696                 VirtualGraph vg = getProvider(request.getProvider());
697                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
698
699                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
700
701                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
702
703                 assert (null != writer);
704 //                writer.state.barrier.inc();
705
706                 try {
707                     // Cannot cancel
708                     dwg.commit(writer, request);
709
710                         if(defaultClusterSet != null) {
711                                 XSupport xs = getService(XSupport.class);
712                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
713                                 ClusteringSupport cs = getService(ClusteringSupport.class);
714                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
715                         }
716
717                     // This makes clusters available from server
718                     clusterStream.reallyFlush();
719
720                     releaseWriteOnly(writer);
721                     handleUpdatesAndMetadata(writer);
722
723                 } catch (ServiceException e) {
724 //                    writer.state.barrier.dec();
725 //                    writer.waitAsync(null);
726
727                     // These shall be requested from server
728                     clusterTable.removeWriteOnlyClusters();
729                     // This makes clusters available from server
730                     clusterStream.reallyFlush();
731
732                     releaseWriteOnly(writer);
733                     writeState.except(e);
734                 } finally {
735                     // Debugging & Profiling
736                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
737                     task2.finish();
738                     total.finish();
739                 }
740             }
741
742         }, combine);
743
744     }
745
746     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
747         scheduleRequest(request, procedure, notify, null);
748     }
749
750     /* (non-Javadoc)
751      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
752      */
753     @Override
754     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
755         throw new Error("Not implemented");
756     }
757
758     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
759         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
760         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
761             createdClusters.add(cluster.clusterId);
762         }
763         return cluster;
764     }
765
766     class WriteOnlySupport implements WriteSupport {
767
768         ClusterStream stream;
769         ClusterImpl currentCluster;
770
771         public WriteOnlySupport() {
772             this.stream = clusterStream;
773         }
774
775         @Override
776         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
777             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
778         }
779
780         @Override
781         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
782
783             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
784
785             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
786                 if(s != queryProvider2.getRootLibrary())
787                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
788
789             try {
790                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
791             } catch (DatabaseException e) {
792                 Logger.defaultLogError(e);
793                 //throw e;
794             }
795
796             clientChanges.invalidate(s);
797
798             if (cluster.isWriteOnly())
799                 return;
800             queryProvider2.updateStatements(s, p);
801
802         }
803
804         @Override
805         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
806             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
807         }
808
809         @Override
810         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
811
812             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
813             try {
814                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
815             } catch (DatabaseException e) {
816                 Logger.defaultLogError(e);
817             }
818
819             clientChanges.invalidate(rid);
820
821             if (cluster.isWriteOnly())
822                 return;
823             queryProvider2.updateValue(rid);
824
825         }
826
827         @Override
828         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
829
830             if(amount < 65536) {
831                 claimValue(provider,resource, reader.readBytes(null, amount));
832                 return;
833             }
834
835             byte[] bytes = new byte[65536];
836
837             int rid = ((ResourceImpl)resource).id;
838             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
839             try {
840                 int left = amount;
841                 while(left > 0) {
842                     int block = Math.min(left, 65536);
843                     reader.readBytes(bytes, block);
844                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
845                     left -= block;
846                 }
847             } catch (DatabaseException e) {
848                 Logger.defaultLogError(e);
849             }
850
851             clientChanges.invalidate(rid);
852
853             if (cluster.isWriteOnly())
854                 return;
855             queryProvider2.updateValue(rid);
856
857         }
858
859         private void maintainCluster(ClusterImpl before, ClusterI after_) {
860             if(after_ != null && after_ != before) {
861                 ClusterImpl after = (ClusterImpl)after_;
862                 if(currentCluster == before) currentCluster = after;
863                 clusterTable.replaceCluster(after);
864             }
865         }
866
867         public int createResourceKey(int foreignCounter) throws DatabaseException {
868             if(currentCluster == null)
869                 currentCluster = getNewResourceCluster();
870             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
871                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
872                 newCluster.foreignLookup = new byte[foreignCounter];
873                 currentCluster = newCluster;
874                 if (DEBUG)
875                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
876             }
877             return currentCluster.createResource(clusterTranslator);
878         }
879
880         @Override
881         public Resource createResource(VirtualGraph provider) throws DatabaseException {
882             if(currentCluster == null) {
883                 if (null != defaultClusterSet) {
884                         ResourceImpl result = getNewResource(defaultClusterSet);
885                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
886                     return result;
887                 } else {
888                     currentCluster = getNewResourceCluster();
889                 }
890             }
891             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
892                 if (null != defaultClusterSet) {
893                         ResourceImpl result = getNewResource(defaultClusterSet);
894                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
895                     return result;
896                 } else {
897                     currentCluster = getNewResourceCluster();
898                 }
899             }
900             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
901         }
902
903         @Override
904         public Resource createResource(VirtualGraph provider, long clusterId)
905         throws DatabaseException {
906             return getNewResource(clusterId);
907         }
908
909         @Override
910         public Resource createResource(VirtualGraph provider, Resource clusterSet)
911         throws DatabaseException {
912             return getNewResource(clusterSet);
913         }
914
915         @Override
916         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
917                 throws DatabaseException {
918             getNewClusterSet(clusterSet);
919         }
920
921         @Override
922         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
923         throws ServiceException {
924             return containsClusterSet(clusterSet);
925         }
926
927         public void selectCluster(long cluster) {
928                 currentCluster = clusterTable.getClusterByClusterId(cluster);
929                 long setResourceId = clusterSetsSupport.getSet(cluster);
930                 clusterSetsSupport.put(setResourceId, cluster);
931         }
932
933         @Override
934         public Resource setDefaultClusterSet(Resource clusterSet)
935         throws ServiceException {
936                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
937                 if(clusterSet != null) {
938                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
939                         currentCluster = clusterTable.getClusterByClusterId(id);
940                         return result;
941                 } else {
942                         currentCluster = null;
943                         return null;
944                 }
945         }
946
947         @Override
948         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
949
950                  provider = getProvider(provider);
951              if (null == provider) {
952                  int key = ((ResourceImpl)resource).id;
953
954
955
956                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
957 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
958 //                      if(key != queryProvider2.getRootLibrary())
959 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
960
961 //                 try {
962 //                     cluster.removeValue(key, clusterTranslator);
963 //                 } catch (DatabaseException e) {
964 //                     Logger.defaultLogError(e);
965 //                     return;
966 //                 }
967
968                  clusterTable.writeOnlyInvalidate(cluster);
969
970                  try {
971                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
972                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
973                          clusterTranslator.removeValue(cluster);
974                  } catch (DatabaseException e) {
975                          Logger.defaultLogError(e);
976                  }
977
978                  queryProvider2.invalidateResource(key);
979                  clientChanges.invalidate(key);
980
981              } else {
982                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
983                  queryProvider2.updateValue(querySupport.getId(resource));
984                  clientChanges.claimValue(resource);
985              }
986
987
988         }
989
990         @Override
991         public void flush(boolean intermediate) {
992             throw new UnsupportedOperationException();
993         }
994
995         @Override
996         public void flushCluster() {
997             clusterTable.flushCluster(graphSession);
998             if(defaultClusterSet != null) {
999                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
1000             }
1001             currentCluster = null;
1002         }
1003
1004         @Override
1005         public void flushCluster(Resource r) {
1006             throw new UnsupportedOperationException("flushCluster resource " + r);
1007         }
1008
1009         @Override
1010         public void gc() {
1011         }
1012
1013         @Override
1014         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1015                 Resource object) {
1016
1017                 int s = ((ResourceImpl)subject).id;
1018                 int p = ((ResourceImpl)predicate).id;
1019                 int o = ((ResourceImpl)object).id;
1020
1021                 provider = getProvider(provider);
1022                 if (null == provider) {
1023
1024                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1025                         clusterTable.writeOnlyInvalidate(cluster);
1026
1027                         try {
1028
1029                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1030                                 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1031                                 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1032
1033                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1034                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1035
1036                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1037                                 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1038                                 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1039                                 clusterTranslator.removeStatement(cluster);
1040
1041                                 queryProvider2.invalidateResource(s);
1042                                 clientChanges.invalidate(s);
1043
1044                         } catch (DatabaseException e) {
1045
1046                                 Logger.defaultLogError(e);
1047
1048                         }
1049
1050                         return true;
1051
1052                 } else {
1053                         
1054                         ((VirtualGraphImpl)provider).deny(s, p, o);
1055                         queryProvider2.invalidateResource(s);
1056                         clientChanges.invalidate(s);
1057
1058                         return true;
1059
1060                 }
1061
1062         }
1063
1064         @Override
1065         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1066             throw new UnsupportedOperationException();
1067         }
1068
1069         @Override
1070         public boolean writeOnly() {
1071             return true;
1072         }
1073
1074         @Override
1075         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1076             writeSupport.performWriteRequest(graph, request);
1077         }
1078
1079         @Override
1080         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1081             throw new UnsupportedOperationException();
1082         }
1083
1084         @Override
1085         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1086             throw new UnsupportedOperationException();
1087         }
1088
1089         @Override
1090         public <T> void addMetadata(Metadata data) throws ServiceException {
1091             writeSupport.addMetadata(data);
1092         }
1093
1094         @Override
1095         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1096             return writeSupport.getMetadata(clazz);
1097         }
1098
1099         @Override
1100         public TreeMap<String, byte[]> getMetadata() {
1101             return writeSupport.getMetadata();
1102         }
1103
1104         @Override
1105         public void commitDone(WriteTraits writeTraits, long csid) {
1106             writeSupport.commitDone(writeTraits, csid);
1107         }
1108
1109         @Override
1110         public void clearUndoList(WriteTraits writeTraits) {
1111             writeSupport.clearUndoList(writeTraits);
1112         }
1113         @Override
1114         public int clearMetadata() {
1115             return writeSupport.clearMetadata();
1116         }
1117
1118                 @Override
1119                 public void startUndo() {
1120                         writeSupport.startUndo();
1121                 }
1122     }
1123
1124     class VirtualWriteOnlySupport implements WriteSupport {
1125
1126 //        @Override
1127 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1128 //                Resource object) {
1129 //            throw new UnsupportedOperationException();
1130 //        }
1131
1132         @Override
1133         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1134
1135             TransientGraph impl = (TransientGraph)provider;
1136             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1137             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1138             clientChanges.claim(subject, predicate, object);
1139
1140         }
1141
1142         @Override
1143         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1144
1145             TransientGraph impl = (TransientGraph)provider;
1146             impl.claim(subject, predicate, object);
1147             getQueryProvider2().updateStatements(subject, predicate);
1148             clientChanges.claim(subject, predicate, object);
1149
1150         }
1151
1152         @Override
1153         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1154             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1155         }
1156
1157         @Override
1158         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1159             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1160             getQueryProvider2().updateValue(resource);
1161             clientChanges.claimValue(resource);
1162         }
1163
1164         @Override
1165         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1166             byte[] value = reader.readBytes(null, amount);
1167             claimValue(provider, resource, value);
1168         }
1169
1170         @Override
1171         public Resource createResource(VirtualGraph provider) {
1172             TransientGraph impl = (TransientGraph)provider;
1173             return impl.getResource(impl.newResource(false));
1174         }
1175
1176         @Override
1177         public Resource createResource(VirtualGraph provider, long clusterId) {
1178             throw new UnsupportedOperationException();
1179         }
1180
1181         @Override
1182         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1183         throws DatabaseException {
1184             throw new UnsupportedOperationException();
1185         }
1186
1187         @Override
1188         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1189         throws DatabaseException {
1190             throw new UnsupportedOperationException();
1191         }
1192
1193         @Override
1194         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1195         throws ServiceException {
1196             throw new UnsupportedOperationException();
1197         }
1198
1199         @Override
1200         public Resource setDefaultClusterSet(Resource clusterSet)
1201         throws ServiceException {
1202                 return null;
1203         }
1204
1205         @Override
1206         public void denyValue(VirtualGraph provider, Resource resource) {
1207             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1208             getQueryProvider2().updateValue(querySupport.getId(resource));
1209             // NOTE: this only keeps track of value changes by-resource.
1210             clientChanges.claimValue(resource);
1211         }
1212
1213         @Override
1214         public void flush(boolean intermediate) {
1215             throw new UnsupportedOperationException();
1216         }
1217
1218         @Override
1219         public void flushCluster() {
1220             throw new UnsupportedOperationException();
1221         }
1222
1223         @Override
1224         public void flushCluster(Resource r) {
1225             throw new UnsupportedOperationException("Resource " + r);
1226         }
1227
1228         @Override
1229         public void gc() {
1230         }
1231
1232         @Override
1233         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1234             TransientGraph impl = (TransientGraph) provider;
1235             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1236             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1237             clientChanges.deny(subject, predicate, object);
1238             return true;
1239         }
1240
1241         @Override
1242         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1243             throw new UnsupportedOperationException();
1244         }
1245
1246         @Override
1247         public boolean writeOnly() {
1248             return true;
1249         }
1250
1251         @Override
1252         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1253             throw new UnsupportedOperationException();
1254         }
1255
1256         @Override
1257         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1258             throw new UnsupportedOperationException();
1259         }
1260
1261         @Override
1262         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1263             throw new UnsupportedOperationException();
1264         }
1265
1266         @Override
1267         public <T> void addMetadata(Metadata data) throws ServiceException {
1268             throw new UnsupportedOperationException();
1269         }
1270
1271         @Override
1272         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1273             throw new UnsupportedOperationException();
1274         }
1275
1276         @Override
1277         public TreeMap<String, byte[]> getMetadata() {
1278             throw new UnsupportedOperationException();
1279         }
1280
1281         @Override
1282         public void commitDone(WriteTraits writeTraits, long csid) {
1283         }
1284
1285         @Override
1286         public void clearUndoList(WriteTraits writeTraits) {
1287         }
1288
1289         @Override
1290         public int clearMetadata() {
1291             return 0;
1292         }
1293
1294                 @Override
1295                 public void startUndo() {
1296                 }
1297     }
1298
1299     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1300
1301         try {
1302
1303             int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1304
1305             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1306
1307             flushCounter = 0;
1308             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1309
1310             acquireWriteOnly();
1311
1312             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1313
1314             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1315
1316             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1317             writeState = writeStateT;
1318
1319             assert (null != writer);
1320 //            writer.state.barrier.inc();
1321             long start = System.nanoTime();
1322             T result = request.perform(writer);
1323             long duration = System.nanoTime() - start;
1324             if (DEBUG) {
1325                 System.err.println("################");
1326                 System.err.println("WriteOnly duration " + 1e-9*duration);
1327             }
1328             writeStateT.setResult(result);
1329
1330             // This makes clusters available from server
1331             clusterStream.reallyFlush();
1332
1333             // This will trigger query updates
1334             releaseWriteOnly(writer);
1335             assert (null != writer);
1336
1337             handleUpdatesAndMetadata(writer);
1338
1339         } catch (CancelTransactionException e) {
1340
1341             releaseWriteOnly(writeState.getGraph());
1342
1343             clusterTable.removeWriteOnlyClusters();
1344             state.stopWriteTransaction(clusterStream);
1345
1346         } catch (Throwable e) {
1347
1348             e.printStackTrace();
1349
1350             releaseWriteOnly(writeState.getGraph());
1351
1352             clusterTable.removeWriteOnlyClusters();
1353
1354             if (callback != null)
1355                 callback.exception(new DatabaseException(e));
1356
1357             state.stopWriteTransaction(clusterStream);
1358             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1359
1360         } finally  {
1361
1362             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1363
1364         }
1365
1366     }
1367
1368     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
1369         scheduleRequest(request, callback, notify, null);
1370     }
1371
1372     @Override
1373     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1374
1375         assertAlive();
1376
1377         assert (request != null);
1378
1379         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1380
1381         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1382
1383             @Override
1384             public void run(int thread) {
1385
1386                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1387
1388                     try {
1389
1390                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1391
1392                         flushCounter = 0;
1393                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1394
1395                         acquireWriteOnly();
1396
1397                         VirtualGraph vg = getProvider(request.getProvider());
1398                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1399
1400                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1401
1402                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1403
1404                             @Override
1405                             public void execute(Object result) {
1406                                 if(callback != null) callback.run(null);
1407                             }
1408
1409                             @Override
1410                             public void exception(Throwable t) {
1411                                 if(callback != null) callback.run((DatabaseException)t);
1412                             }
1413
1414                         });
1415
1416                         assert (null != writer);
1417 //                        writer.state.barrier.inc();
1418
1419                         try {
1420
1421                             request.perform(writer);
1422
1423                         } catch (Throwable e) {
1424
1425 //                            writer.state.barrier.dec();
1426 //                            writer.waitAsync(null);
1427
1428                             releaseWriteOnly(writer);
1429
1430                             clusterTable.removeWriteOnlyClusters();
1431
1432                             if(!(e instanceof CancelTransactionException)) {
1433                                 if (callback != null)
1434                                     callback.run(new DatabaseException(e));
1435                             }
1436
1437                             writeState.except(e);
1438
1439                             return;
1440
1441                         }
1442
1443                         // This makes clusters available from server
1444                         boolean empty = clusterStream.reallyFlush();
1445                         // This was needed to make WO requests call metadata listeners.
1446                         // NOTE: the calling event does not contain clientChanges information.
1447                         if (!empty && clientChanges.isEmpty())
1448                             clientChanges.setNotEmpty(true);
1449                         releaseWriteOnly(writer);
1450                         assert (null != writer);
1451                     } finally  {
1452
1453                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1454
1455                     }
1456
1457
1458                 task.finish();
1459
1460             }
1461
1462         }, combine);
1463
1464     }
1465
1466     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1467         scheduleRequest(request, callback, notify, null);
1468     }
1469
1470     /* (non-Javadoc)
1471      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1472      */
1473     @Override
1474     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1475
1476         assert (request != null);
1477
1478         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1479
1480         requestManager.scheduleWrite(new SessionTask(request, thread) {
1481
1482             @Override
1483             public void run(int thread) {
1484
1485                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1486
1487                 performWriteOnly(request, notify, callback);
1488
1489                 task.finish();
1490
1491             }
1492
1493         }, combine);
1494
1495     }
1496
1497     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1498
1499         assert (request != null);
1500         assert (procedure != null);
1501
1502         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1503
1504         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1505
1506             @Override
1507             public void run(int thread) {
1508
1509                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1510
1511                 ListenerBase listener = getListenerBase(procedure);
1512
1513                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1514
1515                 try {
1516
1517                     if (listener != null) {
1518
1519                         try {
1520                                 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1521
1522                                         @Override
1523                                         public void exception(AsyncReadGraph graph, Throwable t) {
1524                                                 procedure.exception(graph, t);
1525                                                 if(throwable != null) {
1526                                                         throwable.set(t);
1527                                                 } else {
1528                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1529                                                 }
1530                                         }
1531
1532                                         @Override
1533                                         public void execute(AsyncReadGraph graph, T t) {
1534                                                 if(result != null) result.set(t);
1535                                                 procedure.execute(graph, t);
1536                                         }
1537
1538                                 }, listener);
1539                         } catch (Throwable t) {
1540                             // This is handled by the AsyncProcedure
1541                                 //Logger.defaultLogError("Internal error", t);
1542                         }
1543
1544                     } else {
1545
1546                         try {
1547
1548 //                            newGraph.state.barrier.inc();
1549
1550                             T t = request.perform(newGraph);
1551
1552                             try {
1553
1554                                 if(result != null) result.set(t);
1555                                 procedure.execute(newGraph, t);
1556
1557                             } catch (Throwable th) {
1558
1559                                 if(throwable != null) {
1560                                     throwable.set(th);
1561                                 } else {
1562                                     Logger.defaultLogError("Unhandled exception", th);
1563                                 }
1564
1565                             }
1566
1567                         } catch (Throwable t) {
1568
1569                             if (DEBUG)
1570                                 t.printStackTrace();
1571
1572                             if(throwable != null) {
1573                                 throwable.set(t);
1574                             } else {
1575                                 Logger.defaultLogError("Unhandled exception", t);
1576                             }
1577
1578                             try {
1579
1580                                 procedure.exception(newGraph, t);
1581
1582                             } catch (Throwable t2) {
1583
1584                                 if(throwable != null) {
1585                                     throwable.set(t2);
1586                                 } else {
1587                                     Logger.defaultLogError("Unhandled exception", t2);
1588                                 }
1589
1590                             }
1591
1592                         }
1593
1594 //                        newGraph.state.barrier.dec();
1595 //                        newGraph.waitAsync(request);
1596
1597                     }
1598
1599                 } finally {
1600
1601                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1602
1603                 }
1604
1605             }
1606
1607         });
1608
1609     }
1610
1611     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1612
1613         assert (request != null);
1614         assert (procedure != null);
1615
1616         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1617
1618         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1619
1620             @Override
1621             public void run(int thread) {
1622
1623                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1624
1625                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1626
1627                 try {
1628
1629                     if (listener != null) {
1630
1631                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1632
1633 //                        newGraph.waitAsync(request);
1634
1635                     } else {
1636
1637                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1638                                 procedure, "request");
1639
1640                         try {
1641
1642 //                            newGraph.state.barrier.inc();
1643
1644                             request.perform(newGraph, wrapper);
1645
1646 //                            newGraph.waitAsync(request);
1647
1648                         } catch (Throwable t) {
1649
1650                             wrapper.exception(newGraph, t);
1651 //                            newGraph.waitAsync(request);
1652
1653
1654                         }
1655
1656                     }
1657
1658                 } finally {
1659
1660                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1661
1662                 }
1663
1664             }
1665
1666         });
1667
1668     }
1669
1670     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1671
1672         assert (request != null);
1673         assert (procedure != null);
1674
1675         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1676
1677         int sync = notify != null ? thread : -1;
1678
1679         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1680
1681             @Override
1682             public void run(int thread) {
1683
1684                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1685
1686                 ListenerBase listener = getListenerBase(procedure);
1687
1688                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1689
1690                 try {
1691
1692                     if (listener != null) {
1693
1694                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1695
1696 //                        newGraph.waitAsync(request);
1697
1698                     } else {
1699
1700                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1701
1702                         try {
1703
1704                             request.perform(newGraph, wrapper);
1705
1706                         } catch (Throwable t) {
1707
1708                             t.printStackTrace();
1709
1710                         }
1711
1712                     }
1713
1714                 } finally {
1715
1716                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1717
1718                 }
1719
1720             }
1721
1722         });
1723
1724     }
1725
1726     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1727
1728         assert (request != null);
1729         assert (procedure != null);
1730
1731         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1732
1733         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1734
1735             @Override
1736             public void run(int thread) {
1737
1738                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1739
1740                 ListenerBase listener = getListenerBase(procedure);
1741
1742                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1743
1744                 try {
1745
1746                     if (listener != null) {
1747
1748                         newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1749
1750                             @Override
1751                             public void exception(Throwable t) {
1752                                 procedure.exception(t);
1753                                 if(throwable != null) {
1754                                     throwable.set(t);
1755                                 }
1756                             }
1757
1758                             @Override
1759                             public void execute(T t) {
1760                                 if(result != null) result.set(t);
1761                                 procedure.execute(t);
1762                             }
1763
1764                         }, listener);
1765
1766 //                        newGraph.waitAsync(request);
1767
1768                     } else {
1769
1770 //                        newGraph.state.barrier.inc();
1771
1772                         request.register(newGraph, new Listener<T>() {
1773
1774                             @Override
1775                             public void exception(Throwable t) {
1776                                 if(throwable != null) throwable.set(t);
1777                                 procedure.exception(t);
1778 //                                newGraph.state.barrier.dec();
1779                             }
1780
1781                             @Override
1782                             public void execute(T t) {
1783                                 if(result != null) result.set(t);
1784                                 procedure.execute(t);
1785 //                                newGraph.state.barrier.dec();
1786                             }
1787
1788                             @Override
1789                             public boolean isDisposed() {
1790                                 return true;
1791                             }
1792
1793                         });
1794
1795 //                        newGraph.waitAsync(request);
1796
1797                     }
1798
1799                 } finally {
1800
1801                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1802
1803                 }
1804
1805             }
1806
1807         });
1808
1809     }
1810
1811
1812     @Override
1813     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1814
1815         scheduleRequest(request, procedure, null, null, null);
1816
1817     }
1818
1819     @Override
1820     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1821         assertNotSession();
1822         Semaphore notify = new Semaphore(0);
1823         DataContainer<Throwable> container = new DataContainer<Throwable>();
1824         DataContainer<T> result = new DataContainer<T>();
1825         scheduleRequest(request, procedure, notify, container, result);
1826         acquire(notify, request);
1827         Throwable throwable = container.get();
1828         if(throwable != null) {
1829             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1830             else throw new DatabaseException("Unexpected exception", throwable);
1831         }
1832         return result.get();
1833     }
1834
1835     @Override
1836     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1837         assertNotSession();
1838         Semaphore notify = new Semaphore(0);
1839         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1840         final DataContainer<T> resultContainer = new DataContainer<T>();
1841         scheduleRequest(request, new AsyncProcedure<T>() {
1842             @Override
1843             public void exception(AsyncReadGraph graph, Throwable throwable) {
1844                 exceptionContainer.set(throwable);
1845                 procedure.exception(graph, throwable);
1846             }
1847             @Override
1848             public void execute(AsyncReadGraph graph, T result) {
1849                 resultContainer.set(result);
1850                 procedure.execute(graph, result);
1851             }
1852         }, getListenerBase(procedure), notify);
1853         acquire(notify, request, procedure);
1854         Throwable throwable = exceptionContainer.get();
1855         if (throwable != null) {
1856             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1857             else throw new DatabaseException("Unexpected exception", throwable);
1858         }
1859         return resultContainer.get();
1860     }
1861
1862
1863     @Override
1864     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1865         assertNotSession();
1866         Semaphore notify = new Semaphore(0);
1867         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1868         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1869             @Override
1870             public void exception(AsyncReadGraph graph, Throwable throwable) {
1871                 exceptionContainer.set(throwable);
1872                 procedure.exception(graph, throwable);
1873             }
1874             @Override
1875             public void execute(AsyncReadGraph graph, T result) {
1876                 procedure.execute(graph, result);
1877             }
1878             @Override
1879             public void finished(AsyncReadGraph graph) {
1880                 procedure.finished(graph);
1881             }
1882         }, notify);
1883         acquire(notify, request, procedure);
1884         Throwable throwable = exceptionContainer.get();
1885         if (throwable != null) {
1886             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1887             else throw new DatabaseException("Unexpected exception", throwable);
1888         }
1889         // TODO: implement return value
1890         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1891         return null;
1892     }
1893
1894     @Override
1895     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1896         return syncRequest(request, new ProcedureAdapter<T>());
1897
1898
1899 //        assert(request != null);
1900 //
1901 //        final DataContainer<T> result = new DataContainer<T>();
1902 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1903 //
1904 //        syncRequest(request, new Procedure<T>() {
1905 //
1906 //            @Override
1907 //            public void execute(T t) {
1908 //                result.set(t);
1909 //            }
1910 //
1911 //            @Override
1912 //            public void exception(Throwable t) {
1913 //                exception.set(t);
1914 //            }
1915 //
1916 //        });
1917 //
1918 //        Throwable t = exception.get();
1919 //        if(t != null) {
1920 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1921 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1922 //        }
1923 //
1924 //        return result.get();
1925
1926     }
1927
1928     @Override
1929     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1930         return syncRequest(request, (Procedure<T>)procedure);
1931     }
1932
1933
1934 //    @Override
1935 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1936 //        assertNotSession();
1937 //        Semaphore notify = new Semaphore(0);
1938 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1939 //        DataContainer<T> result = new DataContainer<T>();
1940 //        scheduleRequest(request, procedure, notify, container, result);
1941 //        acquire(notify, request);
1942 //        Throwable throwable = container.get();
1943 //        if(throwable != null) {
1944 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1945 //            else throw new DatabaseException("Unexpected exception", throwable);
1946 //        }
1947 //        return result.get();
1948 //    }
1949
1950
1951     @Override
1952     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1953         assertNotSession();
1954         Semaphore notify = new Semaphore(0);
1955         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1956         final DataContainer<T> result = new DataContainer<T>();
1957         scheduleRequest(request, procedure, notify, container, result);
1958         acquire(notify, request);
1959         Throwable throwable = container.get();
1960         if (throwable != null) {
1961             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1962             else throw new DatabaseException("Unexpected exception", throwable);
1963         }
1964         return result.get();
1965     }
1966
1967     @Override
1968     public void syncRequest(Write request) throws DatabaseException  {
1969         assertNotSession();
1970         assertAlive();
1971         Semaphore notify = new Semaphore(0);
1972         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1973         scheduleRequest(request, new Callback<DatabaseException>() {
1974
1975             @Override
1976             public void run(DatabaseException e) {
1977               exception.set(e);
1978             }
1979
1980         }, notify);
1981         acquire(notify, request);
1982         if(exception.get() != null) throw exception.get();
1983     }
1984
1985     @Override
1986     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1987         assertNotSession();
1988         Semaphore notify = new Semaphore(0);
1989         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1990         final DataContainer<T> result = new DataContainer<T>();
1991         scheduleRequest(request, new Procedure<T>() {
1992
1993             @Override
1994             public void exception(Throwable t) {
1995               exception.set(t);
1996             }
1997
1998             @Override
1999             public void execute(T t) {
2000               result.set(t);
2001             }
2002
2003         }, notify);
2004         acquire(notify, request);
2005         if(exception.get() != null) {
2006             Throwable t = exception.get();
2007             if(t instanceof DatabaseException) throw (DatabaseException)t;
2008             else throw new DatabaseException(t);
2009         }
2010         return result.get();
2011     }
2012
2013     @Override
2014     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2015         assertNotSession();
2016         Semaphore notify = new Semaphore(0);
2017         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2018         final DataContainer<T> result = new DataContainer<T>();
2019         scheduleRequest(request, new Procedure<T>() {
2020
2021             @Override
2022             public void exception(Throwable t) {
2023               exception.set(t);
2024             }
2025
2026             @Override
2027             public void execute(T t) {
2028               result.set(t);
2029             }
2030
2031         }, notify);
2032         acquire(notify, request);
2033         if(exception.get() != null) {
2034             Throwable t = exception.get();
2035             if(t instanceof DatabaseException) throw (DatabaseException)t;
2036             else throw new DatabaseException(t);
2037         }
2038         return result.get();
2039     }
2040
2041     @Override
2042     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2043         assertNotSession();
2044         Semaphore notify = new Semaphore(0);
2045         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2046         final DataContainer<T> result = new DataContainer<T>();
2047         scheduleRequest(request, new Procedure<T>() {
2048
2049             @Override
2050             public void exception(Throwable t) {
2051               exception.set(t);
2052             }
2053
2054             @Override
2055             public void execute(T t) {
2056               result.set(t);
2057             }
2058
2059         }, notify);
2060         acquire(notify, request);
2061         if(exception.get() != null) {
2062             Throwable t = exception.get();
2063             if(t instanceof DatabaseException) throw (DatabaseException)t;
2064             else throw new DatabaseException(t);
2065         }
2066         return result.get();
2067     }
2068
2069     @Override
2070     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2071         assertNotSession();
2072         Semaphore notify = new Semaphore(0);
2073         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2074         scheduleRequest(request, new Callback<DatabaseException>() {
2075             @Override
2076             public void run(DatabaseException e) {
2077                 exception.set(e);
2078             }
2079         }, notify);
2080         acquire(notify, request);
2081         if(exception.get() != null) throw exception.get();
2082     }
2083
2084     @Override
2085     public void syncRequest(WriteOnly request) throws DatabaseException  {
2086         assertNotSession();
2087         assertAlive();
2088         Semaphore notify = new Semaphore(0);
2089         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2090         scheduleRequest(request, new Callback<DatabaseException>() {
2091             @Override
2092             public void run(DatabaseException e) {
2093                 exception.set(e);
2094             }
2095         }, notify);
2096         acquire(notify, request);
2097         if(exception.get() != null) throw exception.get();
2098     }
2099
2100     /*
2101      *
2102      * GraphRequestProcessor interface
2103      */
2104
2105     @Override
2106     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2107
2108         scheduleRequest(request, procedure, null, null);
2109
2110     }
2111
2112     @Override
2113     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2114
2115         scheduleRequest(request, procedure, null);
2116
2117     }
2118
2119     @Override
2120     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2121
2122         scheduleRequest(request, procedure, null, null, null);
2123
2124     }
2125
2126     @Override
2127     public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
2128
2129         scheduleRequest(request, callback, null);
2130
2131     }
2132
2133     @Override
2134     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2135
2136         scheduleRequest(request, procedure, null);
2137
2138     }
2139
2140     @Override
2141     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2142
2143         scheduleRequest(request, procedure, null);
2144
2145     }
2146
2147     @Override
2148     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2149
2150         scheduleRequest(request, procedure, null);
2151
2152     }
2153
2154     @Override
2155     public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
2156
2157         scheduleRequest(request, callback, null);
2158
2159     }
2160
2161     @Override
2162     public void asyncRequest(final Write r) {
2163         asyncRequest(r, null);
2164     }
2165
2166     @Override
2167     public void asyncRequest(final DelayedWrite r) {
2168         asyncRequest(r, null);
2169     }
2170
2171     @Override
2172     public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
2173
2174         scheduleRequest(request, callback, null);
2175
2176     }
2177
2178     @Override
2179     public void asyncRequest(final WriteOnly request) {
2180
2181         asyncRequest(request, null);
2182
2183     }
2184
2185     @Override
2186     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2187         r.request(this, procedure);
2188     }
2189
2190     @Override
2191     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2192         r.request(this, procedure);
2193     }
2194
2195     @Override
2196     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2197         r.request(this, procedure);
2198     }
2199
2200     @Override
2201     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2202         r.request(this, procedure);
2203     }
2204
2205     @Override
2206     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2207         r.request(this, procedure);
2208     }
2209
2210     @Override
2211     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2212         r.request(this, procedure);
2213     }
2214
2215     @Override
2216     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2217         return r.request(this);
2218     }
2219
2220     @Override
2221     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2222         return r.request(this);
2223     }
2224
2225     @Override
2226     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2227         r.request(this, procedure);
2228     }
2229
2230     @Override
2231     public <T> void async(WriteInterface<T> r) {
2232         r.request(this, new ProcedureAdapter<T>());
2233     }
2234
2235     //@Override
2236     public void incAsync() {
2237         state.incAsync();
2238     }
2239
2240     //@Override
2241     public void decAsync() {
2242         state.decAsync();
2243     }
2244
2245     public long getCluster(ResourceImpl resource) {
2246         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2247         return cluster.getClusterId();
2248     }
2249
2250     public long getCluster(int id) {
2251         if (clusterTable == null)
2252             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2253         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2254     }
2255
2256     public ResourceImpl getResource(int id) {
2257         return new ResourceImpl(resourceSupport, id);
2258     }
2259
2260     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2261         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2262         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2263         int key = proxy.getClusterKey();
2264         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2265         return new ResourceImpl(resourceSupport, resourceKey);
2266     }
2267
2268     public ResourceImpl getResource2(int id) {
2269         assert (id != 0);
2270         return new ResourceImpl(resourceSupport, id);
2271     }
2272
2273     final public int getId(ResourceImpl impl) {
2274         return impl.id;
2275     }
2276
2277     public static final Charset UTF8 = Charset.forName("utf-8");
2278
2279
2280
2281
2282     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2283         for(TransientGraph g : support.providers) {
2284             if(g.isPending(subject)) return false;
2285         }
2286         return true;
2287     }
2288
2289     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2290         for(TransientGraph g : support.providers) {
2291             if(g.isPending(subject, predicate)) return false;
2292         }
2293         return true;
2294     }
2295
2296     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
2297
2298         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2299
2300             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2301
2302             @Override
2303             public void run(ReadGraphImpl graph) {
2304                 if(ready.decrementAndGet() == 0) {
2305                     runnable.run(graph);
2306                 }
2307             }
2308
2309         };
2310
2311         for(TransientGraph g : support.providers) {
2312             if(g.isPending(subject)) {
2313                 try {
2314                     g.load(graph, subject, composite);
2315                 } catch (DatabaseException e) {
2316                     e.printStackTrace();
2317                 }
2318             } else {
2319                 composite.run(graph);
2320             }
2321         }
2322
2323         composite.run(graph);
2324
2325     }
2326
2327     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
2328
2329         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2330
2331             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2332
2333             @Override
2334             public void run(ReadGraphImpl graph) {
2335                 if(ready.decrementAndGet() == 0) {
2336                     runnable.run(graph);
2337                 }
2338             }
2339
2340         };
2341
2342         for(TransientGraph g : support.providers) {
2343             if(g.isPending(subject, predicate)) {
2344                 try {
2345                     g.load(graph, subject, predicate, composite);
2346                 } catch (DatabaseException e) {
2347                     e.printStackTrace();
2348                 }
2349             } else {
2350                 composite.run(graph);
2351             }
2352         }
2353
2354         composite.run(graph);
2355
2356     }
2357
2358 //    void dumpHeap() {
2359 //
2360 //        try {
2361 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2362 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2363 //                    "d:/heap" + flushCounter + ".txt", true);
2364 //        } catch (IOException e) {
2365 //            e.printStackTrace();
2366 //        }
2367 //
2368 //    }
2369
2370     void fireReactionsToSynchronize(ChangeSet cs) {
2371
2372         // Do not fire empty events
2373         if (cs.isEmpty())
2374             return;
2375
2376         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2377 //        g.state.barrier.inc();
2378
2379         try {
2380
2381             if (!cs.isEmpty()) {
2382                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2383                 for (ChangeListener l : changeListeners2) {
2384                     try {
2385                         l.graphChanged(e2);
2386                     } catch (Exception ex) {
2387                         ex.printStackTrace();
2388                     }
2389                 }
2390             }
2391
2392         } finally {
2393
2394 //            g.state.barrier.dec();
2395 //            g.waitAsync(null);
2396
2397         }
2398
2399     }
2400
2401     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2402         try {
2403
2404             // Do not fire empty events
2405             if (cs2.isEmpty())
2406                 return;
2407
2408 //            graph.restart();
2409 //            graph.state.barrier.inc();
2410
2411             try {
2412
2413                 if (!cs2.isEmpty()) {
2414
2415                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2416                     for (ChangeListener l : changeListeners2) {
2417                         try {
2418                             l.graphChanged(e2);
2419 //                            System.out.println("changelistener " + l);
2420                         } catch (Exception ex) {
2421                             ex.printStackTrace();
2422                         }
2423                     }
2424
2425                 }
2426
2427             } finally {
2428
2429 //                graph.state.barrier.dec();
2430 //                graph.waitAsync(null);
2431
2432             }
2433         } catch (Throwable t) {
2434             t.printStackTrace();
2435
2436         }
2437     }
2438     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2439
2440         try {
2441
2442             // Do not fire empty events
2443             if (cs2.isEmpty())
2444                 return;
2445
2446             // Do not fire on virtual requests
2447             if(graph.getProvider() != null)
2448                 return;
2449
2450             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2451
2452             try {
2453
2454                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2455                 for (ChangeListener l : metadataListeners) {
2456                     try {
2457                         l.graphChanged(e2);
2458                     } catch (Throwable ex) {
2459                         ex.printStackTrace();
2460                     }
2461                 }
2462
2463             } finally {
2464
2465             }
2466
2467         } catch (Throwable t) {
2468             t.printStackTrace();
2469         }
2470
2471     }
2472
2473
2474     /*
2475      * (non-Javadoc)
2476      *
2477      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2478      */
2479     @Override
2480     public <T> T getService(Class<T> api) {
2481         T t = peekService(api);
2482         if (t == null) {
2483             if (state.isClosed())
2484                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2485             throw new ServiceNotFoundException(this, api);
2486         }
2487         return t;
2488     }
2489
2490     /**
2491      * @return
2492      */
2493     protected abstract ServerInformation getCachedServerInformation();
2494
2495         private Class<?> serviceKey1 = null;
2496         private Class<?> serviceKey2 = null;
2497         private Object service1 = null;
2498         private Object service2 = null;
2499
2500     /*
2501      * (non-Javadoc)
2502      *
2503      * @see
2504      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2505      */
2506     @SuppressWarnings("unchecked")
2507     @Override
2508     public synchronized <T> T peekService(Class<T> api) {
2509
2510                 if(serviceKey1 == api) {
2511                         return (T)service1;
2512                 } else if (serviceKey2 == api) {
2513                         // Promote this key
2514                         Object result = service2;
2515                         service2 = service1;
2516                         serviceKey2 = serviceKey1;
2517                         service1 = result;
2518                         serviceKey1 = api;
2519                         return (T)result;
2520                 }
2521
2522         if (Layer0.class == api)
2523                 return (T) L0;
2524         if (ServerInformation.class == api)
2525             return (T) getCachedServerInformation();
2526         else if (WriteGraphImpl.class == api)
2527             return (T) writeState.getGraph();
2528         else if (ClusterBuilder.class == api)
2529             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2530         else if (ClusterBuilderFactory.class == api)
2531             return (T)new ClusterBuilderFactoryImpl(this);
2532
2533                 service2 = service1;
2534                 serviceKey2 = serviceKey1;
2535
2536         service1 = serviceLocator.peekService(api);
2537         serviceKey1 = api;
2538
2539         return (T)service1;
2540
2541     }
2542
2543     /*
2544      * (non-Javadoc)
2545      *
2546      * @see
2547      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2548      */
2549     @Override
2550     public boolean hasService(Class<?> api) {
2551         return serviceLocator.hasService(api);
2552     }
2553
2554     /**
2555      * @param api the api that must be implemented by the specified service
2556      * @param service the service implementation
2557      */
2558     @Override
2559     public <T> void registerService(Class<T> api, T service) {
2560         if(Layer0.class == api) {
2561                 L0 = (Layer0)service;
2562                 return;
2563         }
2564         serviceLocator.registerService(api, service);
2565         if (TransactionPolicySupport.class == api) {
2566             transactionPolicy = (TransactionPolicySupport)service;
2567             state.resetTransactionPolicy();
2568         }
2569         if (api == serviceKey1)
2570                 service1 = service;
2571         else if (api == serviceKey2)
2572                 service2 = service;
2573     }
2574
2575     // ----------------
2576     // SessionMonitor
2577     // ----------------
2578
2579     void fireSessionVariableChange(String variable) {
2580         for (MonitorHandler h : monitorHandlers) {
2581             MonitorContext ctx = monitorContexts.getLeft(h);
2582             assert ctx != null;
2583             // SafeRunner functionality repeated here to avoid dependency.
2584             try {
2585                 h.valuesChanged(ctx);
2586             } catch (Exception e) {
2587                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2588             } catch (LinkageError e) {
2589                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2590             }
2591         }
2592     }
2593
2594
2595     class ResourceSerializerImpl implements ResourceSerializer {
2596
2597         public long createRandomAccessId(int id) throws DatabaseException {
2598             if(id < 0)
2599                 return id;
2600             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2601             long cluster = getCluster(id);
2602             if (0 == cluster)
2603                 return 0; // Better to return 0 then invalid id.
2604             long result = ClusterTraitsBase.createResourceId(cluster, index);
2605             return result;
2606         }
2607
2608         @Override
2609         public long getRandomAccessId(Resource resource) throws DatabaseException {
2610
2611             ResourceImpl resourceImpl = (ResourceImpl) resource;
2612             return createRandomAccessId(resourceImpl.id);
2613
2614         }
2615
2616         @Override
2617         public int getTransientId(Resource resource) throws DatabaseException {
2618
2619             ResourceImpl resourceImpl = (ResourceImpl) resource;
2620             return resourceImpl.id;
2621
2622         }
2623
2624         @Override
2625         public String createRandomAccessId(Resource resource)
2626         throws InvalidResourceReferenceException {
2627
2628             if(resource == null) throw new IllegalArgumentException();
2629
2630             ResourceImpl resourceImpl = (ResourceImpl) resource;
2631
2632             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2633
2634             int r;
2635             try {
2636                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2637             } catch (DatabaseException e1) {
2638                 throw new InvalidResourceReferenceException(e1);
2639             }
2640             try {
2641                 // Serialize as '<resource index>_<cluster id>'
2642                 return "" + r + "_" + getCluster(resourceImpl);
2643             } catch (Throwable e) {
2644                 e.printStackTrace();
2645                 throw new InvalidResourceReferenceException(e);
2646             } finally {
2647             }
2648         }
2649
2650         public int getTransientId(long serialized) throws DatabaseException {
2651             if (serialized <= 0)
2652                 return (int)serialized;
2653             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2654             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2655             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2656             if (cluster == null)
2657                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2658             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2659             return key;
2660         }
2661
2662         @Override
2663         public Resource getResource(long randomAccessId) throws DatabaseException {
2664             return getResourceByKey(getTransientId(randomAccessId));
2665         }
2666
2667         @Override
2668         public Resource getResource(int transientId) throws DatabaseException {
2669             return getResourceByKey(transientId);
2670         }
2671
2672         @Override
2673         public Resource getResource(String randomAccessId)
2674         throws InvalidResourceReferenceException {
2675             try {
2676                 int i = randomAccessId.indexOf('_');
2677                 if (i == -1)
2678                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2679                             + randomAccessId + "'");
2680                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2681                 if(r < 0) return getResourceByKey(r);
2682                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2683                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2684                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2685                 if (cluster.hasResource(key, clusterTranslator))
2686                     return getResourceByKey(key);
2687             } catch (InvalidResourceReferenceException e) {
2688                 throw e;
2689             } catch (NumberFormatException e) {
2690                 throw new InvalidResourceReferenceException(e);
2691             } catch (Throwable e) {
2692                 e.printStackTrace();
2693                 throw new InvalidResourceReferenceException(e);
2694             } finally {
2695             }
2696             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2697         }
2698
2699         @Override
2700         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2701             try {
2702                 return true;
2703             } catch (Throwable e) {
2704                 e.printStackTrace();
2705                 throw new InvalidResourceReferenceException(e);
2706             } finally {
2707             }
2708         }
2709     }
2710
2711     // Copied from old SessionImpl
2712
2713     void check() {
2714         if (state.isClosed())
2715             throw new Error("Session closed.");
2716     }
2717
2718
2719
2720     public ResourceImpl getNewResource(long clusterId) {
2721         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2722         int newId;
2723         try {
2724             newId = cluster.createResource(clusterTranslator);
2725         } catch (DatabaseException e) {
2726             Logger.defaultLogError(e);
2727             return null;
2728         }
2729         return new ResourceImpl(resourceSupport, newId);
2730     }
2731
2732     public ResourceImpl getNewResource(Resource clusterSet)
2733     throws DatabaseException {
2734         long resourceId = clusterSet.getResourceId();
2735         Long clusterId = clusterSetsSupport.get(resourceId);
2736         if (null == clusterId)
2737             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2738         if (Constants.NewClusterId == clusterId) {
2739             clusterId = getService(ClusteringSupport.class).createCluster();
2740             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2741                 createdClusters.add(clusterId);
2742             clusterSetsSupport.put(resourceId, clusterId);
2743             return getNewResource(clusterId);
2744         } else {
2745             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2746             ResourceImpl result;
2747             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2748                 clusterId = getService(ClusteringSupport.class).createCluster();
2749                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2750                     createdClusters.add(clusterId);
2751                 clusterSetsSupport.put(resourceId, clusterId);
2752                 return getNewResource(clusterId);
2753             } else {
2754                 result = getNewResource(clusterId);
2755                 int resultKey = querySupport.getId(result);
2756                 long resultCluster = querySupport.getClusterId(resultKey);
2757                 if (clusterId != resultCluster)
2758                     clusterSetsSupport.put(resourceId, resultCluster);
2759                 return result;
2760             }
2761         }
2762     }
2763
2764     public void getNewClusterSet(Resource clusterSet)
2765     throws DatabaseException {
2766         if (DEBUG)
2767             System.out.println("new cluster set=" + clusterSet);
2768         long resourceId = clusterSet.getResourceId();
2769         if (clusterSetsSupport.containsKey(resourceId))
2770             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2771         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2772     }
2773     public boolean containsClusterSet(Resource clusterSet)
2774     throws ServiceException {
2775         long resourceId = clusterSet.getResourceId();
2776         return clusterSetsSupport.containsKey(resourceId);
2777     }
2778     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2779         Resource r = defaultClusterSet;
2780         defaultClusterSet = clusterSet;
2781         return r;
2782     }
2783     void printDiagnostics() {
2784
2785         if (DIAGNOSTICS) {
2786
2787             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2788             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2789             for(ClusterI cluster : clusterTable.getClusters()) {
2790                 try {
2791                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2792                         residentClusters.set(residentClusters.get() + 1);
2793                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2794                     }
2795                 } catch (DatabaseException e) {
2796                     Logger.defaultLogError(e);
2797                 }
2798             }
2799
2800             System.out.println("--------------------------------");
2801             System.out.println("Cluster information:");
2802             System.out.println("-amount of resident clusters=" + residentClusters.get());
2803             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2804
2805             for(ClusterI cluster : clusterTable.getClusters()) {
2806                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2807                 try {
2808                     if (!cluster.isLoaded())
2809                         System.out.println("not loaded]");
2810                     else if (cluster.isEmpty())
2811                         System.out.println("is empty]");
2812                     else {
2813                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2814                         System.out.print(",references=" + cluster.getReferenceCount());
2815                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2816                     }
2817                 } catch (DatabaseException e) {
2818                     Logger.defaultLogError(e);
2819                     System.out.println("is corrupted]");
2820                 }
2821             }
2822
2823             queryProvider2.printDiagnostics();
2824             System.out.println("--------------------------------");
2825         }
2826
2827     }
2828
2829     public void fireStartReadTransaction() {
2830
2831         if (DIAGNOSTICS)
2832             System.out.println("StartReadTransaction");
2833
2834         for (SessionEventListener listener : eventListeners) {
2835             try {
2836                 listener.readTransactionStarted();
2837             } catch (Throwable t) {
2838                 t.printStackTrace();
2839             }
2840         }
2841
2842     }
2843
2844     public void fireFinishReadTransaction() {
2845
2846         if (DIAGNOSTICS)
2847             System.out.println("FinishReadTransaction");
2848
2849         for (SessionEventListener listener : eventListeners) {
2850             try {
2851                 listener.readTransactionFinished();
2852             } catch (Throwable t) {
2853                 t.printStackTrace();
2854             }
2855         }
2856
2857     }
2858
2859     public void fireStartWriteTransaction() {
2860
2861         if (DIAGNOSTICS)
2862             System.out.println("StartWriteTransaction");
2863
2864         for (SessionEventListener listener : eventListeners) {
2865             try {
2866                 listener.writeTransactionStarted();
2867             } catch (Throwable t) {
2868                 t.printStackTrace();
2869             }
2870         }
2871
2872     }
2873
2874     public void fireFinishWriteTransaction() {
2875
2876         if (DIAGNOSTICS)
2877             System.out.println("FinishWriteTransaction");
2878
2879         for (SessionEventListener listener : eventListeners) {
2880             try {
2881                 listener.writeTransactionFinished();
2882             } catch (Throwable t) {
2883                 t.printStackTrace();
2884             }
2885         }
2886
2887         Indexing.resetDependenciesIndexingDisabled();
2888
2889     }
2890
2891     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2892         throw new Error("Not supported at the moment.");
2893     }
2894
2895     State getState() {
2896         return state;
2897     }
2898
2899     ClusterTable getClusterTable() {
2900         return clusterTable;
2901     }
2902
2903     public GraphSession getGraphSession() {
2904         return graphSession;
2905     }
2906
2907     public QueryProcessor getQueryProvider2() {
2908         return queryProvider2;
2909     }
2910
2911     ClientChangesImpl getClientChanges() {
2912         return clientChanges;
2913     }
2914
2915     boolean getWriteOnly() {
2916         return writeOnly;
2917     }
2918
2919     static int counter = 0;
2920
2921     public void onClusterLoaded(long clusterId) {
2922
2923         clusterTable.updateSize();
2924
2925     }
2926
2927
2928
2929
2930     /*
2931      * Implementation of the interface RequestProcessor
2932      */
2933
2934     @Override
2935     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2936
2937         assertNotSession();
2938         assertAlive();
2939
2940         assert(request != null);
2941
2942         final DataContainer<T> result = new DataContainer<T>();
2943         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2944
2945         syncRequest(request, new AsyncProcedure<T>() {
2946
2947             @Override
2948             public void execute(AsyncReadGraph graph, T t) {
2949                 result.set(t);
2950             }
2951
2952             @Override
2953             public void exception(AsyncReadGraph graph, Throwable t) {
2954                 exception.set(t);
2955             }
2956
2957         });
2958
2959         Throwable t = exception.get();
2960         if(t != null) {
2961             if(t instanceof DatabaseException) throw (DatabaseException)t;
2962             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2963         }
2964
2965         return result.get();
2966
2967     }
2968
2969 //    @Override
2970 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2971 //        assertNotSession();
2972 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2973 //    }
2974
2975     @Override
2976     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2977         assertNotSession();
2978         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2979     }
2980
2981     @Override
2982     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2983         assertNotSession();
2984         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2985     }
2986
2987     @Override
2988     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2989         assertNotSession();
2990         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2991     }
2992
2993     @Override
2994     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2995         assertNotSession();
2996         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2997     }
2998
2999     @Override
3000     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3001
3002         assertNotSession();
3003
3004         assert(request != null);
3005
3006         final DataContainer<T> result = new DataContainer<T>();
3007         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3008
3009         syncRequest(request, new AsyncProcedure<T>() {
3010
3011             @Override
3012             public void execute(AsyncReadGraph graph, T t) {
3013                 result.set(t);
3014             }
3015
3016             @Override
3017             public void exception(AsyncReadGraph graph, Throwable t) {
3018                 exception.set(t);
3019             }
3020
3021         });
3022
3023         Throwable t = exception.get();
3024         if(t != null) {
3025             if(t instanceof DatabaseException) throw (DatabaseException)t;
3026             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3027         }
3028
3029         return result.get();
3030
3031     }
3032
3033     @Override
3034     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3035         assertNotSession();
3036         return syncRequest(request, (AsyncProcedure<T>)procedure);
3037     }
3038
3039     @Override
3040     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3041         assertNotSession();
3042         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3043     }
3044
3045     @Override
3046     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3047         assertNotSession();
3048         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3049     }
3050
3051     @Override
3052     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3053         assertNotSession();
3054         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3055     }
3056
3057     @Override
3058     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3059         assertNotSession();
3060         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3061     }
3062
3063     @Override
3064     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3065
3066         assertNotSession();
3067
3068         assert(request != null);
3069
3070         final ArrayList<T> result = new ArrayList<T>();
3071         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3072
3073         syncRequest(request, new AsyncMultiProcedure<T>() {
3074
3075             @Override
3076             public void execute(AsyncReadGraph graph, T t) {
3077                 synchronized(result) {
3078                     result.add(t);
3079                 }
3080             }
3081
3082             @Override
3083             public void finished(AsyncReadGraph graph) {
3084             }
3085
3086             @Override
3087             public void exception(AsyncReadGraph graph, Throwable t) {
3088                 exception.set(t);
3089             }
3090
3091
3092         });
3093
3094         Throwable t = exception.get();
3095         if(t != null) {
3096             if(t instanceof DatabaseException) throw (DatabaseException)t;
3097             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3098         }
3099
3100         return result;
3101
3102     }
3103
3104     @Override
3105     public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3106         assertNotSession();
3107         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3108     }
3109
3110     @Override
3111     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3112         assertNotSession();
3113         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3114     }
3115
3116     @Override
3117     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3118         assertNotSession();
3119         return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3120     }
3121
3122     @Override
3123     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3124         assertNotSession();
3125         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3126     }
3127
3128     @Override
3129     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3130         assertNotSession();
3131         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3132     }
3133
3134     @Override
3135     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3136
3137         assertNotSession();
3138
3139         assert(request != null);
3140
3141         final ArrayList<T> result = new ArrayList<T>();
3142         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3143
3144         syncRequest(request, new AsyncMultiProcedure<T>() {
3145
3146             @Override
3147             public void execute(AsyncReadGraph graph, T t) {
3148                 synchronized(result) {
3149                     result.add(t);
3150                 }
3151             }
3152
3153             @Override
3154             public void finished(AsyncReadGraph graph) {
3155             }
3156
3157             @Override
3158             public void exception(AsyncReadGraph graph, Throwable t) {
3159                 exception.set(t);
3160             }
3161
3162
3163         });
3164
3165         Throwable t = exception.get();
3166         if(t != null) {
3167             if(t instanceof DatabaseException) throw (DatabaseException)t;
3168             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3169         }
3170
3171         return result;
3172
3173     }
3174
3175     @Override
3176     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3177         assertNotSession();
3178         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3179     }
3180
3181     @Override
3182     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3183         assertNotSession();
3184         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3185     }
3186
3187     @Override
3188     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3189         assertNotSession();
3190        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3191     }
3192
3193     @Override
3194     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3195         assertNotSession();
3196         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3197     }
3198
3199     @Override
3200     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3201         assertNotSession();
3202         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3203     }
3204
3205
3206     @Override
3207     public <T> void asyncRequest(Read<T> request) {
3208
3209         asyncRequest(request, new ProcedureAdapter<T>() {
3210             @Override
3211             public void exception(Throwable t) {
3212                 t.printStackTrace();
3213             }
3214         });
3215
3216     }
3217
3218     @Override
3219     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3220         asyncRequest(request, (AsyncProcedure<T>)procedure);
3221     }
3222
3223     @Override
3224     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3225         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3226     }
3227
3228     @Override
3229     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3230         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3231     }
3232
3233     @Override
3234     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3235         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3236     }
3237
3238     @Override
3239     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3240         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3241     }
3242
3243     @Override
3244     final public <T> void asyncRequest(final AsyncRead<T> request) {
3245
3246         assert(request != null);
3247
3248         asyncRequest(request, new ProcedureAdapter<T>() {
3249             @Override
3250             public void exception(Throwable t) {
3251                 t.printStackTrace();
3252             }
3253         });
3254
3255     }
3256
3257     @Override
3258     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3259         scheduleRequest(request, procedure, procedure, null);
3260     }
3261
3262     @Override
3263     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3264         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3265     }
3266
3267     @Override
3268     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3269         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3270     }
3271
3272     @Override
3273     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3274         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3275     }
3276
3277     @Override
3278     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3279         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3280     }
3281
3282     @Override
3283     public <T> void asyncRequest(MultiRead<T> request) {
3284
3285         assert(request != null);
3286
3287         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3288             @Override
3289             public void exception(AsyncReadGraph graph, Throwable t) {
3290                 t.printStackTrace();
3291             }
3292         });
3293
3294     }
3295
3296     @Override
3297     public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3298         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3299     }
3300
3301     @Override
3302     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3303         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3304     }
3305
3306     @Override
3307     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3308         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3309     }
3310
3311     @Override
3312     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3313         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3314     }
3315
3316     @Override
3317     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3318         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3319     }
3320
3321     @Override
3322     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3323
3324         assert(request != null);
3325
3326         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3327             @Override
3328             public void exception(AsyncReadGraph graph, Throwable t) {
3329                 t.printStackTrace();
3330             }
3331         });
3332
3333     }
3334
3335     @Override
3336     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3337         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3338     }
3339
3340     @Override
3341     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3342         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3343     }
3344
3345     @Override
3346     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3347         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3348     }
3349
3350     @Override
3351     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3352         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3353     }
3354
3355     @Override
3356     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3357         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3358     }
3359
3360     @Override
3361     public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3362         assertNotSession();
3363         throw new Error("Not implemented!");
3364     }
3365
3366     @Override
3367     public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3368         throw new Error("Not implemented!");
3369     }
3370
3371     @Override
3372     final public <T> void asyncRequest(final ExternalRead<T> request) {
3373
3374         assert(request != null);
3375
3376         asyncRequest(request, new ProcedureAdapter<T>() {
3377             @Override
3378             public void exception(Throwable t) {
3379                 t.printStackTrace();
3380             }
3381         });
3382
3383     }
3384
3385     @Override
3386     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3387         asyncRequest(request, (Procedure<T>)procedure);
3388     }
3389
3390
3391
3392     void check(Throwable t) throws DatabaseException {
3393         if(t != null) {
3394             if(t instanceof DatabaseException) throw (DatabaseException)t;
3395             else throw new DatabaseException("Unexpected exception", t);
3396         }
3397     }
3398
3399     void check(DataContainer<Throwable> container) throws DatabaseException {
3400         Throwable t = container.get();
3401         if(t != null) {
3402             if(t instanceof DatabaseException) throw (DatabaseException)t;
3403             else throw new DatabaseException("Unexpected exception", t);
3404         }
3405     }
3406
3407
3408
3409
3410
3411
3412     boolean sameProvider(Write request) {
3413         if(writeState.getGraph().provider != null) {
3414             return writeState.getGraph().provider.equals(request.getProvider());
3415         } else {
3416             return request.getProvider() == null;
3417         }
3418     }
3419
3420     boolean plainWrite(WriteGraphImpl graph) {
3421         if(graph == null) return false;
3422         if(graph.writeSupport.writeOnly()) return false;
3423         return true;
3424     }
3425
3426
3427     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3428     private void assertNotSession() throws DatabaseException {
3429         Thread current = Thread.currentThread();
3430         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3431     }
3432
3433     void assertAlive() {
3434         if (!state.isAlive())
3435             throw new RuntimeDatabaseException("Session has been shut down.");
3436     }
3437
3438     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3439         return querySupport.getValueStream(graph, querySupport.getId(resource));
3440     }
3441
3442     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3443         return querySupport.getValue(graph, querySupport.getId(resource));
3444     }
3445
3446     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3447         acquire(semaphore, request, null);
3448     }
3449
3450     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3451         assertAlive();
3452         try {
3453             long tryCount = 0;
3454             // Loop until the semaphore is acquired reporting requests that take a long time.
3455             while (true) {
3456
3457                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3458                     // Acquired OK.
3459                     return;
3460                 } else {
3461                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3462                         ++tryCount;
3463                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3464                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3465                                 * tryCount;
3466                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3467                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3468                     }
3469                 }
3470
3471                 assertAlive();
3472
3473             }
3474         } catch (InterruptedException e) {
3475             e.printStackTrace();
3476             // FIXME: Should perhaps do something else in this case ??
3477         }
3478     }
3479
3480
3481
3482 //    @Override
3483 //    public boolean holdOnToTransactionAfterCancel() {
3484 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3485 //    }
3486 //
3487 //    @Override
3488 //    public boolean holdOnToTransactionAfterCommit() {
3489 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3490 //    }
3491 //
3492 //    @Override
3493 //    public boolean holdOnToTransactionAfterRead() {
3494 //        return transactionPolicy.holdOnToTransactionAfterRead();
3495 //    }
3496 //
3497 //    @Override
3498 //    public void onRelinquish() {
3499 //        transactionPolicy.onRelinquish();
3500 //    }
3501 //
3502 //    @Override
3503 //    public void onRelinquishDone() {
3504 //        transactionPolicy.onRelinquishDone();
3505 //    }
3506 //
3507 //    @Override
3508 //    public void onRelinquishError() {
3509 //        transactionPolicy.onRelinquishError();
3510 //    }
3511
3512
3513     @Override
3514     public Session getSession() {
3515         return this;
3516     }
3517
3518
3519     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3520     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3521
3522     public void ceased(int thread) {
3523
3524         requestManager.ceased(thread);
3525
3526     }
3527
3528     public int getAmountOfQueryThreads() {
3529         // This must be a power of two
3530         return 1;
3531 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3532     }
3533
3534     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3535
3536         return new ResourceImpl(resourceSupport, key);
3537
3538     }
3539
3540     public void acquireWriteOnly() {
3541         writeOnly = true;
3542     }
3543
3544     public void releaseWriteOnly(ReadGraphImpl graph) {
3545         writeOnly = false;
3546         queryProvider2.releaseWrite(graph);
3547     }
3548
3549     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3550
3551         long start = System.nanoTime();
3552
3553         while(dirtyPrimitives) {
3554             dirtyPrimitives = false;
3555             getQueryProvider2().performDirtyUpdates(writer);
3556             getQueryProvider2().performScheduledUpdates(writer);
3557         }
3558
3559         fireMetadataListeners(writer, clientChanges);
3560
3561         if(DebugPolicy.PERFORMANCE_DATA) {
3562             long end = System.nanoTime();
3563             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3564         }
3565
3566     }
3567
3568     void removeTemporaryData() {
3569         File platform = Platform.getLocation().toFile();
3570         File tempFiles = new File(platform, "tempFiles");
3571         File temp = new File(tempFiles, "db");
3572         if(!temp.exists()) 
3573             return;
3574         try {
3575             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3576                 @Override
3577                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3578                     try {
3579                         Files.delete(file);
3580                     } catch (IOException e) {
3581                         Logger.defaultLogError(e);
3582                     }
3583                     return FileVisitResult.CONTINUE;
3584                 }
3585             });
3586         } catch (IOException e) {
3587             Logger.defaultLogError(e);
3588         }
3589     }
3590
3591     public void handleCreatedClusters() {
3592         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3593             createdClusters.forEach(new TLongProcedure() {
3594
3595                 @Override
3596                 public boolean execute(long value) {
3597                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3598                     cluster.setImmutable(true, clusterTranslator);
3599                     return true;
3600                 }
3601             });
3602         }
3603         createdClusters.clear();
3604     }
3605
3606     @Override
3607     public Object getModificationCounter() {
3608         return queryProvider2.modificationCounter;
3609     }
3610     
3611     @Override
3612     public void markUndoPoint() {
3613         state.setCombine(false);
3614     }
3615
3616 }