]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
91faecfb752986a6eb1694f4d70e2b671271b5f8
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.GraphChangeListenerSupport;
147 import org.simantics.db.service.InitSupport;
148 import org.simantics.db.service.LifecycleSupport;
149 import org.simantics.db.service.ManagementSupport;
150 import org.simantics.db.service.QueryControl;
151 import org.simantics.db.service.SerialisationSupport;
152 import org.simantics.db.service.ServerInformation;
153 import org.simantics.db.service.ServiceActivityMonitor;
154 import org.simantics.db.service.SessionEventSupport;
155 import org.simantics.db.service.SessionMonitorSupport;
156 import org.simantics.db.service.SessionUserSupport;
157 import org.simantics.db.service.StatementSupport;
158 import org.simantics.db.service.TransactionPolicySupport;
159 import org.simantics.db.service.TransactionSupport;
160 import org.simantics.db.service.TransferableGraphSupport;
161 import org.simantics.db.service.UndoRedoSupport;
162 import org.simantics.db.service.VirtualGraphSupport;
163 import org.simantics.db.service.XSupport;
164 import org.simantics.layer0.Layer0;
165 import org.simantics.utils.DataContainer;
166 import org.simantics.utils.Development;
167 import org.simantics.utils.threads.logger.ITask;
168 import org.simantics.utils.threads.logger.ThreadLogger;
169
170 import gnu.trove.procedure.TLongProcedure;
171 import gnu.trove.set.hash.TLongHashSet;
172
173
174 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
175
176     protected static final boolean DEBUG        = false;
177
178     private static final boolean DIAGNOSTICS       = false;
179
180     private TransactionPolicySupport transactionPolicy;
181
182     final private ClusterControl clusterControl;
183
184     final protected BuiltinSupportImpl builtinSupport;
185     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
186     final protected ClusterSetsSupport clusterSetsSupport;
187     final protected LifecycleSupportImpl lifecycleSupport;
188
189     protected QuerySupportImpl querySupport;
190     protected ResourceSupportImpl resourceSupport;
191     protected WriteSupport writeSupport;
192     public ClusterTranslator clusterTranslator;
193
194     boolean dirtyPrimitives = false;
195
196     public static final int SERVICE_MODE_CREATE = 2;
197     public static final int SERVICE_MODE_ALLOW = 1;
198     public int serviceMode = 0;
199     public boolean createdImmutableClusters = false;
200     public TLongHashSet createdClusters = new TLongHashSet();
201
202     private Layer0 L0;
203
204     /**
205      * The service locator maintained by the workbench. These services are
206      * initialized during workbench during the <code>init</code> method.
207      */
208     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
209
210     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
211
212     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
213
214     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
215
216     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
217
218     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
219
220     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
221
222     final protected State                                        state              = new State();
223
224     protected GraphSession                                       graphSession       = null;
225
226     protected SessionManager                                     sessionManagerImpl = null;
227
228     protected UserAuthenticationAgent                            authAgent          = null;
229
230     protected UserAuthenticator                                  authenticator      = null;
231
232     protected Resource                                           user               = null;
233
234     protected ClusterStream                                      clusterStream      = null;
235
236     protected SessionRequestManager                         requestManager = null;
237
238     public ClusterTable                                       clusterTable       = null;
239
240     public QueryProcessor                                     queryProvider2 = null;
241
242     ClientChangesImpl                                  clientChanges      = null;
243
244     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
245
246 //    protected long                                               newClusterId       = Constants.NullClusterId;
247
248     protected int                                                flushCounter       = 0;
249
250     protected boolean                                            writeOnly          = false;
251
252     WriteState<?>                                                writeState = null;
253     WriteStateBase<?>                                            delayedWriteState = null;
254     protected Resource defaultClusterSet = null; // If not null then used for newResource().
255
256     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
257
258 //        if (authAgent == null)
259 //            throw new IllegalArgumentException("null authentication agent");
260
261         File t = StaticSessionProperties.virtualGraphStoragePath;
262         if (null == t)
263             t = new File(".");
264         this.clusterTable = new ClusterTable(this, t);
265         this.builtinSupport = new BuiltinSupportImpl(this);
266         this.sessionManagerImpl = sessionManagerImpl;
267         this.user = null;
268 //        this.authAgent = authAgent;
269
270         serviceLocator.registerService(Session.class, this);
271         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
272         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
273         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
274         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
275         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
276         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
277         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
278         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
279         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
280         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
281         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
282         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
283         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
284         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
285         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
286         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
287         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
288         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
289         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
290         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
291         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
292         ServiceActivityUpdaterForWriteTransactions.register(this);
293
294         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
295         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
296         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
297         this.lifecycleSupport = new LifecycleSupportImpl(this);
298         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
299         this.transactionPolicy = new TransactionPolicyRelease();
300         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
301         this.clusterControl = new ClusterControlImpl(this);
302         serviceLocator.registerService(ClusterControl.class, clusterControl);
303         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
304         this.clusterSetsSupport.setReadDirectory(t.toPath());
305         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
306         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
307     }
308
309     /*
310      *
311      * Session interface
312      */
313
314
315     @Override
316     public Resource getRootLibrary() {
317         return queryProvider2.getRootLibraryResource();
318     }
319
320     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
321         if (!graphSession.dbSession.refreshEnabled())
322             return;
323         try {
324             getClusterTable().refresh(csid, this, clusterUID);
325         } catch (Throwable t) {
326             Logger.defaultLogError("Refesh failed.", t);
327         }
328     }
329
330     static final class TaskHelper {
331         private final String name;
332         private Object result;
333         final Semaphore sema = new Semaphore(0);
334         private Throwable throwable = null;
335         final Consumer<DatabaseException> callback = e -> {
336             synchronized (TaskHelper.this) {
337                 throwable = e;
338             }
339         };
340         final Procedure<Object> proc = new Procedure<Object>() {
341             @Override
342             public void execute(Object result) {
343                 callback.accept(null);
344             }
345             @Override
346             public void exception(Throwable t) {
347                 if (t instanceof DatabaseException)
348                     callback.accept((DatabaseException)t);
349                 else
350                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
351             }
352         };
353         final WriteTraits writeTraits = new WriteTraits() {};
354         TaskHelper(String name) {
355             this.name = name;
356         }
357         @SuppressWarnings("unchecked")
358         <T> T getResult() {
359             return (T)result;
360         }
361         void setResult(Object result) {
362             this.result = result;
363         }
364 //        Throwable throwableGet() {
365 //            return throwable;
366 //        }
367         synchronized void throwableSet(Throwable t) {
368             throwable = t;
369         }
370 //        void throwableSet(String t) {
371 //            throwable = new InternalException("" + name + " operation failed. " + t);
372 //        }
373         synchronized void throwableCheck()
374         throws DatabaseException {
375             if (null != throwable)
376                 if (throwable instanceof DatabaseException)
377                     throw (DatabaseException)throwable;
378                 else
379                     throw new DatabaseException("Undo operation failed.", throwable);
380         }
381         void throw_(String message)
382         throws DatabaseException {
383             throw new DatabaseException("" + name + " operation failed. " + message);
384         }
385     }
386
387
388
389     private ListenerBase getListenerBase(Object procedure) {
390         if (procedure instanceof ListenerBase)
391             return (ListenerBase) procedure;
392         else
393             return null;
394     }
395
396     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
397         scheduleRequest(request, callback, notify, null);
398     }
399
400     /* (non-Javadoc)
401      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
402      */
403     @Override
404     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
405
406         assert (request != null);
407
408         if(Development.DEVELOPMENT) {
409             try {
410             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
411                 System.err.println("schedule write '" + request + "'");
412             } catch (Throwable t) {
413                 Logger.defaultLogError(t);
414             }
415         }
416
417         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
418
419         requestManager.scheduleWrite(new SessionTask(true) {
420
421             @Override
422             public void run(int thread) {
423
424                 if(Development.DEVELOPMENT) {
425                     try {
426                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
427                         System.err.println("perform write '" + request + "'");
428                     } catch (Throwable t) {
429                         Logger.defaultLogError(t);
430                     }
431                 }
432
433                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
434
435                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
436
437                 try {
438
439                     flushCounter = 0;
440                     Disposable.safeDispose(clientChanges);
441                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
442
443                     VirtualGraph vg = getProvider(request.getProvider());
444
445                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
446                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
447
448                         @Override
449                         public void execute(Object result) {
450                             if(callback != null) callback.accept(null);
451                         }
452
453                         @Override
454                         public void exception(Throwable t) {
455                             if(callback != null) callback.accept((DatabaseException)t);
456                         }
457
458                     });
459
460                     assert (null != writer);
461 //                    writer.state.barrier.inc();
462
463                     try {
464                         request.perform(writer);
465                         assert (null != writer);
466                     } catch (Throwable t) {
467                         if (!(t instanceof CancelTransactionException))
468                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
469                         writeState.except(t);
470                     } finally {
471 //                        writer.state.barrier.dec();
472 //                        writer.waitAsync(request);
473                     }
474
475
476                     assert(!queryProvider2.cache.dirty);
477
478                 } catch (Throwable e) {
479
480                     // Log it first, just to be safe that the error is always logged.
481                     if (!(e instanceof CancelTransactionException))
482                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
483
484 //                    writeState.getGraph().state.barrier.dec();
485 //                    writeState.getGraph().waitAsync(request);
486
487                     writeState.except(e);
488
489 //                    try {
490 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
491 //                        // All we can do here is to log those, can't really pass them anywhere.
492 //                        if (callback != null) {
493 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
494 //                            else callback.run(new DatabaseException(e));
495 //                        }
496 //                    } catch (Throwable e2) {
497 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
498 //                    }
499 //                    boolean empty = clusterStream.reallyFlush();
500 //                    int callerThread = -1;
501 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
502 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
503 //                    try {
504 //                        // Send all local changes to server so it can calculate correct reverse change set.
505 //                        // This will call clusterStream.accept().
506 //                        state.cancelCommit(context, clusterStream);
507 //                        if (!empty) {
508 //                            if (!context.isOk()) // this is a blocking operation
509 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
510 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
511 //                        }
512 //                        state.cancelCommit2(context, clusterStream);
513 //                    } catch (DatabaseException e3) {
514 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
515 //                    } finally {
516 //                        clusterStream.setOff(false);
517 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
518 //                    }
519
520                 } finally {
521
522                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
523
524                 }
525
526                 task.finish();
527
528                 if(Development.DEVELOPMENT) {
529                     try {
530                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
531                         System.err.println("finish write '" + request + "'");
532                     } catch (Throwable t) {
533                         Logger.defaultLogError(t);
534                     }
535                 }
536
537             }
538
539         }, combine);
540
541     }
542
543     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
544         scheduleRequest(request, procedure, notify, null);
545     }
546
547     /* (non-Javadoc)
548      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
549      */
550     @Override
551     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
552
553         assert (request != null);
554
555         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
556
557         requestManager.scheduleWrite(new SessionTask(true) {
558
559             @Override
560             public void run(int thread) {
561
562                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
563
564                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
565
566                 flushCounter = 0;
567                 Disposable.safeDispose(clientChanges);
568                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
569
570                 VirtualGraph vg = getProvider(request.getProvider());
571                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
572
573                 try {
574                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
575                     writeState = writeStateT;
576
577                     assert (null != writer);
578 //                    writer.state.barrier.inc();
579                     writeStateT.setResult(request.perform(writer));
580                     assert (null != writer);
581
582 //                    writer.state.barrier.dec();
583 //                    writer.waitAsync(null);
584
585                 } catch (Throwable e) {
586
587 //                    writer.state.barrier.dec();
588 //                    writer.waitAsync(null);
589
590                     writeState.except(e);
591
592 //                  state.stopWriteTransaction(clusterStream);
593 //
594 //              } catch (Throwable e) {
595 //                  // Log it first, just to be safe that the error is always logged.
596 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
597 //
598 //                  try {
599 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
600 //                      // All we can do here is to log those, can't really pass them anywhere.
601 //                      if (procedure != null) {
602 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
603 //                          else procedure.exception(new DatabaseException(e));
604 //                      }
605 //                  } catch (Throwable e2) {
606 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
607 //                  }
608 //
609 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
610 //
611 //                  state.stopWriteTransaction(clusterStream);
612
613                 } finally {
614                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
615                 }
616
617 //              if(notify != null) notify.release();
618
619                 task.finish();
620
621             }
622
623         }, combine);
624
625     }
626
627     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
628         scheduleRequest(request, callback, notify, null);
629     }
630
631     /* (non-Javadoc)
632      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
633      */
634     @Override
635     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
636
637         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
638
639         assert (request != null);
640
641         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
642
643         requestManager.scheduleWrite(new SessionTask(true) {
644
645             @Override
646             public void run(int thread) {
647                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
648
649                 Procedure<Object> stateProcedure = new Procedure<Object>() {
650                     @Override
651                     public void execute(Object result) {
652                         if (callback != null)
653                             callback.accept(null);
654                     }
655                     @Override
656                     public void exception(Throwable t) {
657                         if (callback != null) {
658                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
659                             else callback.accept(new DatabaseException(t));
660                         } else
661                             Logger.defaultLogError("Unhandled exception", t);
662                     }
663                 };
664
665                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
666                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
667                 DelayedWriteGraph dwg = null;
668 //                newGraph.state.barrier.inc();
669
670                 try {
671                     dwg = new DelayedWriteGraph(newGraph);
672                     request.perform(dwg);
673                 } catch (Throwable e) {
674                     delayedWriteState.except(e);
675                     total.finish();
676                     dwg.close();
677                     return;
678                 } finally {
679 //                    newGraph.state.barrier.dec();
680 //                    newGraph.waitAsync(request);
681                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
682                 }
683
684                 delayedWriteState = null;
685
686                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
687                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
688
689                 flushCounter = 0;
690                 Disposable.safeDispose(clientChanges);
691                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
692
693                 acquireWriteOnly();
694
695                 VirtualGraph vg = getProvider(request.getProvider());
696                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
697
698                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
699
700                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
701
702                 assert (null != writer);
703 //                writer.state.barrier.inc();
704
705                 try {
706                     // Cannot cancel
707                     dwg.commit(writer, request);
708
709                         if(defaultClusterSet != null) {
710                                 XSupport xs = getService(XSupport.class);
711                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
712                                 ClusteringSupport cs = getService(ClusteringSupport.class);
713                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
714                         }
715
716                     // This makes clusters available from server
717                     clusterStream.reallyFlush();
718
719                     releaseWriteOnly(writer);
720                     handleUpdatesAndMetadata(writer);
721
722                 } catch (ServiceException e) {
723 //                    writer.state.barrier.dec();
724 //                    writer.waitAsync(null);
725
726                     // These shall be requested from server
727                     clusterTable.removeWriteOnlyClusters();
728                     // This makes clusters available from server
729                     clusterStream.reallyFlush();
730
731                     releaseWriteOnly(writer);
732                     writeState.except(e);
733                 } finally {
734                     // Debugging & Profiling
735                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
736                     task2.finish();
737                     total.finish();
738                 }
739             }
740
741         }, combine);
742
743     }
744
745     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
746         scheduleRequest(request, procedure, notify, null);
747     }
748
749     /* (non-Javadoc)
750      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
751      */
752     @Override
753     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
754         throw new Error("Not implemented");
755     }
756
757     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
758         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
759         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
760             createdClusters.add(cluster.clusterId);
761         }
762         return cluster;
763     }
764
765     class WriteOnlySupport implements WriteSupport {
766
767         ClusterStream stream;
768         ClusterImpl currentCluster;
769
770         public WriteOnlySupport() {
771             this.stream = clusterStream;
772         }
773
774         @Override
775         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
776             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
777         }
778
779         @Override
780         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
781
782             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
783
784             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
785                 if(s != queryProvider2.getRootLibrary())
786                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
787
788             try {
789                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
790             } catch (DatabaseException e) {
791                 Logger.defaultLogError(e);
792                 //throw e;
793             }
794
795             clientChanges.invalidate(s);
796
797             if (cluster.isWriteOnly())
798                 return;
799             queryProvider2.updateStatements(s, p);
800
801         }
802
803         @Override
804         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
805             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
806         }
807
808         @Override
809         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
810
811             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
812             try {
813                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
814             } catch (DatabaseException e) {
815                 Logger.defaultLogError(e);
816             }
817
818             clientChanges.invalidate(rid);
819
820             if (cluster.isWriteOnly())
821                 return;
822             queryProvider2.updateValue(rid);
823
824         }
825
826         @Override
827         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
828
829             if(amount < 65536) {
830                 claimValue(provider,resource, reader.readBytes(null, amount));
831                 return;
832             }
833
834             byte[] bytes = new byte[65536];
835
836             int rid = ((ResourceImpl)resource).id;
837             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
838             try {
839                 int left = amount;
840                 while(left > 0) {
841                     int block = Math.min(left, 65536);
842                     reader.readBytes(bytes, block);
843                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
844                     left -= block;
845                 }
846             } catch (DatabaseException e) {
847                 Logger.defaultLogError(e);
848             }
849
850             clientChanges.invalidate(rid);
851
852             if (cluster.isWriteOnly())
853                 return;
854             queryProvider2.updateValue(rid);
855
856         }
857
858         private void maintainCluster(ClusterImpl before, ClusterI after_) {
859             if(after_ != null && after_ != before) {
860                 ClusterImpl after = (ClusterImpl)after_;
861                 if(currentCluster == before) {
862                     currentCluster = after;
863                 }
864                 clusterTable.replaceCluster(after);
865             }
866         }
867
868         public int createResourceKey(int foreignCounter) throws DatabaseException {
869             if(currentCluster == null) {
870                 currentCluster = getNewResourceCluster();
871             }
872             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
873                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
874                 newCluster.foreignLookup = new byte[foreignCounter];
875                 currentCluster = newCluster;
876                 if (DEBUG)
877                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
878             }
879             return currentCluster.createResource(clusterTranslator);
880         }
881
882         @Override
883         public Resource createResource(VirtualGraph provider) throws DatabaseException {
884             if(currentCluster == null) {
885                 if (null != defaultClusterSet) {
886                         ResourceImpl result = getNewResource(defaultClusterSet);
887                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
888                     return result;
889                 } else {
890                     currentCluster = getNewResourceCluster();
891                 }
892             }
893             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
894                 if (null != defaultClusterSet) {
895                         ResourceImpl result = getNewResource(defaultClusterSet);
896                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
897                     return result;
898                 } else {
899                     currentCluster = getNewResourceCluster();
900                 }
901             }
902             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
903         }
904
905         @Override
906         public Resource createResource(VirtualGraph provider, long clusterId)
907         throws DatabaseException {
908             return getNewResource(clusterId);
909         }
910
911         @Override
912         public Resource createResource(VirtualGraph provider, Resource clusterSet)
913         throws DatabaseException {
914             return getNewResource(clusterSet);
915         }
916
917         @Override
918         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
919                 throws DatabaseException {
920             getNewClusterSet(clusterSet);
921         }
922
923         @Override
924         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
925         throws ServiceException {
926             return containsClusterSet(clusterSet);
927         }
928
929         public void selectCluster(long cluster) {
930                 currentCluster = clusterTable.getClusterByClusterId(cluster);
931                 long setResourceId = clusterSetsSupport.getSet(cluster);
932                 clusterSetsSupport.put(setResourceId, cluster);
933         }
934
935         @Override
936         public Resource setDefaultClusterSet(Resource clusterSet)
937         throws ServiceException {
938                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
939                 if(clusterSet != null) {
940                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
941                         currentCluster = clusterTable.getClusterByClusterId(id);
942                         return result;
943                 } else {
944                         currentCluster = null;
945                         return null;
946                 }
947         }
948
949         @Override
950         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
951
952                  provider = getProvider(provider);
953              if (null == provider) {
954                  int key = ((ResourceImpl)resource).id;
955
956
957
958                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
959 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
960 //                      if(key != queryProvider2.getRootLibrary())
961 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
962
963 //                 try {
964 //                     cluster.removeValue(key, clusterTranslator);
965 //                 } catch (DatabaseException e) {
966 //                     Logger.defaultLogError(e);
967 //                     return;
968 //                 }
969
970                  clusterTable.writeOnlyInvalidate(cluster);
971
972                  try {
973                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
974                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
975                          clusterTranslator.removeValue(cluster);
976                  } catch (DatabaseException e) {
977                          Logger.defaultLogError(e);
978                  }
979
980                  queryProvider2.invalidateResource(key);
981                  clientChanges.invalidate(key);
982
983              } else {
984                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
985                  queryProvider2.updateValue(querySupport.getId(resource));
986                  clientChanges.claimValue(resource);
987              }
988
989
990         }
991
992         @Override
993         public void flush(boolean intermediate) {
994             throw new UnsupportedOperationException();
995         }
996
997         @Override
998         public void flushCluster() {
999             clusterTable.flushCluster(graphSession);
1000             if(defaultClusterSet != null) {
1001                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
1002             }
1003             currentCluster = null;
1004         }
1005
1006         @Override
1007         public void flushCluster(Resource r) {
1008             throw new UnsupportedOperationException("flushCluster resource " + r);
1009         }
1010
1011         @Override
1012         public void gc() {
1013         }
1014
1015         @Override
1016         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1017                 Resource object) {
1018
1019                 int s = ((ResourceImpl)subject).id;
1020                 int p = ((ResourceImpl)predicate).id;
1021                 int o = ((ResourceImpl)object).id;
1022
1023                 provider = getProvider(provider);
1024                 if (null == provider) {
1025
1026                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1027                         clusterTable.writeOnlyInvalidate(cluster);
1028
1029                         try {
1030
1031                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1032
1033                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1034                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1035
1036                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1037                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1038                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1039                                 clusterTranslator.removeStatement(cluster);
1040
1041                                 queryProvider2.invalidateResource(s);
1042                                 clientChanges.invalidate(s);
1043
1044                         } catch (DatabaseException e) {
1045
1046                                 Logger.defaultLogError(e);
1047
1048                         }
1049
1050                         return true;
1051
1052                 } else {
1053                         
1054                         ((VirtualGraphImpl)provider).deny(s, p, o);
1055                         queryProvider2.invalidateResource(s);
1056                         clientChanges.invalidate(s);
1057
1058                         return true;
1059
1060                 }
1061
1062         }
1063
1064         @Override
1065         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1066             throw new UnsupportedOperationException();
1067         }
1068
1069         @Override
1070         public boolean writeOnly() {
1071             return true;
1072         }
1073
1074         @Override
1075         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1076             writeSupport.performWriteRequest(graph, request);
1077         }
1078
1079         @Override
1080         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1081             throw new UnsupportedOperationException();
1082         }
1083
1084         @Override
1085         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1086             throw new UnsupportedOperationException();
1087         }
1088
1089         @Override
1090         public <T> void addMetadata(Metadata data) throws ServiceException {
1091             writeSupport.addMetadata(data);
1092         }
1093
1094         @Override
1095         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1096             return writeSupport.getMetadata(clazz);
1097         }
1098
1099         @Override
1100         public TreeMap<String, byte[]> getMetadata() {
1101             return writeSupport.getMetadata();
1102         }
1103
1104         @Override
1105         public void commitDone(WriteTraits writeTraits, long csid) {
1106             writeSupport.commitDone(writeTraits, csid);
1107         }
1108
1109         @Override
1110         public void clearUndoList(WriteTraits writeTraits) {
1111             writeSupport.clearUndoList(writeTraits);
1112         }
1113         @Override
1114         public int clearMetadata() {
1115             return writeSupport.clearMetadata();
1116         }
1117
1118                 @Override
1119                 public void startUndo() {
1120                         writeSupport.startUndo();
1121                 }
1122     }
1123
1124     class VirtualWriteOnlySupport implements WriteSupport {
1125
1126 //        @Override
1127 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1128 //                Resource object) {
1129 //            throw new UnsupportedOperationException();
1130 //        }
1131
1132         @Override
1133         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1134
1135             TransientGraph impl = (TransientGraph)provider;
1136             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1137             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1138             clientChanges.claim(subject, predicate, object);
1139
1140         }
1141
1142         @Override
1143         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1144
1145             TransientGraph impl = (TransientGraph)provider;
1146             impl.claim(subject, predicate, object);
1147             getQueryProvider2().updateStatements(subject, predicate);
1148             clientChanges.claim(subject, predicate, object);
1149
1150         }
1151
1152         @Override
1153         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1154             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1155         }
1156
1157         @Override
1158         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1159             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1160             getQueryProvider2().updateValue(resource);
1161             clientChanges.claimValue(resource);
1162         }
1163
1164         @Override
1165         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1166             byte[] value = reader.readBytes(null, amount);
1167             claimValue(provider, resource, value);
1168         }
1169
1170         @Override
1171         public Resource createResource(VirtualGraph provider) {
1172             TransientGraph impl = (TransientGraph)provider;
1173             return impl.getResource(impl.newResource(false));
1174         }
1175
1176         @Override
1177         public Resource createResource(VirtualGraph provider, long clusterId) {
1178             throw new UnsupportedOperationException();
1179         }
1180
1181         @Override
1182         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1183         throws DatabaseException {
1184             throw new UnsupportedOperationException();
1185         }
1186
1187         @Override
1188         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1189         throws DatabaseException {
1190             throw new UnsupportedOperationException();
1191         }
1192
1193         @Override
1194         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1195         throws ServiceException {
1196             throw new UnsupportedOperationException();
1197         }
1198
1199         @Override
1200         public Resource setDefaultClusterSet(Resource clusterSet)
1201         throws ServiceException {
1202                 return null;
1203         }
1204
1205         @Override
1206         public void denyValue(VirtualGraph provider, Resource resource) {
1207             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1208             getQueryProvider2().updateValue(querySupport.getId(resource));
1209             // NOTE: this only keeps track of value changes by-resource.
1210             clientChanges.claimValue(resource);
1211         }
1212
1213         @Override
1214         public void flush(boolean intermediate) {
1215             throw new UnsupportedOperationException();
1216         }
1217
1218         @Override
1219         public void flushCluster() {
1220             throw new UnsupportedOperationException();
1221         }
1222
1223         @Override
1224         public void flushCluster(Resource r) {
1225             throw new UnsupportedOperationException("Resource " + r);
1226         }
1227
1228         @Override
1229         public void gc() {
1230         }
1231
1232         @Override
1233         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1234             TransientGraph impl = (TransientGraph) provider;
1235             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1236             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1237             clientChanges.deny(subject, predicate, object);
1238             return true;
1239         }
1240
1241         @Override
1242         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1243             throw new UnsupportedOperationException();
1244         }
1245
1246         @Override
1247         public boolean writeOnly() {
1248             return true;
1249         }
1250
1251         @Override
1252         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1253             throw new UnsupportedOperationException();
1254         }
1255
1256         @Override
1257         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1258             throw new UnsupportedOperationException();
1259         }
1260
1261         @Override
1262         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1263             throw new UnsupportedOperationException();
1264         }
1265
1266         @Override
1267         public <T> void addMetadata(Metadata data) throws ServiceException {
1268             throw new UnsupportedOperationException();
1269         }
1270
1271         @Override
1272         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1273             throw new UnsupportedOperationException();
1274         }
1275
1276         @Override
1277         public TreeMap<String, byte[]> getMetadata() {
1278             throw new UnsupportedOperationException();
1279         }
1280
1281         @Override
1282         public void commitDone(WriteTraits writeTraits, long csid) {
1283         }
1284
1285         @Override
1286         public void clearUndoList(WriteTraits writeTraits) {
1287         }
1288
1289         @Override
1290         public int clearMetadata() {
1291             return 0;
1292         }
1293
1294                 @Override
1295                 public void startUndo() {
1296                 }
1297     }
1298
1299     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1300
1301         try {
1302
1303             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1304
1305             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1306
1307             flushCounter = 0;
1308             Disposable.safeDispose(clientChanges);
1309             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1310
1311             acquireWriteOnly();
1312
1313             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1314
1315             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1316
1317             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1318             writeState = writeStateT;
1319
1320             assert (null != writer);
1321 //            writer.state.barrier.inc();
1322             long start = System.nanoTime();
1323             T result = request.perform(writer);
1324             long duration = System.nanoTime() - start;
1325             if (DEBUG) {
1326                 System.err.println("################");
1327                 System.err.println("WriteOnly duration " + 1e-9*duration);
1328             }
1329             writeStateT.setResult(result);
1330
1331             // This makes clusters available from server
1332             clusterStream.reallyFlush();
1333
1334             // This will trigger query updates
1335             releaseWriteOnly(writer);
1336             assert (null != writer);
1337
1338             handleUpdatesAndMetadata(writer);
1339
1340         } catch (CancelTransactionException e) {
1341
1342             releaseWriteOnly(writeState.getGraph());
1343
1344             clusterTable.removeWriteOnlyClusters();
1345             state.stopWriteTransaction(clusterStream);
1346
1347         } catch (Throwable e) {
1348
1349             e.printStackTrace();
1350
1351             releaseWriteOnly(writeState.getGraph());
1352
1353             clusterTable.removeWriteOnlyClusters();
1354
1355             if (callback != null)
1356                 callback.exception(new DatabaseException(e));
1357
1358             state.stopWriteTransaction(clusterStream);
1359             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1360
1361         } finally  {
1362
1363             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1364
1365         }
1366
1367     }
1368
1369     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1370         scheduleRequest(request, callback, notify, null);
1371     }
1372
1373     @Override
1374     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1375
1376         assertAlive();
1377
1378         assert (request != null);
1379
1380         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1381
1382         requestManager.scheduleWrite(new SessionTask(true) {
1383
1384             @Override
1385             public void run(int thread) {
1386
1387                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1388
1389                     try {
1390
1391                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1392
1393                         flushCounter = 0;
1394                         Disposable.safeDispose(clientChanges);
1395                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1396
1397                         acquireWriteOnly();
1398
1399                         VirtualGraph vg = getProvider(request.getProvider());
1400                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1401
1402                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1403
1404                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1405
1406                             @Override
1407                             public void execute(Object result) {
1408                                 if(callback != null) callback.accept(null);
1409                             }
1410
1411                             @Override
1412                             public void exception(Throwable t) {
1413                                 if(callback != null) callback.accept((DatabaseException)t);
1414                             }
1415
1416                         });
1417
1418                         assert (null != writer);
1419 //                        writer.state.barrier.inc();
1420
1421                         try {
1422
1423                             request.perform(writer);
1424
1425                         } catch (Throwable e) {
1426
1427 //                            writer.state.barrier.dec();
1428 //                            writer.waitAsync(null);
1429
1430                             releaseWriteOnly(writer);
1431
1432                             clusterTable.removeWriteOnlyClusters();
1433
1434                             if(!(e instanceof CancelTransactionException)) {
1435                                 if (callback != null)
1436                                     callback.accept(new DatabaseException(e));
1437                             }
1438
1439                             writeState.except(e);
1440
1441                             return;
1442
1443                         }
1444
1445                         // This makes clusters available from server
1446                         boolean empty = clusterStream.reallyFlush();
1447                         // This was needed to make WO requests call metadata listeners.
1448                         // NOTE: the calling event does not contain clientChanges information.
1449                         if (!empty && clientChanges.isEmpty())
1450                             clientChanges.setNotEmpty(true);
1451                         releaseWriteOnly(writer);
1452                         assert (null != writer);
1453                     } finally  {
1454
1455                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1456
1457                     }
1458
1459
1460                 task.finish();
1461
1462             }
1463
1464         }, combine);
1465
1466     }
1467
1468     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1469         scheduleRequest(request, callback, notify, null);
1470     }
1471
1472     /* (non-Javadoc)
1473      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1474      */
1475     @Override
1476     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1477
1478         assert (request != null);
1479
1480         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1481
1482         requestManager.scheduleWrite(new SessionTask(true) {
1483
1484             @Override
1485             public void run(int thread) {
1486
1487                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1488
1489                 performWriteOnly(request, notify, callback);
1490
1491                 task.finish();
1492
1493             }
1494
1495         }, combine);
1496
1497     }
1498
1499     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1500
1501         assert (request != null);
1502         assert (procedure != null);
1503
1504         //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1505
1506         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1507
1508             @Override
1509             public void run(int thread) {
1510
1511                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1512
1513                 ListenerBase listener = getListenerBase(procedure);
1514
1515                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1516
1517                 try {
1518
1519                     if (listener != null) {
1520
1521                         try {
1522                                 
1523                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1524
1525                                         @Override
1526                                         public void exception(AsyncReadGraph graph, Throwable t) {
1527                                                 procedure.exception(graph, t);
1528                                                 if(throwable != null) {
1529                                                         throwable.set(t);
1530                                                 } else {
1531                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1532                                                 }
1533                                         }
1534
1535                                         @Override
1536                                         public void execute(AsyncReadGraph graph, T t) {
1537                                                 if(result != null) result.set(t);
1538                                                 procedure.execute(graph, t);
1539                                         }
1540
1541                                 };
1542                                 
1543                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1544                                 
1545                         } catch (Throwable t) {
1546                             // This is handled by the AsyncProcedure
1547                                 //Logger.defaultLogError("Internal error", t);
1548                         }
1549
1550                     } else {
1551
1552                         try {
1553
1554 //                            newGraph.state.barrier.inc();
1555
1556                             T t = request.perform(newGraph);
1557
1558                             try {
1559
1560                                 if(result != null) result.set(t);
1561                                 procedure.execute(newGraph, t);
1562
1563                             } catch (Throwable th) {
1564
1565                                 if(throwable != null) {
1566                                     throwable.set(th);
1567                                 } else {
1568                                     Logger.defaultLogError("Unhandled exception", th);
1569                                 }
1570
1571                             }
1572
1573                         } catch (Throwable t) {
1574
1575                             if (DEBUG)
1576                                 t.printStackTrace();
1577
1578                             if(throwable != null) {
1579                                 throwable.set(t);
1580                             } else {
1581                                 Logger.defaultLogError("Unhandled exception", t);
1582                             }
1583
1584                             try {
1585
1586                                 procedure.exception(newGraph, t);
1587
1588                             } catch (Throwable t2) {
1589
1590                                 if(throwable != null) {
1591                                     throwable.set(t2);
1592                                 } else {
1593                                     Logger.defaultLogError("Unhandled exception", t2);
1594                                 }
1595
1596                             }
1597
1598                         }
1599
1600 //                        newGraph.state.barrier.dec();
1601 //                        newGraph.waitAsync(request);
1602
1603                     }
1604
1605                 } finally {
1606
1607                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1608
1609                 }
1610
1611             }
1612
1613         });
1614
1615     }
1616
1617     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1618
1619         assert (request != null);
1620         assert (procedure != null);
1621
1622         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1623
1624         requestManager.scheduleRead(new SessionRead(null, notify) {
1625
1626             @Override
1627             public void run(int thread) {
1628
1629                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1630
1631                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1632
1633                 try {
1634
1635                     if (listener != null) {
1636
1637                         try {
1638                                 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1639                                 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1640                                                         //newGraph.processor.query(newGraph, request, null, procedure, listener);
1641                                                 } catch (DatabaseException e) {
1642                                                         Logger.defaultLogError(e);
1643                                                 }
1644
1645                     } else {
1646
1647 //                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1648 //                                procedure, "request");
1649
1650                         BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
1651
1652                         try {
1653
1654                             request.perform(newGraph, wrap);
1655                             wrap.get();
1656 <<<<<<< Upstream, based on branch 'private/antti_threads' of ssh://villberg@gerrit.simantics.org:29418/simantics/platform.git
1657 =======
1658
1659                         } catch (DatabaseException e) {
1660
1661                                                         Logger.defaultLogError(e);
1662
1663 //                              wrap.exception(newGraph, t);
1664 //                            wrapper.exception(newGraph, t);
1665 //                            newGraph.waitAsync(request);
1666 >>>>>>> 82fa68e Generate parts of db client query code
1667
1668 <<<<<<< Upstream, based on branch 'private/antti_threads' of ssh://villberg@gerrit.simantics.org:29418/simantics/platform.git
1669                         } catch (Throwable t) {
1670
1671                                 wrap.exception(newGraph, t);
1672                                 
1673 //                            wrapper.exception(newGraph, t);
1674 //                            newGraph.waitAsync(request);
1675
1676
1677 =======
1678 >>>>>>> 82fa68e Generate parts of db client query code
1679                         }
1680
1681                     }
1682
1683                 } finally {
1684
1685                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1686
1687                 }
1688
1689             }
1690
1691         });
1692
1693     }
1694
1695     public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1696
1697         assert (request != null);
1698         assert (procedure != null);
1699
1700         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1701
1702         int sync = notify != null ? thread : -1;
1703
1704         requestManager.scheduleRead(new SessionRead(null, notify) {
1705
1706             @Override
1707             public void run(int thread) {
1708
1709                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1710
1711                 ListenerBase listener = getListenerBase(procedure);
1712
1713                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1714
1715                 try {
1716
1717                     if (listener != null) {
1718
1719                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1720
1721 //                        newGraph.waitAsync(request);
1722
1723                     } else {
1724
1725                         final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1726
1727                         try {
1728
1729                             request.perform(newGraph, wrapper);
1730
1731                         } catch (Throwable t) {
1732
1733                             t.printStackTrace();
1734
1735                         }
1736
1737                     }
1738
1739                 } finally {
1740
1741                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1742
1743                 }
1744
1745             }
1746
1747         });
1748
1749     }
1750     
1751     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1752
1753         assert (request != null);
1754         assert (procedure != null);
1755
1756         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1757
1758         int sync = notify != null ? thread : -1;
1759
1760         requestManager.scheduleRead(new SessionRead(null, notify) {
1761
1762             @Override
1763             public void run(int thread) {
1764
1765                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1766
1767                 ListenerBase listener = getListenerBase(procedure);
1768
1769                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1770
1771                 try {
1772
1773                     if (listener != null) {
1774
1775                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1776
1777 //                        newGraph.waitAsync(request);
1778
1779                     } else {
1780
1781                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1782
1783                         try {
1784
1785                             request.perform(newGraph, wrapper);
1786
1787                         } catch (Throwable t) {
1788
1789                             t.printStackTrace();
1790
1791                         }
1792
1793                     }
1794
1795                 } finally {
1796
1797                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1798
1799                 }
1800
1801             }
1802
1803         });
1804
1805     }
1806
1807     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1808
1809         assert (request != null);
1810         assert (procedure != null);
1811
1812         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1813
1814         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1815
1816             @Override
1817             public void run(int thread) {
1818
1819                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1820
1821                 ListenerBase listener = getListenerBase(procedure);
1822
1823                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1824
1825                 try {
1826
1827                     if (listener != null) {
1828
1829                         try {
1830                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1831                         } catch (DatabaseException e) {
1832                             Logger.defaultLogError(e);
1833                         }
1834                         
1835                         
1836 //                        newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1837 //
1838 //                            @Override
1839 //                            public void exception(Throwable t) {
1840 //                                procedure.exception(t);
1841 //                                if(throwable != null) {
1842 //                                    throwable.set(t);
1843 //                                }
1844 //                            }
1845 //
1846 //                            @Override
1847 //                            public void execute(T t) {
1848 //                                if(result != null) result.set(t);
1849 //                                procedure.execute(t);
1850 //                            }
1851 //
1852 //                        }, listener);
1853
1854 //                        newGraph.waitAsync(request);
1855
1856                     } else {
1857
1858 //                        newGraph.state.barrier.inc();
1859
1860                         request.register(newGraph, new Listener<T>() {
1861
1862                             @Override
1863                             public void exception(Throwable t) {
1864                                 if(throwable != null) throwable.set(t);
1865                                 procedure.exception(t);
1866 //                                newGraph.state.barrier.dec();
1867                             }
1868
1869                             @Override
1870                             public void execute(T t) {
1871                                 if(result != null) result.set(t);
1872                                 procedure.execute(t);
1873 //                                newGraph.state.barrier.dec();
1874                             }
1875
1876                             @Override
1877                             public boolean isDisposed() {
1878                                 return true;
1879                             }
1880
1881                         });
1882
1883 //                        newGraph.waitAsync(request);
1884
1885                     }
1886
1887                 } finally {
1888
1889                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1890
1891                 }
1892
1893             }
1894
1895         });
1896
1897     }
1898
1899
1900     @Override
1901     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1902
1903         scheduleRequest(request, procedure, null, null, null);
1904
1905     }
1906
1907     @Override
1908     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1909         assertNotSession();
1910         Semaphore notify = new Semaphore(0);
1911         DataContainer<Throwable> container = new DataContainer<Throwable>();
1912         DataContainer<T> result = new DataContainer<T>();
1913         scheduleRequest(request, procedure, notify, container, result);
1914         acquire(notify, request);
1915         Throwable throwable = container.get();
1916         if(throwable != null) {
1917             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1918             else throw new DatabaseException("Unexpected exception", throwable);
1919         }
1920         return result.get();
1921     }
1922
1923     @Override
1924     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1925         assertNotSession();
1926         Semaphore notify = new Semaphore(0);
1927         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1928         final DataContainer<T> resultContainer = new DataContainer<T>();
1929         scheduleRequest(request, new AsyncProcedure<T>() {
1930             @Override
1931             public void exception(AsyncReadGraph graph, Throwable throwable) {
1932                 exceptionContainer.set(throwable);
1933                 procedure.exception(graph, throwable);
1934             }
1935             @Override
1936             public void execute(AsyncReadGraph graph, T result) {
1937                 resultContainer.set(result);
1938                 procedure.execute(graph, result);
1939             }
1940         }, getListenerBase(procedure), notify);
1941         acquire(notify, request, procedure);
1942         Throwable throwable = exceptionContainer.get();
1943         if (throwable != null) {
1944             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1945             else throw new DatabaseException("Unexpected exception", throwable);
1946         }
1947         return resultContainer.get();
1948     }
1949
1950
1951     @Override
1952     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1953         assertNotSession();
1954         Semaphore notify = new Semaphore(0);
1955         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1956         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1957             @Override
1958             public void exception(AsyncReadGraph graph, Throwable throwable) {
1959                 exceptionContainer.set(throwable);
1960                 procedure.exception(graph, throwable);
1961             }
1962             @Override
1963             public void execute(AsyncReadGraph graph, T result) {
1964                 procedure.execute(graph, result);
1965             }
1966             @Override
1967             public void finished(AsyncReadGraph graph) {
1968                 procedure.finished(graph);
1969             }
1970         }, notify);
1971         acquire(notify, request, procedure);
1972         Throwable throwable = exceptionContainer.get();
1973         if (throwable != null) {
1974             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1975             else throw new DatabaseException("Unexpected exception", throwable);
1976         }
1977         // TODO: implement return value
1978         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1979         return null;
1980     }
1981
1982     @Override
1983     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1984         return syncRequest(request, new ProcedureAdapter<T>());
1985
1986
1987 //        assert(request != null);
1988 //
1989 //        final DataContainer<T> result = new DataContainer<T>();
1990 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1991 //
1992 //        syncRequest(request, new Procedure<T>() {
1993 //
1994 //            @Override
1995 //            public void execute(T t) {
1996 //                result.set(t);
1997 //            }
1998 //
1999 //            @Override
2000 //            public void exception(Throwable t) {
2001 //                exception.set(t);
2002 //            }
2003 //
2004 //        });
2005 //
2006 //        Throwable t = exception.get();
2007 //        if(t != null) {
2008 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
2009 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
2010 //        }
2011 //
2012 //        return result.get();
2013
2014     }
2015
2016     @Override
2017     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
2018         return syncRequest(request, (Procedure<T>)procedure);
2019     }
2020
2021
2022 //    @Override
2023 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
2024 //        assertNotSession();
2025 //        Semaphore notify = new Semaphore(0);
2026 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
2027 //        DataContainer<T> result = new DataContainer<T>();
2028 //        scheduleRequest(request, procedure, notify, container, result);
2029 //        acquire(notify, request);
2030 //        Throwable throwable = container.get();
2031 //        if(throwable != null) {
2032 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2033 //            else throw new DatabaseException("Unexpected exception", throwable);
2034 //        }
2035 //        return result.get();
2036 //    }
2037
2038
2039     @Override
2040     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
2041         assertNotSession();
2042         Semaphore notify = new Semaphore(0);
2043         final DataContainer<Throwable> container = new DataContainer<Throwable>();
2044         final DataContainer<T> result = new DataContainer<T>();
2045         scheduleRequest(request, procedure, notify, container, result);
2046         acquire(notify, request);
2047         Throwable throwable = container.get();
2048         if (throwable != null) {
2049             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2050             else throw new DatabaseException("Unexpected exception", throwable);
2051         }
2052         return result.get();
2053     }
2054
2055     @Override
2056     public void syncRequest(Write request) throws DatabaseException  {
2057         assertNotSession();
2058         assertAlive();
2059         Semaphore notify = new Semaphore(0);
2060         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2061         scheduleRequest(request, e -> exception.set(e), notify);
2062         acquire(notify, request);
2063         if(exception.get() != null) throw exception.get();
2064     }
2065
2066     @Override
2067     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
2068         assertNotSession();
2069         Semaphore notify = new Semaphore(0);
2070         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2071         final DataContainer<T> result = new DataContainer<T>();
2072         scheduleRequest(request, new Procedure<T>() {
2073
2074             @Override
2075             public void exception(Throwable t) {
2076               exception.set(t);
2077             }
2078
2079             @Override
2080             public void execute(T t) {
2081               result.set(t);
2082             }
2083
2084         }, notify);
2085         acquire(notify, request);
2086         if(exception.get() != null) {
2087             Throwable t = exception.get();
2088             if(t instanceof DatabaseException) throw (DatabaseException)t;
2089             else throw new DatabaseException(t);
2090         }
2091         return result.get();
2092     }
2093
2094     @Override
2095     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2096         assertNotSession();
2097         Semaphore notify = new Semaphore(0);
2098         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2099         final DataContainer<T> result = new DataContainer<T>();
2100         scheduleRequest(request, new Procedure<T>() {
2101
2102             @Override
2103             public void exception(Throwable t) {
2104               exception.set(t);
2105             }
2106
2107             @Override
2108             public void execute(T t) {
2109               result.set(t);
2110             }
2111
2112         }, notify);
2113         acquire(notify, request);
2114         if(exception.get() != null) {
2115             Throwable t = exception.get();
2116             if(t instanceof DatabaseException) throw (DatabaseException)t;
2117             else throw new DatabaseException(t);
2118         }
2119         return result.get();
2120     }
2121
2122     @Override
2123     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2124         assertNotSession();
2125         Semaphore notify = new Semaphore(0);
2126         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2127         final DataContainer<T> result = new DataContainer<T>();
2128         scheduleRequest(request, new Procedure<T>() {
2129
2130             @Override
2131             public void exception(Throwable t) {
2132               exception.set(t);
2133             }
2134
2135             @Override
2136             public void execute(T t) {
2137               result.set(t);
2138             }
2139
2140         }, notify);
2141         acquire(notify, request);
2142         if(exception.get() != null) {
2143             Throwable t = exception.get();
2144             if(t instanceof DatabaseException) throw (DatabaseException)t;
2145             else throw new DatabaseException(t);
2146         }
2147         return result.get();
2148     }
2149
2150     @Override
2151     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2152         assertNotSession();
2153         Semaphore notify = new Semaphore(0);
2154         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2155         scheduleRequest(request, e -> exception.set(e), notify);
2156         acquire(notify, request);
2157         if(exception.get() != null) throw exception.get();
2158     }
2159
2160     @Override
2161     public void syncRequest(WriteOnly request) throws DatabaseException  {
2162         assertNotSession();
2163         assertAlive();
2164         Semaphore notify = new Semaphore(0);
2165         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2166         scheduleRequest(request, e -> exception.set(e), notify);
2167         acquire(notify, request);
2168         if(exception.get() != null) throw exception.get();
2169     }
2170
2171     /*
2172      *
2173      * GraphRequestProcessor interface
2174      */
2175
2176     @Override
2177     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2178
2179         scheduleRequest(request, procedure, null, null);
2180
2181     }
2182
2183     @Override
2184     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2185
2186         scheduleRequest(request, procedure, null);
2187
2188     }
2189
2190     @Override
2191     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2192
2193         scheduleRequest(request, procedure, null, null, null);
2194
2195     }
2196
2197     @Override
2198     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2199
2200         scheduleRequest(request, callback, null);
2201
2202     }
2203
2204     @Override
2205     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2206
2207         scheduleRequest(request, procedure, null);
2208
2209     }
2210
2211     @Override
2212     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2213
2214         scheduleRequest(request, procedure, null);
2215
2216     }
2217
2218     @Override
2219     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2220
2221         scheduleRequest(request, procedure, null);
2222
2223     }
2224
2225     @Override
2226     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2227
2228         scheduleRequest(request, callback, null);
2229
2230     }
2231
2232     @Override
2233     public void asyncRequest(final Write r) {
2234         asyncRequest(r, null);
2235     }
2236
2237     @Override
2238     public void asyncRequest(final DelayedWrite r) {
2239         asyncRequest(r, null);
2240     }
2241
2242     @Override
2243     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2244
2245         scheduleRequest(request, callback, null);
2246
2247     }
2248
2249     @Override
2250     public void asyncRequest(final WriteOnly request) {
2251
2252         asyncRequest(request, null);
2253
2254     }
2255
2256     @Override
2257     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2258         r.request(this, procedure);
2259     }
2260
2261     @Override
2262     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2263         r.request(this, procedure);
2264     }
2265
2266     @Override
2267     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2268         r.request(this, procedure);
2269     }
2270
2271     @Override
2272     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2273         r.request(this, procedure);
2274     }
2275
2276     @Override
2277     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2278         r.request(this, procedure);
2279     }
2280
2281     @Override
2282     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2283         r.request(this, procedure);
2284     }
2285
2286     @Override
2287     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2288         return r.request(this);
2289     }
2290
2291     @Override
2292     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2293         return r.request(this);
2294     }
2295
2296     @Override
2297     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2298         r.request(this, procedure);
2299     }
2300
2301     @Override
2302     public <T> void async(WriteInterface<T> r) {
2303         r.request(this, new ProcedureAdapter<T>());
2304     }
2305
2306     //@Override
2307     public void incAsync() {
2308         state.incAsync();
2309     }
2310
2311     //@Override
2312     public void decAsync() {
2313         state.decAsync();
2314     }
2315
2316     public long getCluster(ResourceImpl resource) {
2317         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2318         return cluster.getClusterId();
2319     }
2320
2321     public long getCluster(int id) {
2322         if (clusterTable == null)
2323             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2324         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2325     }
2326
2327     public ResourceImpl getResource(int id) {
2328         return new ResourceImpl(resourceSupport, id);
2329     }
2330
2331     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2332         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2333         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2334         int key = proxy.getClusterKey();
2335         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2336         return new ResourceImpl(resourceSupport, resourceKey);
2337     }
2338
2339     public ResourceImpl getResource2(int id) {
2340         assert (id != 0);
2341         return new ResourceImpl(resourceSupport, id);
2342     }
2343
2344     final public int getId(ResourceImpl impl) {
2345         return impl.id;
2346     }
2347
2348     public static final Charset UTF8 = Charset.forName("utf-8");
2349
2350
2351
2352
2353     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2354         for(TransientGraph g : support.providers) {
2355             if(g.isPending(subject)) return false;
2356         }
2357         return true;
2358     }
2359
2360     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2361         for(TransientGraph g : support.providers) {
2362             if(g.isPending(subject, predicate)) return false;
2363         }
2364         return true;
2365     }
2366
2367     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2368
2369         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2370
2371             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2372
2373             @Override
2374             public void accept(ReadGraphImpl graph) {
2375                 if(ready.decrementAndGet() == 0) {
2376                     runnable.accept(graph);
2377                 }
2378             }
2379
2380         };
2381
2382         for(TransientGraph g : support.providers) {
2383             if(g.isPending(subject)) {
2384                 try {
2385                     g.load(graph, subject, composite);
2386                 } catch (DatabaseException e) {
2387                     e.printStackTrace();
2388                 }
2389             } else {
2390                 composite.accept(graph);
2391             }
2392         }
2393
2394         composite.accept(graph);
2395
2396     }
2397
2398     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2399
2400         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2401
2402             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2403
2404             @Override
2405             public void accept(ReadGraphImpl graph) {
2406                 if(ready.decrementAndGet() == 0) {
2407                     runnable.accept(graph);
2408                 }
2409             }
2410
2411         };
2412
2413         for(TransientGraph g : support.providers) {
2414             if(g.isPending(subject, predicate)) {
2415                 try {
2416                     g.load(graph, subject, predicate, composite);
2417                 } catch (DatabaseException e) {
2418                     e.printStackTrace();
2419                 }
2420             } else {
2421                 composite.accept(graph);
2422             }
2423         }
2424
2425         composite.accept(graph);
2426
2427     }
2428
2429 //    void dumpHeap() {
2430 //
2431 //        try {
2432 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2433 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2434 //                    "d:/heap" + flushCounter + ".txt", true);
2435 //        } catch (IOException e) {
2436 //            e.printStackTrace();
2437 //        }
2438 //
2439 //    }
2440
2441     void fireReactionsToSynchronize(ChangeSet cs) {
2442
2443         // Do not fire empty events
2444         if (cs.isEmpty())
2445             return;
2446
2447         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2448 //        g.state.barrier.inc();
2449
2450         try {
2451
2452             if (!cs.isEmpty()) {
2453                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2454                 for (ChangeListener l : changeListeners2) {
2455                     try {
2456                         l.graphChanged(e2);
2457                     } catch (Exception ex) {
2458                         ex.printStackTrace();
2459                     }
2460                 }
2461             }
2462
2463         } finally {
2464
2465 //            g.state.barrier.dec();
2466 //            g.waitAsync(null);
2467
2468         }
2469
2470     }
2471
2472     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2473         try {
2474
2475             // Do not fire empty events
2476             if (cs2.isEmpty())
2477                 return;
2478
2479 //            graph.restart();
2480 //            graph.state.barrier.inc();
2481
2482             try {
2483
2484                 if (!cs2.isEmpty()) {
2485
2486                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2487                     for (ChangeListener l : changeListeners2) {
2488                         try {
2489                             l.graphChanged(e2);
2490 //                            System.out.println("changelistener " + l);
2491                         } catch (Exception ex) {
2492                             ex.printStackTrace();
2493                         }
2494                     }
2495
2496                 }
2497
2498             } finally {
2499
2500 //                graph.state.barrier.dec();
2501 //                graph.waitAsync(null);
2502
2503             }
2504         } catch (Throwable t) {
2505             t.printStackTrace();
2506
2507         }
2508     }
2509     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2510
2511         try {
2512
2513             // Do not fire empty events
2514             if (cs2.isEmpty())
2515                 return;
2516
2517             // Do not fire on virtual requests
2518             if(graph.getProvider() != null)
2519                 return;
2520
2521             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2522
2523             try {
2524
2525                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2526                 for (ChangeListener l : metadataListeners) {
2527                     try {
2528                         l.graphChanged(e2);
2529                     } catch (Throwable ex) {
2530                         ex.printStackTrace();
2531                     }
2532                 }
2533
2534             } finally {
2535
2536             }
2537
2538         } catch (Throwable t) {
2539             t.printStackTrace();
2540         }
2541
2542     }
2543
2544
2545     /*
2546      * (non-Javadoc)
2547      *
2548      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2549      */
2550     @Override
2551     public <T> T getService(Class<T> api) {
2552         T t = peekService(api);
2553         if (t == null) {
2554             if (state.isClosed())
2555                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2556             throw new ServiceNotFoundException(this, api);
2557         }
2558         return t;
2559     }
2560
2561     /**
2562      * @return
2563      */
2564     protected abstract ServerInformation getCachedServerInformation();
2565
2566         private Class<?> serviceKey1 = null;
2567         private Class<?> serviceKey2 = null;
2568         private Object service1 = null;
2569         private Object service2 = null;
2570
2571     /*
2572      * (non-Javadoc)
2573      *
2574      * @see
2575      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2576      */
2577     @SuppressWarnings("unchecked")
2578     @Override
2579     public synchronized <T> T peekService(Class<T> api) {
2580
2581                 if(serviceKey1 == api) {
2582                         return (T)service1;
2583                 } else if (serviceKey2 == api) {
2584                         // Promote this key
2585                         Object result = service2;
2586                         service2 = service1;
2587                         serviceKey2 = serviceKey1;
2588                         service1 = result;
2589                         serviceKey1 = api;
2590                         return (T)result;
2591                 }
2592
2593         if (Layer0.class == api)
2594                 return (T) L0;
2595         if (ServerInformation.class == api)
2596             return (T) getCachedServerInformation();
2597         else if (WriteGraphImpl.class == api)
2598             return (T) writeState.getGraph();
2599         else if (ClusterBuilder.class == api)
2600             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2601         else if (ClusterBuilderFactory.class == api)
2602             return (T)new ClusterBuilderFactoryImpl(this);
2603
2604                 service2 = service1;
2605                 serviceKey2 = serviceKey1;
2606
2607         service1 = serviceLocator.peekService(api);
2608         serviceKey1 = api;
2609
2610         return (T)service1;
2611
2612     }
2613
2614     /*
2615      * (non-Javadoc)
2616      *
2617      * @see
2618      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2619      */
2620     @Override
2621     public boolean hasService(Class<?> api) {
2622         return serviceLocator.hasService(api);
2623     }
2624
2625     /**
2626      * @param api the api that must be implemented by the specified service
2627      * @param service the service implementation
2628      */
2629     @Override
2630     public <T> void registerService(Class<T> api, T service) {
2631         if(Layer0.class == api) {
2632                 L0 = (Layer0)service;
2633                 return;
2634         }
2635         serviceLocator.registerService(api, service);
2636         if (TransactionPolicySupport.class == api) {
2637             transactionPolicy = (TransactionPolicySupport)service;
2638             state.resetTransactionPolicy();
2639         }
2640         if (api == serviceKey1)
2641                 service1 = service;
2642         else if (api == serviceKey2)
2643                 service2 = service;
2644     }
2645
2646     // ----------------
2647     // SessionMonitor
2648     // ----------------
2649
2650     void fireSessionVariableChange(String variable) {
2651         for (MonitorHandler h : monitorHandlers) {
2652             MonitorContext ctx = monitorContexts.getLeft(h);
2653             assert ctx != null;
2654             // SafeRunner functionality repeated here to avoid dependency.
2655             try {
2656                 h.valuesChanged(ctx);
2657             } catch (Exception e) {
2658                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2659             } catch (LinkageError e) {
2660                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2661             }
2662         }
2663     }
2664
2665
2666     class ResourceSerializerImpl implements ResourceSerializer {
2667
2668         public long createRandomAccessId(int id) throws DatabaseException {
2669             if(id < 0)
2670                 return id;
2671             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2672             long cluster = getCluster(id);
2673             if (0 == cluster)
2674                 return 0; // Better to return 0 then invalid id.
2675             long result = ClusterTraitsBase.createResourceId(cluster, index);
2676             return result;
2677         }
2678
2679         @Override
2680         public long getRandomAccessId(Resource resource) throws DatabaseException {
2681
2682             ResourceImpl resourceImpl = (ResourceImpl) resource;
2683             return createRandomAccessId(resourceImpl.id);
2684
2685         }
2686
2687         @Override
2688         public int getTransientId(Resource resource) throws DatabaseException {
2689
2690             ResourceImpl resourceImpl = (ResourceImpl) resource;
2691             return resourceImpl.id;
2692
2693         }
2694
2695         @Override
2696         public String createRandomAccessId(Resource resource)
2697         throws InvalidResourceReferenceException {
2698
2699             if(resource == null) throw new IllegalArgumentException();
2700
2701             ResourceImpl resourceImpl = (ResourceImpl) resource;
2702
2703             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2704
2705             int r;
2706             try {
2707                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2708             } catch (DatabaseException e1) {
2709                 throw new InvalidResourceReferenceException(e1);
2710             }
2711             try {
2712                 // Serialize as '<resource index>_<cluster id>'
2713                 return "" + r + "_" + getCluster(resourceImpl);
2714             } catch (Throwable e) {
2715                 e.printStackTrace();
2716                 throw new InvalidResourceReferenceException(e);
2717             } finally {
2718             }
2719         }
2720
2721         public int getTransientId(long serialized) throws DatabaseException {
2722             if (serialized <= 0)
2723                 return (int)serialized;
2724             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2725             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2726             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2727             if (cluster == null)
2728                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2729             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2730             return key;
2731         }
2732
2733         @Override
2734         public Resource getResource(long randomAccessId) throws DatabaseException {
2735             return getResourceByKey(getTransientId(randomAccessId));
2736         }
2737
2738         @Override
2739         public Resource getResource(int transientId) throws DatabaseException {
2740             return getResourceByKey(transientId);
2741         }
2742
2743         @Override
2744         public Resource getResource(String randomAccessId)
2745         throws InvalidResourceReferenceException {
2746             try {
2747                 int i = randomAccessId.indexOf('_');
2748                 if (i == -1)
2749                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2750                             + randomAccessId + "'");
2751                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2752                 if(r < 0) return getResourceByKey(r);
2753                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2754                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2755                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2756                 if (cluster.hasResource(key, clusterTranslator))
2757                     return getResourceByKey(key);
2758             } catch (InvalidResourceReferenceException e) {
2759                 throw e;
2760             } catch (NumberFormatException e) {
2761                 throw new InvalidResourceReferenceException(e);
2762             } catch (Throwable e) {
2763                 e.printStackTrace();
2764                 throw new InvalidResourceReferenceException(e);
2765             } finally {
2766             }
2767             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2768         }
2769
2770         @Override
2771         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2772             try {
2773                 return true;
2774             } catch (Throwable e) {
2775                 e.printStackTrace();
2776                 throw new InvalidResourceReferenceException(e);
2777             } finally {
2778             }
2779         }
2780     }
2781
2782     // Copied from old SessionImpl
2783
2784     void check() {
2785         if (state.isClosed())
2786             throw new Error("Session closed.");
2787     }
2788
2789
2790
2791     public ResourceImpl getNewResource(long clusterId) {
2792         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2793         int newId;
2794         try {
2795             newId = cluster.createResource(clusterTranslator);
2796         } catch (DatabaseException e) {
2797             Logger.defaultLogError(e);
2798             return null;
2799         }
2800         return new ResourceImpl(resourceSupport, newId);
2801     }
2802
2803     public ResourceImpl getNewResource(Resource clusterSet)
2804     throws DatabaseException {
2805         long resourceId = clusterSet.getResourceId();
2806         Long clusterId = clusterSetsSupport.get(resourceId);
2807         if (null == clusterId)
2808             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2809         if (Constants.NewClusterId == clusterId) {
2810             clusterId = getService(ClusteringSupport.class).createCluster();
2811             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2812                 createdClusters.add(clusterId);
2813             clusterSetsSupport.put(resourceId, clusterId);
2814             return getNewResource(clusterId);
2815         } else {
2816             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2817             ResourceImpl result;
2818             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2819                 clusterId = getService(ClusteringSupport.class).createCluster();
2820                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2821                     createdClusters.add(clusterId);
2822                 clusterSetsSupport.put(resourceId, clusterId);
2823                 return getNewResource(clusterId);
2824             } else {
2825                 result = getNewResource(clusterId);
2826                 int resultKey = querySupport.getId(result);
2827                 long resultCluster = querySupport.getClusterId(resultKey);
2828                 if (clusterId != resultCluster)
2829                     clusterSetsSupport.put(resourceId, resultCluster);
2830                 return result;
2831             }
2832         }
2833     }
2834
2835     public void getNewClusterSet(Resource clusterSet)
2836     throws DatabaseException {
2837         if (DEBUG)
2838             System.out.println("new cluster set=" + clusterSet);
2839         long resourceId = clusterSet.getResourceId();
2840         if (clusterSetsSupport.containsKey(resourceId))
2841             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2842         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2843     }
2844     public boolean containsClusterSet(Resource clusterSet)
2845     throws ServiceException {
2846         long resourceId = clusterSet.getResourceId();
2847         return clusterSetsSupport.containsKey(resourceId);
2848     }
2849     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2850         Resource r = defaultClusterSet;
2851         defaultClusterSet = clusterSet;
2852         return r;
2853     }
2854     void printDiagnostics() {
2855
2856         if (DIAGNOSTICS) {
2857
2858             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2859             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2860             for(ClusterI cluster : clusterTable.getClusters()) {
2861                 try {
2862                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2863                         residentClusters.set(residentClusters.get() + 1);
2864                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2865                     }
2866                 } catch (DatabaseException e) {
2867                     Logger.defaultLogError(e);
2868                 }
2869             }
2870
2871             System.out.println("--------------------------------");
2872             System.out.println("Cluster information:");
2873             System.out.println("-amount of resident clusters=" + residentClusters.get());
2874             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2875
2876             for(ClusterI cluster : clusterTable.getClusters()) {
2877                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2878                 try {
2879                     if (!cluster.isLoaded())
2880                         System.out.println("not loaded]");
2881                     else if (cluster.isEmpty())
2882                         System.out.println("is empty]");
2883                     else {
2884                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2885                         System.out.print(",references=" + cluster.getReferenceCount());
2886                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2887                     }
2888                 } catch (DatabaseException e) {
2889                     Logger.defaultLogError(e);
2890                     System.out.println("is corrupted]");
2891                 }
2892             }
2893
2894             queryProvider2.printDiagnostics();
2895             System.out.println("--------------------------------");
2896         }
2897
2898     }
2899
2900     public void fireStartReadTransaction() {
2901
2902         if (DIAGNOSTICS)
2903             System.out.println("StartReadTransaction");
2904
2905         for (SessionEventListener listener : eventListeners) {
2906             try {
2907                 listener.readTransactionStarted();
2908             } catch (Throwable t) {
2909                 t.printStackTrace();
2910             }
2911         }
2912
2913     }
2914
2915     public void fireFinishReadTransaction() {
2916
2917         if (DIAGNOSTICS)
2918             System.out.println("FinishReadTransaction");
2919
2920         for (SessionEventListener listener : eventListeners) {
2921             try {
2922                 listener.readTransactionFinished();
2923             } catch (Throwable t) {
2924                 t.printStackTrace();
2925             }
2926         }
2927
2928     }
2929
2930     public void fireStartWriteTransaction() {
2931
2932         if (DIAGNOSTICS)
2933             System.out.println("StartWriteTransaction");
2934
2935         for (SessionEventListener listener : eventListeners) {
2936             try {
2937                 listener.writeTransactionStarted();
2938             } catch (Throwable t) {
2939                 t.printStackTrace();
2940             }
2941         }
2942
2943     }
2944
2945     public void fireFinishWriteTransaction() {
2946
2947         if (DIAGNOSTICS)
2948             System.out.println("FinishWriteTransaction");
2949
2950         for (SessionEventListener listener : eventListeners) {
2951             try {
2952                 listener.writeTransactionFinished();
2953             } catch (Throwable t) {
2954                 t.printStackTrace();
2955             }
2956         }
2957
2958         Indexing.resetDependenciesIndexingDisabled();
2959
2960     }
2961
2962     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2963         throw new Error("Not supported at the moment.");
2964     }
2965
2966     State getState() {
2967         return state;
2968     }
2969
2970     ClusterTable getClusterTable() {
2971         return clusterTable;
2972     }
2973
2974     public GraphSession getGraphSession() {
2975         return graphSession;
2976     }
2977
2978     public QueryProcessor getQueryProvider2() {
2979         return queryProvider2;
2980     }
2981
2982     ClientChangesImpl getClientChanges() {
2983         return clientChanges;
2984     }
2985
2986     boolean getWriteOnly() {
2987         return writeOnly;
2988     }
2989
2990     static int counter = 0;
2991
2992     public void onClusterLoaded(long clusterId) {
2993
2994         clusterTable.updateSize();
2995
2996     }
2997
2998
2999
3000
3001     /*
3002      * Implementation of the interface RequestProcessor
3003      */
3004
3005     @Override
3006     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
3007
3008         assertNotSession();
3009         assertAlive();
3010
3011         assert(request != null);
3012
3013         final DataContainer<T> result = new DataContainer<T>();
3014         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3015
3016         syncRequest(request, new AsyncProcedure<T>() {
3017
3018             @Override
3019             public void execute(AsyncReadGraph graph, T t) {
3020                 result.set(t);
3021             }
3022
3023             @Override
3024             public void exception(AsyncReadGraph graph, Throwable t) {
3025                 exception.set(t);
3026             }
3027
3028         });
3029
3030         Throwable t = exception.get();
3031         if(t != null) {
3032             if(t instanceof DatabaseException) throw (DatabaseException)t;
3033             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3034         }
3035
3036         return result.get();
3037
3038     }
3039
3040 //    @Override
3041 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
3042 //        assertNotSession();
3043 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
3044 //    }
3045
3046     @Override
3047     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
3048         assertNotSession();
3049         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3050     }
3051
3052     @Override
3053     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
3054         assertNotSession();
3055         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3056     }
3057
3058     @Override
3059     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3060         assertNotSession();
3061         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3062     }
3063
3064     @Override
3065     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
3066         assertNotSession();
3067         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3068     }
3069
3070     @Override
3071     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3072
3073         assertNotSession();
3074
3075         assert(request != null);
3076
3077         final DataContainer<T> result = new DataContainer<T>();
3078         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3079
3080         syncRequest(request, new AsyncProcedure<T>() {
3081
3082             @Override
3083             public void execute(AsyncReadGraph graph, T t) {
3084                 result.set(t);
3085             }
3086
3087             @Override
3088             public void exception(AsyncReadGraph graph, Throwable t) {
3089                 exception.set(t);
3090             }
3091
3092         });
3093
3094         Throwable t = exception.get();
3095         if(t != null) {
3096             if(t instanceof DatabaseException) throw (DatabaseException)t;
3097             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3098         }
3099
3100         return result.get();
3101
3102     }
3103
3104     @Override
3105     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3106         assertNotSession();
3107         return syncRequest(request, (AsyncProcedure<T>)procedure);
3108     }
3109
3110     @Override
3111     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3112         assertNotSession();
3113         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3114     }
3115
3116     @Override
3117     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3118         assertNotSession();
3119         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3120     }
3121
3122     @Override
3123     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3124         assertNotSession();
3125         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3126     }
3127
3128     @Override
3129     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3130         assertNotSession();
3131         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3132     }
3133
3134     @Override
3135     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3136
3137         assertNotSession();
3138
3139         assert(request != null);
3140
3141         final ArrayList<T> result = new ArrayList<T>();
3142         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3143
3144         syncRequest(request, new SyncMultiProcedure<T>() {
3145
3146             @Override
3147             public void execute(ReadGraph graph, T t) {
3148                 synchronized(result) {
3149                     result.add(t);
3150                 }
3151             }
3152
3153             @Override
3154             public void finished(ReadGraph graph) {
3155             }
3156
3157             @Override
3158             public void exception(ReadGraph graph, Throwable t) {
3159                 exception.set(t);
3160             }
3161
3162
3163         });
3164
3165         Throwable t = exception.get();
3166         if(t != null) {
3167             if(t instanceof DatabaseException) throw (DatabaseException)t;
3168             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3169         }
3170
3171         return result;
3172
3173     }
3174
3175     @Override
3176     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
3177         assertNotSession();
3178         throw new Error("Not implemented!");
3179     }
3180
3181     @Override
3182     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3183         assertNotSession();
3184         return syncRequest(request, (SyncMultiProcedure<T>)procedure);
3185     }
3186
3187     @Override
3188     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3189         assertNotSession();
3190         return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3191     }
3192
3193     @Override
3194     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3195         assertNotSession();
3196         return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3197     }
3198
3199     @Override
3200     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3201
3202         assertNotSession();
3203
3204         assert(request != null);
3205
3206         final ArrayList<T> result = new ArrayList<T>();
3207         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3208
3209         syncRequest(request, new AsyncMultiProcedure<T>() {
3210
3211             @Override
3212             public void execute(AsyncReadGraph graph, T t) {
3213                 synchronized(result) {
3214                     result.add(t);
3215                 }
3216             }
3217
3218             @Override
3219             public void finished(AsyncReadGraph graph) {
3220             }
3221
3222             @Override
3223             public void exception(AsyncReadGraph graph, Throwable t) {
3224                 exception.set(t);
3225             }
3226
3227
3228         });
3229
3230         Throwable t = exception.get();
3231         if(t != null) {
3232             if(t instanceof DatabaseException) throw (DatabaseException)t;
3233             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3234         }
3235
3236         return result;
3237
3238     }
3239
3240     @Override
3241     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3242         assertNotSession();
3243         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3244     }
3245
3246     @Override
3247     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3248         assertNotSession();
3249         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3250     }
3251
3252     @Override
3253     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3254         assertNotSession();
3255        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3256     }
3257
3258     @Override
3259     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3260         assertNotSession();
3261         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3262     }
3263
3264     @Override
3265     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3266         assertNotSession();
3267         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3268     }
3269
3270
3271     @Override
3272     public <T> void asyncRequest(Read<T> request) {
3273
3274         asyncRequest(request, new ProcedureAdapter<T>() {
3275             @Override
3276             public void exception(Throwable t) {
3277                 t.printStackTrace();
3278             }
3279         });
3280
3281     }
3282
3283     @Override
3284     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3285         asyncRequest(request, (AsyncProcedure<T>)procedure);
3286     }
3287
3288     @Override
3289     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3290         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3291     }
3292
3293     @Override
3294     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3295         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3296     }
3297
3298     @Override
3299     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3300         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3301     }
3302
3303     @Override
3304     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3305         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3306     }
3307
3308     @Override
3309     final public <T> void asyncRequest(final AsyncRead<T> request) {
3310
3311         assert(request != null);
3312
3313         asyncRequest(request, new ProcedureAdapter<T>() {
3314             @Override
3315             public void exception(Throwable t) {
3316                 t.printStackTrace();
3317             }
3318         });
3319
3320     }
3321
3322     @Override
3323     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3324         scheduleRequest(request, procedure, procedure, null);
3325     }
3326
3327     @Override
3328     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3329         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3330     }
3331
3332     @Override
3333     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3334         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3335     }
3336
3337     @Override
3338     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3339         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3340     }
3341
3342     @Override
3343     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3344         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3345     }
3346
3347     @Override
3348     public <T> void asyncRequest(MultiRead<T> request) {
3349
3350         assert(request != null);
3351
3352         asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
3353             @Override
3354             public void exception(ReadGraph graph, Throwable t) {
3355                 t.printStackTrace();
3356             }
3357         });
3358
3359     }
3360
3361     @Override
3362     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3363         asyncRequest(request, (SyncMultiProcedure<T>)procedure);
3364     }
3365
3366     @Override
3367     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3368         asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3369     }
3370
3371     @Override
3372     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3373         scheduleRequest(request, procedure, null);
3374     }
3375
3376     @Override
3377     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3378         asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3379     }
3380
3381     @Override
3382     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3383
3384         assert(request != null);
3385
3386         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3387             @Override
3388             public void exception(AsyncReadGraph graph, Throwable t) {
3389                 t.printStackTrace();
3390             }
3391         });
3392
3393     }
3394
3395     @Override
3396     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3397         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3398     }
3399
3400     @Override
3401     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3402         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3403     }
3404
3405     @Override
3406     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3407         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3408     }
3409
3410     @Override
3411     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3412         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3413     }
3414
3415     @Override
3416     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3417         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3418     }
3419
3420     @Override
3421     final public <T> void asyncRequest(final ExternalRead<T> request) {
3422
3423         assert(request != null);
3424
3425         asyncRequest(request, new ProcedureAdapter<T>() {
3426             @Override
3427             public void exception(Throwable t) {
3428                 t.printStackTrace();
3429             }
3430         });
3431
3432     }
3433
3434     @Override
3435     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3436         asyncRequest(request, (Procedure<T>)procedure);
3437     }
3438
3439
3440
3441     void check(Throwable t) throws DatabaseException {
3442         if(t != null) {
3443             if(t instanceof DatabaseException) throw (DatabaseException)t;
3444             else throw new DatabaseException("Unexpected exception", t);
3445         }
3446     }
3447
3448     void check(DataContainer<Throwable> container) throws DatabaseException {
3449         Throwable t = container.get();
3450         if(t != null) {
3451             if(t instanceof DatabaseException) throw (DatabaseException)t;
3452             else throw new DatabaseException("Unexpected exception", t);
3453         }
3454     }
3455
3456
3457
3458
3459
3460
3461     boolean sameProvider(Write request) {
3462         if(writeState.getGraph().provider != null) {
3463             return writeState.getGraph().provider.equals(request.getProvider());
3464         } else {
3465             return request.getProvider() == null;
3466         }
3467     }
3468
3469     boolean plainWrite(WriteGraphImpl graph) {
3470         if(graph == null) return false;
3471         if(graph.writeSupport.writeOnly()) return false;
3472         return true;
3473     }
3474
3475
3476     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3477     private void assertNotSession() throws DatabaseException {
3478         Thread current = Thread.currentThread();
3479         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3480     }
3481
3482     void assertAlive() {
3483         if (!state.isAlive())
3484             throw new RuntimeDatabaseException("Session has been shut down.");
3485     }
3486
3487     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3488         return querySupport.getValueStream(graph, querySupport.getId(resource));
3489     }
3490
3491     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3492         return querySupport.getValue(graph, querySupport.getId(resource));
3493     }
3494
3495     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3496         acquire(semaphore, request, null);
3497     }
3498
3499     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3500         assertAlive();
3501         try {
3502             long tryCount = 0;
3503             // Loop until the semaphore is acquired reporting requests that take a long time.
3504             while (true) {
3505
3506                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3507                     // Acquired OK.
3508                     return;
3509                 } else {
3510                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3511                         ++tryCount;
3512                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3513                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3514                                 * tryCount;
3515                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3516                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3517                     }
3518                 }
3519
3520                 assertAlive();
3521
3522             }
3523         } catch (InterruptedException e) {
3524             e.printStackTrace();
3525             // FIXME: Should perhaps do something else in this case ??
3526         }
3527     }
3528
3529
3530
3531 //    @Override
3532 //    public boolean holdOnToTransactionAfterCancel() {
3533 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3534 //    }
3535 //
3536 //    @Override
3537 //    public boolean holdOnToTransactionAfterCommit() {
3538 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3539 //    }
3540 //
3541 //    @Override
3542 //    public boolean holdOnToTransactionAfterRead() {
3543 //        return transactionPolicy.holdOnToTransactionAfterRead();
3544 //    }
3545 //
3546 //    @Override
3547 //    public void onRelinquish() {
3548 //        transactionPolicy.onRelinquish();
3549 //    }
3550 //
3551 //    @Override
3552 //    public void onRelinquishDone() {
3553 //        transactionPolicy.onRelinquishDone();
3554 //    }
3555 //
3556 //    @Override
3557 //    public void onRelinquishError() {
3558 //        transactionPolicy.onRelinquishError();
3559 //    }
3560
3561
3562     @Override
3563     public Session getSession() {
3564         return this;
3565     }
3566
3567
3568     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3569     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3570
3571     public void ceased(int thread) {
3572
3573         requestManager.ceased(thread);
3574
3575     }
3576
3577     public int getAmountOfQueryThreads() {
3578         // This must be a power of two
3579         return 1;
3580 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3581     }
3582
3583     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3584
3585         return new ResourceImpl(resourceSupport, key);
3586
3587     }
3588
3589     public void acquireWriteOnly() {
3590         writeOnly = true;
3591     }
3592
3593     public void releaseWriteOnly(ReadGraphImpl graph) {
3594         writeOnly = false;
3595         queryProvider2.releaseWrite(graph);
3596     }
3597
3598     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3599
3600         long start = System.nanoTime();
3601
3602         while(dirtyPrimitives) {
3603             dirtyPrimitives = false;
3604             getQueryProvider2().performDirtyUpdates(writer);
3605             getQueryProvider2().performScheduledUpdates(writer);
3606         }
3607
3608         fireMetadataListeners(writer, clientChanges);
3609
3610         if(DebugPolicy.PERFORMANCE_DATA) {
3611             long end = System.nanoTime();
3612             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3613         }
3614
3615     }
3616
3617     void removeTemporaryData() {
3618         File platform = Platform.getLocation().toFile();
3619         File tempFiles = new File(platform, "tempFiles");
3620         File temp = new File(tempFiles, "db");
3621         if(!temp.exists()) 
3622             return;
3623         try {
3624             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3625                 @Override
3626                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3627                     try {
3628                         Files.delete(file);
3629                     } catch (IOException e) {
3630                         Logger.defaultLogError(e);
3631                     }
3632                     return FileVisitResult.CONTINUE;
3633                 }
3634             });
3635         } catch (IOException e) {
3636             Logger.defaultLogError(e);
3637         }
3638     }
3639
3640     public void handleCreatedClusters() {
3641         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3642             createdClusters.forEach(new TLongProcedure() {
3643
3644                 @Override
3645                 public boolean execute(long value) {
3646                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3647                     cluster.setImmutable(true, clusterTranslator);
3648                     return true;
3649                 }
3650             });
3651         }
3652         createdClusters.clear();
3653     }
3654
3655     @Override
3656     public Object getModificationCounter() {
3657         return queryProvider2.modificationCounter;
3658     }
3659     
3660     @Override
3661     public void markUndoPoint() {
3662         state.setCombine(false);
3663     }
3664
3665 }