]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Thread count was wrong
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.Resource;
43 import org.simantics.db.ResourceSerializer;
44 import org.simantics.db.Session;
45 import org.simantics.db.SessionManager;
46 import org.simantics.db.SessionVariables;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.authentication.UserAuthenticationAgent;
50 import org.simantics.db.authentication.UserAuthenticator;
51 import org.simantics.db.common.Indexing;
52 import org.simantics.db.common.TransactionPolicyRelease;
53 import org.simantics.db.common.procedure.BlockingAsyncProcedure;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
64 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
65 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
66 import org.simantics.db.common.utils.Logger;
67 import org.simantics.db.event.ChangeEvent;
68 import org.simantics.db.event.ChangeListener;
69 import org.simantics.db.event.SessionEventListener;
70 import org.simantics.db.exception.CancelTransactionException;
71 import org.simantics.db.exception.ClusterSetExistException;
72 import org.simantics.db.exception.DatabaseException;
73 import org.simantics.db.exception.ImmutableException;
74 import org.simantics.db.exception.InvalidResourceReferenceException;
75 import org.simantics.db.exception.ResourceNotFoundException;
76 import org.simantics.db.exception.RuntimeDatabaseException;
77 import org.simantics.db.exception.ServiceException;
78 import org.simantics.db.exception.ServiceNotFoundException;
79 import org.simantics.db.impl.ClusterBase;
80 import org.simantics.db.impl.ClusterI;
81 import org.simantics.db.impl.ClusterTraitsBase;
82 import org.simantics.db.impl.ClusterTranslator;
83 import org.simantics.db.impl.ResourceImpl;
84 import org.simantics.db.impl.TransientGraph;
85 import org.simantics.db.impl.VirtualGraphImpl;
86 import org.simantics.db.impl.graph.DelayedWriteGraph;
87 import org.simantics.db.impl.graph.ReadGraphImpl;
88 import org.simantics.db.impl.graph.WriteGraphImpl;
89 import org.simantics.db.impl.graph.WriteSupport;
90 import org.simantics.db.impl.internal.RandomAccessValueSupport;
91 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
92 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
93 import org.simantics.db.impl.query.QueryCache;
94 import org.simantics.db.impl.query.QueryCacheBase;
95 import org.simantics.db.impl.query.QueryProcessor;
96 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
97 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
98 import org.simantics.db.impl.service.QueryDebug;
99 import org.simantics.db.impl.support.VirtualGraphServerSupport;
100 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
101 import org.simantics.db.procedure.AsyncListener;
102 import org.simantics.db.procedure.AsyncMultiListener;
103 import org.simantics.db.procedure.AsyncMultiProcedure;
104 import org.simantics.db.procedure.AsyncProcedure;
105 import org.simantics.db.procedure.Listener;
106 import org.simantics.db.procedure.ListenerBase;
107 import org.simantics.db.procedure.MultiListener;
108 import org.simantics.db.procedure.MultiProcedure;
109 import org.simantics.db.procedure.Procedure;
110 import org.simantics.db.procedure.SyncListener;
111 import org.simantics.db.procedure.SyncMultiListener;
112 import org.simantics.db.procedure.SyncMultiProcedure;
113 import org.simantics.db.procedure.SyncProcedure;
114 import org.simantics.db.procore.cluster.ClusterImpl;
115 import org.simantics.db.procore.cluster.ClusterTraits;
116 import org.simantics.db.procore.protocol.Constants;
117 import org.simantics.db.procore.protocol.DebugPolicy;
118 import org.simantics.db.request.AsyncMultiRead;
119 import org.simantics.db.request.AsyncRead;
120 import org.simantics.db.request.DelayedWrite;
121 import org.simantics.db.request.DelayedWriteResult;
122 import org.simantics.db.request.ExternalRead;
123 import org.simantics.db.request.MultiRead;
124 import org.simantics.db.request.Read;
125 import org.simantics.db.request.ReadInterface;
126 import org.simantics.db.request.Write;
127 import org.simantics.db.request.WriteInterface;
128 import org.simantics.db.request.WriteOnly;
129 import org.simantics.db.request.WriteOnlyResult;
130 import org.simantics.db.request.WriteResult;
131 import org.simantics.db.request.WriteTraits;
132 import org.simantics.db.service.ByteReader;
133 import org.simantics.db.service.ClusterBuilder;
134 import org.simantics.db.service.ClusterBuilderFactory;
135 import org.simantics.db.service.ClusterControl;
136 import org.simantics.db.service.ClusterSetsSupport;
137 import org.simantics.db.service.ClusterUID;
138 import org.simantics.db.service.ClusteringSupport;
139 import org.simantics.db.service.CollectionSupport;
140 import org.simantics.db.service.DebugSupport;
141 import org.simantics.db.service.DirectQuerySupport;
142 import org.simantics.db.service.GraphChangeListenerSupport;
143 import org.simantics.db.service.InitSupport;
144 import org.simantics.db.service.LifecycleSupport;
145 import org.simantics.db.service.ManagementSupport;
146 import org.simantics.db.service.QueryControl;
147 import org.simantics.db.service.SerialisationSupport;
148 import org.simantics.db.service.ServerInformation;
149 import org.simantics.db.service.ServiceActivityMonitor;
150 import org.simantics.db.service.SessionEventSupport;
151 import org.simantics.db.service.SessionMonitorSupport;
152 import org.simantics.db.service.SessionUserSupport;
153 import org.simantics.db.service.StatementSupport;
154 import org.simantics.db.service.TransactionPolicySupport;
155 import org.simantics.db.service.TransactionSupport;
156 import org.simantics.db.service.TransferableGraphSupport;
157 import org.simantics.db.service.UndoRedoSupport;
158 import org.simantics.db.service.VirtualGraphSupport;
159 import org.simantics.db.service.XSupport;
160 import org.simantics.layer0.Layer0;
161 import org.simantics.utils.DataContainer;
162 import org.simantics.utils.Development;
163 import org.simantics.utils.threads.logger.ITask;
164 import org.simantics.utils.threads.logger.ThreadLogger;
165
166 import gnu.trove.procedure.TLongProcedure;
167 import gnu.trove.set.hash.TLongHashSet;
168
169
170 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
171
172     protected static final boolean DEBUG        = false;
173
174     private static final boolean DIAGNOSTICS       = false;
175
176     private TransactionPolicySupport transactionPolicy;
177
178     final private ClusterControl clusterControl;
179
180     final protected BuiltinSupportImpl builtinSupport;
181     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
182     final protected ClusterSetsSupport clusterSetsSupport;
183     final protected LifecycleSupportImpl lifecycleSupport;
184
185     protected QuerySupportImpl querySupport;
186     protected ResourceSupportImpl resourceSupport;
187     protected WriteSupport writeSupport;
188     public ClusterTranslator clusterTranslator;
189
190     boolean dirtyPrimitives = false;
191
192     public static final int SERVICE_MODE_CREATE = 2;
193     public static final int SERVICE_MODE_ALLOW = 1;
194     public int serviceMode = 0;
195     public boolean createdImmutableClusters = false;
196     public TLongHashSet createdClusters = new TLongHashSet();
197
198     private Layer0 L0;
199
200     /**
201      * The service locator maintained by the workbench. These services are
202      * initialized during workbench during the <code>init</code> method.
203      */
204     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
205
206     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
207
208     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
209
210     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
211
212     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
213
214     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
215
216     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
217
218     final protected State                                        state              = new State();
219
220     protected GraphSession                                       graphSession       = null;
221
222     protected SessionManager                                     sessionManagerImpl = null;
223
224     protected UserAuthenticationAgent                            authAgent          = null;
225
226     protected UserAuthenticator                                  authenticator      = null;
227
228     protected Resource                                           user               = null;
229
230     protected ClusterStream                                      clusterStream      = null;
231
232     protected SessionRequestManager                         requestManager = null;
233
234     public ClusterTable                                       clusterTable       = null;
235
236     public QueryProcessor                                     queryProvider2 = null;
237
238     ClientChangesImpl                                  clientChanges      = null;
239
240     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
241
242 //    protected long                                               newClusterId       = Constants.NullClusterId;
243
244     protected int                                                flushCounter       = 0;
245
246     protected boolean                                            writeOnly          = false;
247
248     WriteState<?>                                                writeState = null;
249     WriteStateBase<?>                                            delayedWriteState = null;
250     protected Resource defaultClusterSet = null; // If not null then used for newResource().
251
252     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
253
254 //        if (authAgent == null)
255 //            throw new IllegalArgumentException("null authentication agent");
256
257         File t = StaticSessionProperties.virtualGraphStoragePath;
258         if (null == t)
259             t = new File(".");
260         this.clusterTable = new ClusterTable(this, t);
261         this.builtinSupport = new BuiltinSupportImpl(this);
262         this.sessionManagerImpl = sessionManagerImpl;
263         this.user = null;
264 //        this.authAgent = authAgent;
265
266         serviceLocator.registerService(Session.class, this);
267         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
268         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
269         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
270         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
271         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
272         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
273         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
274         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
275         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
276         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
277         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
278         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
279         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
280         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
281         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
282         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
283         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
284         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
285         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
286         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
287         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
288         ServiceActivityUpdaterForWriteTransactions.register(this);
289
290         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
291         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
292         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
293         this.lifecycleSupport = new LifecycleSupportImpl(this);
294         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
295         this.transactionPolicy = new TransactionPolicyRelease();
296         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
297         this.clusterControl = new ClusterControlImpl(this);
298         serviceLocator.registerService(ClusterControl.class, clusterControl);
299         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
300         this.clusterSetsSupport.setReadDirectory(t.toPath());
301         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
302         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
303     }
304
305     /*
306      *
307      * Session interface
308      */
309
310
311     @Override
312     public Resource getRootLibrary() {
313         return queryProvider2.getRootLibraryResource();
314     }
315
316     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
317         if (!graphSession.dbSession.refreshEnabled())
318             return;
319         try {
320             getClusterTable().refresh(csid, this, clusterUID);
321         } catch (Throwable t) {
322             Logger.defaultLogError("Refesh failed.", t);
323         }
324     }
325
326     static final class TaskHelper {
327         private final String name;
328         private Object result;
329         final Semaphore sema = new Semaphore(0);
330         private Throwable throwable = null;
331         final Consumer<DatabaseException> callback = e -> {
332             synchronized (TaskHelper.this) {
333                 throwable = e;
334             }
335         };
336         final Procedure<Object> proc = new Procedure<Object>() {
337             @Override
338             public void execute(Object result) {
339                 callback.accept(null);
340             }
341             @Override
342             public void exception(Throwable t) {
343                 if (t instanceof DatabaseException)
344                     callback.accept((DatabaseException)t);
345                 else
346                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
347             }
348         };
349         final WriteTraits writeTraits = new WriteTraits() {};
350         TaskHelper(String name) {
351             this.name = name;
352         }
353         @SuppressWarnings("unchecked")
354         <T> T getResult() {
355             return (T)result;
356         }
357         void setResult(Object result) {
358             this.result = result;
359         }
360 //        Throwable throwableGet() {
361 //            return throwable;
362 //        }
363         synchronized void throwableSet(Throwable t) {
364             throwable = t;
365         }
366 //        void throwableSet(String t) {
367 //            throwable = new InternalException("" + name + " operation failed. " + t);
368 //        }
369         synchronized void throwableCheck()
370         throws DatabaseException {
371             if (null != throwable)
372                 if (throwable instanceof DatabaseException)
373                     throw (DatabaseException)throwable;
374                 else
375                     throw new DatabaseException("Undo operation failed.", throwable);
376         }
377         void throw_(String message)
378         throws DatabaseException {
379             throw new DatabaseException("" + name + " operation failed. " + message);
380         }
381     }
382
383
384
385     private ListenerBase getListenerBase(Object procedure) {
386         if (procedure instanceof ListenerBase)
387             return (ListenerBase) procedure;
388         else
389             return null;
390     }
391
392     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
393         scheduleRequest(request, callback, notify, null);
394     }
395
396     /* (non-Javadoc)
397      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
398      */
399     @Override
400     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
401
402         assert (request != null);
403
404         if(Development.DEVELOPMENT) {
405             try {
406             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
407                 System.err.println("schedule write '" + request + "'");
408             } catch (Throwable t) {
409                 Logger.defaultLogError(t);
410             }
411         }
412
413         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
414
415         requestManager.scheduleWrite(new SessionTask(true) {
416
417             @Override
418             public void run(int thread) {
419
420                 if(Development.DEVELOPMENT) {
421                     try {
422                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
423                         System.err.println("perform write '" + request + "'");
424                     } catch (Throwable t) {
425                         Logger.defaultLogError(t);
426                     }
427                 }
428
429                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
430
431                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
432
433                 try {
434
435                     flushCounter = 0;
436                     Disposable.safeDispose(clientChanges);
437                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
438
439                     VirtualGraph vg = getProvider(request.getProvider());
440
441                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
442                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
443
444                         @Override
445                         public void execute(Object result) {
446                             if(callback != null) callback.accept(null);
447                         }
448
449                         @Override
450                         public void exception(Throwable t) {
451                             if(callback != null) callback.accept((DatabaseException)t);
452                         }
453
454                     });
455
456                     assert (null != writer);
457 //                    writer.state.barrier.inc();
458
459                     try {
460                         request.perform(writer);
461                         assert (null != writer);
462                     } catch (Throwable t) {
463                         if (!(t instanceof CancelTransactionException))
464                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
465                         writeState.except(t);
466                     } finally {
467 //                        writer.state.barrier.dec();
468 //                        writer.waitAsync(request);
469                     }
470
471
472                     assert(!queryProvider2.cache.dirty);
473
474                 } catch (Throwable e) {
475
476                     // Log it first, just to be safe that the error is always logged.
477                     if (!(e instanceof CancelTransactionException))
478                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
479
480 //                    writeState.getGraph().state.barrier.dec();
481 //                    writeState.getGraph().waitAsync(request);
482
483                     writeState.except(e);
484
485 //                    try {
486 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
487 //                        // All we can do here is to log those, can't really pass them anywhere.
488 //                        if (callback != null) {
489 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
490 //                            else callback.run(new DatabaseException(e));
491 //                        }
492 //                    } catch (Throwable e2) {
493 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
494 //                    }
495 //                    boolean empty = clusterStream.reallyFlush();
496 //                    int callerThread = -1;
497 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
498 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
499 //                    try {
500 //                        // Send all local changes to server so it can calculate correct reverse change set.
501 //                        // This will call clusterStream.accept().
502 //                        state.cancelCommit(context, clusterStream);
503 //                        if (!empty) {
504 //                            if (!context.isOk()) // this is a blocking operation
505 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
506 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
507 //                        }
508 //                        state.cancelCommit2(context, clusterStream);
509 //                    } catch (DatabaseException e3) {
510 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
511 //                    } finally {
512 //                        clusterStream.setOff(false);
513 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
514 //                    }
515
516                 } finally {
517
518                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
519
520                 }
521
522                 task.finish();
523
524                 if(Development.DEVELOPMENT) {
525                     try {
526                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
527                         System.err.println("finish write '" + request + "'");
528                     } catch (Throwable t) {
529                         Logger.defaultLogError(t);
530                     }
531                 }
532
533             }
534
535         }, combine);
536
537     }
538
539     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
540         scheduleRequest(request, procedure, notify, null);
541     }
542
543     /* (non-Javadoc)
544      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
545      */
546     @Override
547     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
548
549         assert (request != null);
550
551         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
552
553         requestManager.scheduleWrite(new SessionTask(true) {
554
555             @Override
556             public void run(int thread) {
557
558                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
559
560                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
561
562                 flushCounter = 0;
563                 Disposable.safeDispose(clientChanges);
564                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
565
566                 VirtualGraph vg = getProvider(request.getProvider());
567                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
568
569                 try {
570                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
571                     writeState = writeStateT;
572
573                     assert (null != writer);
574 //                    writer.state.barrier.inc();
575                     writeStateT.setResult(request.perform(writer));
576                     assert (null != writer);
577
578 //                    writer.state.barrier.dec();
579 //                    writer.waitAsync(null);
580
581                 } catch (Throwable e) {
582
583 //                    writer.state.barrier.dec();
584 //                    writer.waitAsync(null);
585
586                     writeState.except(e);
587
588 //                  state.stopWriteTransaction(clusterStream);
589 //
590 //              } catch (Throwable e) {
591 //                  // Log it first, just to be safe that the error is always logged.
592 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
593 //
594 //                  try {
595 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
596 //                      // All we can do here is to log those, can't really pass them anywhere.
597 //                      if (procedure != null) {
598 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
599 //                          else procedure.exception(new DatabaseException(e));
600 //                      }
601 //                  } catch (Throwable e2) {
602 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
603 //                  }
604 //
605 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
606 //
607 //                  state.stopWriteTransaction(clusterStream);
608
609                 } finally {
610                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
611                 }
612
613 //              if(notify != null) notify.release();
614
615                 task.finish();
616
617             }
618
619         }, combine);
620
621     }
622
623     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
624         scheduleRequest(request, callback, notify, null);
625     }
626
627     /* (non-Javadoc)
628      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
629      */
630     @Override
631     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
632
633         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
634
635         assert (request != null);
636
637         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
638
639         requestManager.scheduleWrite(new SessionTask(true) {
640
641             @Override
642             public void run(int thread) {
643                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
644
645                 Procedure<Object> stateProcedure = new Procedure<Object>() {
646                     @Override
647                     public void execute(Object result) {
648                         if (callback != null)
649                             callback.accept(null);
650                     }
651                     @Override
652                     public void exception(Throwable t) {
653                         if (callback != null) {
654                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
655                             else callback.accept(new DatabaseException(t));
656                         } else
657                             Logger.defaultLogError("Unhandled exception", t);
658                     }
659                 };
660
661                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
662                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
663                 DelayedWriteGraph dwg = null;
664 //                newGraph.state.barrier.inc();
665
666                 try {
667                     dwg = new DelayedWriteGraph(newGraph);
668                     request.perform(dwg);
669                 } catch (Throwable e) {
670                     delayedWriteState.except(e);
671                     total.finish();
672                     dwg.close();
673                     return;
674                 } finally {
675 //                    newGraph.state.barrier.dec();
676 //                    newGraph.waitAsync(request);
677                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
678                 }
679
680                 delayedWriteState = null;
681
682                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
683                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
684
685                 flushCounter = 0;
686                 Disposable.safeDispose(clientChanges);
687                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
688
689                 acquireWriteOnly();
690
691                 VirtualGraph vg = getProvider(request.getProvider());
692                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
693
694                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
695
696                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
697
698                 assert (null != writer);
699 //                writer.state.barrier.inc();
700
701                 try {
702                     // Cannot cancel
703                     dwg.commit(writer, request);
704
705                         if(defaultClusterSet != null) {
706                                 XSupport xs = getService(XSupport.class);
707                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
708                                 ClusteringSupport cs = getService(ClusteringSupport.class);
709                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
710                         }
711
712                     // This makes clusters available from server
713                     clusterStream.reallyFlush();
714
715                     releaseWriteOnly(writer);
716                     handleUpdatesAndMetadata(writer);
717
718                 } catch (ServiceException e) {
719 //                    writer.state.barrier.dec();
720 //                    writer.waitAsync(null);
721
722                     // These shall be requested from server
723                     clusterTable.removeWriteOnlyClusters();
724                     // This makes clusters available from server
725                     clusterStream.reallyFlush();
726
727                     releaseWriteOnly(writer);
728                     writeState.except(e);
729                 } finally {
730                     // Debugging & Profiling
731                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
732                     task2.finish();
733                     total.finish();
734                 }
735             }
736
737         }, combine);
738
739     }
740
741     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
742         scheduleRequest(request, procedure, notify, null);
743     }
744
745     /* (non-Javadoc)
746      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
747      */
748     @Override
749     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
750         throw new Error("Not implemented");
751     }
752
753     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
754         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
755         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
756             createdClusters.add(cluster.clusterId);
757         }
758         return cluster;
759     }
760
761     class WriteOnlySupport implements WriteSupport {
762
763         ClusterStream stream;
764         ClusterImpl currentCluster;
765
766         public WriteOnlySupport() {
767             this.stream = clusterStream;
768         }
769
770         @Override
771         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
772             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
773         }
774
775         @Override
776         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
777
778             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
779
780             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
781                 if(s != queryProvider2.getRootLibrary())
782                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
783
784             try {
785                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
786             } catch (DatabaseException e) {
787                 Logger.defaultLogError(e);
788                 //throw e;
789             }
790
791             clientChanges.invalidate(s);
792
793             if (cluster.isWriteOnly())
794                 return;
795             queryProvider2.updateStatements(s, p);
796
797         }
798
799         @Override
800         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
801             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
802         }
803
804         @Override
805         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
806
807             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
808             try {
809                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
810             } catch (DatabaseException e) {
811                 Logger.defaultLogError(e);
812             }
813
814             clientChanges.invalidate(rid);
815
816             if (cluster.isWriteOnly())
817                 return;
818             queryProvider2.updateValue(rid);
819
820         }
821
822         @Override
823         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
824
825             if(amount < 65536) {
826                 claimValue(provider,resource, reader.readBytes(null, amount));
827                 return;
828             }
829
830             byte[] bytes = new byte[65536];
831
832             int rid = ((ResourceImpl)resource).id;
833             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
834             try {
835                 int left = amount;
836                 while(left > 0) {
837                     int block = Math.min(left, 65536);
838                     reader.readBytes(bytes, block);
839                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
840                     left -= block;
841                 }
842             } catch (DatabaseException e) {
843                 Logger.defaultLogError(e);
844             }
845
846             clientChanges.invalidate(rid);
847
848             if (cluster.isWriteOnly())
849                 return;
850             queryProvider2.updateValue(rid);
851
852         }
853
854         private void maintainCluster(ClusterImpl before, ClusterI after_) {
855             if(after_ != null && after_ != before) {
856                 ClusterImpl after = (ClusterImpl)after_;
857                 if(currentCluster == before) {
858                     currentCluster = after;
859                 }
860                 clusterTable.replaceCluster(after);
861             }
862         }
863
864         public int createResourceKey(int foreignCounter) throws DatabaseException {
865             if(currentCluster == null) {
866                 currentCluster = getNewResourceCluster();
867             }
868             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
869                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
870                 newCluster.foreignLookup = new byte[foreignCounter];
871                 currentCluster = newCluster;
872                 if (DEBUG)
873                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
874             }
875             return currentCluster.createResource(clusterTranslator);
876         }
877
878         @Override
879         public Resource createResource(VirtualGraph provider) throws DatabaseException {
880             if(currentCluster == null) {
881                 if (null != defaultClusterSet) {
882                         ResourceImpl result = getNewResource(defaultClusterSet);
883                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
884                     return result;
885                 } else {
886                     currentCluster = getNewResourceCluster();
887                 }
888             }
889             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
890                 if (null != defaultClusterSet) {
891                         ResourceImpl result = getNewResource(defaultClusterSet);
892                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
893                     return result;
894                 } else {
895                     currentCluster = getNewResourceCluster();
896                 }
897             }
898             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
899         }
900
901         @Override
902         public Resource createResource(VirtualGraph provider, long clusterId)
903         throws DatabaseException {
904             return getNewResource(clusterId);
905         }
906
907         @Override
908         public Resource createResource(VirtualGraph provider, Resource clusterSet)
909         throws DatabaseException {
910             return getNewResource(clusterSet);
911         }
912
913         @Override
914         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
915                 throws DatabaseException {
916             getNewClusterSet(clusterSet);
917         }
918
919         @Override
920         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
921         throws ServiceException {
922             return containsClusterSet(clusterSet);
923         }
924
925         public void selectCluster(long cluster) {
926                 currentCluster = clusterTable.getClusterByClusterId(cluster);
927                 long setResourceId = clusterSetsSupport.getSet(cluster);
928                 clusterSetsSupport.put(setResourceId, cluster);
929         }
930
931         @Override
932         public Resource setDefaultClusterSet(Resource clusterSet)
933         throws ServiceException {
934                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
935                 if(clusterSet != null) {
936                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
937                         currentCluster = clusterTable.getClusterByClusterId(id);
938                         return result;
939                 } else {
940                         currentCluster = null;
941                         return null;
942                 }
943         }
944
945         @Override
946         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
947
948                  provider = getProvider(provider);
949              if (null == provider) {
950                  int key = ((ResourceImpl)resource).id;
951
952
953
954                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
955 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
956 //                      if(key != queryProvider2.getRootLibrary())
957 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
958
959 //                 try {
960 //                     cluster.removeValue(key, clusterTranslator);
961 //                 } catch (DatabaseException e) {
962 //                     Logger.defaultLogError(e);
963 //                     return;
964 //                 }
965
966                  clusterTable.writeOnlyInvalidate(cluster);
967
968                  try {
969                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
970                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
971                          clusterTranslator.removeValue(cluster);
972                  } catch (DatabaseException e) {
973                          Logger.defaultLogError(e);
974                  }
975
976                  queryProvider2.invalidateResource(key);
977                  clientChanges.invalidate(key);
978
979              } else {
980                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
981                  queryProvider2.updateValue(querySupport.getId(resource));
982                  clientChanges.claimValue(resource);
983              }
984
985
986         }
987
988         @Override
989         public void flush(boolean intermediate) {
990             throw new UnsupportedOperationException();
991         }
992
993         @Override
994         public void flushCluster() {
995             clusterTable.flushCluster(graphSession);
996             if(defaultClusterSet != null) {
997                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
998             }
999             currentCluster = null;
1000         }
1001
1002         @Override
1003         public void flushCluster(Resource r) {
1004             throw new UnsupportedOperationException("flushCluster resource " + r);
1005         }
1006
1007         @Override
1008         public void gc() {
1009         }
1010
1011         @Override
1012         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1013                 Resource object) {
1014
1015                 int s = ((ResourceImpl)subject).id;
1016                 int p = ((ResourceImpl)predicate).id;
1017                 int o = ((ResourceImpl)object).id;
1018
1019                 provider = getProvider(provider);
1020                 if (null == provider) {
1021
1022                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1023                         clusterTable.writeOnlyInvalidate(cluster);
1024
1025                         try {
1026
1027                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1028
1029                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1030                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1031
1032                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1033                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1034                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1035                                 clusterTranslator.removeStatement(cluster);
1036
1037                                 queryProvider2.invalidateResource(s);
1038                                 clientChanges.invalidate(s);
1039
1040                         } catch (DatabaseException e) {
1041
1042                                 Logger.defaultLogError(e);
1043
1044                         }
1045
1046                         return true;
1047
1048                 } else {
1049                         
1050                         ((VirtualGraphImpl)provider).deny(s, p, o);
1051                         queryProvider2.invalidateResource(s);
1052                         clientChanges.invalidate(s);
1053
1054                         return true;
1055
1056                 }
1057
1058         }
1059
1060         @Override
1061         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1062             throw new UnsupportedOperationException();
1063         }
1064
1065         @Override
1066         public boolean writeOnly() {
1067             return true;
1068         }
1069
1070         @Override
1071         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1072             writeSupport.performWriteRequest(graph, request);
1073         }
1074
1075         @Override
1076         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1077             throw new UnsupportedOperationException();
1078         }
1079
1080         @Override
1081         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1082             throw new UnsupportedOperationException();
1083         }
1084
1085         @Override
1086         public <T> void addMetadata(Metadata data) throws ServiceException {
1087             writeSupport.addMetadata(data);
1088         }
1089
1090         @Override
1091         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1092             return writeSupport.getMetadata(clazz);
1093         }
1094
1095         @Override
1096         public TreeMap<String, byte[]> getMetadata() {
1097             return writeSupport.getMetadata();
1098         }
1099
1100         @Override
1101         public void commitDone(WriteTraits writeTraits, long csid) {
1102             writeSupport.commitDone(writeTraits, csid);
1103         }
1104
1105         @Override
1106         public void clearUndoList(WriteTraits writeTraits) {
1107             writeSupport.clearUndoList(writeTraits);
1108         }
1109         @Override
1110         public int clearMetadata() {
1111             return writeSupport.clearMetadata();
1112         }
1113
1114                 @Override
1115                 public void startUndo() {
1116                         writeSupport.startUndo();
1117                 }
1118     }
1119
1120     class VirtualWriteOnlySupport implements WriteSupport {
1121
1122 //        @Override
1123 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1124 //                Resource object) {
1125 //            throw new UnsupportedOperationException();
1126 //        }
1127
1128         @Override
1129         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1130
1131             TransientGraph impl = (TransientGraph)provider;
1132             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1133             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1134             clientChanges.claim(subject, predicate, object);
1135
1136         }
1137
1138         @Override
1139         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1140
1141             TransientGraph impl = (TransientGraph)provider;
1142             impl.claim(subject, predicate, object);
1143             getQueryProvider2().updateStatements(subject, predicate);
1144             clientChanges.claim(subject, predicate, object);
1145
1146         }
1147
1148         @Override
1149         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1150             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1151         }
1152
1153         @Override
1154         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1155             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1156             getQueryProvider2().updateValue(resource);
1157             clientChanges.claimValue(resource);
1158         }
1159
1160         @Override
1161         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1162             byte[] value = reader.readBytes(null, amount);
1163             claimValue(provider, resource, value);
1164         }
1165
1166         @Override
1167         public Resource createResource(VirtualGraph provider) {
1168             TransientGraph impl = (TransientGraph)provider;
1169             return impl.getResource(impl.newResource(false));
1170         }
1171
1172         @Override
1173         public Resource createResource(VirtualGraph provider, long clusterId) {
1174             throw new UnsupportedOperationException();
1175         }
1176
1177         @Override
1178         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1179         throws DatabaseException {
1180             throw new UnsupportedOperationException();
1181         }
1182
1183         @Override
1184         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1185         throws DatabaseException {
1186             throw new UnsupportedOperationException();
1187         }
1188
1189         @Override
1190         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1191         throws ServiceException {
1192             throw new UnsupportedOperationException();
1193         }
1194
1195         @Override
1196         public Resource setDefaultClusterSet(Resource clusterSet)
1197         throws ServiceException {
1198                 return null;
1199         }
1200
1201         @Override
1202         public void denyValue(VirtualGraph provider, Resource resource) {
1203             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1204             getQueryProvider2().updateValue(querySupport.getId(resource));
1205             // NOTE: this only keeps track of value changes by-resource.
1206             clientChanges.claimValue(resource);
1207         }
1208
1209         @Override
1210         public void flush(boolean intermediate) {
1211             throw new UnsupportedOperationException();
1212         }
1213
1214         @Override
1215         public void flushCluster() {
1216             throw new UnsupportedOperationException();
1217         }
1218
1219         @Override
1220         public void flushCluster(Resource r) {
1221             throw new UnsupportedOperationException("Resource " + r);
1222         }
1223
1224         @Override
1225         public void gc() {
1226         }
1227
1228         @Override
1229         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1230             TransientGraph impl = (TransientGraph) provider;
1231             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1232             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1233             clientChanges.deny(subject, predicate, object);
1234             return true;
1235         }
1236
1237         @Override
1238         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1239             throw new UnsupportedOperationException();
1240         }
1241
1242         @Override
1243         public boolean writeOnly() {
1244             return true;
1245         }
1246
1247         @Override
1248         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1249             throw new UnsupportedOperationException();
1250         }
1251
1252         @Override
1253         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1254             throw new UnsupportedOperationException();
1255         }
1256
1257         @Override
1258         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1259             throw new UnsupportedOperationException();
1260         }
1261
1262         @Override
1263         public <T> void addMetadata(Metadata data) throws ServiceException {
1264             throw new UnsupportedOperationException();
1265         }
1266
1267         @Override
1268         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1269             throw new UnsupportedOperationException();
1270         }
1271
1272         @Override
1273         public TreeMap<String, byte[]> getMetadata() {
1274             throw new UnsupportedOperationException();
1275         }
1276
1277         @Override
1278         public void commitDone(WriteTraits writeTraits, long csid) {
1279         }
1280
1281         @Override
1282         public void clearUndoList(WriteTraits writeTraits) {
1283         }
1284
1285         @Override
1286         public int clearMetadata() {
1287             return 0;
1288         }
1289
1290                 @Override
1291                 public void startUndo() {
1292                 }
1293     }
1294
1295     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1296
1297         try {
1298
1299             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1300
1301             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1302
1303             flushCounter = 0;
1304             Disposable.safeDispose(clientChanges);
1305             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1306
1307             acquireWriteOnly();
1308
1309             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1310
1311             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1312
1313             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1314             writeState = writeStateT;
1315
1316             assert (null != writer);
1317 //            writer.state.barrier.inc();
1318             long start = System.nanoTime();
1319             T result = request.perform(writer);
1320             long duration = System.nanoTime() - start;
1321             if (DEBUG) {
1322                 System.err.println("################");
1323                 System.err.println("WriteOnly duration " + 1e-9*duration);
1324             }
1325             writeStateT.setResult(result);
1326
1327             // This makes clusters available from server
1328             clusterStream.reallyFlush();
1329
1330             // This will trigger query updates
1331             releaseWriteOnly(writer);
1332             assert (null != writer);
1333
1334             handleUpdatesAndMetadata(writer);
1335
1336         } catch (CancelTransactionException e) {
1337
1338             releaseWriteOnly(writeState.getGraph());
1339
1340             clusterTable.removeWriteOnlyClusters();
1341             state.stopWriteTransaction(clusterStream);
1342
1343         } catch (Throwable e) {
1344
1345             e.printStackTrace();
1346
1347             releaseWriteOnly(writeState.getGraph());
1348
1349             clusterTable.removeWriteOnlyClusters();
1350
1351             if (callback != null)
1352                 callback.exception(new DatabaseException(e));
1353
1354             state.stopWriteTransaction(clusterStream);
1355             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1356
1357         } finally  {
1358
1359             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1360
1361         }
1362
1363     }
1364
1365     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1366         scheduleRequest(request, callback, notify, null);
1367     }
1368
1369     @Override
1370     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1371
1372         assertAlive();
1373
1374         assert (request != null);
1375
1376         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1377
1378         requestManager.scheduleWrite(new SessionTask(true) {
1379
1380             @Override
1381             public void run(int thread) {
1382
1383                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1384
1385                     try {
1386
1387                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1388
1389                         flushCounter = 0;
1390                         Disposable.safeDispose(clientChanges);
1391                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1392
1393                         acquireWriteOnly();
1394
1395                         VirtualGraph vg = getProvider(request.getProvider());
1396                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1397
1398                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1399
1400                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1401
1402                             @Override
1403                             public void execute(Object result) {
1404                                 if(callback != null) callback.accept(null);
1405                             }
1406
1407                             @Override
1408                             public void exception(Throwable t) {
1409                                 if(callback != null) callback.accept((DatabaseException)t);
1410                             }
1411
1412                         });
1413
1414                         assert (null != writer);
1415 //                        writer.state.barrier.inc();
1416
1417                         try {
1418
1419                             request.perform(writer);
1420
1421                         } catch (Throwable e) {
1422
1423 //                            writer.state.barrier.dec();
1424 //                            writer.waitAsync(null);
1425
1426                             releaseWriteOnly(writer);
1427
1428                             clusterTable.removeWriteOnlyClusters();
1429
1430                             if(!(e instanceof CancelTransactionException)) {
1431                                 if (callback != null)
1432                                     callback.accept(new DatabaseException(e));
1433                             }
1434
1435                             writeState.except(e);
1436
1437                             return;
1438
1439                         }
1440
1441                         // This makes clusters available from server
1442                         boolean empty = clusterStream.reallyFlush();
1443                         // This was needed to make WO requests call metadata listeners.
1444                         // NOTE: the calling event does not contain clientChanges information.
1445                         if (!empty && clientChanges.isEmpty())
1446                             clientChanges.setNotEmpty(true);
1447                         releaseWriteOnly(writer);
1448                         assert (null != writer);
1449                     } finally  {
1450
1451                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1452
1453                     }
1454
1455
1456                 task.finish();
1457
1458             }
1459
1460         }, combine);
1461
1462     }
1463
1464     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1465         scheduleRequest(request, callback, notify, null);
1466     }
1467
1468     /* (non-Javadoc)
1469      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1470      */
1471     @Override
1472     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1473
1474         assert (request != null);
1475
1476         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1477
1478         requestManager.scheduleWrite(new SessionTask(true) {
1479
1480             @Override
1481             public void run(int thread) {
1482
1483                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1484
1485                 performWriteOnly(request, notify, callback);
1486
1487                 task.finish();
1488
1489             }
1490
1491         }, combine);
1492
1493     }
1494
1495     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1496
1497         assert (request != null);
1498         assert (procedure != null);
1499
1500         //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1501
1502         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1503
1504             @Override
1505             public void run(int thread) {
1506
1507                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1508
1509                 ListenerBase listener = getListenerBase(procedure);
1510
1511                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1512
1513                 try {
1514
1515                     if (listener != null) {
1516
1517                         try {
1518                                 
1519                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1520
1521                                         @Override
1522                                         public void exception(AsyncReadGraph graph, Throwable t) {
1523                                                 procedure.exception(graph, t);
1524                                                 if(throwable != null) {
1525                                                         throwable.set(t);
1526                                                 } else {
1527                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1528                                                 }
1529                                         }
1530
1531                                         @Override
1532                                         public void execute(AsyncReadGraph graph, T t) {
1533                                                 if(result != null) result.set(t);
1534                                                 procedure.execute(graph, t);
1535                                         }
1536
1537                                 };
1538                                 
1539                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1540                                 
1541                         } catch (Throwable t) {
1542                             // This is handled by the AsyncProcedure
1543                                 //Logger.defaultLogError("Internal error", t);
1544                         }
1545
1546                     } else {
1547
1548                         try {
1549
1550 //                            newGraph.state.barrier.inc();
1551
1552                             T t = request.perform(newGraph);
1553
1554                             try {
1555
1556                                 if(result != null) result.set(t);
1557                                 procedure.execute(newGraph, t);
1558
1559                             } catch (Throwable th) {
1560
1561                                 if(throwable != null) {
1562                                     throwable.set(th);
1563                                 } else {
1564                                     Logger.defaultLogError("Unhandled exception", th);
1565                                 }
1566
1567                             }
1568
1569                         } catch (Throwable t) {
1570
1571                             if (DEBUG)
1572                                 t.printStackTrace();
1573
1574                             if(throwable != null) {
1575                                 throwable.set(t);
1576                             } else {
1577                                 Logger.defaultLogError("Unhandled exception", t);
1578                             }
1579
1580                             try {
1581
1582                                 procedure.exception(newGraph, t);
1583
1584                             } catch (Throwable t2) {
1585
1586                                 if(throwable != null) {
1587                                     throwable.set(t2);
1588                                 } else {
1589                                     Logger.defaultLogError("Unhandled exception", t2);
1590                                 }
1591
1592                             }
1593
1594                         }
1595
1596 //                        newGraph.state.barrier.dec();
1597 //                        newGraph.waitAsync(request);
1598
1599                     }
1600
1601                 } finally {
1602
1603                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1604
1605                 }
1606
1607             }
1608
1609         });
1610
1611     }
1612
1613     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1614
1615         assert (request != null);
1616         assert (procedure != null);
1617
1618         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1619
1620         requestManager.scheduleRead(new SessionRead(null, notify) {
1621
1622             @Override
1623             public void run(int thread) {
1624
1625                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1626
1627                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1628
1629                 try {
1630
1631                     if (listener != null) {
1632
1633                         try {
1634                                 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1635                                 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1636                                                         //newGraph.processor.query(newGraph, request, null, procedure, listener);
1637                                                 } catch (DatabaseException e) {
1638                                                         Logger.defaultLogError(e);
1639                                                 }
1640
1641                     } else {
1642
1643 //                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1644 //                                procedure, "request");
1645
1646                         BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
1647
1648                         try {
1649
1650                             request.perform(newGraph, wrap);
1651                             wrap.get();
1652
1653                         } catch (Throwable t) {
1654
1655                                 wrap.exception(newGraph, t);
1656                                 
1657 //                            wrapper.exception(newGraph, t);
1658 //                            newGraph.waitAsync(request);
1659
1660
1661                         }
1662
1663                     }
1664
1665                 } finally {
1666
1667                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1668
1669                 }
1670
1671             }
1672
1673         });
1674
1675     }
1676
1677     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1678
1679         assert (request != null);
1680         assert (procedure != null);
1681
1682         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1683
1684         int sync = notify != null ? thread : -1;
1685
1686         requestManager.scheduleRead(new SessionRead(null, notify) {
1687
1688             @Override
1689             public void run(int thread) {
1690
1691                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1692
1693                 ListenerBase listener = getListenerBase(procedure);
1694
1695                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1696
1697                 try {
1698
1699                     if (listener != null) {
1700
1701                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1702
1703 //                        newGraph.waitAsync(request);
1704
1705                     } else {
1706
1707                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1708
1709                         try {
1710
1711                             request.perform(newGraph, wrapper);
1712
1713                         } catch (Throwable t) {
1714
1715                             t.printStackTrace();
1716
1717                         }
1718
1719                     }
1720
1721                 } finally {
1722
1723                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1724
1725                 }
1726
1727             }
1728
1729         });
1730
1731     }
1732
1733     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1734
1735         assert (request != null);
1736         assert (procedure != null);
1737
1738         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1739
1740         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1741
1742             @Override
1743             public void run(int thread) {
1744
1745                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1746
1747                 ListenerBase listener = getListenerBase(procedure);
1748
1749                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1750
1751                 try {
1752
1753                     if (listener != null) {
1754
1755                         try {
1756                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1757                         } catch (DatabaseException e) {
1758                             Logger.defaultLogError(e);
1759                         }
1760                         
1761                         
1762 //                        newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1763 //
1764 //                            @Override
1765 //                            public void exception(Throwable t) {
1766 //                                procedure.exception(t);
1767 //                                if(throwable != null) {
1768 //                                    throwable.set(t);
1769 //                                }
1770 //                            }
1771 //
1772 //                            @Override
1773 //                            public void execute(T t) {
1774 //                                if(result != null) result.set(t);
1775 //                                procedure.execute(t);
1776 //                            }
1777 //
1778 //                        }, listener);
1779
1780 //                        newGraph.waitAsync(request);
1781
1782                     } else {
1783
1784 //                        newGraph.state.barrier.inc();
1785
1786                         request.register(newGraph, new Listener<T>() {
1787
1788                             @Override
1789                             public void exception(Throwable t) {
1790                                 if(throwable != null) throwable.set(t);
1791                                 procedure.exception(t);
1792 //                                newGraph.state.barrier.dec();
1793                             }
1794
1795                             @Override
1796                             public void execute(T t) {
1797                                 if(result != null) result.set(t);
1798                                 procedure.execute(t);
1799 //                                newGraph.state.barrier.dec();
1800                             }
1801
1802                             @Override
1803                             public boolean isDisposed() {
1804                                 return true;
1805                             }
1806
1807                         });
1808
1809 //                        newGraph.waitAsync(request);
1810
1811                     }
1812
1813                 } finally {
1814
1815                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1816
1817                 }
1818
1819             }
1820
1821         });
1822
1823     }
1824
1825
1826     @Override
1827     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1828
1829         scheduleRequest(request, procedure, null, null, null);
1830
1831     }
1832
1833     @Override
1834     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1835         assertNotSession();
1836         Semaphore notify = new Semaphore(0);
1837         DataContainer<Throwable> container = new DataContainer<Throwable>();
1838         DataContainer<T> result = new DataContainer<T>();
1839         scheduleRequest(request, procedure, notify, container, result);
1840         acquire(notify, request);
1841         Throwable throwable = container.get();
1842         if(throwable != null) {
1843             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1844             else throw new DatabaseException("Unexpected exception", throwable);
1845         }
1846         return result.get();
1847     }
1848
1849     @Override
1850     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1851         assertNotSession();
1852         Semaphore notify = new Semaphore(0);
1853         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1854         final DataContainer<T> resultContainer = new DataContainer<T>();
1855         scheduleRequest(request, new AsyncProcedure<T>() {
1856             @Override
1857             public void exception(AsyncReadGraph graph, Throwable throwable) {
1858                 exceptionContainer.set(throwable);
1859                 procedure.exception(graph, throwable);
1860             }
1861             @Override
1862             public void execute(AsyncReadGraph graph, T result) {
1863                 resultContainer.set(result);
1864                 procedure.execute(graph, result);
1865             }
1866         }, getListenerBase(procedure), notify);
1867         acquire(notify, request, procedure);
1868         Throwable throwable = exceptionContainer.get();
1869         if (throwable != null) {
1870             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1871             else throw new DatabaseException("Unexpected exception", throwable);
1872         }
1873         return resultContainer.get();
1874     }
1875
1876
1877     @Override
1878     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1879         assertNotSession();
1880         Semaphore notify = new Semaphore(0);
1881         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1882         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1883             @Override
1884             public void exception(AsyncReadGraph graph, Throwable throwable) {
1885                 exceptionContainer.set(throwable);
1886                 procedure.exception(graph, throwable);
1887             }
1888             @Override
1889             public void execute(AsyncReadGraph graph, T result) {
1890                 procedure.execute(graph, result);
1891             }
1892             @Override
1893             public void finished(AsyncReadGraph graph) {
1894                 procedure.finished(graph);
1895             }
1896         }, notify);
1897         acquire(notify, request, procedure);
1898         Throwable throwable = exceptionContainer.get();
1899         if (throwable != null) {
1900             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1901             else throw new DatabaseException("Unexpected exception", throwable);
1902         }
1903         // TODO: implement return value
1904         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1905         return null;
1906     }
1907
1908     @Override
1909     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1910         return syncRequest(request, new ProcedureAdapter<T>());
1911
1912
1913 //        assert(request != null);
1914 //
1915 //        final DataContainer<T> result = new DataContainer<T>();
1916 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1917 //
1918 //        syncRequest(request, new Procedure<T>() {
1919 //
1920 //            @Override
1921 //            public void execute(T t) {
1922 //                result.set(t);
1923 //            }
1924 //
1925 //            @Override
1926 //            public void exception(Throwable t) {
1927 //                exception.set(t);
1928 //            }
1929 //
1930 //        });
1931 //
1932 //        Throwable t = exception.get();
1933 //        if(t != null) {
1934 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1935 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1936 //        }
1937 //
1938 //        return result.get();
1939
1940     }
1941
1942     @Override
1943     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1944         return syncRequest(request, (Procedure<T>)procedure);
1945     }
1946
1947
1948 //    @Override
1949 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1950 //        assertNotSession();
1951 //        Semaphore notify = new Semaphore(0);
1952 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1953 //        DataContainer<T> result = new DataContainer<T>();
1954 //        scheduleRequest(request, procedure, notify, container, result);
1955 //        acquire(notify, request);
1956 //        Throwable throwable = container.get();
1957 //        if(throwable != null) {
1958 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1959 //            else throw new DatabaseException("Unexpected exception", throwable);
1960 //        }
1961 //        return result.get();
1962 //    }
1963
1964
1965     @Override
1966     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1967         assertNotSession();
1968         Semaphore notify = new Semaphore(0);
1969         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1970         final DataContainer<T> result = new DataContainer<T>();
1971         scheduleRequest(request, procedure, notify, container, result);
1972         acquire(notify, request);
1973         Throwable throwable = container.get();
1974         if (throwable != null) {
1975             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1976             else throw new DatabaseException("Unexpected exception", throwable);
1977         }
1978         return result.get();
1979     }
1980
1981     @Override
1982     public void syncRequest(Write request) throws DatabaseException  {
1983         assertNotSession();
1984         assertAlive();
1985         Semaphore notify = new Semaphore(0);
1986         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1987         scheduleRequest(request, e -> exception.set(e), notify);
1988         acquire(notify, request);
1989         if(exception.get() != null) throw exception.get();
1990     }
1991
1992     @Override
1993     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1994         assertNotSession();
1995         Semaphore notify = new Semaphore(0);
1996         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1997         final DataContainer<T> result = new DataContainer<T>();
1998         scheduleRequest(request, new Procedure<T>() {
1999
2000             @Override
2001             public void exception(Throwable t) {
2002               exception.set(t);
2003             }
2004
2005             @Override
2006             public void execute(T t) {
2007               result.set(t);
2008             }
2009
2010         }, notify);
2011         acquire(notify, request);
2012         if(exception.get() != null) {
2013             Throwable t = exception.get();
2014             if(t instanceof DatabaseException) throw (DatabaseException)t;
2015             else throw new DatabaseException(t);
2016         }
2017         return result.get();
2018     }
2019
2020     @Override
2021     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2022         assertNotSession();
2023         Semaphore notify = new Semaphore(0);
2024         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2025         final DataContainer<T> result = new DataContainer<T>();
2026         scheduleRequest(request, new Procedure<T>() {
2027
2028             @Override
2029             public void exception(Throwable t) {
2030               exception.set(t);
2031             }
2032
2033             @Override
2034             public void execute(T t) {
2035               result.set(t);
2036             }
2037
2038         }, notify);
2039         acquire(notify, request);
2040         if(exception.get() != null) {
2041             Throwable t = exception.get();
2042             if(t instanceof DatabaseException) throw (DatabaseException)t;
2043             else throw new DatabaseException(t);
2044         }
2045         return result.get();
2046     }
2047
2048     @Override
2049     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2050         assertNotSession();
2051         Semaphore notify = new Semaphore(0);
2052         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2053         final DataContainer<T> result = new DataContainer<T>();
2054         scheduleRequest(request, new Procedure<T>() {
2055
2056             @Override
2057             public void exception(Throwable t) {
2058               exception.set(t);
2059             }
2060
2061             @Override
2062             public void execute(T t) {
2063               result.set(t);
2064             }
2065
2066         }, notify);
2067         acquire(notify, request);
2068         if(exception.get() != null) {
2069             Throwable t = exception.get();
2070             if(t instanceof DatabaseException) throw (DatabaseException)t;
2071             else throw new DatabaseException(t);
2072         }
2073         return result.get();
2074     }
2075
2076     @Override
2077     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2078         assertNotSession();
2079         Semaphore notify = new Semaphore(0);
2080         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2081         scheduleRequest(request, e -> exception.set(e), notify);
2082         acquire(notify, request);
2083         if(exception.get() != null) throw exception.get();
2084     }
2085
2086     @Override
2087     public void syncRequest(WriteOnly request) throws DatabaseException  {
2088         assertNotSession();
2089         assertAlive();
2090         Semaphore notify = new Semaphore(0);
2091         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2092         scheduleRequest(request, e -> exception.set(e), notify);
2093         acquire(notify, request);
2094         if(exception.get() != null) throw exception.get();
2095     }
2096
2097     /*
2098      *
2099      * GraphRequestProcessor interface
2100      */
2101
2102     @Override
2103     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2104
2105         scheduleRequest(request, procedure, null, null);
2106
2107     }
2108
2109     @Override
2110     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2111
2112         scheduleRequest(request, procedure, null);
2113
2114     }
2115
2116     @Override
2117     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2118
2119         scheduleRequest(request, procedure, null, null, null);
2120
2121     }
2122
2123     @Override
2124     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2125
2126         scheduleRequest(request, callback, null);
2127
2128     }
2129
2130     @Override
2131     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2132
2133         scheduleRequest(request, procedure, null);
2134
2135     }
2136
2137     @Override
2138     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2139
2140         scheduleRequest(request, procedure, null);
2141
2142     }
2143
2144     @Override
2145     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2146
2147         scheduleRequest(request, procedure, null);
2148
2149     }
2150
2151     @Override
2152     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2153
2154         scheduleRequest(request, callback, null);
2155
2156     }
2157
2158     @Override
2159     public void asyncRequest(final Write r) {
2160         asyncRequest(r, null);
2161     }
2162
2163     @Override
2164     public void asyncRequest(final DelayedWrite r) {
2165         asyncRequest(r, null);
2166     }
2167
2168     @Override
2169     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2170
2171         scheduleRequest(request, callback, null);
2172
2173     }
2174
2175     @Override
2176     public void asyncRequest(final WriteOnly request) {
2177
2178         asyncRequest(request, null);
2179
2180     }
2181
2182     @Override
2183     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2184         r.request(this, procedure);
2185     }
2186
2187     @Override
2188     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2189         r.request(this, procedure);
2190     }
2191
2192     @Override
2193     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2194         r.request(this, procedure);
2195     }
2196
2197     @Override
2198     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2199         r.request(this, procedure);
2200     }
2201
2202     @Override
2203     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2204         r.request(this, procedure);
2205     }
2206
2207     @Override
2208     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2209         r.request(this, procedure);
2210     }
2211
2212     @Override
2213     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2214         return r.request(this);
2215     }
2216
2217     @Override
2218     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2219         return r.request(this);
2220     }
2221
2222     @Override
2223     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2224         r.request(this, procedure);
2225     }
2226
2227     @Override
2228     public <T> void async(WriteInterface<T> r) {
2229         r.request(this, new ProcedureAdapter<T>());
2230     }
2231
2232     //@Override
2233     public void incAsync() {
2234         state.incAsync();
2235     }
2236
2237     //@Override
2238     public void decAsync() {
2239         state.decAsync();
2240     }
2241
2242     public long getCluster(ResourceImpl resource) {
2243         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2244         return cluster.getClusterId();
2245     }
2246
2247     public long getCluster(int id) {
2248         if (clusterTable == null)
2249             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2250         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2251     }
2252
2253     public ResourceImpl getResource(int id) {
2254         return new ResourceImpl(resourceSupport, id);
2255     }
2256
2257     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2258         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2259         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2260         int key = proxy.getClusterKey();
2261         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2262         return new ResourceImpl(resourceSupport, resourceKey);
2263     }
2264
2265     public ResourceImpl getResource2(int id) {
2266         assert (id != 0);
2267         return new ResourceImpl(resourceSupport, id);
2268     }
2269
2270     final public int getId(ResourceImpl impl) {
2271         return impl.id;
2272     }
2273
2274     public static final Charset UTF8 = Charset.forName("utf-8");
2275
2276
2277
2278
2279     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2280         for(TransientGraph g : support.providers) {
2281             if(g.isPending(subject)) return false;
2282         }
2283         return true;
2284     }
2285
2286     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2287         for(TransientGraph g : support.providers) {
2288             if(g.isPending(subject, predicate)) return false;
2289         }
2290         return true;
2291     }
2292
2293     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2294
2295         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2296
2297             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2298
2299             @Override
2300             public void accept(ReadGraphImpl graph) {
2301                 if(ready.decrementAndGet() == 0) {
2302                     runnable.accept(graph);
2303                 }
2304             }
2305
2306         };
2307
2308         for(TransientGraph g : support.providers) {
2309             if(g.isPending(subject)) {
2310                 try {
2311                     g.load(graph, subject, composite);
2312                 } catch (DatabaseException e) {
2313                     e.printStackTrace();
2314                 }
2315             } else {
2316                 composite.accept(graph);
2317             }
2318         }
2319
2320         composite.accept(graph);
2321
2322     }
2323
2324     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2325
2326         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2327
2328             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2329
2330             @Override
2331             public void accept(ReadGraphImpl graph) {
2332                 if(ready.decrementAndGet() == 0) {
2333                     runnable.accept(graph);
2334                 }
2335             }
2336
2337         };
2338
2339         for(TransientGraph g : support.providers) {
2340             if(g.isPending(subject, predicate)) {
2341                 try {
2342                     g.load(graph, subject, predicate, composite);
2343                 } catch (DatabaseException e) {
2344                     e.printStackTrace();
2345                 }
2346             } else {
2347                 composite.accept(graph);
2348             }
2349         }
2350
2351         composite.accept(graph);
2352
2353     }
2354
2355 //    void dumpHeap() {
2356 //
2357 //        try {
2358 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2359 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2360 //                    "d:/heap" + flushCounter + ".txt", true);
2361 //        } catch (IOException e) {
2362 //            e.printStackTrace();
2363 //        }
2364 //
2365 //    }
2366
2367     void fireReactionsToSynchronize(ChangeSet cs) {
2368
2369         // Do not fire empty events
2370         if (cs.isEmpty())
2371             return;
2372
2373         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2374 //        g.state.barrier.inc();
2375
2376         try {
2377
2378             if (!cs.isEmpty()) {
2379                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2380                 for (ChangeListener l : changeListeners2) {
2381                     try {
2382                         l.graphChanged(e2);
2383                     } catch (Exception ex) {
2384                         ex.printStackTrace();
2385                     }
2386                 }
2387             }
2388
2389         } finally {
2390
2391 //            g.state.barrier.dec();
2392 //            g.waitAsync(null);
2393
2394         }
2395
2396     }
2397
2398     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2399         try {
2400
2401             // Do not fire empty events
2402             if (cs2.isEmpty())
2403                 return;
2404
2405 //            graph.restart();
2406 //            graph.state.barrier.inc();
2407
2408             try {
2409
2410                 if (!cs2.isEmpty()) {
2411
2412                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2413                     for (ChangeListener l : changeListeners2) {
2414                         try {
2415                             l.graphChanged(e2);
2416 //                            System.out.println("changelistener " + l);
2417                         } catch (Exception ex) {
2418                             ex.printStackTrace();
2419                         }
2420                     }
2421
2422                 }
2423
2424             } finally {
2425
2426 //                graph.state.barrier.dec();
2427 //                graph.waitAsync(null);
2428
2429             }
2430         } catch (Throwable t) {
2431             t.printStackTrace();
2432
2433         }
2434     }
2435     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2436
2437         try {
2438
2439             // Do not fire empty events
2440             if (cs2.isEmpty())
2441                 return;
2442
2443             // Do not fire on virtual requests
2444             if(graph.getProvider() != null)
2445                 return;
2446
2447             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2448
2449             try {
2450
2451                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2452                 for (ChangeListener l : metadataListeners) {
2453                     try {
2454                         l.graphChanged(e2);
2455                     } catch (Throwable ex) {
2456                         ex.printStackTrace();
2457                     }
2458                 }
2459
2460             } finally {
2461
2462             }
2463
2464         } catch (Throwable t) {
2465             t.printStackTrace();
2466         }
2467
2468     }
2469
2470
2471     /*
2472      * (non-Javadoc)
2473      *
2474      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2475      */
2476     @Override
2477     public <T> T getService(Class<T> api) {
2478         T t = peekService(api);
2479         if (t == null) {
2480             if (state.isClosed())
2481                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2482             throw new ServiceNotFoundException(this, api);
2483         }
2484         return t;
2485     }
2486
2487     /**
2488      * @return
2489      */
2490     protected abstract ServerInformation getCachedServerInformation();
2491
2492         private Class<?> serviceKey1 = null;
2493         private Class<?> serviceKey2 = null;
2494         private Object service1 = null;
2495         private Object service2 = null;
2496
2497     /*
2498      * (non-Javadoc)
2499      *
2500      * @see
2501      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2502      */
2503     @SuppressWarnings("unchecked")
2504     @Override
2505     public synchronized <T> T peekService(Class<T> api) {
2506
2507                 if(serviceKey1 == api) {
2508                         return (T)service1;
2509                 } else if (serviceKey2 == api) {
2510                         // Promote this key
2511                         Object result = service2;
2512                         service2 = service1;
2513                         serviceKey2 = serviceKey1;
2514                         service1 = result;
2515                         serviceKey1 = api;
2516                         return (T)result;
2517                 }
2518
2519         if (Layer0.class == api)
2520                 return (T) L0;
2521         if (ServerInformation.class == api)
2522             return (T) getCachedServerInformation();
2523         else if (WriteGraphImpl.class == api)
2524             return (T) writeState.getGraph();
2525         else if (ClusterBuilder.class == api)
2526             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2527         else if (ClusterBuilderFactory.class == api)
2528             return (T)new ClusterBuilderFactoryImpl(this);
2529
2530                 service2 = service1;
2531                 serviceKey2 = serviceKey1;
2532
2533         service1 = serviceLocator.peekService(api);
2534         serviceKey1 = api;
2535
2536         return (T)service1;
2537
2538     }
2539
2540     /*
2541      * (non-Javadoc)
2542      *
2543      * @see
2544      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2545      */
2546     @Override
2547     public boolean hasService(Class<?> api) {
2548         return serviceLocator.hasService(api);
2549     }
2550
2551     /**
2552      * @param api the api that must be implemented by the specified service
2553      * @param service the service implementation
2554      */
2555     @Override
2556     public <T> void registerService(Class<T> api, T service) {
2557         if(Layer0.class == api) {
2558                 L0 = (Layer0)service;
2559                 return;
2560         }
2561         serviceLocator.registerService(api, service);
2562         if (TransactionPolicySupport.class == api) {
2563             transactionPolicy = (TransactionPolicySupport)service;
2564             state.resetTransactionPolicy();
2565         }
2566         if (api == serviceKey1)
2567                 service1 = service;
2568         else if (api == serviceKey2)
2569                 service2 = service;
2570     }
2571
2572     // ----------------
2573     // SessionMonitor
2574     // ----------------
2575
2576     void fireSessionVariableChange(String variable) {
2577         for (MonitorHandler h : monitorHandlers) {
2578             MonitorContext ctx = monitorContexts.getLeft(h);
2579             assert ctx != null;
2580             // SafeRunner functionality repeated here to avoid dependency.
2581             try {
2582                 h.valuesChanged(ctx);
2583             } catch (Exception e) {
2584                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2585             } catch (LinkageError e) {
2586                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2587             }
2588         }
2589     }
2590
2591
2592     class ResourceSerializerImpl implements ResourceSerializer {
2593
2594         public long createRandomAccessId(int id) throws DatabaseException {
2595             if(id < 0)
2596                 return id;
2597             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2598             long cluster = getCluster(id);
2599             if (0 == cluster)
2600                 return 0; // Better to return 0 then invalid id.
2601             long result = ClusterTraitsBase.createResourceId(cluster, index);
2602             return result;
2603         }
2604
2605         @Override
2606         public long getRandomAccessId(Resource resource) throws DatabaseException {
2607
2608             ResourceImpl resourceImpl = (ResourceImpl) resource;
2609             return createRandomAccessId(resourceImpl.id);
2610
2611         }
2612
2613         @Override
2614         public int getTransientId(Resource resource) throws DatabaseException {
2615
2616             ResourceImpl resourceImpl = (ResourceImpl) resource;
2617             return resourceImpl.id;
2618
2619         }
2620
2621         @Override
2622         public String createRandomAccessId(Resource resource)
2623         throws InvalidResourceReferenceException {
2624
2625             if(resource == null) throw new IllegalArgumentException();
2626
2627             ResourceImpl resourceImpl = (ResourceImpl) resource;
2628
2629             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2630
2631             int r;
2632             try {
2633                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2634             } catch (DatabaseException e1) {
2635                 throw new InvalidResourceReferenceException(e1);
2636             }
2637             try {
2638                 // Serialize as '<resource index>_<cluster id>'
2639                 return "" + r + "_" + getCluster(resourceImpl);
2640             } catch (Throwable e) {
2641                 e.printStackTrace();
2642                 throw new InvalidResourceReferenceException(e);
2643             } finally {
2644             }
2645         }
2646
2647         public int getTransientId(long serialized) throws DatabaseException {
2648             if (serialized <= 0)
2649                 return (int)serialized;
2650             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2651             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2652             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2653             if (cluster == null)
2654                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2655             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2656             return key;
2657         }
2658
2659         @Override
2660         public Resource getResource(long randomAccessId) throws DatabaseException {
2661             return getResourceByKey(getTransientId(randomAccessId));
2662         }
2663
2664         @Override
2665         public Resource getResource(int transientId) throws DatabaseException {
2666             return getResourceByKey(transientId);
2667         }
2668
2669         @Override
2670         public Resource getResource(String randomAccessId)
2671         throws InvalidResourceReferenceException {
2672             try {
2673                 int i = randomAccessId.indexOf('_');
2674                 if (i == -1)
2675                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2676                             + randomAccessId + "'");
2677                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2678                 if(r < 0) return getResourceByKey(r);
2679                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2680                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2681                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2682                 if (cluster.hasResource(key, clusterTranslator))
2683                     return getResourceByKey(key);
2684             } catch (InvalidResourceReferenceException e) {
2685                 throw e;
2686             } catch (NumberFormatException e) {
2687                 throw new InvalidResourceReferenceException(e);
2688             } catch (Throwable e) {
2689                 e.printStackTrace();
2690                 throw new InvalidResourceReferenceException(e);
2691             } finally {
2692             }
2693             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2694         }
2695
2696         @Override
2697         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2698             try {
2699                 return true;
2700             } catch (Throwable e) {
2701                 e.printStackTrace();
2702                 throw new InvalidResourceReferenceException(e);
2703             } finally {
2704             }
2705         }
2706     }
2707
2708     // Copied from old SessionImpl
2709
2710     void check() {
2711         if (state.isClosed())
2712             throw new Error("Session closed.");
2713     }
2714
2715
2716
2717     public ResourceImpl getNewResource(long clusterId) {
2718         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2719         int newId;
2720         try {
2721             newId = cluster.createResource(clusterTranslator);
2722         } catch (DatabaseException e) {
2723             Logger.defaultLogError(e);
2724             return null;
2725         }
2726         return new ResourceImpl(resourceSupport, newId);
2727     }
2728
2729     public ResourceImpl getNewResource(Resource clusterSet)
2730     throws DatabaseException {
2731         long resourceId = clusterSet.getResourceId();
2732         Long clusterId = clusterSetsSupport.get(resourceId);
2733         if (null == clusterId)
2734             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2735         if (Constants.NewClusterId == clusterId) {
2736             clusterId = getService(ClusteringSupport.class).createCluster();
2737             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2738                 createdClusters.add(clusterId);
2739             clusterSetsSupport.put(resourceId, clusterId);
2740             return getNewResource(clusterId);
2741         } else {
2742             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2743             ResourceImpl result;
2744             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2745                 clusterId = getService(ClusteringSupport.class).createCluster();
2746                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2747                     createdClusters.add(clusterId);
2748                 clusterSetsSupport.put(resourceId, clusterId);
2749                 return getNewResource(clusterId);
2750             } else {
2751                 result = getNewResource(clusterId);
2752                 int resultKey = querySupport.getId(result);
2753                 long resultCluster = querySupport.getClusterId(resultKey);
2754                 if (clusterId != resultCluster)
2755                     clusterSetsSupport.put(resourceId, resultCluster);
2756                 return result;
2757             }
2758         }
2759     }
2760
2761     public void getNewClusterSet(Resource clusterSet)
2762     throws DatabaseException {
2763         if (DEBUG)
2764             System.out.println("new cluster set=" + clusterSet);
2765         long resourceId = clusterSet.getResourceId();
2766         if (clusterSetsSupport.containsKey(resourceId))
2767             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2768         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2769     }
2770     public boolean containsClusterSet(Resource clusterSet)
2771     throws ServiceException {
2772         long resourceId = clusterSet.getResourceId();
2773         return clusterSetsSupport.containsKey(resourceId);
2774     }
2775     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2776         Resource r = defaultClusterSet;
2777         defaultClusterSet = clusterSet;
2778         return r;
2779     }
2780     void printDiagnostics() {
2781
2782         if (DIAGNOSTICS) {
2783
2784             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2785             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2786             for(ClusterI cluster : clusterTable.getClusters()) {
2787                 try {
2788                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2789                         residentClusters.set(residentClusters.get() + 1);
2790                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2791                     }
2792                 } catch (DatabaseException e) {
2793                     Logger.defaultLogError(e);
2794                 }
2795             }
2796
2797             System.out.println("--------------------------------");
2798             System.out.println("Cluster information:");
2799             System.out.println("-amount of resident clusters=" + residentClusters.get());
2800             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2801
2802             for(ClusterI cluster : clusterTable.getClusters()) {
2803                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2804                 try {
2805                     if (!cluster.isLoaded())
2806                         System.out.println("not loaded]");
2807                     else if (cluster.isEmpty())
2808                         System.out.println("is empty]");
2809                     else {
2810                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2811                         System.out.print(",references=" + cluster.getReferenceCount());
2812                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2813                     }
2814                 } catch (DatabaseException e) {
2815                     Logger.defaultLogError(e);
2816                     System.out.println("is corrupted]");
2817                 }
2818             }
2819
2820             queryProvider2.printDiagnostics();
2821             System.out.println("--------------------------------");
2822         }
2823
2824     }
2825
2826     public void fireStartReadTransaction() {
2827
2828         if (DIAGNOSTICS)
2829             System.out.println("StartReadTransaction");
2830
2831         for (SessionEventListener listener : eventListeners) {
2832             try {
2833                 listener.readTransactionStarted();
2834             } catch (Throwable t) {
2835                 t.printStackTrace();
2836             }
2837         }
2838
2839     }
2840
2841     public void fireFinishReadTransaction() {
2842
2843         if (DIAGNOSTICS)
2844             System.out.println("FinishReadTransaction");
2845
2846         for (SessionEventListener listener : eventListeners) {
2847             try {
2848                 listener.readTransactionFinished();
2849             } catch (Throwable t) {
2850                 t.printStackTrace();
2851             }
2852         }
2853
2854     }
2855
2856     public void fireStartWriteTransaction() {
2857
2858         if (DIAGNOSTICS)
2859             System.out.println("StartWriteTransaction");
2860
2861         for (SessionEventListener listener : eventListeners) {
2862             try {
2863                 listener.writeTransactionStarted();
2864             } catch (Throwable t) {
2865                 t.printStackTrace();
2866             }
2867         }
2868
2869     }
2870
2871     public void fireFinishWriteTransaction() {
2872
2873         if (DIAGNOSTICS)
2874             System.out.println("FinishWriteTransaction");
2875
2876         for (SessionEventListener listener : eventListeners) {
2877             try {
2878                 listener.writeTransactionFinished();
2879             } catch (Throwable t) {
2880                 t.printStackTrace();
2881             }
2882         }
2883
2884         Indexing.resetDependenciesIndexingDisabled();
2885
2886     }
2887
2888     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2889         throw new Error("Not supported at the moment.");
2890     }
2891
2892     State getState() {
2893         return state;
2894     }
2895
2896     ClusterTable getClusterTable() {
2897         return clusterTable;
2898     }
2899
2900     public GraphSession getGraphSession() {
2901         return graphSession;
2902     }
2903
2904     public QueryProcessor getQueryProvider2() {
2905         return queryProvider2;
2906     }
2907
2908     ClientChangesImpl getClientChanges() {
2909         return clientChanges;
2910     }
2911
2912     boolean getWriteOnly() {
2913         return writeOnly;
2914     }
2915
2916     static int counter = 0;
2917
2918     public void onClusterLoaded(long clusterId) {
2919
2920         clusterTable.updateSize();
2921
2922     }
2923
2924
2925
2926
2927     /*
2928      * Implementation of the interface RequestProcessor
2929      */
2930
2931     @Override
2932     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2933
2934         assertNotSession();
2935         assertAlive();
2936
2937         assert(request != null);
2938
2939         final DataContainer<T> result = new DataContainer<T>();
2940         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2941
2942         syncRequest(request, new AsyncProcedure<T>() {
2943
2944             @Override
2945             public void execute(AsyncReadGraph graph, T t) {
2946                 result.set(t);
2947             }
2948
2949             @Override
2950             public void exception(AsyncReadGraph graph, Throwable t) {
2951                 exception.set(t);
2952             }
2953
2954         });
2955
2956         Throwable t = exception.get();
2957         if(t != null) {
2958             if(t instanceof DatabaseException) throw (DatabaseException)t;
2959             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2960         }
2961
2962         return result.get();
2963
2964     }
2965
2966 //    @Override
2967 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2968 //        assertNotSession();
2969 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2970 //    }
2971
2972     @Override
2973     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2974         assertNotSession();
2975         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2976     }
2977
2978     @Override
2979     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2980         assertNotSession();
2981         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2982     }
2983
2984     @Override
2985     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2986         assertNotSession();
2987         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2988     }
2989
2990     @Override
2991     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2992         assertNotSession();
2993         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2994     }
2995
2996     @Override
2997     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2998
2999         assertNotSession();
3000
3001         assert(request != null);
3002
3003         final DataContainer<T> result = new DataContainer<T>();
3004         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3005
3006         syncRequest(request, new AsyncProcedure<T>() {
3007
3008             @Override
3009             public void execute(AsyncReadGraph graph, T t) {
3010                 result.set(t);
3011             }
3012
3013             @Override
3014             public void exception(AsyncReadGraph graph, Throwable t) {
3015                 exception.set(t);
3016             }
3017
3018         });
3019
3020         Throwable t = exception.get();
3021         if(t != null) {
3022             if(t instanceof DatabaseException) throw (DatabaseException)t;
3023             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3024         }
3025
3026         return result.get();
3027
3028     }
3029
3030     @Override
3031     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3032         assertNotSession();
3033         return syncRequest(request, (AsyncProcedure<T>)procedure);
3034     }
3035
3036     @Override
3037     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3038         assertNotSession();
3039         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3040     }
3041
3042     @Override
3043     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3044         assertNotSession();
3045         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3046     }
3047
3048     @Override
3049     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3050         assertNotSession();
3051         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3052     }
3053
3054     @Override
3055     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3056         assertNotSession();
3057         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3058     }
3059
3060     @Override
3061     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3062
3063         assertNotSession();
3064
3065         assert(request != null);
3066
3067         final ArrayList<T> result = new ArrayList<T>();
3068         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3069
3070         syncRequest(request, new AsyncMultiProcedure<T>() {
3071
3072             @Override
3073             public void execute(AsyncReadGraph graph, T t) {
3074                 synchronized(result) {
3075                     result.add(t);
3076                 }
3077             }
3078
3079             @Override
3080             public void finished(AsyncReadGraph graph) {
3081             }
3082
3083             @Override
3084             public void exception(AsyncReadGraph graph, Throwable t) {
3085                 exception.set(t);
3086             }
3087
3088
3089         });
3090
3091         Throwable t = exception.get();
3092         if(t != null) {
3093             if(t instanceof DatabaseException) throw (DatabaseException)t;
3094             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3095         }
3096
3097         return result;
3098
3099     }
3100
3101     @Override
3102     public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3103         assertNotSession();
3104         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3105     }
3106
3107     @Override
3108     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3109         assertNotSession();
3110         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3111     }
3112
3113     @Override
3114     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3115         assertNotSession();
3116         return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3117     }
3118
3119     @Override
3120     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3121         assertNotSession();
3122         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3123     }
3124
3125     @Override
3126     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3127         assertNotSession();
3128         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3129     }
3130
3131     @Override
3132     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3133
3134         assertNotSession();
3135
3136         assert(request != null);
3137
3138         final ArrayList<T> result = new ArrayList<T>();
3139         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3140
3141         syncRequest(request, new AsyncMultiProcedure<T>() {
3142
3143             @Override
3144             public void execute(AsyncReadGraph graph, T t) {
3145                 synchronized(result) {
3146                     result.add(t);
3147                 }
3148             }
3149
3150             @Override
3151             public void finished(AsyncReadGraph graph) {
3152             }
3153
3154             @Override
3155             public void exception(AsyncReadGraph graph, Throwable t) {
3156                 exception.set(t);
3157             }
3158
3159
3160         });
3161
3162         Throwable t = exception.get();
3163         if(t != null) {
3164             if(t instanceof DatabaseException) throw (DatabaseException)t;
3165             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3166         }
3167
3168         return result;
3169
3170     }
3171
3172     @Override
3173     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3174         assertNotSession();
3175         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3176     }
3177
3178     @Override
3179     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3180         assertNotSession();
3181         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3182     }
3183
3184     @Override
3185     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3186         assertNotSession();
3187        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3188     }
3189
3190     @Override
3191     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3192         assertNotSession();
3193         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3194     }
3195
3196     @Override
3197     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3198         assertNotSession();
3199         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3200     }
3201
3202
3203     @Override
3204     public <T> void asyncRequest(Read<T> request) {
3205
3206         asyncRequest(request, new ProcedureAdapter<T>() {
3207             @Override
3208             public void exception(Throwable t) {
3209                 t.printStackTrace();
3210             }
3211         });
3212
3213     }
3214
3215     @Override
3216     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3217         asyncRequest(request, (AsyncProcedure<T>)procedure);
3218     }
3219
3220     @Override
3221     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3222         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3223     }
3224
3225     @Override
3226     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3227         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3228     }
3229
3230     @Override
3231     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3232         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3233     }
3234
3235     @Override
3236     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3237         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3238     }
3239
3240     @Override
3241     final public <T> void asyncRequest(final AsyncRead<T> request) {
3242
3243         assert(request != null);
3244
3245         asyncRequest(request, new ProcedureAdapter<T>() {
3246             @Override
3247             public void exception(Throwable t) {
3248                 t.printStackTrace();
3249             }
3250         });
3251
3252     }
3253
3254     @Override
3255     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3256         scheduleRequest(request, procedure, procedure, null);
3257     }
3258
3259     @Override
3260     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3261         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3262     }
3263
3264     @Override
3265     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3266         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3267     }
3268
3269     @Override
3270     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3271         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3272     }
3273
3274     @Override
3275     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3276         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3277     }
3278
3279     @Override
3280     public <T> void asyncRequest(MultiRead<T> request) {
3281
3282         assert(request != null);
3283
3284         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3285             @Override
3286             public void exception(AsyncReadGraph graph, Throwable t) {
3287                 t.printStackTrace();
3288             }
3289         });
3290
3291     }
3292
3293     @Override
3294     public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3295         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3296     }
3297
3298     @Override
3299     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3300         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3301     }
3302
3303     @Override
3304     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3305         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3306     }
3307
3308     @Override
3309     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3310         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3311     }
3312
3313     @Override
3314     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3315         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3316     }
3317
3318     @Override
3319     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3320
3321         assert(request != null);
3322
3323         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3324             @Override
3325             public void exception(AsyncReadGraph graph, Throwable t) {
3326                 t.printStackTrace();
3327             }
3328         });
3329
3330     }
3331
3332     @Override
3333     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3334         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3335     }
3336
3337     @Override
3338     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3339         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3340     }
3341
3342     @Override
3343     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3344         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3345     }
3346
3347     @Override
3348     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3349         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3350     }
3351
3352     @Override
3353     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3354         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3355     }
3356
3357     @Override
3358     public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3359         assertNotSession();
3360         throw new Error("Not implemented!");
3361     }
3362
3363     @Override
3364     public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3365         throw new Error("Not implemented!");
3366     }
3367
3368     @Override
3369     final public <T> void asyncRequest(final ExternalRead<T> request) {
3370
3371         assert(request != null);
3372
3373         asyncRequest(request, new ProcedureAdapter<T>() {
3374             @Override
3375             public void exception(Throwable t) {
3376                 t.printStackTrace();
3377             }
3378         });
3379
3380     }
3381
3382     @Override
3383     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3384         asyncRequest(request, (Procedure<T>)procedure);
3385     }
3386
3387
3388
3389     void check(Throwable t) throws DatabaseException {
3390         if(t != null) {
3391             if(t instanceof DatabaseException) throw (DatabaseException)t;
3392             else throw new DatabaseException("Unexpected exception", t);
3393         }
3394     }
3395
3396     void check(DataContainer<Throwable> container) throws DatabaseException {
3397         Throwable t = container.get();
3398         if(t != null) {
3399             if(t instanceof DatabaseException) throw (DatabaseException)t;
3400             else throw new DatabaseException("Unexpected exception", t);
3401         }
3402     }
3403
3404
3405
3406
3407
3408
3409     boolean sameProvider(Write request) {
3410         if(writeState.getGraph().provider != null) {
3411             return writeState.getGraph().provider.equals(request.getProvider());
3412         } else {
3413             return request.getProvider() == null;
3414         }
3415     }
3416
3417     boolean plainWrite(WriteGraphImpl graph) {
3418         if(graph == null) return false;
3419         if(graph.writeSupport.writeOnly()) return false;
3420         return true;
3421     }
3422
3423
3424     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3425     private void assertNotSession() throws DatabaseException {
3426         Thread current = Thread.currentThread();
3427         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3428     }
3429
3430     void assertAlive() {
3431         if (!state.isAlive())
3432             throw new RuntimeDatabaseException("Session has been shut down.");
3433     }
3434
3435     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3436         return querySupport.getValueStream(graph, querySupport.getId(resource));
3437     }
3438
3439     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3440         return querySupport.getValue(graph, querySupport.getId(resource));
3441     }
3442
3443     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3444         acquire(semaphore, request, null);
3445     }
3446
3447     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3448         assertAlive();
3449         try {
3450             long tryCount = 0;
3451             // Loop until the semaphore is acquired reporting requests that take a long time.
3452             while (true) {
3453
3454                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3455                     // Acquired OK.
3456                     return;
3457                 } else {
3458                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3459                         ++tryCount;
3460                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3461                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3462                                 * tryCount;
3463                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3464                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3465                     }
3466                 }
3467
3468                 assertAlive();
3469
3470             }
3471         } catch (InterruptedException e) {
3472             e.printStackTrace();
3473             // FIXME: Should perhaps do something else in this case ??
3474         }
3475     }
3476
3477
3478
3479 //    @Override
3480 //    public boolean holdOnToTransactionAfterCancel() {
3481 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3482 //    }
3483 //
3484 //    @Override
3485 //    public boolean holdOnToTransactionAfterCommit() {
3486 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3487 //    }
3488 //
3489 //    @Override
3490 //    public boolean holdOnToTransactionAfterRead() {
3491 //        return transactionPolicy.holdOnToTransactionAfterRead();
3492 //    }
3493 //
3494 //    @Override
3495 //    public void onRelinquish() {
3496 //        transactionPolicy.onRelinquish();
3497 //    }
3498 //
3499 //    @Override
3500 //    public void onRelinquishDone() {
3501 //        transactionPolicy.onRelinquishDone();
3502 //    }
3503 //
3504 //    @Override
3505 //    public void onRelinquishError() {
3506 //        transactionPolicy.onRelinquishError();
3507 //    }
3508
3509
3510     @Override
3511     public Session getSession() {
3512         return this;
3513     }
3514
3515
3516     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3517     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3518
3519     public void ceased(int thread) {
3520
3521         requestManager.ceased(thread);
3522
3523     }
3524
3525     public int getAmountOfQueryThreads() {
3526         // This must be a power of two
3527         return 16;
3528 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3529     }
3530
3531     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3532
3533         return new ResourceImpl(resourceSupport, key);
3534
3535     }
3536
3537     public void acquireWriteOnly() {
3538         writeOnly = true;
3539     }
3540
3541     public void releaseWriteOnly(ReadGraphImpl graph) {
3542         writeOnly = false;
3543         queryProvider2.releaseWrite(graph);
3544     }
3545
3546     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3547
3548         long start = System.nanoTime();
3549
3550         while(dirtyPrimitives) {
3551             dirtyPrimitives = false;
3552             getQueryProvider2().performDirtyUpdates(writer);
3553             getQueryProvider2().performScheduledUpdates(writer);
3554         }
3555
3556         fireMetadataListeners(writer, clientChanges);
3557
3558         if(DebugPolicy.PERFORMANCE_DATA) {
3559             long end = System.nanoTime();
3560             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3561         }
3562
3563     }
3564
3565     void removeTemporaryData() {
3566         File platform = Platform.getLocation().toFile();
3567         File tempFiles = new File(platform, "tempFiles");
3568         File temp = new File(tempFiles, "db");
3569         if(!temp.exists()) 
3570             return;
3571         try {
3572             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3573                 @Override
3574                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3575                     try {
3576                         Files.delete(file);
3577                     } catch (IOException e) {
3578                         Logger.defaultLogError(e);
3579                     }
3580                     return FileVisitResult.CONTINUE;
3581                 }
3582             });
3583         } catch (IOException e) {
3584             Logger.defaultLogError(e);
3585         }
3586     }
3587
3588     public void handleCreatedClusters() {
3589         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3590             createdClusters.forEach(new TLongProcedure() {
3591
3592                 @Override
3593                 public boolean execute(long value) {
3594                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3595                     cluster.setImmutable(true, clusterTranslator);
3596                     return true;
3597                 }
3598             });
3599         }
3600         createdClusters.clear();
3601     }
3602
3603     @Override
3604     public Object getModificationCounter() {
3605         return queryProvider2.modificationCounter;
3606     }
3607     
3608     @Override
3609     public void markUndoPoint() {
3610         state.setCombine(false);
3611     }
3612
3613 }