]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
030d512eb736e2f6303f0d68c1d0b0e3d2e57b97
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.EventSupport;
147 import org.simantics.db.service.GraphChangeListenerSupport;
148 import org.simantics.db.service.InitSupport;
149 import org.simantics.db.service.LifecycleSupport;
150 import org.simantics.db.service.ManagementSupport;
151 import org.simantics.db.service.QueryControl;
152 import org.simantics.db.service.SerialisationSupport;
153 import org.simantics.db.service.ServerInformation;
154 import org.simantics.db.service.ServiceActivityMonitor;
155 import org.simantics.db.service.SessionEventSupport;
156 import org.simantics.db.service.SessionMonitorSupport;
157 import org.simantics.db.service.SessionUserSupport;
158 import org.simantics.db.service.StatementSupport;
159 import org.simantics.db.service.TransactionPolicySupport;
160 import org.simantics.db.service.TransactionSupport;
161 import org.simantics.db.service.TransferableGraphSupport;
162 import org.simantics.db.service.UndoRedoSupport;
163 import org.simantics.db.service.VirtualGraphSupport;
164 import org.simantics.db.service.XSupport;
165 import org.simantics.layer0.Layer0;
166 import org.simantics.utils.DataContainer;
167 import org.simantics.utils.Development;
168 import org.simantics.utils.threads.logger.ITask;
169 import org.simantics.utils.threads.logger.ThreadLogger;
170 import org.slf4j.LoggerFactory;
171
172 import gnu.trove.procedure.TLongProcedure;
173 import gnu.trove.set.hash.TLongHashSet;
174
175
176 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
177
178     private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class);
179
180     protected static final boolean DEBUG        = false;
181
182     private static final boolean DIAGNOSTICS       = false;
183
184     private TransactionPolicySupport transactionPolicy;
185
186     final private ClusterControl clusterControl;
187
188     final protected BuiltinSupportImpl builtinSupport;
189     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
190     final protected ClusterSetsSupport clusterSetsSupport;
191     final protected LifecycleSupportImpl lifecycleSupport;
192
193     protected QuerySupportImpl querySupport;
194     protected ResourceSupportImpl resourceSupport;
195     protected WriteSupport writeSupport;
196     public ClusterTranslator clusterTranslator;
197
198     boolean dirtyPrimitives = false;
199
200     public static final int SERVICE_MODE_CREATE = 2;
201     public static final int SERVICE_MODE_ALLOW = 1;
202     public int serviceMode = 0;
203     public boolean createdImmutableClusters = false;
204     public TLongHashSet createdClusters = new TLongHashSet();
205
206     private Layer0 L0;
207
208     /**
209      * The service locator maintained by the workbench. These services are
210      * initialized during workbench during the <code>init</code> method.
211      */
212     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
213
214     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
215
216     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
217
218     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
219
220     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
221
222     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
223
224     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
225
226     final protected State                                        state              = new State();
227
228     protected GraphSession                                       graphSession       = null;
229
230     protected SessionManager                                     sessionManagerImpl = null;
231
232     protected UserAuthenticationAgent                            authAgent          = null;
233
234     protected UserAuthenticator                                  authenticator      = null;
235
236     protected Resource                                           user               = null;
237
238     protected ClusterStream                                      clusterStream      = null;
239
240     protected SessionRequestManager                         requestManager = null;
241
242     public ClusterTable                                       clusterTable       = null;
243
244     public QueryProcessor                                     queryProvider2 = null;
245
246     ClientChangesImpl                                  clientChanges      = null;
247
248     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
249
250 //    protected long                                               newClusterId       = Constants.NullClusterId;
251
252     protected int                                                flushCounter       = 0;
253
254     protected boolean                                            writeOnly          = false;
255
256     WriteState<?>                                                writeState = null;
257     WriteStateBase<?>                                            delayedWriteState = null;
258     protected Resource defaultClusterSet = null; // If not null then used for newResource().
259
260     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
261
262 //        if (authAgent == null)
263 //            throw new IllegalArgumentException("null authentication agent");
264
265         File t = StaticSessionProperties.virtualGraphStoragePath;
266         if (null == t)
267             t = new File(".");
268         this.clusterTable = new ClusterTable(this, t);
269         this.builtinSupport = new BuiltinSupportImpl(this);
270         this.sessionManagerImpl = sessionManagerImpl;
271         this.user = null;
272 //        this.authAgent = authAgent;
273
274         serviceLocator.registerService(Session.class, this);
275         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
276         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
277         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
278         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
279         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
280         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
281         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
282         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
283         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
284         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
285         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
286         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
287         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
288         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
289         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
290         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
291         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
292         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
293         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
294         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
295         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
296         serviceLocator.registerService(EventSupport.class, new EventSupportImpl());
297         ServiceActivityUpdaterForWriteTransactions.register(this);
298
299         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
300         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
301         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
302         this.lifecycleSupport = new LifecycleSupportImpl(this);
303         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
304         this.transactionPolicy = new TransactionPolicyRelease();
305         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
306         this.clusterControl = new ClusterControlImpl(this);
307         serviceLocator.registerService(ClusterControl.class, clusterControl);
308         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
309         this.clusterSetsSupport.setReadDirectory(t.toPath());
310         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
311         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
312     }
313
314     /*
315      *
316      * Session interface
317      */
318
319
320     @Override
321     public Resource getRootLibrary() {
322         return queryProvider2.getRootLibraryResource();
323     }
324
325     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
326         if (!graphSession.dbSession.refreshEnabled())
327             return;
328         try {
329             getClusterTable().refresh(csid, this, clusterUID);
330         } catch (Throwable t) {
331             Logger.defaultLogError("Refesh failed.", t);
332         }
333     }
334
335     static final class TaskHelper {
336         private final String name;
337         private Object result;
338         final Semaphore sema = new Semaphore(0);
339         private Throwable throwable = null;
340         final Consumer<DatabaseException> callback = e -> {
341             synchronized (TaskHelper.this) {
342                 throwable = e;
343             }
344         };
345         final Procedure<Object> proc = new Procedure<Object>() {
346             @Override
347             public void execute(Object result) {
348                 callback.accept(null);
349             }
350             @Override
351             public void exception(Throwable t) {
352                 if (t instanceof DatabaseException)
353                     callback.accept((DatabaseException)t);
354                 else
355                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
356             }
357         };
358         final WriteTraits writeTraits = new WriteTraits() {};
359         TaskHelper(String name) {
360             this.name = name;
361         }
362         @SuppressWarnings("unchecked")
363         <T> T getResult() {
364             return (T)result;
365         }
366         void setResult(Object result) {
367             this.result = result;
368         }
369 //        Throwable throwableGet() {
370 //            return throwable;
371 //        }
372         synchronized void throwableSet(Throwable t) {
373             throwable = t;
374         }
375 //        void throwableSet(String t) {
376 //            throwable = new InternalException("" + name + " operation failed. " + t);
377 //        }
378         synchronized void throwableCheck()
379         throws DatabaseException {
380             if (null != throwable)
381                 if (throwable instanceof DatabaseException)
382                     throw (DatabaseException)throwable;
383                 else
384                     throw new DatabaseException("Undo operation failed.", throwable);
385         }
386         void throw_(String message)
387         throws DatabaseException {
388             throw new DatabaseException("" + name + " operation failed. " + message);
389         }
390     }
391
392
393
394     private ListenerBase getListenerBase(Object procedure) {
395         if (procedure instanceof ListenerBase)
396             return (ListenerBase) procedure;
397         else
398             return null;
399     }
400
401     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
402         scheduleRequest(request, callback, notify, null);
403     }
404
405     /* (non-Javadoc)
406      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
407      */
408     @Override
409     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
410
411         assert (request != null);
412
413         if(Development.DEVELOPMENT) {
414             try {
415             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
416                 System.err.println("schedule write '" + request + "'");
417             } catch (Throwable t) {
418                 Logger.defaultLogError(t);
419             }
420         }
421
422         requestManager.scheduleWrite(new SessionTask(null) {
423
424             @Override
425             public void run0(int thread) {
426
427                 if(Development.DEVELOPMENT) {
428                     try {
429                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
430                         System.err.println("perform write '" + request + "'");
431                     } catch (Throwable t) {
432                         Logger.defaultLogError(t);
433                     }
434                 }
435
436                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
437
438                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
439
440                 try {
441
442                     flushCounter = 0;
443                     Disposable.safeDispose(clientChanges);
444                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
445
446                     VirtualGraph vg = getProvider(request.getProvider());
447
448                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
449                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
450
451                         @Override
452                         public void execute(Object result) {
453                             if(callback != null) callback.accept(null);
454                         }
455
456                         @Override
457                         public void exception(Throwable t) {
458                             if(callback != null) callback.accept((DatabaseException)t);
459                         }
460
461                     });
462
463                     assert (null != writer);
464 //                    writer.state.barrier.inc();
465
466                     try {
467                         request.perform(writer);
468                         assert (null != writer);
469                     } catch (Throwable t) {
470                         if (!(t instanceof CancelTransactionException))
471                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
472                         writeState.except(t);
473                     } finally {
474 //                        writer.state.barrier.dec();
475 //                        writer.waitAsync(request);
476                     }
477
478
479                     assert(!queryProvider2.cache.dirty);
480
481                 } catch (Throwable e) {
482
483                     // Log it first, just to be safe that the error is always logged.
484                     if (!(e instanceof CancelTransactionException))
485                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
486
487 //                    writeState.getGraph().state.barrier.dec();
488 //                    writeState.getGraph().waitAsync(request);
489
490                     writeState.except(e);
491
492 //                    try {
493 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
494 //                        // All we can do here is to log those, can't really pass them anywhere.
495 //                        if (callback != null) {
496 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
497 //                            else callback.run(new DatabaseException(e));
498 //                        }
499 //                    } catch (Throwable e2) {
500 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
501 //                    }
502 //                    boolean empty = clusterStream.reallyFlush();
503 //                    int callerThread = -1;
504 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
505 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
506 //                    try {
507 //                        // Send all local changes to server so it can calculate correct reverse change set.
508 //                        // This will call clusterStream.accept().
509 //                        state.cancelCommit(context, clusterStream);
510 //                        if (!empty) {
511 //                            if (!context.isOk()) // this is a blocking operation
512 //                                throw new InternalException("Cancel failed. This should never happen.");
513 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
514 //                        }
515 //                        state.cancelCommit2(context, clusterStream);
516 //                    } catch (DatabaseException e3) {
517 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
518 //                    } finally {
519 //                        clusterStream.setOff(false);
520 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
521 //                    }
522
523                 } finally {
524
525                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
526
527                 }
528
529                 task.finish();
530
531                 if(Development.DEVELOPMENT) {
532                     try {
533                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
534                         System.err.println("finish write '" + request + "'");
535                     } catch (Throwable t) {
536                         Logger.defaultLogError(t);
537                     }
538                 }
539
540             }
541
542         }, combine);
543
544     }
545
546     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
547         scheduleRequest(request, procedure, notify, null);
548     }
549
550     /* (non-Javadoc)
551      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
552      */
553     @Override
554     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
555
556         assert (request != null);
557
558         requestManager.scheduleWrite(new SessionTask(null) {
559
560             @Override
561             public void run0(int thread) {
562
563                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
564
565                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
566
567                 flushCounter = 0;
568                 Disposable.safeDispose(clientChanges);
569                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
570
571                 VirtualGraph vg = getProvider(request.getProvider());
572                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
573
574                 try {
575                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
576                     writeState = writeStateT;
577
578                     assert (null != writer);
579 //                    writer.state.barrier.inc();
580                     writeStateT.setResult(request.perform(writer));
581                     assert (null != writer);
582
583 //                    writer.state.barrier.dec();
584 //                    writer.waitAsync(null);
585
586                 } catch (Throwable e) {
587
588 //                    writer.state.barrier.dec();
589 //                    writer.waitAsync(null);
590
591                     writeState.except(e);
592
593 //                  state.stopWriteTransaction(clusterStream);
594 //
595 //              } catch (Throwable e) {
596 //                  // Log it first, just to be safe that the error is always logged.
597 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
598 //
599 //                  try {
600 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
601 //                      // All we can do here is to log those, can't really pass them anywhere.
602 //                      if (procedure != null) {
603 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
604 //                          else procedure.exception(new DatabaseException(e));
605 //                      }
606 //                  } catch (Throwable e2) {
607 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
608 //                  }
609 //
610 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
611 //
612 //                  state.stopWriteTransaction(clusterStream);
613
614                 } finally {
615                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
616                 }
617
618 //              if(notify != null) notify.release();
619
620                 task.finish();
621
622             }
623
624         }, combine);
625
626     }
627
628     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
629         scheduleRequest(request, callback, notify, null);
630     }
631
632     /* (non-Javadoc)
633      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
634      */
635     @Override
636     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
637
638         final ITask total = ThreadLogger.task("ScheduleDelayedWrite");
639
640         assert (request != null);
641
642         requestManager.scheduleWrite(new SessionTask(null) {
643
644             @Override
645             public void run0(int thread) {
646                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
647
648                 Procedure<Object> stateProcedure = new Procedure<Object>() {
649                     @Override
650                     public void execute(Object result) {
651                         if (callback != null)
652                             callback.accept(null);
653                     }
654                     @Override
655                     public void exception(Throwable t) {
656                         if (callback != null) {
657                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
658                             else callback.accept(new DatabaseException(t));
659                         } else
660                             Logger.defaultLogError("Unhandled exception", t);
661                     }
662                 };
663
664                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
665                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
666                 DelayedWriteGraph dwg = null;
667 //                newGraph.state.barrier.inc();
668
669                 try {
670                     dwg = new DelayedWriteGraph(newGraph);
671                     request.perform(dwg);
672                 } catch (Throwable e) {
673                     delayedWriteState.except(e);
674                     total.finish();
675                     dwg.close();
676                     return;
677                 } finally {
678 //                    newGraph.state.barrier.dec();
679 //                    newGraph.waitAsync(request);
680                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
681                 }
682
683                 delayedWriteState = null;
684
685                 ITask task2 = ThreadLogger.task("DelayedWriteCommit");
686                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
687
688                 flushCounter = 0;
689                 Disposable.safeDispose(clientChanges);
690                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
691
692                 acquireWriteOnly();
693
694                 VirtualGraph vg = getProvider(request.getProvider());
695                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
696
697                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
698
699                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
700
701                 assert (null != writer);
702 //                writer.state.barrier.inc();
703
704                 try {
705                     // Cannot cancel
706                     dwg.commit(writer, request);
707
708                         if(defaultClusterSet != null) {
709                                 XSupport xs = getService(XSupport.class);
710                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
711                                 ClusteringSupport cs = getService(ClusteringSupport.class);
712                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
713                         }
714
715                     // This makes clusters available from server
716                     clusterStream.reallyFlush();
717
718                     releaseWriteOnly(writer);
719                     handleUpdatesAndMetadata(writer);
720
721                 } catch (ServiceException e) {
722 //                    writer.state.barrier.dec();
723 //                    writer.waitAsync(null);
724
725                     // These shall be requested from server
726                     clusterTable.removeWriteOnlyClusters();
727                     // This makes clusters available from server
728                     clusterStream.reallyFlush();
729
730                     releaseWriteOnly(writer);
731                     writeState.except(e);
732                 } finally {
733                     // Debugging & Profiling
734                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
735                     task2.finish();
736                     total.finish();
737                 }
738             }
739
740         }, combine);
741
742     }
743
744     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
745         scheduleRequest(request, procedure, notify, null);
746     }
747
748     /* (non-Javadoc)
749      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
750      */
751     @Override
752     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
753         throw new Error("Not implemented");
754     }
755
756     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
757         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
758         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
759             createdClusters.add(cluster.clusterId);
760         }
761         return cluster;
762     }
763
764     class WriteOnlySupport implements WriteSupport {
765
766         ClusterStream stream;
767         ClusterImpl currentCluster;
768
769         public WriteOnlySupport() {
770             this.stream = clusterStream;
771         }
772
773         @Override
774         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
775             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
776         }
777
778         @Override
779         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
780
781             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
782
783             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
784                 if(s != queryProvider2.getRootLibrary())
785                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
786
787             try {
788                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
789             } catch (DatabaseException e) {
790                 Logger.defaultLogError(e);
791                 //throw e;
792             }
793
794             clientChanges.invalidate(s);
795
796             if (cluster.isWriteOnly())
797                 return;
798             queryProvider2.updateStatements(s, p);
799
800         }
801
802         @Override
803         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
804             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
805         }
806
807         @Override
808         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
809
810             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
811             try {
812                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
813             } catch (DatabaseException e) {
814                 Logger.defaultLogError(e);
815             }
816
817             clientChanges.invalidate(rid);
818
819             if (cluster.isWriteOnly())
820                 return;
821             queryProvider2.updateValue(rid);
822
823         }
824
825         @Override
826         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
827
828             if(amount < 65536) {
829                 claimValue(provider,resource, reader.readBytes(null, amount));
830                 return;
831             }
832
833             byte[] bytes = new byte[65536];
834
835             int rid = ((ResourceImpl)resource).id;
836             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
837             try {
838                 int left = amount;
839                 while(left > 0) {
840                     int block = Math.min(left, 65536);
841                     reader.readBytes(bytes, block);
842                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
843                     left -= block;
844                 }
845             } catch (DatabaseException e) {
846                 Logger.defaultLogError(e);
847             }
848
849             clientChanges.invalidate(rid);
850
851             if (cluster.isWriteOnly())
852                 return;
853             queryProvider2.updateValue(rid);
854
855         }
856
857         private void maintainCluster(ClusterImpl before, ClusterI after_) {
858             if(after_ != null && after_ != before) {
859                 ClusterImpl after = (ClusterImpl)after_;
860                 if(currentCluster == before) {
861                     currentCluster = after;
862                 }
863                 clusterTable.replaceCluster(after);
864             }
865         }
866
867         public int createResourceKey(int foreignCounter) throws DatabaseException {
868             if(currentCluster == null) {
869                 currentCluster = getNewResourceCluster();
870             }
871             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
872                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
873                 newCluster.foreignLookup = new byte[foreignCounter];
874                 currentCluster = newCluster;
875                 if (DEBUG)
876                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
877             }
878             return currentCluster.createResource(clusterTranslator);
879         }
880
881         @Override
882         public Resource createResource(VirtualGraph provider) throws DatabaseException {
883             if(currentCluster == null) {
884                 if (null != defaultClusterSet) {
885                         ResourceImpl result = getNewResource(defaultClusterSet);
886                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
887                     return result;
888                 } else {
889                     currentCluster = getNewResourceCluster();
890                 }
891             }
892             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
893                 if (null != defaultClusterSet) {
894                         ResourceImpl result = getNewResource(defaultClusterSet);
895                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
896                     return result;
897                 } else {
898                     currentCluster = getNewResourceCluster();
899                 }
900             }
901             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
902         }
903
904         @Override
905         public Resource createResource(VirtualGraph provider, long clusterId)
906         throws DatabaseException {
907             return getNewResource(clusterId);
908         }
909
910         @Override
911         public Resource createResource(VirtualGraph provider, Resource clusterSet)
912         throws DatabaseException {
913             return getNewResource(clusterSet);
914         }
915
916         @Override
917         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
918                 throws DatabaseException {
919             getNewClusterSet(clusterSet);
920         }
921
922         @Override
923         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
924         throws ServiceException {
925             return containsClusterSet(clusterSet);
926         }
927
928         public void selectCluster(long cluster) {
929                 currentCluster = clusterTable.getClusterByClusterId(cluster);
930                 long setResourceId = clusterSetsSupport.getSet(cluster);
931                 clusterSetsSupport.put(setResourceId, cluster);
932         }
933
934         @Override
935         public Resource setDefaultClusterSet(Resource clusterSet)
936         throws ServiceException {
937                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
938                 if(clusterSet != null) {
939                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
940                         currentCluster = clusterTable.getClusterByClusterId(id);
941                         return result;
942                 } else {
943                         currentCluster = null;
944                         return null;
945                 }
946         }
947
948         @Override
949         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
950
951                  provider = getProvider(provider);
952              if (null == provider) {
953                  int key = ((ResourceImpl)resource).id;
954
955
956
957                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
958 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
959 //                      if(key != queryProvider2.getRootLibrary())
960 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
961
962 //                 try {
963 //                     cluster.removeValue(key, clusterTranslator);
964 //                 } catch (DatabaseException e) {
965 //                     Logger.defaultLogError(e);
966 //                     return;
967 //                 }
968
969                  clusterTable.writeOnlyInvalidate(cluster);
970
971                  try {
972                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
973                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
974                          clusterTranslator.removeValue(cluster);
975                  } catch (DatabaseException e) {
976                          Logger.defaultLogError(e);
977                  }
978
979                  queryProvider2.invalidateResource(key);
980                  clientChanges.invalidate(key);
981
982              } else {
983                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
984                  queryProvider2.updateValue(querySupport.getId(resource));
985                  clientChanges.claimValue(resource);
986              }
987
988
989         }
990
991         @Override
992         public void flush(boolean intermediate) {
993             throw new UnsupportedOperationException();
994         }
995
996         @Override
997         public void flushCluster() {
998             clusterTable.flushCluster(graphSession);
999             if(defaultClusterSet != null) {
1000                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
1001             }
1002             currentCluster = null;
1003         }
1004
1005         @Override
1006         public void flushCluster(Resource r) {
1007             throw new UnsupportedOperationException("flushCluster resource " + r);
1008         }
1009
1010         @Override
1011         public void gc() {
1012         }
1013
1014         @Override
1015         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1016                 Resource object) {
1017
1018                 int s = ((ResourceImpl)subject).id;
1019                 int p = ((ResourceImpl)predicate).id;
1020                 int o = ((ResourceImpl)object).id;
1021
1022                 provider = getProvider(provider);
1023                 if (null == provider) {
1024
1025                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1026                         clusterTable.writeOnlyInvalidate(cluster);
1027
1028                         try {
1029
1030                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1031
1032                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1033                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1034
1035                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1036                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1037                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1038                                 clusterTranslator.removeStatement(cluster);
1039
1040                                 queryProvider2.invalidateResource(s);
1041                                 clientChanges.invalidate(s);
1042
1043                         } catch (DatabaseException e) {
1044
1045                                 Logger.defaultLogError(e);
1046
1047                         }
1048
1049                         return true;
1050
1051                 } else {
1052                         
1053                         ((VirtualGraphImpl)provider).deny(s, p, o);
1054                         queryProvider2.invalidateResource(s);
1055                         clientChanges.invalidate(s);
1056
1057                         return true;
1058
1059                 }
1060
1061         }
1062
1063         @Override
1064         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1065             throw new UnsupportedOperationException();
1066         }
1067
1068         @Override
1069         public boolean writeOnly() {
1070             return true;
1071         }
1072
1073         @Override
1074         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1075             writeSupport.performWriteRequest(graph, request);
1076         }
1077
1078         @Override
1079         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1080             throw new UnsupportedOperationException();
1081         }
1082
1083         @Override
1084         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1085             throw new UnsupportedOperationException();
1086         }
1087
1088         @Override
1089         public <T> void addMetadata(Metadata data) throws ServiceException {
1090             writeSupport.addMetadata(data);
1091         }
1092
1093         @Override
1094         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1095             return writeSupport.getMetadata(clazz);
1096         }
1097
1098         @Override
1099         public TreeMap<String, byte[]> getMetadata() {
1100             return writeSupport.getMetadata();
1101         }
1102
1103         @Override
1104         public void commitDone(WriteTraits writeTraits, long csid) {
1105             writeSupport.commitDone(writeTraits, csid);
1106         }
1107
1108         @Override
1109         public void clearUndoList(WriteTraits writeTraits) {
1110             writeSupport.clearUndoList(writeTraits);
1111         }
1112         @Override
1113         public int clearMetadata() {
1114             return writeSupport.clearMetadata();
1115         }
1116
1117                 @Override
1118                 public void startUndo() {
1119                         writeSupport.startUndo();
1120                 }
1121     }
1122
1123     class VirtualWriteOnlySupport implements WriteSupport {
1124
1125 //        @Override
1126 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1127 //                Resource object) {
1128 //            throw new UnsupportedOperationException();
1129 //        }
1130
1131         @Override
1132         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1133
1134             TransientGraph impl = (TransientGraph)provider;
1135             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1136             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1137             clientChanges.claim(subject, predicate, object);
1138
1139         }
1140
1141         @Override
1142         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1143
1144             TransientGraph impl = (TransientGraph)provider;
1145             impl.claim(subject, predicate, object);
1146             getQueryProvider2().updateStatements(subject, predicate);
1147             clientChanges.claim(subject, predicate, object);
1148
1149         }
1150
1151         @Override
1152         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1153             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1154         }
1155
1156         @Override
1157         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1158             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1159             getQueryProvider2().updateValue(resource);
1160             clientChanges.claimValue(resource);
1161         }
1162
1163         @Override
1164         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1165             byte[] value = reader.readBytes(null, amount);
1166             claimValue(provider, resource, value);
1167         }
1168
1169         @Override
1170         public Resource createResource(VirtualGraph provider) {
1171             TransientGraph impl = (TransientGraph)provider;
1172             return impl.getResource(impl.newResource(false));
1173         }
1174
1175         @Override
1176         public Resource createResource(VirtualGraph provider, long clusterId) {
1177             throw new UnsupportedOperationException();
1178         }
1179
1180         @Override
1181         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1182         throws DatabaseException {
1183             throw new UnsupportedOperationException();
1184         }
1185
1186         @Override
1187         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1188         throws DatabaseException {
1189             throw new UnsupportedOperationException();
1190         }
1191
1192         @Override
1193         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1194         throws ServiceException {
1195             throw new UnsupportedOperationException();
1196         }
1197
1198         @Override
1199         public Resource setDefaultClusterSet(Resource clusterSet)
1200         throws ServiceException {
1201                 return null;
1202         }
1203
1204         @Override
1205         public void denyValue(VirtualGraph provider, Resource resource) {
1206             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1207             getQueryProvider2().updateValue(querySupport.getId(resource));
1208             // NOTE: this only keeps track of value changes by-resource.
1209             clientChanges.claimValue(resource);
1210         }
1211
1212         @Override
1213         public void flush(boolean intermediate) {
1214             throw new UnsupportedOperationException();
1215         }
1216
1217         @Override
1218         public void flushCluster() {
1219             throw new UnsupportedOperationException();
1220         }
1221
1222         @Override
1223         public void flushCluster(Resource r) {
1224             throw new UnsupportedOperationException("Resource " + r);
1225         }
1226
1227         @Override
1228         public void gc() {
1229         }
1230
1231         @Override
1232         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1233             TransientGraph impl = (TransientGraph) provider;
1234             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1235             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1236             clientChanges.deny(subject, predicate, object);
1237             return true;
1238         }
1239
1240         @Override
1241         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1242             throw new UnsupportedOperationException();
1243         }
1244
1245         @Override
1246         public boolean writeOnly() {
1247             return true;
1248         }
1249
1250         @Override
1251         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1252             throw new UnsupportedOperationException();
1253         }
1254
1255         @Override
1256         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1257             throw new UnsupportedOperationException();
1258         }
1259
1260         @Override
1261         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1262             throw new UnsupportedOperationException();
1263         }
1264
1265         @Override
1266         public <T> void addMetadata(Metadata data) throws ServiceException {
1267             throw new UnsupportedOperationException();
1268         }
1269
1270         @Override
1271         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1272             throw new UnsupportedOperationException();
1273         }
1274
1275         @Override
1276         public TreeMap<String, byte[]> getMetadata() {
1277             throw new UnsupportedOperationException();
1278         }
1279
1280         @Override
1281         public void commitDone(WriteTraits writeTraits, long csid) {
1282         }
1283
1284         @Override
1285         public void clearUndoList(WriteTraits writeTraits) {
1286         }
1287
1288         @Override
1289         public int clearMetadata() {
1290             return 0;
1291         }
1292
1293                 @Override
1294                 public void startUndo() {
1295                 }
1296     }
1297
1298     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1299
1300         try {
1301
1302             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1303
1304             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1305
1306             flushCounter = 0;
1307             Disposable.safeDispose(clientChanges);
1308             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1309
1310             acquireWriteOnly();
1311
1312             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1313
1314             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1315
1316             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1317             writeState = writeStateT;
1318
1319             assert (null != writer);
1320 //            writer.state.barrier.inc();
1321             long start = System.nanoTime();
1322             T result = request.perform(writer);
1323             long duration = System.nanoTime() - start;
1324             if (DEBUG) {
1325                 System.err.println("################");
1326                 System.err.println("WriteOnly duration " + 1e-9*duration);
1327             }
1328             writeStateT.setResult(result);
1329
1330             // This makes clusters available from server
1331             clusterStream.reallyFlush();
1332
1333             // This will trigger query updates
1334             releaseWriteOnly(writer);
1335             assert (null != writer);
1336
1337             handleUpdatesAndMetadata(writer);
1338
1339         } catch (CancelTransactionException e) {
1340
1341             releaseWriteOnly(writeState.getGraph());
1342
1343             clusterTable.removeWriteOnlyClusters();
1344             state.stopWriteTransaction(clusterStream);
1345
1346         } catch (Throwable e) {
1347
1348             e.printStackTrace();
1349
1350             releaseWriteOnly(writeState.getGraph());
1351
1352             clusterTable.removeWriteOnlyClusters();
1353
1354             if (callback != null)
1355                 callback.exception(new DatabaseException(e));
1356
1357             state.stopWriteTransaction(clusterStream);
1358             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1359
1360         } finally  {
1361
1362             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1363
1364         }
1365
1366     }
1367
1368     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1369         scheduleRequest(request, callback, notify, null);
1370     }
1371
1372     @Override
1373     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1374
1375         assertAlive();
1376
1377         assert (request != null);
1378
1379         requestManager.scheduleWrite(new SessionTask(null) {
1380
1381             @Override
1382             public void run0(int thread) {
1383
1384                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1385
1386                     try {
1387
1388                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1389
1390                         flushCounter = 0;
1391                         Disposable.safeDispose(clientChanges);
1392                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1393
1394                         acquireWriteOnly();
1395
1396                         VirtualGraph vg = getProvider(request.getProvider());
1397                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1398
1399                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1400
1401                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1402
1403                             @Override
1404                             public void execute(Object result) {
1405                                 if(callback != null) callback.accept(null);
1406                             }
1407
1408                             @Override
1409                             public void exception(Throwable t) {
1410                                 if(callback != null) callback.accept((DatabaseException)t);
1411                             }
1412
1413                         });
1414
1415                         assert (null != writer);
1416 //                        writer.state.barrier.inc();
1417
1418                         try {
1419
1420                             request.perform(writer);
1421
1422                         } catch (Throwable e) {
1423
1424 //                            writer.state.barrier.dec();
1425 //                            writer.waitAsync(null);
1426
1427                             releaseWriteOnly(writer);
1428
1429                             clusterTable.removeWriteOnlyClusters();
1430
1431                             if(!(e instanceof CancelTransactionException)) {
1432                                 if (callback != null)
1433                                     callback.accept(new DatabaseException(e));
1434                             }
1435
1436                             writeState.except(e);
1437
1438                             return;
1439
1440                         }
1441
1442                         // This makes clusters available from server
1443                         boolean empty = clusterStream.reallyFlush();
1444                         // This was needed to make WO requests call metadata listeners.
1445                         // NOTE: the calling event does not contain clientChanges information.
1446                         if (!empty && clientChanges.isEmpty())
1447                             clientChanges.setNotEmpty(true);
1448                         releaseWriteOnly(writer);
1449                         assert (null != writer);
1450                     } finally  {
1451
1452                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1453
1454                     }
1455
1456
1457                 task.finish();
1458
1459             }
1460
1461         }, combine);
1462
1463     }
1464
1465     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1466         scheduleRequest(request, callback, notify, null);
1467     }
1468
1469     /* (non-Javadoc)
1470      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1471      */
1472     @Override
1473     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1474
1475         assert (request != null);
1476
1477         requestManager.scheduleWrite(new SessionTask(null) {
1478
1479             @Override
1480             public void run0(int thread) {
1481
1482                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1483
1484                 performWriteOnly(request, notify, callback);
1485
1486                 task.finish();
1487
1488             }
1489
1490         }, combine);
1491
1492     }
1493
1494     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1495
1496         assert (request != null);
1497         assert (procedure != null);
1498
1499         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1500
1501             @Override
1502             public void run0(int thread) {
1503
1504                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1505
1506                 ITask task = ThreadLogger.task(request);
1507
1508                 ListenerBase listener = getListenerBase(procedure);
1509
1510                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1511
1512                 // This is never synced but increase to prevent it from visiting 0
1513                 newGraph.asyncBarrier.inc();
1514
1515                 try {
1516
1517                     if (listener != null) {
1518
1519                         try {
1520                                 
1521                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1522
1523                                         @Override
1524                                         public void exception(AsyncReadGraph graph, Throwable t) {
1525                                                 procedure.exception(graph, t);
1526                                                 if(throwable != null) {
1527                                                         throwable.set(t);
1528                                                 } else {
1529                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1530                                                 }
1531                                         }
1532
1533                                         @Override
1534                                         public void execute(AsyncReadGraph graph, T t) {
1535                                                 if(result != null) result.set(t);
1536                                                 procedure.execute(graph, t);
1537                                         }
1538
1539                                 };
1540                                 
1541                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1542                                 
1543                         } catch (Throwable t) {
1544                             // This is handled by the AsyncProcedure
1545                                 //Logger.defaultLogError("Internal error", t);
1546                         }
1547
1548                     } else {
1549
1550                         try {
1551
1552                             T t = request.perform(newGraph);
1553
1554                             try {
1555
1556                                 if(result != null) result.set(t);
1557                                 procedure.execute(newGraph, t);
1558
1559                             } catch (Throwable th) {
1560
1561                                 if(throwable != null) {
1562                                     throwable.set(th);
1563                                 } else {
1564                                     Logger.defaultLogError("Unhandled exception", th);
1565                                 }
1566
1567                             }
1568
1569                         } catch (Throwable t) {
1570
1571                             if (DEBUG)
1572                                 t.printStackTrace();
1573
1574                             if(throwable != null) {
1575                                 throwable.set(t);
1576                             } else {
1577                                 Logger.defaultLogError("Unhandled exception", t);
1578                             }
1579
1580                             try {
1581
1582                                 procedure.exception(newGraph, t);
1583
1584                             } catch (Throwable t2) {
1585
1586                                 if(throwable != null) {
1587                                     throwable.set(t2);
1588                                 } else {
1589                                     Logger.defaultLogError("Unhandled exception", t2);
1590                                 }
1591
1592                             }
1593
1594                         }
1595                         
1596                         task.finish();
1597
1598                     }
1599
1600                 } finally {
1601
1602                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1603
1604                 }
1605
1606             }
1607
1608         });
1609
1610     }
1611
1612     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1613
1614         assert (request != null);
1615         assert (procedure != null);
1616
1617         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1618
1619         requestManager.scheduleRead(new SessionRead(null, notify) {
1620
1621             @Override
1622             public void run0(int thread) {
1623
1624                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1625
1626                 ITask task = ThreadLogger.task(request);
1627
1628                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1629
1630                 try {
1631
1632                     if (listener != null) {
1633
1634                         try {
1635                                 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1636                                                 } catch (DatabaseException e) {
1637                                                         Logger.defaultLogError(e);
1638                                                 }
1639
1640                     } else {
1641
1642                         BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
1643                                 
1644                                 public void execute(AsyncReadGraph graph_, T result) {
1645                                         task.finish();
1646                                         super.execute(graph_, result);
1647                                 }
1648                                 
1649                                 public void exception(AsyncReadGraph graph_, Throwable t) {
1650                                         task.finish();
1651                                         super.exception(graph_, t);
1652                                 }
1653                                 
1654                         };
1655
1656                         try {
1657
1658                             request.perform(newGraph, wrap);
1659                             wrap.dec();
1660                             wrap.get();
1661
1662                         } catch (DatabaseException e) {
1663
1664                                                         Logger.defaultLogError(e);
1665
1666                         }
1667
1668                     }
1669
1670                 } finally {
1671
1672                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1673
1674                 }
1675
1676             }
1677
1678         });
1679
1680     }
1681
1682     public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1683
1684         assert (request != null);
1685         assert (procedure != null);
1686
1687         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1688
1689         int sync = notify != null ? thread : -1;
1690
1691         requestManager.scheduleRead(new SessionRead(null, notify) {
1692
1693             @Override
1694             public void run0(int thread) {
1695
1696                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1697
1698                 ListenerBase listener = getListenerBase(procedure);
1699
1700                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1701
1702                 try {
1703
1704                     if (listener != null) {
1705
1706                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1707
1708 //                        newGraph.waitAsync(request);
1709
1710                     } else {
1711
1712                         final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1713
1714                         try {
1715
1716                             request.perform(newGraph, wrapper);
1717
1718                         } catch (Throwable t) {
1719
1720                             t.printStackTrace();
1721
1722                         }
1723
1724                     }
1725
1726                 } finally {
1727
1728                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1729
1730                 }
1731
1732             }
1733
1734         });
1735
1736     }
1737     
1738     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1739
1740         assert (request != null);
1741         assert (procedure != null);
1742
1743         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1744
1745         int sync = notify != null ? thread : -1;
1746
1747         requestManager.scheduleRead(new SessionRead(null, notify) {
1748
1749             @Override
1750             public void run0(int thread) {
1751
1752                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1753
1754                 ListenerBase listener = getListenerBase(procedure);
1755
1756                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1757
1758                 try {
1759
1760                     if (listener != null) {
1761
1762                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1763
1764 //                        newGraph.waitAsync(request);
1765
1766                     } else {
1767
1768                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1769
1770                         try {
1771
1772                             request.perform(newGraph, wrapper);
1773
1774                         } catch (Throwable t) {
1775
1776                             t.printStackTrace();
1777
1778                         }
1779
1780                     }
1781
1782                 } finally {
1783
1784                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1785
1786                 }
1787
1788             }
1789
1790         });
1791
1792     }
1793
1794     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1795
1796         assert (request != null);
1797         assert (procedure != null);
1798
1799         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1800
1801         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1802
1803             @Override
1804             public void run0(int thread) {
1805
1806                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1807
1808                 ListenerBase listener = getListenerBase(procedure);
1809
1810                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1811
1812                 try {
1813
1814                     if (listener != null) {
1815
1816                         try {
1817                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1818                         } catch (DatabaseException e) {
1819                             Logger.defaultLogError(e);
1820                         }
1821
1822                     } else {
1823
1824 //                        newGraph.state.barrier.inc();
1825
1826                         request.register(newGraph, new Listener<T>() {
1827
1828                             @Override
1829                             public void exception(Throwable t) {
1830                                 if(throwable != null) throwable.set(t);
1831                                 procedure.exception(t);
1832 //                                newGraph.state.barrier.dec();
1833                             }
1834
1835                             @Override
1836                             public void execute(T t) {
1837                                 if(result != null) result.set(t);
1838                                 procedure.execute(t);
1839 //                                newGraph.state.barrier.dec();
1840                             }
1841
1842                             @Override
1843                             public boolean isDisposed() {
1844                                 return true;
1845                             }
1846
1847                         });
1848
1849 //                        newGraph.waitAsync(request);
1850
1851                     }
1852
1853                 } finally {
1854
1855                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1856
1857                 }
1858
1859             }
1860
1861         });
1862
1863     }
1864
1865
1866     @Override
1867     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1868
1869         scheduleRequest(request, procedure, null, null, null);
1870
1871     }
1872
1873     @Override
1874     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1875         assertNotSession();
1876         Semaphore notify = new Semaphore(0);
1877         DataContainer<Throwable> container = new DataContainer<Throwable>();
1878         DataContainer<T> result = new DataContainer<T>();
1879         scheduleRequest(request, procedure, notify, container, result);
1880         acquire(notify, request);
1881         Throwable throwable = container.get();
1882         if(throwable != null) {
1883             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1884             else throw new DatabaseException("Unexpected exception", throwable);
1885         }
1886         return result.get();
1887     }
1888
1889     @Override
1890     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1891         assertNotSession();
1892         Semaphore notify = new Semaphore(0);
1893         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1894         final DataContainer<T> resultContainer = new DataContainer<T>();
1895         scheduleRequest(request, new AsyncProcedure<T>() {
1896             @Override
1897             public void exception(AsyncReadGraph graph, Throwable throwable) {
1898                 exceptionContainer.set(throwable);
1899                 procedure.exception(graph, throwable);
1900             }
1901             @Override
1902             public void execute(AsyncReadGraph graph, T result) {
1903                 resultContainer.set(result);
1904                 procedure.execute(graph, result);
1905             }
1906         }, getListenerBase(procedure), notify);
1907         acquire(notify, request, procedure);
1908         Throwable throwable = exceptionContainer.get();
1909         if (throwable != null) {
1910             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1911             else throw new DatabaseException("Unexpected exception", throwable);
1912         }
1913         return resultContainer.get();
1914     }
1915
1916
1917     @Override
1918     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1919         assertNotSession();
1920         Semaphore notify = new Semaphore(0);
1921         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1922         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1923             @Override
1924             public void exception(AsyncReadGraph graph, Throwable throwable) {
1925                 exceptionContainer.set(throwable);
1926                 procedure.exception(graph, throwable);
1927             }
1928             @Override
1929             public void execute(AsyncReadGraph graph, T result) {
1930                 procedure.execute(graph, result);
1931             }
1932             @Override
1933             public void finished(AsyncReadGraph graph) {
1934                 procedure.finished(graph);
1935             }
1936         }, notify);
1937         acquire(notify, request, procedure);
1938         Throwable throwable = exceptionContainer.get();
1939         if (throwable != null) {
1940             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1941             else throw new DatabaseException("Unexpected exception", throwable);
1942         }
1943         // TODO: implement return value
1944         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1945         return null;
1946     }
1947
1948     @Override
1949     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1950         return syncRequest(request, new ProcedureAdapter<T>());
1951
1952
1953 //        assert(request != null);
1954 //
1955 //        final DataContainer<T> result = new DataContainer<T>();
1956 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1957 //
1958 //        syncRequest(request, new Procedure<T>() {
1959 //
1960 //            @Override
1961 //            public void execute(T t) {
1962 //                result.set(t);
1963 //            }
1964 //
1965 //            @Override
1966 //            public void exception(Throwable t) {
1967 //                exception.set(t);
1968 //            }
1969 //
1970 //        });
1971 //
1972 //        Throwable t = exception.get();
1973 //        if(t != null) {
1974 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1975 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1976 //        }
1977 //
1978 //        return result.get();
1979
1980     }
1981
1982     @Override
1983     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1984         return syncRequest(request, (Procedure<T>)procedure);
1985     }
1986
1987
1988 //    @Override
1989 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1990 //        assertNotSession();
1991 //        Semaphore notify = new Semaphore(0);
1992 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1993 //        DataContainer<T> result = new DataContainer<T>();
1994 //        scheduleRequest(request, procedure, notify, container, result);
1995 //        acquire(notify, request);
1996 //        Throwable throwable = container.get();
1997 //        if(throwable != null) {
1998 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1999 //            else throw new DatabaseException("Unexpected exception", throwable);
2000 //        }
2001 //        return result.get();
2002 //    }
2003
2004
2005     @Override
2006     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
2007         assertNotSession();
2008         Semaphore notify = new Semaphore(0);
2009         final DataContainer<Throwable> container = new DataContainer<Throwable>();
2010         final DataContainer<T> result = new DataContainer<T>();
2011         scheduleRequest(request, procedure, notify, container, result);
2012         acquire(notify, request);
2013         Throwable throwable = container.get();
2014         if (throwable != null) {
2015             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2016             else throw new DatabaseException("Unexpected exception", throwable);
2017         }
2018         return result.get();
2019     }
2020
2021     @Override
2022     public void syncRequest(Write request) throws DatabaseException  {
2023         assertNotSession();
2024         assertAlive();
2025         Semaphore notify = new Semaphore(0);
2026         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2027         scheduleRequest(request, e -> exception.set(e), notify);
2028         acquire(notify, request);
2029         if(exception.get() != null) throw exception.get();
2030     }
2031
2032     @Override
2033     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
2034         assertNotSession();
2035         Semaphore notify = new Semaphore(0);
2036         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2037         final DataContainer<T> result = new DataContainer<T>();
2038         scheduleRequest(request, new Procedure<T>() {
2039
2040             @Override
2041             public void exception(Throwable t) {
2042               exception.set(t);
2043             }
2044
2045             @Override
2046             public void execute(T t) {
2047               result.set(t);
2048             }
2049
2050         }, notify);
2051         acquire(notify, request);
2052         if(exception.get() != null) {
2053             Throwable t = exception.get();
2054             if(t instanceof DatabaseException) throw (DatabaseException)t;
2055             else throw new DatabaseException(t);
2056         }
2057         return result.get();
2058     }
2059
2060     @Override
2061     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2062         assertNotSession();
2063         Semaphore notify = new Semaphore(0);
2064         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2065         final DataContainer<T> result = new DataContainer<T>();
2066         scheduleRequest(request, new Procedure<T>() {
2067
2068             @Override
2069             public void exception(Throwable t) {
2070               exception.set(t);
2071             }
2072
2073             @Override
2074             public void execute(T t) {
2075               result.set(t);
2076             }
2077
2078         }, notify);
2079         acquire(notify, request);
2080         if(exception.get() != null) {
2081             Throwable t = exception.get();
2082             if(t instanceof DatabaseException) throw (DatabaseException)t;
2083             else throw new DatabaseException(t);
2084         }
2085         return result.get();
2086     }
2087
2088     @Override
2089     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2090         assertNotSession();
2091         Semaphore notify = new Semaphore(0);
2092         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2093         final DataContainer<T> result = new DataContainer<T>();
2094         scheduleRequest(request, new Procedure<T>() {
2095
2096             @Override
2097             public void exception(Throwable t) {
2098               exception.set(t);
2099             }
2100
2101             @Override
2102             public void execute(T t) {
2103               result.set(t);
2104             }
2105
2106         }, notify);
2107         acquire(notify, request);
2108         if(exception.get() != null) {
2109             Throwable t = exception.get();
2110             if(t instanceof DatabaseException) throw (DatabaseException)t;
2111             else throw new DatabaseException(t);
2112         }
2113         return result.get();
2114     }
2115
2116     @Override
2117     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2118         assertNotSession();
2119         Semaphore notify = new Semaphore(0);
2120         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2121         scheduleRequest(request, e -> exception.set(e), notify);
2122         acquire(notify, request);
2123         if(exception.get() != null) throw exception.get();
2124     }
2125
2126     @Override
2127     public void syncRequest(WriteOnly request) throws DatabaseException  {
2128         assertNotSession();
2129         assertAlive();
2130         Semaphore notify = new Semaphore(0);
2131         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2132         scheduleRequest(request, e -> exception.set(e), notify);
2133         acquire(notify, request);
2134         if(exception.get() != null) throw exception.get();
2135     }
2136
2137     /*
2138      *
2139      * GraphRequestProcessor interface
2140      */
2141
2142     @Override
2143     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2144
2145         scheduleRequest(request, procedure, null, null);
2146
2147     }
2148
2149     @Override
2150     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2151
2152         scheduleRequest(request, procedure, null);
2153
2154     }
2155
2156     @Override
2157     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2158
2159         scheduleRequest(request, procedure, null, null, null);
2160
2161     }
2162
2163     @Override
2164     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2165
2166         scheduleRequest(request, callback, null);
2167
2168     }
2169
2170     @Override
2171     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2172
2173         scheduleRequest(request, procedure, null);
2174
2175     }
2176
2177     @Override
2178     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2179
2180         scheduleRequest(request, procedure, null);
2181
2182     }
2183
2184     @Override
2185     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2186
2187         scheduleRequest(request, procedure, null);
2188
2189     }
2190
2191     @Override
2192     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2193
2194         scheduleRequest(request, callback, null);
2195
2196     }
2197
2198     @Override
2199     public void asyncRequest(final Write r) {
2200         asyncRequest(r, null);
2201     }
2202
2203     @Override
2204     public void asyncRequest(final DelayedWrite r) {
2205         asyncRequest(r, null);
2206     }
2207
2208     @Override
2209     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2210
2211         scheduleRequest(request, callback, null);
2212
2213     }
2214
2215     @Override
2216     public void asyncRequest(final WriteOnly request) {
2217
2218         asyncRequest(request, null);
2219
2220     }
2221
2222     @Override
2223     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2224         r.request(this, procedure);
2225     }
2226
2227     @Override
2228     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2229         r.request(this, procedure);
2230     }
2231
2232     @Override
2233     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2234         r.request(this, procedure);
2235     }
2236
2237     @Override
2238     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2239         r.request(this, procedure);
2240     }
2241
2242     @Override
2243     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2244         r.request(this, procedure);
2245     }
2246
2247     @Override
2248     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2249         r.request(this, procedure);
2250     }
2251
2252     @Override
2253     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2254         return r.request(this);
2255     }
2256
2257     @Override
2258     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2259         return r.request(this);
2260     }
2261
2262     @Override
2263     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2264         r.request(this, procedure);
2265     }
2266
2267     @Override
2268     public <T> void async(WriteInterface<T> r) {
2269         r.request(this, new ProcedureAdapter<T>());
2270     }
2271
2272     //@Override
2273     public void incAsync() {
2274         state.incAsync();
2275     }
2276
2277     //@Override
2278     public void decAsync() {
2279         state.decAsync();
2280     }
2281
2282     public long getCluster(ResourceImpl resource) {
2283         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2284         return cluster.getClusterId();
2285     }
2286
2287     public long getCluster(int id) {
2288         if (clusterTable == null)
2289             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2290         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2291     }
2292
2293     public ResourceImpl getResource(int id) {
2294         return new ResourceImpl(resourceSupport, id);
2295     }
2296
2297     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2298         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2299         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2300         int key = proxy.getClusterKey();
2301         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2302         return new ResourceImpl(resourceSupport, resourceKey);
2303     }
2304
2305     public ResourceImpl getResource2(int id) {
2306         assert (id != 0);
2307         return new ResourceImpl(resourceSupport, id);
2308     }
2309
2310     final public int getId(ResourceImpl impl) {
2311         return impl.id;
2312     }
2313
2314     public static final Charset UTF8 = Charset.forName("utf-8");
2315
2316
2317
2318
2319     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2320         for(TransientGraph g : support.providers) {
2321             if(g.isPending(subject)) return false;
2322         }
2323         return true;
2324     }
2325
2326     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2327         for(TransientGraph g : support.providers) {
2328             if(g.isPending(subject, predicate)) return false;
2329         }
2330         return true;
2331     }
2332
2333     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2334
2335         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2336
2337             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2338
2339             @Override
2340             public void accept(ReadGraphImpl graph) {
2341                 if(ready.decrementAndGet() == 0) {
2342                     runnable.accept(graph);
2343                 }
2344             }
2345
2346         };
2347
2348         for(TransientGraph g : support.providers) {
2349             if(g.isPending(subject)) {
2350                 try {
2351                     g.load(graph, subject, composite);
2352                 } catch (DatabaseException e) {
2353                     e.printStackTrace();
2354                 }
2355             } else {
2356                 composite.accept(graph);
2357             }
2358         }
2359
2360         composite.accept(graph);
2361
2362     }
2363
2364     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2365
2366         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2367
2368             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2369
2370             @Override
2371             public void accept(ReadGraphImpl graph) {
2372                 if(ready.decrementAndGet() == 0) {
2373                     runnable.accept(graph);
2374                 }
2375             }
2376
2377         };
2378
2379         for(TransientGraph g : support.providers) {
2380             if(g.isPending(subject, predicate)) {
2381                 try {
2382                     g.load(graph, subject, predicate, composite);
2383                 } catch (DatabaseException e) {
2384                     e.printStackTrace();
2385                 }
2386             } else {
2387                 composite.accept(graph);
2388             }
2389         }
2390
2391         composite.accept(graph);
2392
2393     }
2394
2395 //    void dumpHeap() {
2396 //
2397 //        try {
2398 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2399 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2400 //                    "d:/heap" + flushCounter + ".txt", true);
2401 //        } catch (IOException e) {
2402 //            e.printStackTrace();
2403 //        }
2404 //
2405 //    }
2406
2407     void fireReactionsToSynchronize(ChangeSet cs) {
2408
2409         // Do not fire empty events
2410         if (cs.isEmpty())
2411             return;
2412
2413         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2414 //        g.state.barrier.inc();
2415
2416         try {
2417
2418             if (!cs.isEmpty()) {
2419                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2420                 for (ChangeListener l : changeListeners2) {
2421                     try {
2422                         l.graphChanged(e2);
2423                     } catch (Exception ex) {
2424                         ex.printStackTrace();
2425                     }
2426                 }
2427             }
2428
2429         } finally {
2430
2431 //            g.state.barrier.dec();
2432 //            g.waitAsync(null);
2433
2434         }
2435
2436     }
2437
2438     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2439         try {
2440
2441             // Do not fire empty events
2442             if (cs2.isEmpty())
2443                 return;
2444
2445 //            graph.restart();
2446 //            graph.state.barrier.inc();
2447
2448             try {
2449
2450                 if (!cs2.isEmpty()) {
2451
2452                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2453                     for (ChangeListener l : changeListeners2) {
2454                         try {
2455                             l.graphChanged(e2);
2456 //                            System.out.println("changelistener " + l);
2457                         } catch (Exception ex) {
2458                             ex.printStackTrace();
2459                         }
2460                     }
2461
2462                 }
2463
2464             } finally {
2465
2466 //                graph.state.barrier.dec();
2467 //                graph.waitAsync(null);
2468
2469             }
2470         } catch (Throwable t) {
2471             t.printStackTrace();
2472
2473         }
2474     }
2475     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2476
2477         try {
2478
2479             // Do not fire empty events
2480             if (cs2.isEmpty())
2481                 return;
2482
2483             // Do not fire on virtual requests
2484             if(graph.getProvider() != null)
2485                 return;
2486
2487             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2488
2489             try {
2490
2491                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2492                 for (ChangeListener l : metadataListeners) {
2493                     try {
2494                         l.graphChanged(e2);
2495                     } catch (Throwable ex) {
2496                         LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex);
2497                     }
2498                 }
2499
2500             } finally {
2501
2502             }
2503
2504         } catch (Throwable t) {
2505             LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t);
2506         }
2507
2508     }
2509
2510
2511     /*
2512      * (non-Javadoc)
2513      *
2514      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2515      */
2516     @Override
2517     public <T> T getService(Class<T> api) {
2518         T t = peekService(api);
2519         if (t == null) {
2520             if (state.isClosed())
2521                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2522             throw new ServiceNotFoundException(this, api);
2523         }
2524         return t;
2525     }
2526
2527     /**
2528      * @return
2529      */
2530     protected abstract ServerInformation getCachedServerInformation();
2531
2532         private Class<?> serviceKey1 = null;
2533         private Class<?> serviceKey2 = null;
2534         private Object service1 = null;
2535         private Object service2 = null;
2536
2537     /*
2538      * (non-Javadoc)
2539      *
2540      * @see
2541      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2542      */
2543     @SuppressWarnings("unchecked")
2544     @Override
2545     public synchronized <T> T peekService(Class<T> api) {
2546
2547                 if(serviceKey1 == api) {
2548                         return (T)service1;
2549                 } else if (serviceKey2 == api) {
2550                         // Promote this key
2551                         Object result = service2;
2552                         service2 = service1;
2553                         serviceKey2 = serviceKey1;
2554                         service1 = result;
2555                         serviceKey1 = api;
2556                         return (T)result;
2557                 }
2558
2559         if (Layer0.class == api)
2560                 return (T) L0;
2561         if (ServerInformation.class == api)
2562             return (T) getCachedServerInformation();
2563         else if (WriteGraphImpl.class == api)
2564             return (T) writeState.getGraph();
2565         else if (ClusterBuilder.class == api)
2566             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2567         else if (ClusterBuilderFactory.class == api)
2568             return (T)new ClusterBuilderFactoryImpl(this);
2569
2570                 service2 = service1;
2571                 serviceKey2 = serviceKey1;
2572
2573         service1 = serviceLocator.peekService(api);
2574         serviceKey1 = api;
2575
2576         return (T)service1;
2577
2578     }
2579
2580     /*
2581      * (non-Javadoc)
2582      *
2583      * @see
2584      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2585      */
2586     @Override
2587     public boolean hasService(Class<?> api) {
2588         return serviceLocator.hasService(api);
2589     }
2590
2591     /**
2592      * @param api the api that must be implemented by the specified service
2593      * @param service the service implementation
2594      */
2595     @Override
2596     public <T> void registerService(Class<T> api, T service) {
2597         if(Layer0.class == api) {
2598                 L0 = (Layer0)service;
2599                 return;
2600         }
2601         serviceLocator.registerService(api, service);
2602         if (TransactionPolicySupport.class == api) {
2603             transactionPolicy = (TransactionPolicySupport)service;
2604             state.resetTransactionPolicy();
2605         }
2606         if (api == serviceKey1)
2607                 service1 = service;
2608         else if (api == serviceKey2)
2609                 service2 = service;
2610     }
2611
2612     // ----------------
2613     // SessionMonitor
2614     // ----------------
2615
2616     void fireSessionVariableChange(String variable) {
2617         for (MonitorHandler h : monitorHandlers) {
2618             MonitorContext ctx = monitorContexts.getLeft(h);
2619             assert ctx != null;
2620             // SafeRunner functionality repeated here to avoid dependency.
2621             try {
2622                 h.valuesChanged(ctx);
2623             } catch (Exception e) {
2624                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2625             } catch (LinkageError e) {
2626                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2627             }
2628         }
2629     }
2630
2631
2632     class ResourceSerializerImpl implements ResourceSerializer {
2633
2634         public long createRandomAccessId(int id) throws DatabaseException {
2635             if(id < 0)
2636                 return id;
2637             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2638             long cluster = getCluster(id);
2639             if (0 == cluster)
2640                 return 0; // Better to return 0 then invalid id.
2641             long result = ClusterTraitsBase.createResourceId(cluster, index);
2642             return result;
2643         }
2644
2645         @Override
2646         public long getRandomAccessId(Resource resource) throws DatabaseException {
2647
2648             ResourceImpl resourceImpl = (ResourceImpl) resource;
2649             return createRandomAccessId(resourceImpl.id);
2650
2651         }
2652
2653         @Override
2654         public int getTransientId(Resource resource) throws DatabaseException {
2655
2656             ResourceImpl resourceImpl = (ResourceImpl) resource;
2657             return resourceImpl.id;
2658
2659         }
2660
2661         @Override
2662         public String createRandomAccessId(Resource resource)
2663         throws InvalidResourceReferenceException {
2664
2665             if(resource == null) throw new IllegalArgumentException();
2666
2667             ResourceImpl resourceImpl = (ResourceImpl) resource;
2668
2669             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2670
2671             int r;
2672             try {
2673                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2674             } catch (DatabaseException e1) {
2675                 throw new InvalidResourceReferenceException(e1);
2676             }
2677             try {
2678                 // Serialize as '<resource index>_<cluster id>'
2679                 return "" + r + "_" + getCluster(resourceImpl);
2680             } catch (Throwable e) {
2681                 e.printStackTrace();
2682                 throw new InvalidResourceReferenceException(e);
2683             } finally {
2684             }
2685         }
2686
2687         public int getTransientId(long serialized) throws DatabaseException {
2688             if (serialized <= 0)
2689                 return (int)serialized;
2690             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2691             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2692             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2693             if (cluster == null)
2694                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2695             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2696             return key;
2697         }
2698
2699         @Override
2700         public Resource getResource(long randomAccessId) throws DatabaseException {
2701             return getResourceByKey(getTransientId(randomAccessId));
2702         }
2703
2704         @Override
2705         public Resource getResource(int transientId) throws DatabaseException {
2706             return getResourceByKey(transientId);
2707         }
2708
2709         @Override
2710         public Resource getResource(String randomAccessId)
2711         throws InvalidResourceReferenceException {
2712             try {
2713                 int i = randomAccessId.indexOf('_');
2714                 if (i == -1)
2715                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2716                             + randomAccessId + "'");
2717                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2718                 if(r < 0) return getResourceByKey(r);
2719                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2720                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2721                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2722                 if (cluster.hasResource(key, clusterTranslator))
2723                     return getResourceByKey(key);
2724             } catch (InvalidResourceReferenceException e) {
2725                 throw e;
2726             } catch (NumberFormatException e) {
2727                 throw new InvalidResourceReferenceException(e);
2728             } catch (Throwable e) {
2729                 e.printStackTrace();
2730                 throw new InvalidResourceReferenceException(e);
2731             } finally {
2732             }
2733             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2734         }
2735
2736         @Override
2737         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2738             try {
2739                 return true;
2740             } catch (Throwable e) {
2741                 e.printStackTrace();
2742                 throw new InvalidResourceReferenceException(e);
2743             } finally {
2744             }
2745         }
2746     }
2747
2748     // Copied from old SessionImpl
2749
2750     void check() {
2751         if (state.isClosed())
2752             throw new Error("Session closed.");
2753     }
2754
2755
2756
2757     public ResourceImpl getNewResource(long clusterId) {
2758         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2759         int newId;
2760         try {
2761             newId = cluster.createResource(clusterTranslator);
2762         } catch (DatabaseException e) {
2763             Logger.defaultLogError(e);
2764             return null;
2765         }
2766         return new ResourceImpl(resourceSupport, newId);
2767     }
2768
2769     public ResourceImpl getNewResource(Resource clusterSet)
2770     throws DatabaseException {
2771         long resourceId = clusterSet.getResourceId();
2772         Long clusterId = clusterSetsSupport.get(resourceId);
2773         if (null == clusterId)
2774             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2775         if (Constants.NewClusterId == clusterId) {
2776             clusterId = getService(ClusteringSupport.class).createCluster();
2777             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2778                 createdClusters.add(clusterId);
2779             clusterSetsSupport.put(resourceId, clusterId);
2780             return getNewResource(clusterId);
2781         } else {
2782             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2783             ResourceImpl result;
2784             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2785                 clusterId = getService(ClusteringSupport.class).createCluster();
2786                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2787                     createdClusters.add(clusterId);
2788                 clusterSetsSupport.put(resourceId, clusterId);
2789                 return getNewResource(clusterId);
2790             } else {
2791                 result = getNewResource(clusterId);
2792                 int resultKey = querySupport.getId(result);
2793                 long resultCluster = querySupport.getClusterId(resultKey);
2794                 if (clusterId != resultCluster)
2795                     clusterSetsSupport.put(resourceId, resultCluster);
2796                 return result;
2797             }
2798         }
2799     }
2800
2801     public void getNewClusterSet(Resource clusterSet)
2802     throws DatabaseException {
2803         if (DEBUG)
2804             System.out.println("new cluster set=" + clusterSet);
2805         long resourceId = clusterSet.getResourceId();
2806         if (clusterSetsSupport.containsKey(resourceId))
2807             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2808         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2809     }
2810     public boolean containsClusterSet(Resource clusterSet)
2811     throws ServiceException {
2812         long resourceId = clusterSet.getResourceId();
2813         return clusterSetsSupport.containsKey(resourceId);
2814     }
2815     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2816         Resource r = defaultClusterSet;
2817         defaultClusterSet = clusterSet;
2818         return r;
2819     }
2820     void printDiagnostics() {
2821
2822         if (DIAGNOSTICS) {
2823
2824             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2825             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2826             for(ClusterI cluster : clusterTable.getClusters()) {
2827                 try {
2828                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2829                         residentClusters.set(residentClusters.get() + 1);
2830                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2831                     }
2832                 } catch (DatabaseException e) {
2833                     Logger.defaultLogError(e);
2834                 }
2835             }
2836
2837             System.out.println("--------------------------------");
2838             System.out.println("Cluster information:");
2839             System.out.println("-amount of resident clusters=" + residentClusters.get());
2840             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2841
2842             for(ClusterI cluster : clusterTable.getClusters()) {
2843                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2844                 try {
2845                     if (!cluster.isLoaded())
2846                         System.out.println("not loaded]");
2847                     else if (cluster.isEmpty())
2848                         System.out.println("is empty]");
2849                     else {
2850                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2851                         System.out.print(",references=" + cluster.getReferenceCount());
2852                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2853                     }
2854                 } catch (DatabaseException e) {
2855                     Logger.defaultLogError(e);
2856                     System.out.println("is corrupted]");
2857                 }
2858             }
2859
2860             queryProvider2.printDiagnostics();
2861             System.out.println("--------------------------------");
2862         }
2863
2864     }
2865
2866     public void fireStartReadTransaction() {
2867
2868         if (DIAGNOSTICS)
2869             System.out.println("StartReadTransaction");
2870
2871         for (SessionEventListener listener : eventListeners) {
2872             try {
2873                 listener.readTransactionStarted();
2874             } catch (Throwable t) {
2875                 t.printStackTrace();
2876             }
2877         }
2878
2879     }
2880
2881     public void fireFinishReadTransaction() {
2882
2883         if (DIAGNOSTICS)
2884             System.out.println("FinishReadTransaction");
2885
2886         for (SessionEventListener listener : eventListeners) {
2887             try {
2888                 listener.readTransactionFinished();
2889             } catch (Throwable t) {
2890                 t.printStackTrace();
2891             }
2892         }
2893
2894     }
2895
2896     public void fireStartWriteTransaction() {
2897
2898         if (DIAGNOSTICS)
2899             System.out.println("StartWriteTransaction");
2900
2901         for (SessionEventListener listener : eventListeners) {
2902             try {
2903                 listener.writeTransactionStarted();
2904             } catch (Throwable t) {
2905                 t.printStackTrace();
2906             }
2907         }
2908
2909     }
2910
2911     public void fireFinishWriteTransaction() {
2912
2913         if (DIAGNOSTICS)
2914             System.out.println("FinishWriteTransaction");
2915
2916         for (SessionEventListener listener : eventListeners) {
2917             try {
2918                 listener.writeTransactionFinished();
2919             } catch (Throwable t) {
2920                 t.printStackTrace();
2921             }
2922         }
2923
2924         Indexing.resetDependenciesIndexingDisabled();
2925
2926     }
2927
2928     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2929         throw new Error("Not supported at the moment.");
2930     }
2931
2932     State getState() {
2933         return state;
2934     }
2935
2936     ClusterTable getClusterTable() {
2937         return clusterTable;
2938     }
2939
2940     public GraphSession getGraphSession() {
2941         return graphSession;
2942     }
2943
2944     public QueryProcessor getQueryProvider2() {
2945         return queryProvider2;
2946     }
2947
2948     ClientChangesImpl getClientChanges() {
2949         return clientChanges;
2950     }
2951
2952     boolean getWriteOnly() {
2953         return writeOnly;
2954     }
2955
2956     static int counter = 0;
2957
2958     public void onClusterLoaded(long clusterId) {
2959
2960         clusterTable.updateSize();
2961
2962     }
2963
2964
2965
2966
2967     /*
2968      * Implementation of the interface RequestProcessor
2969      */
2970
2971     @Override
2972     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2973
2974         assertNotSession();
2975         assertAlive();
2976
2977         assert(request != null);
2978
2979         final DataContainer<T> result = new DataContainer<T>();
2980         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2981
2982         syncRequest(request, new AsyncProcedure<T>() {
2983
2984             @Override
2985             public void execute(AsyncReadGraph graph, T t) {
2986                 result.set(t);
2987             }
2988
2989             @Override
2990             public void exception(AsyncReadGraph graph, Throwable t) {
2991                 exception.set(t);
2992             }
2993
2994         });
2995
2996         Throwable t = exception.get();
2997         if(t != null) {
2998             if(t instanceof DatabaseException) throw (DatabaseException)t;
2999             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3000         }
3001
3002         return result.get();
3003
3004     }
3005
3006 //    @Override
3007 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
3008 //        assertNotSession();
3009 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
3010 //    }
3011
3012     @Override
3013     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
3014         assertNotSession();
3015         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3016     }
3017
3018     @Override
3019     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
3020         assertNotSession();
3021         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3022     }
3023
3024     @Override
3025     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3026         assertNotSession();
3027         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3028     }
3029
3030     @Override
3031     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
3032         assertNotSession();
3033         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3034     }
3035
3036     @Override
3037     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3038
3039         assertNotSession();
3040
3041         assert(request != null);
3042
3043         final DataContainer<T> result = new DataContainer<T>();
3044         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3045
3046         syncRequest(request, new AsyncProcedure<T>() {
3047
3048             @Override
3049             public void execute(AsyncReadGraph graph, T t) {
3050                 result.set(t);
3051             }
3052
3053             @Override
3054             public void exception(AsyncReadGraph graph, Throwable t) {
3055                 exception.set(t);
3056             }
3057
3058         });
3059
3060         Throwable t = exception.get();
3061         if(t != null) {
3062             if(t instanceof DatabaseException) throw (DatabaseException)t;
3063             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3064         }
3065
3066         return result.get();
3067
3068     }
3069
3070     @Override
3071     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3072         assertNotSession();
3073         return syncRequest(request, (AsyncProcedure<T>)procedure);
3074     }
3075
3076     @Override
3077     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3078         assertNotSession();
3079         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3080     }
3081
3082     @Override
3083     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3084         assertNotSession();
3085         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3086     }
3087
3088     @Override
3089     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3090         assertNotSession();
3091         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3092     }
3093
3094     @Override
3095     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3096         assertNotSession();
3097         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3098     }
3099
3100     @Override
3101     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3102
3103         assertNotSession();
3104
3105         assert(request != null);
3106
3107         final ArrayList<T> result = new ArrayList<T>();
3108         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3109
3110         syncRequest(request, new SyncMultiProcedure<T>() {
3111
3112             @Override
3113             public void execute(ReadGraph graph, T t) {
3114                 synchronized(result) {
3115                     result.add(t);
3116                 }
3117             }
3118
3119             @Override
3120             public void finished(ReadGraph graph) {
3121             }
3122
3123             @Override
3124             public void exception(ReadGraph graph, Throwable t) {
3125                 exception.set(t);
3126             }
3127
3128
3129         });
3130
3131         Throwable t = exception.get();
3132         if(t != null) {
3133             if(t instanceof DatabaseException) throw (DatabaseException)t;
3134             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3135         }
3136
3137         return result;
3138
3139     }
3140
3141     @Override
3142     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
3143         assertNotSession();
3144         throw new Error("Not implemented!");
3145     }
3146
3147     @Override
3148     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3149         assertNotSession();
3150         return syncRequest(request, (SyncMultiProcedure<T>)procedure);
3151     }
3152
3153     @Override
3154     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3155         assertNotSession();
3156         return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3157     }
3158
3159     @Override
3160     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3161         assertNotSession();
3162         return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3163     }
3164
3165     @Override
3166     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3167
3168         assertNotSession();
3169
3170         assert(request != null);
3171
3172         final ArrayList<T> result = new ArrayList<T>();
3173         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3174
3175         syncRequest(request, new AsyncMultiProcedure<T>() {
3176
3177             @Override
3178             public void execute(AsyncReadGraph graph, T t) {
3179                 synchronized(result) {
3180                     result.add(t);
3181                 }
3182             }
3183
3184             @Override
3185             public void finished(AsyncReadGraph graph) {
3186             }
3187
3188             @Override
3189             public void exception(AsyncReadGraph graph, Throwable t) {
3190                 exception.set(t);
3191             }
3192
3193
3194         });
3195
3196         Throwable t = exception.get();
3197         if (t != null) {
3198             if (t instanceof DatabaseException)
3199                 throw (DatabaseException) t;
3200             else
3201                 throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3202         }
3203
3204         return result;
3205
3206     }
3207
3208     @Override
3209     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3210         assertNotSession();
3211         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3212     }
3213
3214     @Override
3215     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3216         assertNotSession();
3217         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3218     }
3219
3220     @Override
3221     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3222         assertNotSession();
3223        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3224     }
3225
3226     @Override
3227     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3228         assertNotSession();
3229         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3230     }
3231
3232     @Override
3233     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3234         assertNotSession();
3235         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3236     }
3237
3238
3239     @Override
3240     public <T> void asyncRequest(Read<T> request) {
3241
3242         asyncRequest(request, new ProcedureAdapter<T>() {
3243             @Override
3244             public void exception(Throwable t) {
3245                 t.printStackTrace();
3246             }
3247         });
3248
3249     }
3250
3251     @Override
3252     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3253         asyncRequest(request, (AsyncProcedure<T>)procedure);
3254     }
3255
3256     @Override
3257     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3258         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3259     }
3260
3261     @Override
3262     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3263         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3264     }
3265
3266     @Override
3267     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3268         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3269     }
3270
3271     @Override
3272     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3273         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3274     }
3275
3276     @Override
3277     final public <T> void asyncRequest(final AsyncRead<T> request) {
3278
3279         assert(request != null);
3280
3281         asyncRequest(request, new ProcedureAdapter<T>() {
3282             @Override
3283             public void exception(Throwable t) {
3284                 t.printStackTrace();
3285             }
3286         });
3287
3288     }
3289
3290     @Override
3291     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3292         scheduleRequest(request, procedure, procedure, null);
3293     }
3294
3295     @Override
3296     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3297         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3298     }
3299
3300     @Override
3301     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3302         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3303     }
3304
3305     @Override
3306     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3307         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3308     }
3309
3310     @Override
3311     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3312         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3313     }
3314
3315     @Override
3316     public <T> void asyncRequest(MultiRead<T> request) {
3317
3318         assert(request != null);
3319
3320         asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
3321             @Override
3322             public void exception(ReadGraph graph, Throwable t) {
3323                 t.printStackTrace();
3324             }
3325         });
3326
3327     }
3328
3329     @Override
3330     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3331         asyncRequest(request, (SyncMultiProcedure<T>)procedure);
3332     }
3333
3334     @Override
3335     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3336         asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3337     }
3338
3339     @Override
3340     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3341         scheduleRequest(request, procedure, null);
3342     }
3343
3344     @Override
3345     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3346         asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3347     }
3348
3349     @Override
3350     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3351
3352         assert(request != null);
3353
3354         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3355             @Override
3356             public void exception(AsyncReadGraph graph, Throwable t) {
3357                 t.printStackTrace();
3358             }
3359         });
3360
3361     }
3362
3363     @Override
3364     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3365         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3366     }
3367
3368     @Override
3369     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3370         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3371     }
3372
3373     @Override
3374     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3375         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3376     }
3377
3378     @Override
3379     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3380         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3381     }
3382
3383     @Override
3384     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3385         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3386     }
3387
3388     @Override
3389     final public <T> void asyncRequest(final ExternalRead<T> request) {
3390
3391         assert(request != null);
3392
3393         asyncRequest(request, new ProcedureAdapter<T>() {
3394             @Override
3395             public void exception(Throwable t) {
3396                 t.printStackTrace();
3397             }
3398         });
3399
3400     }
3401
3402     @Override
3403     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3404         asyncRequest(request, (Procedure<T>)procedure);
3405     }
3406
3407     boolean sameProvider(Write request) {
3408         if(writeState.getGraph().provider != null) {
3409             return writeState.getGraph().provider.equals(request.getProvider());
3410         } else {
3411             return request.getProvider() == null;
3412         }
3413     }
3414
3415     boolean plainWrite(WriteGraphImpl graph) {
3416         if(graph == null) return false;
3417         if(graph.writeSupport.writeOnly()) return false;
3418         return true;
3419     }
3420
3421
3422     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3423
3424     private void assertNotSession() throws DatabaseException {
3425         Thread current = Thread.currentThread();
3426         if (sessionThreads.contains(current))
3427             throw new ServiceException("Caller is already inside a transaction.");
3428     }
3429
3430     void assertAlive() {
3431         if (!state.isAlive())
3432             throw new RuntimeDatabaseException("Session has been shut down.");
3433     }
3434
3435     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3436         return querySupport.getValueStream(graph, querySupport.getId(resource));
3437     }
3438
3439     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3440         return querySupport.getValue(graph, querySupport.getId(resource));
3441     }
3442
3443     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3444         acquire(semaphore, request, null);
3445     }
3446
3447     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3448         assertAlive();
3449         try {
3450             long tryCount = 0;
3451             // Loop until the semaphore is acquired reporting requests that take a long time.
3452             while (true) {
3453
3454                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3455                     // Acquired OK.
3456                     return;
3457                 } else {
3458                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3459                         ++tryCount;
3460                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3461                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3462                                 * tryCount;
3463                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3464                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3465                     }
3466                 }
3467
3468                 assertAlive();
3469
3470             }
3471         } catch (InterruptedException e) {
3472             e.printStackTrace();
3473             // FIXME: Should perhaps do something else in this case ??
3474         }
3475     }
3476
3477
3478
3479 //    @Override
3480 //    public boolean holdOnToTransactionAfterCancel() {
3481 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3482 //    }
3483 //
3484 //    @Override
3485 //    public boolean holdOnToTransactionAfterCommit() {
3486 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3487 //    }
3488 //
3489 //    @Override
3490 //    public boolean holdOnToTransactionAfterRead() {
3491 //        return transactionPolicy.holdOnToTransactionAfterRead();
3492 //    }
3493 //
3494 //    @Override
3495 //    public void onRelinquish() {
3496 //        transactionPolicy.onRelinquish();
3497 //    }
3498 //
3499 //    @Override
3500 //    public void onRelinquishDone() {
3501 //        transactionPolicy.onRelinquishDone();
3502 //    }
3503 //
3504 //    @Override
3505 //    public void onRelinquishError() {
3506 //        transactionPolicy.onRelinquishError();
3507 //    }
3508
3509
3510     @Override
3511     public Session getSession() {
3512         return this;
3513     }
3514
3515
3516     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3517     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3518
3519     public void ceased(int thread) {
3520
3521         requestManager.ceased(thread);
3522
3523     }
3524
3525     public int getAmountOfQueryThreads() {
3526         // This must be a power of two
3527         return 1;
3528 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3529     }
3530
3531     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3532
3533         return new ResourceImpl(resourceSupport, key);
3534
3535     }
3536
3537     public void acquireWriteOnly() {
3538         writeOnly = true;
3539     }
3540
3541     public void releaseWriteOnly(ReadGraphImpl graph) {
3542         writeOnly = false;
3543         queryProvider2.releaseWrite(graph);
3544     }
3545
3546     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3547
3548         long start = System.nanoTime();
3549
3550         while(dirtyPrimitives) {
3551             dirtyPrimitives = false;
3552             getQueryProvider2().propagateChangesInQueryCache(writer);
3553             getQueryProvider2().listening.fireListeners(writer);
3554         }
3555
3556         fireMetadataListeners(writer, clientChanges);
3557
3558         if(DebugPolicy.PERFORMANCE_DATA) {
3559             long end = System.nanoTime();
3560             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3561         }
3562
3563     }
3564
3565     void removeTemporaryData() {
3566         File platform = Platform.getLocation().toFile();
3567         File tempFiles = new File(platform, "tempFiles");
3568         File temp = new File(tempFiles, "db");
3569         if(!temp.exists()) 
3570             return;
3571         try {
3572             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3573                 @Override
3574                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3575                     try {
3576                         Files.delete(file);
3577                     } catch (IOException e) {
3578                         Logger.defaultLogError(e);
3579                     }
3580                     return FileVisitResult.CONTINUE;
3581                 }
3582             });
3583         } catch (IOException e) {
3584             Logger.defaultLogError(e);
3585         }
3586     }
3587
3588     public void handleCreatedClusters() {
3589         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3590             createdClusters.forEach(new TLongProcedure() {
3591
3592                 @Override
3593                 public boolean execute(long value) {
3594                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3595                     cluster.setImmutable(true, clusterTranslator);
3596                     return true;
3597                 }
3598             });
3599         }
3600         createdClusters.clear();
3601     }
3602
3603     @Override
3604     public Object getModificationCounter() {
3605         return queryProvider2.modificationCounter;
3606     }
3607     
3608     @Override
3609     public void markUndoPoint() {
3610         state.setCombine(false);
3611     }
3612
3613 }