]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
8bbccd080512af308765c0a15c1a413526c5acf1
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.Resource;
43 import org.simantics.db.ResourceSerializer;
44 import org.simantics.db.Session;
45 import org.simantics.db.SessionManager;
46 import org.simantics.db.SessionVariables;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.authentication.UserAuthenticationAgent;
50 import org.simantics.db.authentication.UserAuthenticator;
51 import org.simantics.db.common.Indexing;
52 import org.simantics.db.common.TransactionPolicyRelease;
53 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
54 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
62 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
63 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
64 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
65 import org.simantics.db.common.utils.Logger;
66 import org.simantics.db.event.ChangeEvent;
67 import org.simantics.db.event.ChangeListener;
68 import org.simantics.db.event.SessionEventListener;
69 import org.simantics.db.exception.CancelTransactionException;
70 import org.simantics.db.exception.ClusterSetExistException;
71 import org.simantics.db.exception.DatabaseException;
72 import org.simantics.db.exception.ImmutableException;
73 import org.simantics.db.exception.InvalidResourceReferenceException;
74 import org.simantics.db.exception.ResourceNotFoundException;
75 import org.simantics.db.exception.RuntimeDatabaseException;
76 import org.simantics.db.exception.ServiceException;
77 import org.simantics.db.exception.ServiceNotFoundException;
78 import org.simantics.db.impl.ClusterBase;
79 import org.simantics.db.impl.ClusterI;
80 import org.simantics.db.impl.ClusterTraitsBase;
81 import org.simantics.db.impl.ClusterTranslator;
82 import org.simantics.db.impl.ResourceImpl;
83 import org.simantics.db.impl.TransientGraph;
84 import org.simantics.db.impl.VirtualGraphImpl;
85 import org.simantics.db.impl.graph.DelayedWriteGraph;
86 import org.simantics.db.impl.graph.ReadGraphImpl;
87 import org.simantics.db.impl.graph.WriteGraphImpl;
88 import org.simantics.db.impl.graph.WriteSupport;
89 import org.simantics.db.impl.internal.RandomAccessValueSupport;
90 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
91 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
92 import org.simantics.db.impl.query.QueryCache;
93 import org.simantics.db.impl.query.QueryProcessor;
94 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
95 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
96 import org.simantics.db.impl.service.QueryDebug;
97 import org.simantics.db.impl.support.VirtualGraphServerSupport;
98 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
99 import org.simantics.db.procedure.AsyncListener;
100 import org.simantics.db.procedure.AsyncMultiListener;
101 import org.simantics.db.procedure.AsyncMultiProcedure;
102 import org.simantics.db.procedure.AsyncProcedure;
103 import org.simantics.db.procedure.Listener;
104 import org.simantics.db.procedure.ListenerBase;
105 import org.simantics.db.procedure.MultiListener;
106 import org.simantics.db.procedure.MultiProcedure;
107 import org.simantics.db.procedure.Procedure;
108 import org.simantics.db.procedure.SyncListener;
109 import org.simantics.db.procedure.SyncMultiListener;
110 import org.simantics.db.procedure.SyncMultiProcedure;
111 import org.simantics.db.procedure.SyncProcedure;
112 import org.simantics.db.procore.cluster.ClusterImpl;
113 import org.simantics.db.procore.cluster.ClusterTraits;
114 import org.simantics.db.procore.protocol.Constants;
115 import org.simantics.db.procore.protocol.DebugPolicy;
116 import org.simantics.db.request.AsyncMultiRead;
117 import org.simantics.db.request.AsyncRead;
118 import org.simantics.db.request.DelayedWrite;
119 import org.simantics.db.request.DelayedWriteResult;
120 import org.simantics.db.request.ExternalRead;
121 import org.simantics.db.request.MultiRead;
122 import org.simantics.db.request.Read;
123 import org.simantics.db.request.ReadInterface;
124 import org.simantics.db.request.Write;
125 import org.simantics.db.request.WriteInterface;
126 import org.simantics.db.request.WriteOnly;
127 import org.simantics.db.request.WriteOnlyResult;
128 import org.simantics.db.request.WriteResult;
129 import org.simantics.db.request.WriteTraits;
130 import org.simantics.db.service.ByteReader;
131 import org.simantics.db.service.ClusterBuilder;
132 import org.simantics.db.service.ClusterBuilderFactory;
133 import org.simantics.db.service.ClusterControl;
134 import org.simantics.db.service.ClusterSetsSupport;
135 import org.simantics.db.service.ClusterUID;
136 import org.simantics.db.service.ClusteringSupport;
137 import org.simantics.db.service.CollectionSupport;
138 import org.simantics.db.service.DebugSupport;
139 import org.simantics.db.service.DirectQuerySupport;
140 import org.simantics.db.service.GraphChangeListenerSupport;
141 import org.simantics.db.service.InitSupport;
142 import org.simantics.db.service.LifecycleSupport;
143 import org.simantics.db.service.ManagementSupport;
144 import org.simantics.db.service.QueryControl;
145 import org.simantics.db.service.SerialisationSupport;
146 import org.simantics.db.service.ServerInformation;
147 import org.simantics.db.service.ServiceActivityMonitor;
148 import org.simantics.db.service.SessionEventSupport;
149 import org.simantics.db.service.SessionMonitorSupport;
150 import org.simantics.db.service.SessionUserSupport;
151 import org.simantics.db.service.StatementSupport;
152 import org.simantics.db.service.TransactionPolicySupport;
153 import org.simantics.db.service.TransactionSupport;
154 import org.simantics.db.service.TransferableGraphSupport;
155 import org.simantics.db.service.UndoRedoSupport;
156 import org.simantics.db.service.VirtualGraphSupport;
157 import org.simantics.db.service.XSupport;
158 import org.simantics.layer0.Layer0;
159 import org.simantics.utils.DataContainer;
160 import org.simantics.utils.Development;
161 import org.simantics.utils.threads.logger.ITask;
162 import org.simantics.utils.threads.logger.ThreadLogger;
163
164 import gnu.trove.procedure.TLongProcedure;
165 import gnu.trove.set.hash.TLongHashSet;
166
167
168 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
169
170     protected static final boolean DEBUG        = false;
171
172     private static final boolean DIAGNOSTICS       = false;
173
174     private TransactionPolicySupport transactionPolicy;
175
176     final private ClusterControl clusterControl;
177
178     final protected BuiltinSupportImpl builtinSupport;
179     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
180     final protected ClusterSetsSupport clusterSetsSupport;
181     final protected LifecycleSupportImpl lifecycleSupport;
182
183     protected QuerySupportImpl querySupport;
184     protected ResourceSupportImpl resourceSupport;
185     protected WriteSupport writeSupport;
186     public ClusterTranslator clusterTranslator;
187
188     boolean dirtyPrimitives = false;
189
190     public static final int SERVICE_MODE_CREATE = 2;
191     public static final int SERVICE_MODE_ALLOW = 1;
192     public int serviceMode = 0;
193     public boolean createdImmutableClusters = false;
194     public TLongHashSet createdClusters = new TLongHashSet();
195
196     private Layer0 L0;
197
198     /**
199      * The service locator maintained by the workbench. These services are
200      * initialized during workbench during the <code>init</code> method.
201      */
202     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
203
204     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
205
206     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
207
208     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
209
210     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
211
212     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
213
214     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
215
216     final protected State                                        state              = new State();
217
218     protected GraphSession                                       graphSession       = null;
219
220     protected SessionManager                                     sessionManagerImpl = null;
221
222     protected UserAuthenticationAgent                            authAgent          = null;
223
224     protected UserAuthenticator                                  authenticator      = null;
225
226     protected Resource                                           user               = null;
227
228     protected ClusterStream                                      clusterStream      = null;
229
230     protected SessionRequestManager                         requestManager = null;
231
232     public ClusterTable                                       clusterTable       = null;
233
234     public QueryProcessor                                     queryProvider2 = null;
235
236     ClientChangesImpl                                  clientChanges      = null;
237
238     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
239
240 //    protected long                                               newClusterId       = Constants.NullClusterId;
241
242     protected int                                                flushCounter       = 0;
243
244     protected boolean                                            writeOnly          = false;
245
246     WriteState<?>                                                writeState = null;
247     WriteStateBase<?>                                            delayedWriteState = null;
248     protected Resource defaultClusterSet = null; // If not null then used for newResource().
249
250     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
251
252 //        if (authAgent == null)
253 //            throw new IllegalArgumentException("null authentication agent");
254
255         File t = StaticSessionProperties.virtualGraphStoragePath;
256         if (null == t)
257             t = new File(".");
258         this.clusterTable = new ClusterTable(this, t);
259         this.builtinSupport = new BuiltinSupportImpl(this);
260         this.sessionManagerImpl = sessionManagerImpl;
261         this.user = null;
262 //        this.authAgent = authAgent;
263
264         serviceLocator.registerService(Session.class, this);
265         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
266         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
267         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
268         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
269         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
270         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
271         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
272         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
273         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
274         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
275         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
276         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
277         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
278         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
279         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
280         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
281         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
282         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
283         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
284         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
285         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
286         ServiceActivityUpdaterForWriteTransactions.register(this);
287
288         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
289         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
290         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
291         this.lifecycleSupport = new LifecycleSupportImpl(this);
292         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
293         this.transactionPolicy = new TransactionPolicyRelease();
294         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
295         this.clusterControl = new ClusterControlImpl(this);
296         serviceLocator.registerService(ClusterControl.class, clusterControl);
297         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
298         this.clusterSetsSupport.setReadDirectory(t.toPath());
299         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
300         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
301     }
302
303     /*
304      *
305      * Session interface
306      */
307
308
309     @Override
310     public Resource getRootLibrary() {
311         return queryProvider2.getRootLibraryResource();
312     }
313
314     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
315         if (!graphSession.dbSession.refreshEnabled())
316             return;
317         try {
318             getClusterTable().refresh(csid, this, clusterUID);
319         } catch (Throwable t) {
320             Logger.defaultLogError("Refesh failed.", t);
321         }
322     }
323
324     static final class TaskHelper {
325         private final String name;
326         private Object result;
327         final Semaphore sema = new Semaphore(0);
328         private Throwable throwable = null;
329         final Consumer<DatabaseException> callback = e -> {
330             synchronized (TaskHelper.this) {
331                 throwable = e;
332             }
333         };
334         final Procedure<Object> proc = new Procedure<Object>() {
335             @Override
336             public void execute(Object result) {
337                 callback.accept(null);
338             }
339             @Override
340             public void exception(Throwable t) {
341                 if (t instanceof DatabaseException)
342                     callback.accept((DatabaseException)t);
343                 else
344                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
345             }
346         };
347         final WriteTraits writeTraits = new WriteTraits() {};
348         TaskHelper(String name) {
349             this.name = name;
350         }
351         @SuppressWarnings("unchecked")
352         <T> T getResult() {
353             return (T)result;
354         }
355         void setResult(Object result) {
356             this.result = result;
357         }
358 //        Throwable throwableGet() {
359 //            return throwable;
360 //        }
361         synchronized void throwableSet(Throwable t) {
362             throwable = t;
363         }
364 //        void throwableSet(String t) {
365 //            throwable = new InternalException("" + name + " operation failed. " + t);
366 //        }
367         synchronized void throwableCheck()
368         throws DatabaseException {
369             if (null != throwable)
370                 if (throwable instanceof DatabaseException)
371                     throw (DatabaseException)throwable;
372                 else
373                     throw new DatabaseException("Undo operation failed.", throwable);
374         }
375         void throw_(String message)
376         throws DatabaseException {
377             throw new DatabaseException("" + name + " operation failed. " + message);
378         }
379     }
380
381
382
383     private ListenerBase getListenerBase(Object procedure) {
384         if (procedure instanceof ListenerBase)
385             return (ListenerBase) procedure;
386         else
387             return null;
388     }
389
390     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
391         scheduleRequest(request, callback, notify, null);
392     }
393
394     /* (non-Javadoc)
395      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
396      */
397     @Override
398     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
399
400         assert (request != null);
401
402         if(Development.DEVELOPMENT) {
403             try {
404             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
405                 System.err.println("schedule write '" + request + "'");
406             } catch (Throwable t) {
407                 Logger.defaultLogError(t);
408             }
409         }
410
411         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
412
413         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
414
415             @Override
416             public void run(int thread) {
417
418                 if(Development.DEVELOPMENT) {
419                     try {
420                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
421                         System.err.println("perform write '" + request + "'");
422                     } catch (Throwable t) {
423                         Logger.defaultLogError(t);
424                     }
425                 }
426
427                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
428
429                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
430
431                 try {
432
433                     flushCounter = 0;
434                     Disposable.safeDispose(clientChanges);
435                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
436
437                     VirtualGraph vg = getProvider(request.getProvider());
438
439                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
440                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
441
442                         @Override
443                         public void execute(Object result) {
444                             if(callback != null) callback.accept(null);
445                         }
446
447                         @Override
448                         public void exception(Throwable t) {
449                             if(callback != null) callback.accept((DatabaseException)t);
450                         }
451
452                     });
453
454                     assert (null != writer);
455 //                    writer.state.barrier.inc();
456
457                     try {
458                         request.perform(writer);
459                         assert (null != writer);
460                     } catch (Throwable t) {
461                         if (!(t instanceof CancelTransactionException))
462                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
463                         writeState.except(t);
464                     } finally {
465 //                        writer.state.barrier.dec();
466 //                        writer.waitAsync(request);
467                     }
468
469
470                     assert(!queryProvider2.cache.dirty);
471
472                 } catch (Throwable e) {
473
474                     // Log it first, just to be safe that the error is always logged.
475                     if (!(e instanceof CancelTransactionException))
476                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
477
478 //                    writeState.getGraph().state.barrier.dec();
479 //                    writeState.getGraph().waitAsync(request);
480
481                     writeState.except(e);
482
483 //                    try {
484 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
485 //                        // All we can do here is to log those, can't really pass them anywhere.
486 //                        if (callback != null) {
487 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
488 //                            else callback.run(new DatabaseException(e));
489 //                        }
490 //                    } catch (Throwable e2) {
491 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
492 //                    }
493 //                    boolean empty = clusterStream.reallyFlush();
494 //                    int callerThread = -1;
495 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
496 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
497 //                    try {
498 //                        // Send all local changes to server so it can calculate correct reverse change set.
499 //                        // This will call clusterStream.accept().
500 //                        state.cancelCommit(context, clusterStream);
501 //                        if (!empty) {
502 //                            if (!context.isOk()) // this is a blocking operation
503 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
504 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
505 //                        }
506 //                        state.cancelCommit2(context, clusterStream);
507 //                    } catch (DatabaseException e3) {
508 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
509 //                    } finally {
510 //                        clusterStream.setOff(false);
511 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
512 //                    }
513
514                 } finally {
515
516                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
517
518                 }
519
520                 task.finish();
521
522                 if(Development.DEVELOPMENT) {
523                     try {
524                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
525                         System.err.println("finish write '" + request + "'");
526                     } catch (Throwable t) {
527                         Logger.defaultLogError(t);
528                     }
529                 }
530
531             }
532
533         }, combine);
534
535     }
536
537     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
538         scheduleRequest(request, procedure, notify, null);
539     }
540
541     /* (non-Javadoc)
542      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
543      */
544     @Override
545     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
546
547         assert (request != null);
548
549         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
550
551         requestManager.scheduleWrite(new SessionTask(request, thread) {
552
553             @Override
554             public void run(int thread) {
555
556                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
557
558                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
559
560                 flushCounter = 0;
561                 Disposable.safeDispose(clientChanges);
562                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
563
564                 VirtualGraph vg = getProvider(request.getProvider());
565                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
566
567                 try {
568                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
569                     writeState = writeStateT;
570
571                     assert (null != writer);
572 //                    writer.state.barrier.inc();
573                     writeStateT.setResult(request.perform(writer));
574                     assert (null != writer);
575
576 //                    writer.state.barrier.dec();
577 //                    writer.waitAsync(null);
578
579                 } catch (Throwable e) {
580
581 //                    writer.state.barrier.dec();
582 //                    writer.waitAsync(null);
583
584                     writeState.except(e);
585
586 //                  state.stopWriteTransaction(clusterStream);
587 //
588 //              } catch (Throwable e) {
589 //                  // Log it first, just to be safe that the error is always logged.
590 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
591 //
592 //                  try {
593 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
594 //                      // All we can do here is to log those, can't really pass them anywhere.
595 //                      if (procedure != null) {
596 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
597 //                          else procedure.exception(new DatabaseException(e));
598 //                      }
599 //                  } catch (Throwable e2) {
600 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
601 //                  }
602 //
603 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
604 //
605 //                  state.stopWriteTransaction(clusterStream);
606
607                 } finally {
608                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
609                 }
610
611 //              if(notify != null) notify.release();
612
613                 task.finish();
614
615             }
616
617         }, combine);
618
619     }
620
621     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
622         scheduleRequest(request, callback, notify, null);
623     }
624
625     /* (non-Javadoc)
626      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
627      */
628     @Override
629     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
630
631         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
632
633         assert (request != null);
634
635         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
636
637         requestManager.scheduleWrite(new SessionTask(request, thread) {
638
639             @Override
640             public void run(int thread) {
641                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
642
643                 Procedure<Object> stateProcedure = new Procedure<Object>() {
644                     @Override
645                     public void execute(Object result) {
646                         if (callback != null)
647                             callback.accept(null);
648                     }
649                     @Override
650                     public void exception(Throwable t) {
651                         if (callback != null) {
652                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
653                             else callback.accept(new DatabaseException(t));
654                         } else
655                             Logger.defaultLogError("Unhandled exception", t);
656                     }
657                 };
658
659                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
660                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
661                 DelayedWriteGraph dwg = null;
662 //                newGraph.state.barrier.inc();
663
664                 try {
665                     dwg = new DelayedWriteGraph(newGraph);
666                     request.perform(dwg);
667                 } catch (Throwable e) {
668                     delayedWriteState.except(e);
669                     total.finish();
670                     dwg.close();
671                     return;
672                 } finally {
673 //                    newGraph.state.barrier.dec();
674 //                    newGraph.waitAsync(request);
675                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
676                 }
677
678                 delayedWriteState = null;
679
680                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
681                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
682
683                 flushCounter = 0;
684                 Disposable.safeDispose(clientChanges);
685                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
686
687                 acquireWriteOnly();
688
689                 VirtualGraph vg = getProvider(request.getProvider());
690                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
691
692                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
693
694                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
695
696                 assert (null != writer);
697 //                writer.state.barrier.inc();
698
699                 try {
700                     // Cannot cancel
701                     dwg.commit(writer, request);
702
703                         if(defaultClusterSet != null) {
704                                 XSupport xs = getService(XSupport.class);
705                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
706                                 ClusteringSupport cs = getService(ClusteringSupport.class);
707                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
708                         }
709
710                     // This makes clusters available from server
711                     clusterStream.reallyFlush();
712
713                     releaseWriteOnly(writer);
714                     handleUpdatesAndMetadata(writer);
715
716                 } catch (ServiceException e) {
717 //                    writer.state.barrier.dec();
718 //                    writer.waitAsync(null);
719
720                     // These shall be requested from server
721                     clusterTable.removeWriteOnlyClusters();
722                     // This makes clusters available from server
723                     clusterStream.reallyFlush();
724
725                     releaseWriteOnly(writer);
726                     writeState.except(e);
727                 } finally {
728                     // Debugging & Profiling
729                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
730                     task2.finish();
731                     total.finish();
732                 }
733             }
734
735         }, combine);
736
737     }
738
739     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
740         scheduleRequest(request, procedure, notify, null);
741     }
742
743     /* (non-Javadoc)
744      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
745      */
746     @Override
747     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
748         throw new Error("Not implemented");
749     }
750
751     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
752         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
753         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
754             createdClusters.add(cluster.clusterId);
755         }
756         return cluster;
757     }
758
759     class WriteOnlySupport implements WriteSupport {
760
761         ClusterStream stream;
762         ClusterImpl currentCluster;
763
764         public WriteOnlySupport() {
765             this.stream = clusterStream;
766         }
767
768         @Override
769         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
770             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
771         }
772
773         @Override
774         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
775
776             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
777
778             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
779                 if(s != queryProvider2.getRootLibrary())
780                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
781
782             try {
783                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
784             } catch (DatabaseException e) {
785                 Logger.defaultLogError(e);
786                 //throw e;
787             }
788
789             clientChanges.invalidate(s);
790
791             if (cluster.isWriteOnly())
792                 return;
793             queryProvider2.updateStatements(s, p);
794
795         }
796
797         @Override
798         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
799             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
800         }
801
802         @Override
803         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
804
805             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
806             try {
807                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
808             } catch (DatabaseException e) {
809                 Logger.defaultLogError(e);
810             }
811
812             clientChanges.invalidate(rid);
813
814             if (cluster.isWriteOnly())
815                 return;
816             queryProvider2.updateValue(rid);
817
818         }
819
820         @Override
821         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
822
823             if(amount < 65536) {
824                 claimValue(provider,resource, reader.readBytes(null, amount));
825                 return;
826             }
827
828             byte[] bytes = new byte[65536];
829
830             int rid = ((ResourceImpl)resource).id;
831             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
832             try {
833                 int left = amount;
834                 while(left > 0) {
835                     int block = Math.min(left, 65536);
836                     reader.readBytes(bytes, block);
837                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
838                     left -= block;
839                 }
840             } catch (DatabaseException e) {
841                 Logger.defaultLogError(e);
842             }
843
844             clientChanges.invalidate(rid);
845
846             if (cluster.isWriteOnly())
847                 return;
848             queryProvider2.updateValue(rid);
849
850         }
851
852         private void maintainCluster(ClusterImpl before, ClusterI after_) {
853             if(after_ != null && after_ != before) {
854                 ClusterImpl after = (ClusterImpl)after_;
855                 if(currentCluster == before) {
856                     currentCluster = after;
857                 }
858                 clusterTable.replaceCluster(after);
859             }
860         }
861
862         public int createResourceKey(int foreignCounter) throws DatabaseException {
863             if(currentCluster == null) {
864                 currentCluster = getNewResourceCluster();
865             }
866             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
867                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
868                 newCluster.foreignLookup = new byte[foreignCounter];
869                 currentCluster = newCluster;
870                 if (DEBUG)
871                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
872             }
873             return currentCluster.createResource(clusterTranslator);
874         }
875
876         @Override
877         public Resource createResource(VirtualGraph provider) throws DatabaseException {
878             if(currentCluster == null) {
879                 if (null != defaultClusterSet) {
880                         ResourceImpl result = getNewResource(defaultClusterSet);
881                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
882                     return result;
883                 } else {
884                     currentCluster = getNewResourceCluster();
885                 }
886             }
887             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
888                 if (null != defaultClusterSet) {
889                         ResourceImpl result = getNewResource(defaultClusterSet);
890                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
891                     return result;
892                 } else {
893                     currentCluster = getNewResourceCluster();
894                 }
895             }
896             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
897         }
898
899         @Override
900         public Resource createResource(VirtualGraph provider, long clusterId)
901         throws DatabaseException {
902             return getNewResource(clusterId);
903         }
904
905         @Override
906         public Resource createResource(VirtualGraph provider, Resource clusterSet)
907         throws DatabaseException {
908             return getNewResource(clusterSet);
909         }
910
911         @Override
912         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
913                 throws DatabaseException {
914             getNewClusterSet(clusterSet);
915         }
916
917         @Override
918         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
919         throws ServiceException {
920             return containsClusterSet(clusterSet);
921         }
922
923         public void selectCluster(long cluster) {
924                 currentCluster = clusterTable.getClusterByClusterId(cluster);
925                 long setResourceId = clusterSetsSupport.getSet(cluster);
926                 clusterSetsSupport.put(setResourceId, cluster);
927         }
928
929         @Override
930         public Resource setDefaultClusterSet(Resource clusterSet)
931         throws ServiceException {
932                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
933                 if(clusterSet != null) {
934                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
935                         currentCluster = clusterTable.getClusterByClusterId(id);
936                         return result;
937                 } else {
938                         currentCluster = null;
939                         return null;
940                 }
941         }
942
943         @Override
944         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
945
946                  provider = getProvider(provider);
947              if (null == provider) {
948                  int key = ((ResourceImpl)resource).id;
949
950
951
952                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
953 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
954 //                      if(key != queryProvider2.getRootLibrary())
955 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
956
957 //                 try {
958 //                     cluster.removeValue(key, clusterTranslator);
959 //                 } catch (DatabaseException e) {
960 //                     Logger.defaultLogError(e);
961 //                     return;
962 //                 }
963
964                  clusterTable.writeOnlyInvalidate(cluster);
965
966                  try {
967                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
968                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
969                          clusterTranslator.removeValue(cluster);
970                  } catch (DatabaseException e) {
971                          Logger.defaultLogError(e);
972                  }
973
974                  queryProvider2.invalidateResource(key);
975                  clientChanges.invalidate(key);
976
977              } else {
978                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
979                  queryProvider2.updateValue(querySupport.getId(resource));
980                  clientChanges.claimValue(resource);
981              }
982
983
984         }
985
986         @Override
987         public void flush(boolean intermediate) {
988             throw new UnsupportedOperationException();
989         }
990
991         @Override
992         public void flushCluster() {
993             clusterTable.flushCluster(graphSession);
994             if(defaultClusterSet != null) {
995                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
996             }
997             currentCluster = null;
998         }
999
1000         @Override
1001         public void flushCluster(Resource r) {
1002             throw new UnsupportedOperationException("flushCluster resource " + r);
1003         }
1004
1005         @Override
1006         public void gc() {
1007         }
1008
1009         @Override
1010         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1011                 Resource object) {
1012
1013                 int s = ((ResourceImpl)subject).id;
1014                 int p = ((ResourceImpl)predicate).id;
1015                 int o = ((ResourceImpl)object).id;
1016
1017                 provider = getProvider(provider);
1018                 if (null == provider) {
1019
1020                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1021                         clusterTable.writeOnlyInvalidate(cluster);
1022
1023                         try {
1024
1025                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1026
1027                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1028                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1029
1030                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1031                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1032                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1033                                 clusterTranslator.removeStatement(cluster);
1034
1035                                 queryProvider2.invalidateResource(s);
1036                                 clientChanges.invalidate(s);
1037
1038                         } catch (DatabaseException e) {
1039
1040                                 Logger.defaultLogError(e);
1041
1042                         }
1043
1044                         return true;
1045
1046                 } else {
1047                         
1048                         ((VirtualGraphImpl)provider).deny(s, p, o);
1049                         queryProvider2.invalidateResource(s);
1050                         clientChanges.invalidate(s);
1051
1052                         return true;
1053
1054                 }
1055
1056         }
1057
1058         @Override
1059         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1060             throw new UnsupportedOperationException();
1061         }
1062
1063         @Override
1064         public boolean writeOnly() {
1065             return true;
1066         }
1067
1068         @Override
1069         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1070             writeSupport.performWriteRequest(graph, request);
1071         }
1072
1073         @Override
1074         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1075             throw new UnsupportedOperationException();
1076         }
1077
1078         @Override
1079         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1080             throw new UnsupportedOperationException();
1081         }
1082
1083         @Override
1084         public <T> void addMetadata(Metadata data) throws ServiceException {
1085             writeSupport.addMetadata(data);
1086         }
1087
1088         @Override
1089         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1090             return writeSupport.getMetadata(clazz);
1091         }
1092
1093         @Override
1094         public TreeMap<String, byte[]> getMetadata() {
1095             return writeSupport.getMetadata();
1096         }
1097
1098         @Override
1099         public void commitDone(WriteTraits writeTraits, long csid) {
1100             writeSupport.commitDone(writeTraits, csid);
1101         }
1102
1103         @Override
1104         public void clearUndoList(WriteTraits writeTraits) {
1105             writeSupport.clearUndoList(writeTraits);
1106         }
1107         @Override
1108         public int clearMetadata() {
1109             return writeSupport.clearMetadata();
1110         }
1111
1112                 @Override
1113                 public void startUndo() {
1114                         writeSupport.startUndo();
1115                 }
1116     }
1117
1118     class VirtualWriteOnlySupport implements WriteSupport {
1119
1120 //        @Override
1121 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1122 //                Resource object) {
1123 //            throw new UnsupportedOperationException();
1124 //        }
1125
1126         @Override
1127         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1128
1129             TransientGraph impl = (TransientGraph)provider;
1130             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1131             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1132             clientChanges.claim(subject, predicate, object);
1133
1134         }
1135
1136         @Override
1137         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1138
1139             TransientGraph impl = (TransientGraph)provider;
1140             impl.claim(subject, predicate, object);
1141             getQueryProvider2().updateStatements(subject, predicate);
1142             clientChanges.claim(subject, predicate, object);
1143
1144         }
1145
1146         @Override
1147         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1148             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1149         }
1150
1151         @Override
1152         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1153             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1154             getQueryProvider2().updateValue(resource);
1155             clientChanges.claimValue(resource);
1156         }
1157
1158         @Override
1159         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1160             byte[] value = reader.readBytes(null, amount);
1161             claimValue(provider, resource, value);
1162         }
1163
1164         @Override
1165         public Resource createResource(VirtualGraph provider) {
1166             TransientGraph impl = (TransientGraph)provider;
1167             return impl.getResource(impl.newResource(false));
1168         }
1169
1170         @Override
1171         public Resource createResource(VirtualGraph provider, long clusterId) {
1172             throw new UnsupportedOperationException();
1173         }
1174
1175         @Override
1176         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1177         throws DatabaseException {
1178             throw new UnsupportedOperationException();
1179         }
1180
1181         @Override
1182         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1183         throws DatabaseException {
1184             throw new UnsupportedOperationException();
1185         }
1186
1187         @Override
1188         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1189         throws ServiceException {
1190             throw new UnsupportedOperationException();
1191         }
1192
1193         @Override
1194         public Resource setDefaultClusterSet(Resource clusterSet)
1195         throws ServiceException {
1196                 return null;
1197         }
1198
1199         @Override
1200         public void denyValue(VirtualGraph provider, Resource resource) {
1201             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1202             getQueryProvider2().updateValue(querySupport.getId(resource));
1203             // NOTE: this only keeps track of value changes by-resource.
1204             clientChanges.claimValue(resource);
1205         }
1206
1207         @Override
1208         public void flush(boolean intermediate) {
1209             throw new UnsupportedOperationException();
1210         }
1211
1212         @Override
1213         public void flushCluster() {
1214             throw new UnsupportedOperationException();
1215         }
1216
1217         @Override
1218         public void flushCluster(Resource r) {
1219             throw new UnsupportedOperationException("Resource " + r);
1220         }
1221
1222         @Override
1223         public void gc() {
1224         }
1225
1226         @Override
1227         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1228             TransientGraph impl = (TransientGraph) provider;
1229             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1230             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1231             clientChanges.deny(subject, predicate, object);
1232             return true;
1233         }
1234
1235         @Override
1236         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1237             throw new UnsupportedOperationException();
1238         }
1239
1240         @Override
1241         public boolean writeOnly() {
1242             return true;
1243         }
1244
1245         @Override
1246         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1247             throw new UnsupportedOperationException();
1248         }
1249
1250         @Override
1251         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1252             throw new UnsupportedOperationException();
1253         }
1254
1255         @Override
1256         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1257             throw new UnsupportedOperationException();
1258         }
1259
1260         @Override
1261         public <T> void addMetadata(Metadata data) throws ServiceException {
1262             throw new UnsupportedOperationException();
1263         }
1264
1265         @Override
1266         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1267             throw new UnsupportedOperationException();
1268         }
1269
1270         @Override
1271         public TreeMap<String, byte[]> getMetadata() {
1272             throw new UnsupportedOperationException();
1273         }
1274
1275         @Override
1276         public void commitDone(WriteTraits writeTraits, long csid) {
1277         }
1278
1279         @Override
1280         public void clearUndoList(WriteTraits writeTraits) {
1281         }
1282
1283         @Override
1284         public int clearMetadata() {
1285             return 0;
1286         }
1287
1288                 @Override
1289                 public void startUndo() {
1290                 }
1291     }
1292
1293     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1294
1295         try {
1296
1297             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1298
1299             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1300
1301             flushCounter = 0;
1302             Disposable.safeDispose(clientChanges);
1303             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1304
1305             acquireWriteOnly();
1306
1307             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1308
1309             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1310
1311             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1312             writeState = writeStateT;
1313
1314             assert (null != writer);
1315 //            writer.state.barrier.inc();
1316             long start = System.nanoTime();
1317             T result = request.perform(writer);
1318             long duration = System.nanoTime() - start;
1319             if (DEBUG) {
1320                 System.err.println("################");
1321                 System.err.println("WriteOnly duration " + 1e-9*duration);
1322             }
1323             writeStateT.setResult(result);
1324
1325             // This makes clusters available from server
1326             clusterStream.reallyFlush();
1327
1328             // This will trigger query updates
1329             releaseWriteOnly(writer);
1330             assert (null != writer);
1331
1332             handleUpdatesAndMetadata(writer);
1333
1334         } catch (CancelTransactionException e) {
1335
1336             releaseWriteOnly(writeState.getGraph());
1337
1338             clusterTable.removeWriteOnlyClusters();
1339             state.stopWriteTransaction(clusterStream);
1340
1341         } catch (Throwable e) {
1342
1343             e.printStackTrace();
1344
1345             releaseWriteOnly(writeState.getGraph());
1346
1347             clusterTable.removeWriteOnlyClusters();
1348
1349             if (callback != null)
1350                 callback.exception(new DatabaseException(e));
1351
1352             state.stopWriteTransaction(clusterStream);
1353             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1354
1355         } finally  {
1356
1357             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1358
1359         }
1360
1361     }
1362
1363     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1364         scheduleRequest(request, callback, notify, null);
1365     }
1366
1367     @Override
1368     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1369
1370         assertAlive();
1371
1372         assert (request != null);
1373
1374         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1375
1376         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1377
1378             @Override
1379             public void run(int thread) {
1380
1381                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1382
1383                     try {
1384
1385                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1386
1387                         flushCounter = 0;
1388                         Disposable.safeDispose(clientChanges);
1389                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1390
1391                         acquireWriteOnly();
1392
1393                         VirtualGraph vg = getProvider(request.getProvider());
1394                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1395
1396                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1397
1398                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1399
1400                             @Override
1401                             public void execute(Object result) {
1402                                 if(callback != null) callback.accept(null);
1403                             }
1404
1405                             @Override
1406                             public void exception(Throwable t) {
1407                                 if(callback != null) callback.accept((DatabaseException)t);
1408                             }
1409
1410                         });
1411
1412                         assert (null != writer);
1413 //                        writer.state.barrier.inc();
1414
1415                         try {
1416
1417                             request.perform(writer);
1418
1419                         } catch (Throwable e) {
1420
1421 //                            writer.state.barrier.dec();
1422 //                            writer.waitAsync(null);
1423
1424                             releaseWriteOnly(writer);
1425
1426                             clusterTable.removeWriteOnlyClusters();
1427
1428                             if(!(e instanceof CancelTransactionException)) {
1429                                 if (callback != null)
1430                                     callback.accept(new DatabaseException(e));
1431                             }
1432
1433                             writeState.except(e);
1434
1435                             return;
1436
1437                         }
1438
1439                         // This makes clusters available from server
1440                         boolean empty = clusterStream.reallyFlush();
1441                         // This was needed to make WO requests call metadata listeners.
1442                         // NOTE: the calling event does not contain clientChanges information.
1443                         if (!empty && clientChanges.isEmpty())
1444                             clientChanges.setNotEmpty(true);
1445                         releaseWriteOnly(writer);
1446                         assert (null != writer);
1447                     } finally  {
1448
1449                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1450
1451                     }
1452
1453
1454                 task.finish();
1455
1456             }
1457
1458         }, combine);
1459
1460     }
1461
1462     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1463         scheduleRequest(request, callback, notify, null);
1464     }
1465
1466     /* (non-Javadoc)
1467      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1468      */
1469     @Override
1470     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1471
1472         assert (request != null);
1473
1474         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1475
1476         requestManager.scheduleWrite(new SessionTask(request, thread) {
1477
1478             @Override
1479             public void run(int thread) {
1480
1481                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1482
1483                 performWriteOnly(request, notify, callback);
1484
1485                 task.finish();
1486
1487             }
1488
1489         }, combine);
1490
1491     }
1492
1493     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1494
1495         assert (request != null);
1496         assert (procedure != null);
1497
1498         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1499
1500         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1501
1502             @Override
1503             public void run(int thread) {
1504
1505                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1506
1507                 ListenerBase listener = getListenerBase(procedure);
1508
1509                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1510
1511                 try {
1512
1513                     if (listener != null) {
1514
1515                         try {
1516                                 
1517                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1518
1519                                         @Override
1520                                         public void exception(AsyncReadGraph graph, Throwable t) {
1521                                                 procedure.exception(graph, t);
1522                                                 if(throwable != null) {
1523                                                         throwable.set(t);
1524                                                 } else {
1525                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1526                                                 }
1527                                         }
1528
1529                                         @Override
1530                                         public void execute(AsyncReadGraph graph, T t) {
1531                                                 if(result != null) result.set(t);
1532                                                 procedure.execute(graph, t);
1533                                         }
1534
1535                                 };
1536                                 
1537                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap);
1538                                 
1539                         } catch (Throwable t) {
1540                             // This is handled by the AsyncProcedure
1541                                 //Logger.defaultLogError("Internal error", t);
1542                         }
1543
1544                     } else {
1545
1546                         try {
1547
1548 //                            newGraph.state.barrier.inc();
1549
1550                             T t = request.perform(newGraph);
1551
1552                             try {
1553
1554                                 if(result != null) result.set(t);
1555                                 procedure.execute(newGraph, t);
1556
1557                             } catch (Throwable th) {
1558
1559                                 if(throwable != null) {
1560                                     throwable.set(th);
1561                                 } else {
1562                                     Logger.defaultLogError("Unhandled exception", th);
1563                                 }
1564
1565                             }
1566
1567                         } catch (Throwable t) {
1568
1569                             if (DEBUG)
1570                                 t.printStackTrace();
1571
1572                             if(throwable != null) {
1573                                 throwable.set(t);
1574                             } else {
1575                                 Logger.defaultLogError("Unhandled exception", t);
1576                             }
1577
1578                             try {
1579
1580                                 procedure.exception(newGraph, t);
1581
1582                             } catch (Throwable t2) {
1583
1584                                 if(throwable != null) {
1585                                     throwable.set(t2);
1586                                 } else {
1587                                     Logger.defaultLogError("Unhandled exception", t2);
1588                                 }
1589
1590                             }
1591
1592                         }
1593
1594 //                        newGraph.state.barrier.dec();
1595 //                        newGraph.waitAsync(request);
1596
1597                     }
1598
1599                 } finally {
1600
1601                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1602
1603                 }
1604
1605             }
1606
1607         });
1608
1609     }
1610
1611     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1612
1613         assert (request != null);
1614         assert (procedure != null);
1615
1616         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1617
1618         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1619
1620             @Override
1621             public void run(int thread) {
1622
1623                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1624
1625                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1626
1627                 try {
1628
1629                     if (listener != null) {
1630
1631                         try {
1632                                 QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure);
1633                                                         //newGraph.processor.query(newGraph, request, null, procedure, listener);
1634                                                 } catch (DatabaseException e) {
1635                                                         Logger.defaultLogError(e);
1636                                                 }
1637
1638                     } else {
1639
1640                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1641                                 procedure, "request");
1642
1643                         try {
1644
1645 //                            newGraph.state.barrier.inc();
1646
1647                             request.perform(newGraph, wrapper);
1648
1649 //                            newGraph.waitAsync(request);
1650
1651                         } catch (Throwable t) {
1652
1653                             wrapper.exception(newGraph, t);
1654 //                            newGraph.waitAsync(request);
1655
1656
1657                         }
1658
1659                     }
1660
1661                 } finally {
1662
1663                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1664
1665                 }
1666
1667             }
1668
1669         });
1670
1671     }
1672
1673     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1674
1675         assert (request != null);
1676         assert (procedure != null);
1677
1678         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1679
1680         int sync = notify != null ? thread : -1;
1681
1682         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1683
1684             @Override
1685             public void run(int thread) {
1686
1687                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1688
1689                 ListenerBase listener = getListenerBase(procedure);
1690
1691                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1692
1693                 try {
1694
1695                     if (listener != null) {
1696
1697                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1698
1699 //                        newGraph.waitAsync(request);
1700
1701                     } else {
1702
1703                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1704
1705                         try {
1706
1707                             request.perform(newGraph, wrapper);
1708
1709                         } catch (Throwable t) {
1710
1711                             t.printStackTrace();
1712
1713                         }
1714
1715                     }
1716
1717                 } finally {
1718
1719                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1720
1721                 }
1722
1723             }
1724
1725         });
1726
1727     }
1728
1729     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1730
1731         assert (request != null);
1732         assert (procedure != null);
1733
1734         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1735
1736         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1737
1738             @Override
1739             public void run(int thread) {
1740
1741                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1742
1743                 ListenerBase listener = getListenerBase(procedure);
1744
1745                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1746
1747                 try {
1748
1749                     if (listener != null) {
1750
1751                         newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1752
1753                             @Override
1754                             public void exception(Throwable t) {
1755                                 procedure.exception(t);
1756                                 if(throwable != null) {
1757                                     throwable.set(t);
1758                                 }
1759                             }
1760
1761                             @Override
1762                             public void execute(T t) {
1763                                 if(result != null) result.set(t);
1764                                 procedure.execute(t);
1765                             }
1766
1767                         }, listener);
1768
1769 //                        newGraph.waitAsync(request);
1770
1771                     } else {
1772
1773 //                        newGraph.state.barrier.inc();
1774
1775                         request.register(newGraph, new Listener<T>() {
1776
1777                             @Override
1778                             public void exception(Throwable t) {
1779                                 if(throwable != null) throwable.set(t);
1780                                 procedure.exception(t);
1781 //                                newGraph.state.barrier.dec();
1782                             }
1783
1784                             @Override
1785                             public void execute(T t) {
1786                                 if(result != null) result.set(t);
1787                                 procedure.execute(t);
1788 //                                newGraph.state.barrier.dec();
1789                             }
1790
1791                             @Override
1792                             public boolean isDisposed() {
1793                                 return true;
1794                             }
1795
1796                         });
1797
1798 //                        newGraph.waitAsync(request);
1799
1800                     }
1801
1802                 } finally {
1803
1804                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1805
1806                 }
1807
1808             }
1809
1810         });
1811
1812     }
1813
1814
1815     @Override
1816     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1817
1818         scheduleRequest(request, procedure, null, null, null);
1819
1820     }
1821
1822     @Override
1823     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1824         assertNotSession();
1825         Semaphore notify = new Semaphore(0);
1826         DataContainer<Throwable> container = new DataContainer<Throwable>();
1827         DataContainer<T> result = new DataContainer<T>();
1828         scheduleRequest(request, procedure, notify, container, result);
1829         acquire(notify, request);
1830         Throwable throwable = container.get();
1831         if(throwable != null) {
1832             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1833             else throw new DatabaseException("Unexpected exception", throwable);
1834         }
1835         return result.get();
1836     }
1837
1838     @Override
1839     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1840         assertNotSession();
1841         Semaphore notify = new Semaphore(0);
1842         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1843         final DataContainer<T> resultContainer = new DataContainer<T>();
1844         scheduleRequest(request, new AsyncProcedure<T>() {
1845             @Override
1846             public void exception(AsyncReadGraph graph, Throwable throwable) {
1847                 exceptionContainer.set(throwable);
1848                 procedure.exception(graph, throwable);
1849             }
1850             @Override
1851             public void execute(AsyncReadGraph graph, T result) {
1852                 resultContainer.set(result);
1853                 procedure.execute(graph, result);
1854             }
1855         }, getListenerBase(procedure), notify);
1856         acquire(notify, request, procedure);
1857         Throwable throwable = exceptionContainer.get();
1858         if (throwable != null) {
1859             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1860             else throw new DatabaseException("Unexpected exception", throwable);
1861         }
1862         return resultContainer.get();
1863     }
1864
1865
1866     @Override
1867     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1868         assertNotSession();
1869         Semaphore notify = new Semaphore(0);
1870         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1871         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1872             @Override
1873             public void exception(AsyncReadGraph graph, Throwable throwable) {
1874                 exceptionContainer.set(throwable);
1875                 procedure.exception(graph, throwable);
1876             }
1877             @Override
1878             public void execute(AsyncReadGraph graph, T result) {
1879                 procedure.execute(graph, result);
1880             }
1881             @Override
1882             public void finished(AsyncReadGraph graph) {
1883                 procedure.finished(graph);
1884             }
1885         }, notify);
1886         acquire(notify, request, procedure);
1887         Throwable throwable = exceptionContainer.get();
1888         if (throwable != null) {
1889             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1890             else throw new DatabaseException("Unexpected exception", throwable);
1891         }
1892         // TODO: implement return value
1893         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1894         return null;
1895     }
1896
1897     @Override
1898     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1899         return syncRequest(request, new ProcedureAdapter<T>());
1900
1901
1902 //        assert(request != null);
1903 //
1904 //        final DataContainer<T> result = new DataContainer<T>();
1905 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1906 //
1907 //        syncRequest(request, new Procedure<T>() {
1908 //
1909 //            @Override
1910 //            public void execute(T t) {
1911 //                result.set(t);
1912 //            }
1913 //
1914 //            @Override
1915 //            public void exception(Throwable t) {
1916 //                exception.set(t);
1917 //            }
1918 //
1919 //        });
1920 //
1921 //        Throwable t = exception.get();
1922 //        if(t != null) {
1923 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1924 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1925 //        }
1926 //
1927 //        return result.get();
1928
1929     }
1930
1931     @Override
1932     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1933         return syncRequest(request, (Procedure<T>)procedure);
1934     }
1935
1936
1937 //    @Override
1938 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1939 //        assertNotSession();
1940 //        Semaphore notify = new Semaphore(0);
1941 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1942 //        DataContainer<T> result = new DataContainer<T>();
1943 //        scheduleRequest(request, procedure, notify, container, result);
1944 //        acquire(notify, request);
1945 //        Throwable throwable = container.get();
1946 //        if(throwable != null) {
1947 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1948 //            else throw new DatabaseException("Unexpected exception", throwable);
1949 //        }
1950 //        return result.get();
1951 //    }
1952
1953
1954     @Override
1955     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1956         assertNotSession();
1957         Semaphore notify = new Semaphore(0);
1958         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1959         final DataContainer<T> result = new DataContainer<T>();
1960         scheduleRequest(request, procedure, notify, container, result);
1961         acquire(notify, request);
1962         Throwable throwable = container.get();
1963         if (throwable != null) {
1964             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1965             else throw new DatabaseException("Unexpected exception", throwable);
1966         }
1967         return result.get();
1968     }
1969
1970     @Override
1971     public void syncRequest(Write request) throws DatabaseException  {
1972         assertNotSession();
1973         assertAlive();
1974         Semaphore notify = new Semaphore(0);
1975         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1976         scheduleRequest(request, e -> exception.set(e), notify);
1977         acquire(notify, request);
1978         if(exception.get() != null) throw exception.get();
1979     }
1980
1981     @Override
1982     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1983         assertNotSession();
1984         Semaphore notify = new Semaphore(0);
1985         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1986         final DataContainer<T> result = new DataContainer<T>();
1987         scheduleRequest(request, new Procedure<T>() {
1988
1989             @Override
1990             public void exception(Throwable t) {
1991               exception.set(t);
1992             }
1993
1994             @Override
1995             public void execute(T t) {
1996               result.set(t);
1997             }
1998
1999         }, notify);
2000         acquire(notify, request);
2001         if(exception.get() != null) {
2002             Throwable t = exception.get();
2003             if(t instanceof DatabaseException) throw (DatabaseException)t;
2004             else throw new DatabaseException(t);
2005         }
2006         return result.get();
2007     }
2008
2009     @Override
2010     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2011         assertNotSession();
2012         Semaphore notify = new Semaphore(0);
2013         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2014         final DataContainer<T> result = new DataContainer<T>();
2015         scheduleRequest(request, new Procedure<T>() {
2016
2017             @Override
2018             public void exception(Throwable t) {
2019               exception.set(t);
2020             }
2021
2022             @Override
2023             public void execute(T t) {
2024               result.set(t);
2025             }
2026
2027         }, notify);
2028         acquire(notify, request);
2029         if(exception.get() != null) {
2030             Throwable t = exception.get();
2031             if(t instanceof DatabaseException) throw (DatabaseException)t;
2032             else throw new DatabaseException(t);
2033         }
2034         return result.get();
2035     }
2036
2037     @Override
2038     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2039         assertNotSession();
2040         Semaphore notify = new Semaphore(0);
2041         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2042         final DataContainer<T> result = new DataContainer<T>();
2043         scheduleRequest(request, new Procedure<T>() {
2044
2045             @Override
2046             public void exception(Throwable t) {
2047               exception.set(t);
2048             }
2049
2050             @Override
2051             public void execute(T t) {
2052               result.set(t);
2053             }
2054
2055         }, notify);
2056         acquire(notify, request);
2057         if(exception.get() != null) {
2058             Throwable t = exception.get();
2059             if(t instanceof DatabaseException) throw (DatabaseException)t;
2060             else throw new DatabaseException(t);
2061         }
2062         return result.get();
2063     }
2064
2065     @Override
2066     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2067         assertNotSession();
2068         Semaphore notify = new Semaphore(0);
2069         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2070         scheduleRequest(request, e -> exception.set(e), notify);
2071         acquire(notify, request);
2072         if(exception.get() != null) throw exception.get();
2073     }
2074
2075     @Override
2076     public void syncRequest(WriteOnly request) throws DatabaseException  {
2077         assertNotSession();
2078         assertAlive();
2079         Semaphore notify = new Semaphore(0);
2080         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2081         scheduleRequest(request, e -> exception.set(e), notify);
2082         acquire(notify, request);
2083         if(exception.get() != null) throw exception.get();
2084     }
2085
2086     /*
2087      *
2088      * GraphRequestProcessor interface
2089      */
2090
2091     @Override
2092     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2093
2094         scheduleRequest(request, procedure, null, null);
2095
2096     }
2097
2098     @Override
2099     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2100
2101         scheduleRequest(request, procedure, null);
2102
2103     }
2104
2105     @Override
2106     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2107
2108         scheduleRequest(request, procedure, null, null, null);
2109
2110     }
2111
2112     @Override
2113     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2114
2115         scheduleRequest(request, callback, null);
2116
2117     }
2118
2119     @Override
2120     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2121
2122         scheduleRequest(request, procedure, null);
2123
2124     }
2125
2126     @Override
2127     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2128
2129         scheduleRequest(request, procedure, null);
2130
2131     }
2132
2133     @Override
2134     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2135
2136         scheduleRequest(request, procedure, null);
2137
2138     }
2139
2140     @Override
2141     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2142
2143         scheduleRequest(request, callback, null);
2144
2145     }
2146
2147     @Override
2148     public void asyncRequest(final Write r) {
2149         asyncRequest(r, null);
2150     }
2151
2152     @Override
2153     public void asyncRequest(final DelayedWrite r) {
2154         asyncRequest(r, null);
2155     }
2156
2157     @Override
2158     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2159
2160         scheduleRequest(request, callback, null);
2161
2162     }
2163
2164     @Override
2165     public void asyncRequest(final WriteOnly request) {
2166
2167         asyncRequest(request, null);
2168
2169     }
2170
2171     @Override
2172     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2173         r.request(this, procedure);
2174     }
2175
2176     @Override
2177     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2178         r.request(this, procedure);
2179     }
2180
2181     @Override
2182     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2183         r.request(this, procedure);
2184     }
2185
2186     @Override
2187     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2188         r.request(this, procedure);
2189     }
2190
2191     @Override
2192     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2193         r.request(this, procedure);
2194     }
2195
2196     @Override
2197     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2198         r.request(this, procedure);
2199     }
2200
2201     @Override
2202     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2203         return r.request(this);
2204     }
2205
2206     @Override
2207     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2208         return r.request(this);
2209     }
2210
2211     @Override
2212     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2213         r.request(this, procedure);
2214     }
2215
2216     @Override
2217     public <T> void async(WriteInterface<T> r) {
2218         r.request(this, new ProcedureAdapter<T>());
2219     }
2220
2221     //@Override
2222     public void incAsync() {
2223         state.incAsync();
2224     }
2225
2226     //@Override
2227     public void decAsync() {
2228         state.decAsync();
2229     }
2230
2231     public long getCluster(ResourceImpl resource) {
2232         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2233         return cluster.getClusterId();
2234     }
2235
2236     public long getCluster(int id) {
2237         if (clusterTable == null)
2238             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2239         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2240     }
2241
2242     public ResourceImpl getResource(int id) {
2243         return new ResourceImpl(resourceSupport, id);
2244     }
2245
2246     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2247         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2248         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2249         int key = proxy.getClusterKey();
2250         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2251         return new ResourceImpl(resourceSupport, resourceKey);
2252     }
2253
2254     public ResourceImpl getResource2(int id) {
2255         assert (id != 0);
2256         return new ResourceImpl(resourceSupport, id);
2257     }
2258
2259     final public int getId(ResourceImpl impl) {
2260         return impl.id;
2261     }
2262
2263     public static final Charset UTF8 = Charset.forName("utf-8");
2264
2265
2266
2267
2268     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2269         for(TransientGraph g : support.providers) {
2270             if(g.isPending(subject)) return false;
2271         }
2272         return true;
2273     }
2274
2275     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2276         for(TransientGraph g : support.providers) {
2277             if(g.isPending(subject, predicate)) return false;
2278         }
2279         return true;
2280     }
2281
2282     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2283
2284         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2285
2286             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2287
2288             @Override
2289             public void accept(ReadGraphImpl graph) {
2290                 if(ready.decrementAndGet() == 0) {
2291                     runnable.accept(graph);
2292                 }
2293             }
2294
2295         };
2296
2297         for(TransientGraph g : support.providers) {
2298             if(g.isPending(subject)) {
2299                 try {
2300                     g.load(graph, subject, composite);
2301                 } catch (DatabaseException e) {
2302                     e.printStackTrace();
2303                 }
2304             } else {
2305                 composite.accept(graph);
2306             }
2307         }
2308
2309         composite.accept(graph);
2310
2311     }
2312
2313     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2314
2315         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2316
2317             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2318
2319             @Override
2320             public void accept(ReadGraphImpl graph) {
2321                 if(ready.decrementAndGet() == 0) {
2322                     runnable.accept(graph);
2323                 }
2324             }
2325
2326         };
2327
2328         for(TransientGraph g : support.providers) {
2329             if(g.isPending(subject, predicate)) {
2330                 try {
2331                     g.load(graph, subject, predicate, composite);
2332                 } catch (DatabaseException e) {
2333                     e.printStackTrace();
2334                 }
2335             } else {
2336                 composite.accept(graph);
2337             }
2338         }
2339
2340         composite.accept(graph);
2341
2342     }
2343
2344 //    void dumpHeap() {
2345 //
2346 //        try {
2347 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2348 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2349 //                    "d:/heap" + flushCounter + ".txt", true);
2350 //        } catch (IOException e) {
2351 //            e.printStackTrace();
2352 //        }
2353 //
2354 //    }
2355
2356     void fireReactionsToSynchronize(ChangeSet cs) {
2357
2358         // Do not fire empty events
2359         if (cs.isEmpty())
2360             return;
2361
2362         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2363 //        g.state.barrier.inc();
2364
2365         try {
2366
2367             if (!cs.isEmpty()) {
2368                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2369                 for (ChangeListener l : changeListeners2) {
2370                     try {
2371                         l.graphChanged(e2);
2372                     } catch (Exception ex) {
2373                         ex.printStackTrace();
2374                     }
2375                 }
2376             }
2377
2378         } finally {
2379
2380 //            g.state.barrier.dec();
2381 //            g.waitAsync(null);
2382
2383         }
2384
2385     }
2386
2387     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2388         try {
2389
2390             // Do not fire empty events
2391             if (cs2.isEmpty())
2392                 return;
2393
2394 //            graph.restart();
2395 //            graph.state.barrier.inc();
2396
2397             try {
2398
2399                 if (!cs2.isEmpty()) {
2400
2401                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2402                     for (ChangeListener l : changeListeners2) {
2403                         try {
2404                             l.graphChanged(e2);
2405 //                            System.out.println("changelistener " + l);
2406                         } catch (Exception ex) {
2407                             ex.printStackTrace();
2408                         }
2409                     }
2410
2411                 }
2412
2413             } finally {
2414
2415 //                graph.state.barrier.dec();
2416 //                graph.waitAsync(null);
2417
2418             }
2419         } catch (Throwable t) {
2420             t.printStackTrace();
2421
2422         }
2423     }
2424     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2425
2426         try {
2427
2428             // Do not fire empty events
2429             if (cs2.isEmpty())
2430                 return;
2431
2432             // Do not fire on virtual requests
2433             if(graph.getProvider() != null)
2434                 return;
2435
2436             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2437
2438             try {
2439
2440                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2441                 for (ChangeListener l : metadataListeners) {
2442                     try {
2443                         l.graphChanged(e2);
2444                     } catch (Throwable ex) {
2445                         ex.printStackTrace();
2446                     }
2447                 }
2448
2449             } finally {
2450
2451             }
2452
2453         } catch (Throwable t) {
2454             t.printStackTrace();
2455         }
2456
2457     }
2458
2459
2460     /*
2461      * (non-Javadoc)
2462      *
2463      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2464      */
2465     @Override
2466     public <T> T getService(Class<T> api) {
2467         T t = peekService(api);
2468         if (t == null) {
2469             if (state.isClosed())
2470                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2471             throw new ServiceNotFoundException(this, api);
2472         }
2473         return t;
2474     }
2475
2476     /**
2477      * @return
2478      */
2479     protected abstract ServerInformation getCachedServerInformation();
2480
2481         private Class<?> serviceKey1 = null;
2482         private Class<?> serviceKey2 = null;
2483         private Object service1 = null;
2484         private Object service2 = null;
2485
2486     /*
2487      * (non-Javadoc)
2488      *
2489      * @see
2490      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2491      */
2492     @SuppressWarnings("unchecked")
2493     @Override
2494     public synchronized <T> T peekService(Class<T> api) {
2495
2496                 if(serviceKey1 == api) {
2497                         return (T)service1;
2498                 } else if (serviceKey2 == api) {
2499                         // Promote this key
2500                         Object result = service2;
2501                         service2 = service1;
2502                         serviceKey2 = serviceKey1;
2503                         service1 = result;
2504                         serviceKey1 = api;
2505                         return (T)result;
2506                 }
2507
2508         if (Layer0.class == api)
2509                 return (T) L0;
2510         if (ServerInformation.class == api)
2511             return (T) getCachedServerInformation();
2512         else if (WriteGraphImpl.class == api)
2513             return (T) writeState.getGraph();
2514         else if (ClusterBuilder.class == api)
2515             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2516         else if (ClusterBuilderFactory.class == api)
2517             return (T)new ClusterBuilderFactoryImpl(this);
2518
2519                 service2 = service1;
2520                 serviceKey2 = serviceKey1;
2521
2522         service1 = serviceLocator.peekService(api);
2523         serviceKey1 = api;
2524
2525         return (T)service1;
2526
2527     }
2528
2529     /*
2530      * (non-Javadoc)
2531      *
2532      * @see
2533      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2534      */
2535     @Override
2536     public boolean hasService(Class<?> api) {
2537         return serviceLocator.hasService(api);
2538     }
2539
2540     /**
2541      * @param api the api that must be implemented by the specified service
2542      * @param service the service implementation
2543      */
2544     @Override
2545     public <T> void registerService(Class<T> api, T service) {
2546         if(Layer0.class == api) {
2547                 L0 = (Layer0)service;
2548                 return;
2549         }
2550         serviceLocator.registerService(api, service);
2551         if (TransactionPolicySupport.class == api) {
2552             transactionPolicy = (TransactionPolicySupport)service;
2553             state.resetTransactionPolicy();
2554         }
2555         if (api == serviceKey1)
2556                 service1 = service;
2557         else if (api == serviceKey2)
2558                 service2 = service;
2559     }
2560
2561     // ----------------
2562     // SessionMonitor
2563     // ----------------
2564
2565     void fireSessionVariableChange(String variable) {
2566         for (MonitorHandler h : monitorHandlers) {
2567             MonitorContext ctx = monitorContexts.getLeft(h);
2568             assert ctx != null;
2569             // SafeRunner functionality repeated here to avoid dependency.
2570             try {
2571                 h.valuesChanged(ctx);
2572             } catch (Exception e) {
2573                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2574             } catch (LinkageError e) {
2575                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2576             }
2577         }
2578     }
2579
2580
2581     class ResourceSerializerImpl implements ResourceSerializer {
2582
2583         public long createRandomAccessId(int id) throws DatabaseException {
2584             if(id < 0)
2585                 return id;
2586             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2587             long cluster = getCluster(id);
2588             if (0 == cluster)
2589                 return 0; // Better to return 0 then invalid id.
2590             long result = ClusterTraitsBase.createResourceId(cluster, index);
2591             return result;
2592         }
2593
2594         @Override
2595         public long getRandomAccessId(Resource resource) throws DatabaseException {
2596
2597             ResourceImpl resourceImpl = (ResourceImpl) resource;
2598             return createRandomAccessId(resourceImpl.id);
2599
2600         }
2601
2602         @Override
2603         public int getTransientId(Resource resource) throws DatabaseException {
2604
2605             ResourceImpl resourceImpl = (ResourceImpl) resource;
2606             return resourceImpl.id;
2607
2608         }
2609
2610         @Override
2611         public String createRandomAccessId(Resource resource)
2612         throws InvalidResourceReferenceException {
2613
2614             if(resource == null) throw new IllegalArgumentException();
2615
2616             ResourceImpl resourceImpl = (ResourceImpl) resource;
2617
2618             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2619
2620             int r;
2621             try {
2622                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2623             } catch (DatabaseException e1) {
2624                 throw new InvalidResourceReferenceException(e1);
2625             }
2626             try {
2627                 // Serialize as '<resource index>_<cluster id>'
2628                 return "" + r + "_" + getCluster(resourceImpl);
2629             } catch (Throwable e) {
2630                 e.printStackTrace();
2631                 throw new InvalidResourceReferenceException(e);
2632             } finally {
2633             }
2634         }
2635
2636         public int getTransientId(long serialized) throws DatabaseException {
2637             if (serialized <= 0)
2638                 return (int)serialized;
2639             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2640             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2641             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2642             if (cluster == null)
2643                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2644             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2645             return key;
2646         }
2647
2648         @Override
2649         public Resource getResource(long randomAccessId) throws DatabaseException {
2650             return getResourceByKey(getTransientId(randomAccessId));
2651         }
2652
2653         @Override
2654         public Resource getResource(int transientId) throws DatabaseException {
2655             return getResourceByKey(transientId);
2656         }
2657
2658         @Override
2659         public Resource getResource(String randomAccessId)
2660         throws InvalidResourceReferenceException {
2661             try {
2662                 int i = randomAccessId.indexOf('_');
2663                 if (i == -1)
2664                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2665                             + randomAccessId + "'");
2666                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2667                 if(r < 0) return getResourceByKey(r);
2668                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2669                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2670                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2671                 if (cluster.hasResource(key, clusterTranslator))
2672                     return getResourceByKey(key);
2673             } catch (InvalidResourceReferenceException e) {
2674                 throw e;
2675             } catch (NumberFormatException e) {
2676                 throw new InvalidResourceReferenceException(e);
2677             } catch (Throwable e) {
2678                 e.printStackTrace();
2679                 throw new InvalidResourceReferenceException(e);
2680             } finally {
2681             }
2682             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2683         }
2684
2685         @Override
2686         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2687             try {
2688                 return true;
2689             } catch (Throwable e) {
2690                 e.printStackTrace();
2691                 throw new InvalidResourceReferenceException(e);
2692             } finally {
2693             }
2694         }
2695     }
2696
2697     // Copied from old SessionImpl
2698
2699     void check() {
2700         if (state.isClosed())
2701             throw new Error("Session closed.");
2702     }
2703
2704
2705
2706     public ResourceImpl getNewResource(long clusterId) {
2707         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2708         int newId;
2709         try {
2710             newId = cluster.createResource(clusterTranslator);
2711         } catch (DatabaseException e) {
2712             Logger.defaultLogError(e);
2713             return null;
2714         }
2715         return new ResourceImpl(resourceSupport, newId);
2716     }
2717
2718     public ResourceImpl getNewResource(Resource clusterSet)
2719     throws DatabaseException {
2720         long resourceId = clusterSet.getResourceId();
2721         Long clusterId = clusterSetsSupport.get(resourceId);
2722         if (null == clusterId)
2723             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2724         if (Constants.NewClusterId == clusterId) {
2725             clusterId = getService(ClusteringSupport.class).createCluster();
2726             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2727                 createdClusters.add(clusterId);
2728             clusterSetsSupport.put(resourceId, clusterId);
2729             return getNewResource(clusterId);
2730         } else {
2731             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2732             ResourceImpl result;
2733             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2734                 clusterId = getService(ClusteringSupport.class).createCluster();
2735                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2736                     createdClusters.add(clusterId);
2737                 clusterSetsSupport.put(resourceId, clusterId);
2738                 return getNewResource(clusterId);
2739             } else {
2740                 result = getNewResource(clusterId);
2741                 int resultKey = querySupport.getId(result);
2742                 long resultCluster = querySupport.getClusterId(resultKey);
2743                 if (clusterId != resultCluster)
2744                     clusterSetsSupport.put(resourceId, resultCluster);
2745                 return result;
2746             }
2747         }
2748     }
2749
2750     public void getNewClusterSet(Resource clusterSet)
2751     throws DatabaseException {
2752         if (DEBUG)
2753             System.out.println("new cluster set=" + clusterSet);
2754         long resourceId = clusterSet.getResourceId();
2755         if (clusterSetsSupport.containsKey(resourceId))
2756             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2757         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2758     }
2759     public boolean containsClusterSet(Resource clusterSet)
2760     throws ServiceException {
2761         long resourceId = clusterSet.getResourceId();
2762         return clusterSetsSupport.containsKey(resourceId);
2763     }
2764     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2765         Resource r = defaultClusterSet;
2766         defaultClusterSet = clusterSet;
2767         return r;
2768     }
2769     void printDiagnostics() {
2770
2771         if (DIAGNOSTICS) {
2772
2773             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2774             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2775             for(ClusterI cluster : clusterTable.getClusters()) {
2776                 try {
2777                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2778                         residentClusters.set(residentClusters.get() + 1);
2779                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2780                     }
2781                 } catch (DatabaseException e) {
2782                     Logger.defaultLogError(e);
2783                 }
2784             }
2785
2786             System.out.println("--------------------------------");
2787             System.out.println("Cluster information:");
2788             System.out.println("-amount of resident clusters=" + residentClusters.get());
2789             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2790
2791             for(ClusterI cluster : clusterTable.getClusters()) {
2792                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2793                 try {
2794                     if (!cluster.isLoaded())
2795                         System.out.println("not loaded]");
2796                     else if (cluster.isEmpty())
2797                         System.out.println("is empty]");
2798                     else {
2799                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2800                         System.out.print(",references=" + cluster.getReferenceCount());
2801                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2802                     }
2803                 } catch (DatabaseException e) {
2804                     Logger.defaultLogError(e);
2805                     System.out.println("is corrupted]");
2806                 }
2807             }
2808
2809             queryProvider2.printDiagnostics();
2810             System.out.println("--------------------------------");
2811         }
2812
2813     }
2814
2815     public void fireStartReadTransaction() {
2816
2817         if (DIAGNOSTICS)
2818             System.out.println("StartReadTransaction");
2819
2820         for (SessionEventListener listener : eventListeners) {
2821             try {
2822                 listener.readTransactionStarted();
2823             } catch (Throwable t) {
2824                 t.printStackTrace();
2825             }
2826         }
2827
2828     }
2829
2830     public void fireFinishReadTransaction() {
2831
2832         if (DIAGNOSTICS)
2833             System.out.println("FinishReadTransaction");
2834
2835         for (SessionEventListener listener : eventListeners) {
2836             try {
2837                 listener.readTransactionFinished();
2838             } catch (Throwable t) {
2839                 t.printStackTrace();
2840             }
2841         }
2842
2843     }
2844
2845     public void fireStartWriteTransaction() {
2846
2847         if (DIAGNOSTICS)
2848             System.out.println("StartWriteTransaction");
2849
2850         for (SessionEventListener listener : eventListeners) {
2851             try {
2852                 listener.writeTransactionStarted();
2853             } catch (Throwable t) {
2854                 t.printStackTrace();
2855             }
2856         }
2857
2858     }
2859
2860     public void fireFinishWriteTransaction() {
2861
2862         if (DIAGNOSTICS)
2863             System.out.println("FinishWriteTransaction");
2864
2865         for (SessionEventListener listener : eventListeners) {
2866             try {
2867                 listener.writeTransactionFinished();
2868             } catch (Throwable t) {
2869                 t.printStackTrace();
2870             }
2871         }
2872
2873         Indexing.resetDependenciesIndexingDisabled();
2874
2875     }
2876
2877     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2878         throw new Error("Not supported at the moment.");
2879     }
2880
2881     State getState() {
2882         return state;
2883     }
2884
2885     ClusterTable getClusterTable() {
2886         return clusterTable;
2887     }
2888
2889     public GraphSession getGraphSession() {
2890         return graphSession;
2891     }
2892
2893     public QueryProcessor getQueryProvider2() {
2894         return queryProvider2;
2895     }
2896
2897     ClientChangesImpl getClientChanges() {
2898         return clientChanges;
2899     }
2900
2901     boolean getWriteOnly() {
2902         return writeOnly;
2903     }
2904
2905     static int counter = 0;
2906
2907     public void onClusterLoaded(long clusterId) {
2908
2909         clusterTable.updateSize();
2910
2911     }
2912
2913
2914
2915
2916     /*
2917      * Implementation of the interface RequestProcessor
2918      */
2919
2920     @Override
2921     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2922
2923         assertNotSession();
2924         assertAlive();
2925
2926         assert(request != null);
2927
2928         final DataContainer<T> result = new DataContainer<T>();
2929         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2930
2931         syncRequest(request, new AsyncProcedure<T>() {
2932
2933             @Override
2934             public void execute(AsyncReadGraph graph, T t) {
2935                 result.set(t);
2936             }
2937
2938             @Override
2939             public void exception(AsyncReadGraph graph, Throwable t) {
2940                 exception.set(t);
2941             }
2942
2943         });
2944
2945         Throwable t = exception.get();
2946         if(t != null) {
2947             if(t instanceof DatabaseException) throw (DatabaseException)t;
2948             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2949         }
2950
2951         return result.get();
2952
2953     }
2954
2955 //    @Override
2956 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2957 //        assertNotSession();
2958 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2959 //    }
2960
2961     @Override
2962     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2963         assertNotSession();
2964         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2965     }
2966
2967     @Override
2968     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2969         assertNotSession();
2970         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2971     }
2972
2973     @Override
2974     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2975         assertNotSession();
2976         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2977     }
2978
2979     @Override
2980     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2981         assertNotSession();
2982         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2983     }
2984
2985     @Override
2986     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2987
2988         assertNotSession();
2989
2990         assert(request != null);
2991
2992         final DataContainer<T> result = new DataContainer<T>();
2993         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2994
2995         syncRequest(request, new AsyncProcedure<T>() {
2996
2997             @Override
2998             public void execute(AsyncReadGraph graph, T t) {
2999                 result.set(t);
3000             }
3001
3002             @Override
3003             public void exception(AsyncReadGraph graph, Throwable t) {
3004                 exception.set(t);
3005             }
3006
3007         });
3008
3009         Throwable t = exception.get();
3010         if(t != null) {
3011             if(t instanceof DatabaseException) throw (DatabaseException)t;
3012             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3013         }
3014
3015         return result.get();
3016
3017     }
3018
3019     @Override
3020     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3021         assertNotSession();
3022         return syncRequest(request, (AsyncProcedure<T>)procedure);
3023     }
3024
3025     @Override
3026     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3027         assertNotSession();
3028         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3029     }
3030
3031     @Override
3032     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3033         assertNotSession();
3034         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3035     }
3036
3037     @Override
3038     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3039         assertNotSession();
3040         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3041     }
3042
3043     @Override
3044     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3045         assertNotSession();
3046         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3047     }
3048
3049     @Override
3050     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3051
3052         assertNotSession();
3053
3054         assert(request != null);
3055
3056         final ArrayList<T> result = new ArrayList<T>();
3057         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3058
3059         syncRequest(request, new AsyncMultiProcedure<T>() {
3060
3061             @Override
3062             public void execute(AsyncReadGraph graph, T t) {
3063                 synchronized(result) {
3064                     result.add(t);
3065                 }
3066             }
3067
3068             @Override
3069             public void finished(AsyncReadGraph graph) {
3070             }
3071
3072             @Override
3073             public void exception(AsyncReadGraph graph, Throwable t) {
3074                 exception.set(t);
3075             }
3076
3077
3078         });
3079
3080         Throwable t = exception.get();
3081         if(t != null) {
3082             if(t instanceof DatabaseException) throw (DatabaseException)t;
3083             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3084         }
3085
3086         return result;
3087
3088     }
3089
3090     @Override
3091     public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3092         assertNotSession();
3093         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3094     }
3095
3096     @Override
3097     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3098         assertNotSession();
3099         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3100     }
3101
3102     @Override
3103     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3104         assertNotSession();
3105         return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3106     }
3107
3108     @Override
3109     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3110         assertNotSession();
3111         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3112     }
3113
3114     @Override
3115     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3116         assertNotSession();
3117         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3118     }
3119
3120     @Override
3121     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3122
3123         assertNotSession();
3124
3125         assert(request != null);
3126
3127         final ArrayList<T> result = new ArrayList<T>();
3128         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3129
3130         syncRequest(request, new AsyncMultiProcedure<T>() {
3131
3132             @Override
3133             public void execute(AsyncReadGraph graph, T t) {
3134                 synchronized(result) {
3135                     result.add(t);
3136                 }
3137             }
3138
3139             @Override
3140             public void finished(AsyncReadGraph graph) {
3141             }
3142
3143             @Override
3144             public void exception(AsyncReadGraph graph, Throwable t) {
3145                 exception.set(t);
3146             }
3147
3148
3149         });
3150
3151         Throwable t = exception.get();
3152         if(t != null) {
3153             if(t instanceof DatabaseException) throw (DatabaseException)t;
3154             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3155         }
3156
3157         return result;
3158
3159     }
3160
3161     @Override
3162     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3163         assertNotSession();
3164         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3165     }
3166
3167     @Override
3168     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3169         assertNotSession();
3170         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3171     }
3172
3173     @Override
3174     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3175         assertNotSession();
3176        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3177     }
3178
3179     @Override
3180     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3181         assertNotSession();
3182         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3183     }
3184
3185     @Override
3186     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3187         assertNotSession();
3188         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3189     }
3190
3191
3192     @Override
3193     public <T> void asyncRequest(Read<T> request) {
3194
3195         asyncRequest(request, new ProcedureAdapter<T>() {
3196             @Override
3197             public void exception(Throwable t) {
3198                 t.printStackTrace();
3199             }
3200         });
3201
3202     }
3203
3204     @Override
3205     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3206         asyncRequest(request, (AsyncProcedure<T>)procedure);
3207     }
3208
3209     @Override
3210     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3211         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3212     }
3213
3214     @Override
3215     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3216         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3217     }
3218
3219     @Override
3220     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3221         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3222     }
3223
3224     @Override
3225     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3226         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3227     }
3228
3229     @Override
3230     final public <T> void asyncRequest(final AsyncRead<T> request) {
3231
3232         assert(request != null);
3233
3234         asyncRequest(request, new ProcedureAdapter<T>() {
3235             @Override
3236             public void exception(Throwable t) {
3237                 t.printStackTrace();
3238             }
3239         });
3240
3241     }
3242
3243     @Override
3244     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3245         scheduleRequest(request, procedure, procedure, null);
3246     }
3247
3248     @Override
3249     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3250         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3251     }
3252
3253     @Override
3254     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3255         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3256     }
3257
3258     @Override
3259     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3260         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3261     }
3262
3263     @Override
3264     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3265         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3266     }
3267
3268     @Override
3269     public <T> void asyncRequest(MultiRead<T> request) {
3270
3271         assert(request != null);
3272
3273         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3274             @Override
3275             public void exception(AsyncReadGraph graph, Throwable t) {
3276                 t.printStackTrace();
3277             }
3278         });
3279
3280     }
3281
3282     @Override
3283     public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3284         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3285     }
3286
3287     @Override
3288     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3289         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3290     }
3291
3292     @Override
3293     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3294         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3295     }
3296
3297     @Override
3298     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3299         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3300     }
3301
3302     @Override
3303     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3304         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3305     }
3306
3307     @Override
3308     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3309
3310         assert(request != null);
3311
3312         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3313             @Override
3314             public void exception(AsyncReadGraph graph, Throwable t) {
3315                 t.printStackTrace();
3316             }
3317         });
3318
3319     }
3320
3321     @Override
3322     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3323         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3324     }
3325
3326     @Override
3327     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3328         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3329     }
3330
3331     @Override
3332     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3333         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3334     }
3335
3336     @Override
3337     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3338         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3339     }
3340
3341     @Override
3342     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3343         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3344     }
3345
3346     @Override
3347     public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3348         assertNotSession();
3349         throw new Error("Not implemented!");
3350     }
3351
3352     @Override
3353     public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3354         throw new Error("Not implemented!");
3355     }
3356
3357     @Override
3358     final public <T> void asyncRequest(final ExternalRead<T> request) {
3359
3360         assert(request != null);
3361
3362         asyncRequest(request, new ProcedureAdapter<T>() {
3363             @Override
3364             public void exception(Throwable t) {
3365                 t.printStackTrace();
3366             }
3367         });
3368
3369     }
3370
3371     @Override
3372     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3373         asyncRequest(request, (Procedure<T>)procedure);
3374     }
3375
3376
3377
3378     void check(Throwable t) throws DatabaseException {
3379         if(t != null) {
3380             if(t instanceof DatabaseException) throw (DatabaseException)t;
3381             else throw new DatabaseException("Unexpected exception", t);
3382         }
3383     }
3384
3385     void check(DataContainer<Throwable> container) throws DatabaseException {
3386         Throwable t = container.get();
3387         if(t != null) {
3388             if(t instanceof DatabaseException) throw (DatabaseException)t;
3389             else throw new DatabaseException("Unexpected exception", t);
3390         }
3391     }
3392
3393
3394
3395
3396
3397
3398     boolean sameProvider(Write request) {
3399         if(writeState.getGraph().provider != null) {
3400             return writeState.getGraph().provider.equals(request.getProvider());
3401         } else {
3402             return request.getProvider() == null;
3403         }
3404     }
3405
3406     boolean plainWrite(WriteGraphImpl graph) {
3407         if(graph == null) return false;
3408         if(graph.writeSupport.writeOnly()) return false;
3409         return true;
3410     }
3411
3412
3413     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3414     private void assertNotSession() throws DatabaseException {
3415         Thread current = Thread.currentThread();
3416         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3417     }
3418
3419     void assertAlive() {
3420         if (!state.isAlive())
3421             throw new RuntimeDatabaseException("Session has been shut down.");
3422     }
3423
3424     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3425         return querySupport.getValueStream(graph, querySupport.getId(resource));
3426     }
3427
3428     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3429         return querySupport.getValue(graph, querySupport.getId(resource));
3430     }
3431
3432     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3433         acquire(semaphore, request, null);
3434     }
3435
3436     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3437         assertAlive();
3438         try {
3439             long tryCount = 0;
3440             // Loop until the semaphore is acquired reporting requests that take a long time.
3441             while (true) {
3442
3443                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3444                     // Acquired OK.
3445                     return;
3446                 } else {
3447                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3448                         ++tryCount;
3449                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3450                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3451                                 * tryCount;
3452                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3453                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3454                     }
3455                 }
3456
3457                 assertAlive();
3458
3459             }
3460         } catch (InterruptedException e) {
3461             e.printStackTrace();
3462             // FIXME: Should perhaps do something else in this case ??
3463         }
3464     }
3465
3466
3467
3468 //    @Override
3469 //    public boolean holdOnToTransactionAfterCancel() {
3470 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3471 //    }
3472 //
3473 //    @Override
3474 //    public boolean holdOnToTransactionAfterCommit() {
3475 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3476 //    }
3477 //
3478 //    @Override
3479 //    public boolean holdOnToTransactionAfterRead() {
3480 //        return transactionPolicy.holdOnToTransactionAfterRead();
3481 //    }
3482 //
3483 //    @Override
3484 //    public void onRelinquish() {
3485 //        transactionPolicy.onRelinquish();
3486 //    }
3487 //
3488 //    @Override
3489 //    public void onRelinquishDone() {
3490 //        transactionPolicy.onRelinquishDone();
3491 //    }
3492 //
3493 //    @Override
3494 //    public void onRelinquishError() {
3495 //        transactionPolicy.onRelinquishError();
3496 //    }
3497
3498
3499     @Override
3500     public Session getSession() {
3501         return this;
3502     }
3503
3504
3505     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3506     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3507
3508     public void ceased(int thread) {
3509
3510         requestManager.ceased(thread);
3511
3512     }
3513
3514     public int getAmountOfQueryThreads() {
3515         // This must be a power of two
3516         return 4;
3517 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3518     }
3519
3520     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3521
3522         return new ResourceImpl(resourceSupport, key);
3523
3524     }
3525
3526     public void acquireWriteOnly() {
3527         writeOnly = true;
3528     }
3529
3530     public void releaseWriteOnly(ReadGraphImpl graph) {
3531         writeOnly = false;
3532         queryProvider2.releaseWrite(graph);
3533     }
3534
3535     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3536
3537         long start = System.nanoTime();
3538
3539         while(dirtyPrimitives) {
3540             dirtyPrimitives = false;
3541             getQueryProvider2().performDirtyUpdates(writer);
3542             getQueryProvider2().performScheduledUpdates(writer);
3543         }
3544
3545         fireMetadataListeners(writer, clientChanges);
3546
3547         if(DebugPolicy.PERFORMANCE_DATA) {
3548             long end = System.nanoTime();
3549             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3550         }
3551
3552     }
3553
3554     void removeTemporaryData() {
3555         File platform = Platform.getLocation().toFile();
3556         File tempFiles = new File(platform, "tempFiles");
3557         File temp = new File(tempFiles, "db");
3558         if(!temp.exists()) 
3559             return;
3560         try {
3561             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3562                 @Override
3563                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3564                     try {
3565                         Files.delete(file);
3566                     } catch (IOException e) {
3567                         Logger.defaultLogError(e);
3568                     }
3569                     return FileVisitResult.CONTINUE;
3570                 }
3571             });
3572         } catch (IOException e) {
3573             Logger.defaultLogError(e);
3574         }
3575     }
3576
3577     public void handleCreatedClusters() {
3578         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3579             createdClusters.forEach(new TLongProcedure() {
3580
3581                 @Override
3582                 public boolean execute(long value) {
3583                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3584                     cluster.setImmutable(true, clusterTranslator);
3585                     return true;
3586                 }
3587             });
3588         }
3589         createdClusters.clear();
3590     }
3591
3592     @Override
3593     public Object getModificationCounter() {
3594         return queryProvider2.modificationCounter;
3595     }
3596     
3597     @Override
3598     public void markUndoPoint() {
3599         state.setCombine(false);
3600     }
3601
3602 }