]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Yet another fixing commit
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.GraphChangeListenerSupport;
147 import org.simantics.db.service.InitSupport;
148 import org.simantics.db.service.LifecycleSupport;
149 import org.simantics.db.service.ManagementSupport;
150 import org.simantics.db.service.QueryControl;
151 import org.simantics.db.service.SerialisationSupport;
152 import org.simantics.db.service.ServerInformation;
153 import org.simantics.db.service.ServiceActivityMonitor;
154 import org.simantics.db.service.SessionEventSupport;
155 import org.simantics.db.service.SessionMonitorSupport;
156 import org.simantics.db.service.SessionUserSupport;
157 import org.simantics.db.service.StatementSupport;
158 import org.simantics.db.service.TransactionPolicySupport;
159 import org.simantics.db.service.TransactionSupport;
160 import org.simantics.db.service.TransferableGraphSupport;
161 import org.simantics.db.service.UndoRedoSupport;
162 import org.simantics.db.service.VirtualGraphSupport;
163 import org.simantics.db.service.XSupport;
164 import org.simantics.layer0.Layer0;
165 import org.simantics.utils.DataContainer;
166 import org.simantics.utils.Development;
167 import org.simantics.utils.threads.logger.ITask;
168 import org.simantics.utils.threads.logger.ThreadLogger;
169
170 import gnu.trove.procedure.TLongProcedure;
171 import gnu.trove.set.hash.TLongHashSet;
172
173
174 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
175
176     protected static final boolean DEBUG        = false;
177
178     private static final boolean DIAGNOSTICS       = false;
179
180     private TransactionPolicySupport transactionPolicy;
181
182     final private ClusterControl clusterControl;
183
184     final protected BuiltinSupportImpl builtinSupport;
185     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
186     final protected ClusterSetsSupport clusterSetsSupport;
187     final protected LifecycleSupportImpl lifecycleSupport;
188
189     protected QuerySupportImpl querySupport;
190     protected ResourceSupportImpl resourceSupport;
191     protected WriteSupport writeSupport;
192     public ClusterTranslator clusterTranslator;
193
194     boolean dirtyPrimitives = false;
195
196     public static final int SERVICE_MODE_CREATE = 2;
197     public static final int SERVICE_MODE_ALLOW = 1;
198     public int serviceMode = 0;
199     public boolean createdImmutableClusters = false;
200     public TLongHashSet createdClusters = new TLongHashSet();
201
202     private Layer0 L0;
203
204     /**
205      * The service locator maintained by the workbench. These services are
206      * initialized during workbench during the <code>init</code> method.
207      */
208     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
209
210     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
211
212     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
213
214     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
215
216     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
217
218     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
219
220     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
221
222     final protected State                                        state              = new State();
223
224     protected GraphSession                                       graphSession       = null;
225
226     protected SessionManager                                     sessionManagerImpl = null;
227
228     protected UserAuthenticationAgent                            authAgent          = null;
229
230     protected UserAuthenticator                                  authenticator      = null;
231
232     protected Resource                                           user               = null;
233
234     protected ClusterStream                                      clusterStream      = null;
235
236     protected SessionRequestManager                         requestManager = null;
237
238     public ClusterTable                                       clusterTable       = null;
239
240     public QueryProcessor                                     queryProvider2 = null;
241
242     ClientChangesImpl                                  clientChanges      = null;
243
244     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
245
246 //    protected long                                               newClusterId       = Constants.NullClusterId;
247
248     protected int                                                flushCounter       = 0;
249
250     protected boolean                                            writeOnly          = false;
251
252     WriteState<?>                                                writeState = null;
253     WriteStateBase<?>                                            delayedWriteState = null;
254     protected Resource defaultClusterSet = null; // If not null then used for newResource().
255
256     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
257
258 //        if (authAgent == null)
259 //            throw new IllegalArgumentException("null authentication agent");
260
261         File t = StaticSessionProperties.virtualGraphStoragePath;
262         if (null == t)
263             t = new File(".");
264         this.clusterTable = new ClusterTable(this, t);
265         this.builtinSupport = new BuiltinSupportImpl(this);
266         this.sessionManagerImpl = sessionManagerImpl;
267         this.user = null;
268 //        this.authAgent = authAgent;
269
270         serviceLocator.registerService(Session.class, this);
271         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
272         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
273         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
274         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
275         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
276         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
277         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
278         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
279         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
280         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
281         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
282         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
283         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
284         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
285         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
286         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
287         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
288         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
289         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
290         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
291         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
292         ServiceActivityUpdaterForWriteTransactions.register(this);
293
294         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
295         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
296         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
297         this.lifecycleSupport = new LifecycleSupportImpl(this);
298         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
299         this.transactionPolicy = new TransactionPolicyRelease();
300         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
301         this.clusterControl = new ClusterControlImpl(this);
302         serviceLocator.registerService(ClusterControl.class, clusterControl);
303         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
304         this.clusterSetsSupport.setReadDirectory(t.toPath());
305         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
306         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
307     }
308
309     /*
310      *
311      * Session interface
312      */
313
314
315     @Override
316     public Resource getRootLibrary() {
317         return queryProvider2.getRootLibraryResource();
318     }
319
320     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
321         if (!graphSession.dbSession.refreshEnabled())
322             return;
323         try {
324             getClusterTable().refresh(csid, this, clusterUID);
325         } catch (Throwable t) {
326             Logger.defaultLogError("Refesh failed.", t);
327         }
328     }
329
330     static final class TaskHelper {
331         private final String name;
332         private Object result;
333         final Semaphore sema = new Semaphore(0);
334         private Throwable throwable = null;
335         final Consumer<DatabaseException> callback = e -> {
336             synchronized (TaskHelper.this) {
337                 throwable = e;
338             }
339         };
340         final Procedure<Object> proc = new Procedure<Object>() {
341             @Override
342             public void execute(Object result) {
343                 callback.accept(null);
344             }
345             @Override
346             public void exception(Throwable t) {
347                 if (t instanceof DatabaseException)
348                     callback.accept((DatabaseException)t);
349                 else
350                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
351             }
352         };
353         final WriteTraits writeTraits = new WriteTraits() {};
354         TaskHelper(String name) {
355             this.name = name;
356         }
357         @SuppressWarnings("unchecked")
358         <T> T getResult() {
359             return (T)result;
360         }
361         void setResult(Object result) {
362             this.result = result;
363         }
364 //        Throwable throwableGet() {
365 //            return throwable;
366 //        }
367         synchronized void throwableSet(Throwable t) {
368             throwable = t;
369         }
370 //        void throwableSet(String t) {
371 //            throwable = new InternalException("" + name + " operation failed. " + t);
372 //        }
373         synchronized void throwableCheck()
374         throws DatabaseException {
375             if (null != throwable)
376                 if (throwable instanceof DatabaseException)
377                     throw (DatabaseException)throwable;
378                 else
379                     throw new DatabaseException("Undo operation failed.", throwable);
380         }
381         void throw_(String message)
382         throws DatabaseException {
383             throw new DatabaseException("" + name + " operation failed. " + message);
384         }
385     }
386
387
388
389     private ListenerBase getListenerBase(Object procedure) {
390         if (procedure instanceof ListenerBase)
391             return (ListenerBase) procedure;
392         else
393             return null;
394     }
395
396     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
397         scheduleRequest(request, callback, notify, null);
398     }
399
400     /* (non-Javadoc)
401      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
402      */
403     @Override
404     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
405
406         assert (request != null);
407
408         if(Development.DEVELOPMENT) {
409             try {
410             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
411                 System.err.println("schedule write '" + request + "'");
412             } catch (Throwable t) {
413                 Logger.defaultLogError(t);
414             }
415         }
416
417         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
418
419         requestManager.scheduleWrite(new SessionTask(true) {
420
421             @Override
422             public void run(int thread) {
423
424                 if(Development.DEVELOPMENT) {
425                     try {
426                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
427                         System.err.println("perform write '" + request + "'");
428                     } catch (Throwable t) {
429                         Logger.defaultLogError(t);
430                     }
431                 }
432
433                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
434
435                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
436
437                 try {
438
439                     flushCounter = 0;
440                     Disposable.safeDispose(clientChanges);
441                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
442
443                     VirtualGraph vg = getProvider(request.getProvider());
444
445                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
446                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
447
448                         @Override
449                         public void execute(Object result) {
450                             if(callback != null) callback.accept(null);
451                         }
452
453                         @Override
454                         public void exception(Throwable t) {
455                             if(callback != null) callback.accept((DatabaseException)t);
456                         }
457
458                     });
459
460                     assert (null != writer);
461 //                    writer.state.barrier.inc();
462
463                     try {
464                         request.perform(writer);
465                         assert (null != writer);
466                     } catch (Throwable t) {
467                         if (!(t instanceof CancelTransactionException))
468                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
469                         writeState.except(t);
470                     } finally {
471 //                        writer.state.barrier.dec();
472 //                        writer.waitAsync(request);
473                     }
474
475
476                     assert(!queryProvider2.cache.dirty);
477
478                 } catch (Throwable e) {
479
480                     // Log it first, just to be safe that the error is always logged.
481                     if (!(e instanceof CancelTransactionException))
482                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
483
484 //                    writeState.getGraph().state.barrier.dec();
485 //                    writeState.getGraph().waitAsync(request);
486
487                     writeState.except(e);
488
489 //                    try {
490 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
491 //                        // All we can do here is to log those, can't really pass them anywhere.
492 //                        if (callback != null) {
493 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
494 //                            else callback.run(new DatabaseException(e));
495 //                        }
496 //                    } catch (Throwable e2) {
497 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
498 //                    }
499 //                    boolean empty = clusterStream.reallyFlush();
500 //                    int callerThread = -1;
501 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
502 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
503 //                    try {
504 //                        // Send all local changes to server so it can calculate correct reverse change set.
505 //                        // This will call clusterStream.accept().
506 //                        state.cancelCommit(context, clusterStream);
507 //                        if (!empty) {
508 //                            if (!context.isOk()) // this is a blocking operation
509 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
510 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
511 //                        }
512 //                        state.cancelCommit2(context, clusterStream);
513 //                    } catch (DatabaseException e3) {
514 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
515 //                    } finally {
516 //                        clusterStream.setOff(false);
517 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
518 //                    }
519
520                 } finally {
521
522                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
523
524                 }
525
526                 task.finish();
527
528                 if(Development.DEVELOPMENT) {
529                     try {
530                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
531                         System.err.println("finish write '" + request + "'");
532                     } catch (Throwable t) {
533                         Logger.defaultLogError(t);
534                     }
535                 }
536
537             }
538
539         }, combine);
540
541     }
542
543     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
544         scheduleRequest(request, procedure, notify, null);
545     }
546
547     /* (non-Javadoc)
548      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
549      */
550     @Override
551     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
552
553         assert (request != null);
554
555         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
556
557         requestManager.scheduleWrite(new SessionTask(true) {
558
559             @Override
560             public void run(int thread) {
561
562                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
563
564                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
565
566                 flushCounter = 0;
567                 Disposable.safeDispose(clientChanges);
568                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
569
570                 VirtualGraph vg = getProvider(request.getProvider());
571                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
572
573                 try {
574                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
575                     writeState = writeStateT;
576
577                     assert (null != writer);
578 //                    writer.state.barrier.inc();
579                     writeStateT.setResult(request.perform(writer));
580                     assert (null != writer);
581
582 //                    writer.state.barrier.dec();
583 //                    writer.waitAsync(null);
584
585                 } catch (Throwable e) {
586
587 //                    writer.state.barrier.dec();
588 //                    writer.waitAsync(null);
589
590                     writeState.except(e);
591
592 //                  state.stopWriteTransaction(clusterStream);
593 //
594 //              } catch (Throwable e) {
595 //                  // Log it first, just to be safe that the error is always logged.
596 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
597 //
598 //                  try {
599 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
600 //                      // All we can do here is to log those, can't really pass them anywhere.
601 //                      if (procedure != null) {
602 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
603 //                          else procedure.exception(new DatabaseException(e));
604 //                      }
605 //                  } catch (Throwable e2) {
606 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
607 //                  }
608 //
609 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
610 //
611 //                  state.stopWriteTransaction(clusterStream);
612
613                 } finally {
614                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
615                 }
616
617 //              if(notify != null) notify.release();
618
619                 task.finish();
620
621             }
622
623         }, combine);
624
625     }
626
627     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
628         scheduleRequest(request, callback, notify, null);
629     }
630
631     /* (non-Javadoc)
632      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
633      */
634     @Override
635     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
636
637         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
638
639         assert (request != null);
640
641         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
642
643         requestManager.scheduleWrite(new SessionTask(true) {
644
645             @Override
646             public void run(int thread) {
647                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
648
649                 Procedure<Object> stateProcedure = new Procedure<Object>() {
650                     @Override
651                     public void execute(Object result) {
652                         if (callback != null)
653                             callback.accept(null);
654                     }
655                     @Override
656                     public void exception(Throwable t) {
657                         if (callback != null) {
658                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
659                             else callback.accept(new DatabaseException(t));
660                         } else
661                             Logger.defaultLogError("Unhandled exception", t);
662                     }
663                 };
664
665                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
666                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
667                 DelayedWriteGraph dwg = null;
668 //                newGraph.state.barrier.inc();
669
670                 try {
671                     dwg = new DelayedWriteGraph(newGraph);
672                     request.perform(dwg);
673                 } catch (Throwable e) {
674                     delayedWriteState.except(e);
675                     total.finish();
676                     dwg.close();
677                     return;
678                 } finally {
679 //                    newGraph.state.barrier.dec();
680 //                    newGraph.waitAsync(request);
681                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
682                 }
683
684                 delayedWriteState = null;
685
686                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
687                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
688
689                 flushCounter = 0;
690                 Disposable.safeDispose(clientChanges);
691                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
692
693                 acquireWriteOnly();
694
695                 VirtualGraph vg = getProvider(request.getProvider());
696                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
697
698                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
699
700                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
701
702                 assert (null != writer);
703 //                writer.state.barrier.inc();
704
705                 try {
706                     // Cannot cancel
707                     dwg.commit(writer, request);
708
709                         if(defaultClusterSet != null) {
710                                 XSupport xs = getService(XSupport.class);
711                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
712                                 ClusteringSupport cs = getService(ClusteringSupport.class);
713                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
714                         }
715
716                     // This makes clusters available from server
717                     clusterStream.reallyFlush();
718
719                     releaseWriteOnly(writer);
720                     handleUpdatesAndMetadata(writer);
721
722                 } catch (ServiceException e) {
723 //                    writer.state.barrier.dec();
724 //                    writer.waitAsync(null);
725
726                     // These shall be requested from server
727                     clusterTable.removeWriteOnlyClusters();
728                     // This makes clusters available from server
729                     clusterStream.reallyFlush();
730
731                     releaseWriteOnly(writer);
732                     writeState.except(e);
733                 } finally {
734                     // Debugging & Profiling
735                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
736                     task2.finish();
737                     total.finish();
738                 }
739             }
740
741         }, combine);
742
743     }
744
745     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
746         scheduleRequest(request, procedure, notify, null);
747     }
748
749     /* (non-Javadoc)
750      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
751      */
752     @Override
753     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
754         throw new Error("Not implemented");
755     }
756
757     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
758         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
759         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
760             createdClusters.add(cluster.clusterId);
761         }
762         return cluster;
763     }
764
765     class WriteOnlySupport implements WriteSupport {
766
767         ClusterStream stream;
768         ClusterImpl currentCluster;
769
770         public WriteOnlySupport() {
771             this.stream = clusterStream;
772         }
773
774         @Override
775         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
776             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
777         }
778
779         @Override
780         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
781
782             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
783
784             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
785                 if(s != queryProvider2.getRootLibrary())
786                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
787
788             try {
789                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
790             } catch (DatabaseException e) {
791                 Logger.defaultLogError(e);
792                 //throw e;
793             }
794
795             clientChanges.invalidate(s);
796
797             if (cluster.isWriteOnly())
798                 return;
799             queryProvider2.updateStatements(s, p);
800
801         }
802
803         @Override
804         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
805             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
806         }
807
808         @Override
809         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
810
811             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
812             try {
813                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
814             } catch (DatabaseException e) {
815                 Logger.defaultLogError(e);
816             }
817
818             clientChanges.invalidate(rid);
819
820             if (cluster.isWriteOnly())
821                 return;
822             queryProvider2.updateValue(rid);
823
824         }
825
826         @Override
827         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
828
829             if(amount < 65536) {
830                 claimValue(provider,resource, reader.readBytes(null, amount));
831                 return;
832             }
833
834             byte[] bytes = new byte[65536];
835
836             int rid = ((ResourceImpl)resource).id;
837             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
838             try {
839                 int left = amount;
840                 while(left > 0) {
841                     int block = Math.min(left, 65536);
842                     reader.readBytes(bytes, block);
843                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
844                     left -= block;
845                 }
846             } catch (DatabaseException e) {
847                 Logger.defaultLogError(e);
848             }
849
850             clientChanges.invalidate(rid);
851
852             if (cluster.isWriteOnly())
853                 return;
854             queryProvider2.updateValue(rid);
855
856         }
857
858         private void maintainCluster(ClusterImpl before, ClusterI after_) {
859             if(after_ != null && after_ != before) {
860                 ClusterImpl after = (ClusterImpl)after_;
861                 if(currentCluster == before) {
862                     currentCluster = after;
863                 }
864                 clusterTable.replaceCluster(after);
865             }
866         }
867
868         public int createResourceKey(int foreignCounter) throws DatabaseException {
869             if(currentCluster == null) {
870                 currentCluster = getNewResourceCluster();
871             }
872             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
873                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
874                 newCluster.foreignLookup = new byte[foreignCounter];
875                 currentCluster = newCluster;
876                 if (DEBUG)
877                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
878             }
879             return currentCluster.createResource(clusterTranslator);
880         }
881
882         @Override
883         public Resource createResource(VirtualGraph provider) throws DatabaseException {
884             if(currentCluster == null) {
885                 if (null != defaultClusterSet) {
886                         ResourceImpl result = getNewResource(defaultClusterSet);
887                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
888                     return result;
889                 } else {
890                     currentCluster = getNewResourceCluster();
891                 }
892             }
893             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
894                 if (null != defaultClusterSet) {
895                         ResourceImpl result = getNewResource(defaultClusterSet);
896                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
897                     return result;
898                 } else {
899                     currentCluster = getNewResourceCluster();
900                 }
901             }
902             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
903         }
904
905         @Override
906         public Resource createResource(VirtualGraph provider, long clusterId)
907         throws DatabaseException {
908             return getNewResource(clusterId);
909         }
910
911         @Override
912         public Resource createResource(VirtualGraph provider, Resource clusterSet)
913         throws DatabaseException {
914             return getNewResource(clusterSet);
915         }
916
917         @Override
918         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
919                 throws DatabaseException {
920             getNewClusterSet(clusterSet);
921         }
922
923         @Override
924         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
925         throws ServiceException {
926             return containsClusterSet(clusterSet);
927         }
928
929         public void selectCluster(long cluster) {
930                 currentCluster = clusterTable.getClusterByClusterId(cluster);
931                 long setResourceId = clusterSetsSupport.getSet(cluster);
932                 clusterSetsSupport.put(setResourceId, cluster);
933         }
934
935         @Override
936         public Resource setDefaultClusterSet(Resource clusterSet)
937         throws ServiceException {
938                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
939                 if(clusterSet != null) {
940                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
941                         currentCluster = clusterTable.getClusterByClusterId(id);
942                         return result;
943                 } else {
944                         currentCluster = null;
945                         return null;
946                 }
947         }
948
949         @Override
950         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
951
952                  provider = getProvider(provider);
953              if (null == provider) {
954                  int key = ((ResourceImpl)resource).id;
955
956
957
958                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
959 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
960 //                      if(key != queryProvider2.getRootLibrary())
961 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
962
963 //                 try {
964 //                     cluster.removeValue(key, clusterTranslator);
965 //                 } catch (DatabaseException e) {
966 //                     Logger.defaultLogError(e);
967 //                     return;
968 //                 }
969
970                  clusterTable.writeOnlyInvalidate(cluster);
971
972                  try {
973                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
974                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
975                          clusterTranslator.removeValue(cluster);
976                  } catch (DatabaseException e) {
977                          Logger.defaultLogError(e);
978                  }
979
980                  queryProvider2.invalidateResource(key);
981                  clientChanges.invalidate(key);
982
983              } else {
984                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
985                  queryProvider2.updateValue(querySupport.getId(resource));
986                  clientChanges.claimValue(resource);
987              }
988
989
990         }
991
992         @Override
993         public void flush(boolean intermediate) {
994             throw new UnsupportedOperationException();
995         }
996
997         @Override
998         public void flushCluster() {
999             clusterTable.flushCluster(graphSession);
1000             if(defaultClusterSet != null) {
1001                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
1002             }
1003             currentCluster = null;
1004         }
1005
1006         @Override
1007         public void flushCluster(Resource r) {
1008             throw new UnsupportedOperationException("flushCluster resource " + r);
1009         }
1010
1011         @Override
1012         public void gc() {
1013         }
1014
1015         @Override
1016         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1017                 Resource object) {
1018
1019                 int s = ((ResourceImpl)subject).id;
1020                 int p = ((ResourceImpl)predicate).id;
1021                 int o = ((ResourceImpl)object).id;
1022
1023                 provider = getProvider(provider);
1024                 if (null == provider) {
1025
1026                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1027                         clusterTable.writeOnlyInvalidate(cluster);
1028
1029                         try {
1030
1031                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1032
1033                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1034                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1035
1036                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1037                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1038                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1039                                 clusterTranslator.removeStatement(cluster);
1040
1041                                 queryProvider2.invalidateResource(s);
1042                                 clientChanges.invalidate(s);
1043
1044                         } catch (DatabaseException e) {
1045
1046                                 Logger.defaultLogError(e);
1047
1048                         }
1049
1050                         return true;
1051
1052                 } else {
1053                         
1054                         ((VirtualGraphImpl)provider).deny(s, p, o);
1055                         queryProvider2.invalidateResource(s);
1056                         clientChanges.invalidate(s);
1057
1058                         return true;
1059
1060                 }
1061
1062         }
1063
1064         @Override
1065         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1066             throw new UnsupportedOperationException();
1067         }
1068
1069         @Override
1070         public boolean writeOnly() {
1071             return true;
1072         }
1073
1074         @Override
1075         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1076             writeSupport.performWriteRequest(graph, request);
1077         }
1078
1079         @Override
1080         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1081             throw new UnsupportedOperationException();
1082         }
1083
1084         @Override
1085         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1086             throw new UnsupportedOperationException();
1087         }
1088
1089         @Override
1090         public <T> void addMetadata(Metadata data) throws ServiceException {
1091             writeSupport.addMetadata(data);
1092         }
1093
1094         @Override
1095         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1096             return writeSupport.getMetadata(clazz);
1097         }
1098
1099         @Override
1100         public TreeMap<String, byte[]> getMetadata() {
1101             return writeSupport.getMetadata();
1102         }
1103
1104         @Override
1105         public void commitDone(WriteTraits writeTraits, long csid) {
1106             writeSupport.commitDone(writeTraits, csid);
1107         }
1108
1109         @Override
1110         public void clearUndoList(WriteTraits writeTraits) {
1111             writeSupport.clearUndoList(writeTraits);
1112         }
1113         @Override
1114         public int clearMetadata() {
1115             return writeSupport.clearMetadata();
1116         }
1117
1118                 @Override
1119                 public void startUndo() {
1120                         writeSupport.startUndo();
1121                 }
1122     }
1123
1124     class VirtualWriteOnlySupport implements WriteSupport {
1125
1126 //        @Override
1127 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1128 //                Resource object) {
1129 //            throw new UnsupportedOperationException();
1130 //        }
1131
1132         @Override
1133         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1134
1135             TransientGraph impl = (TransientGraph)provider;
1136             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1137             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1138             clientChanges.claim(subject, predicate, object);
1139
1140         }
1141
1142         @Override
1143         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1144
1145             TransientGraph impl = (TransientGraph)provider;
1146             impl.claim(subject, predicate, object);
1147             getQueryProvider2().updateStatements(subject, predicate);
1148             clientChanges.claim(subject, predicate, object);
1149
1150         }
1151
1152         @Override
1153         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1154             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1155         }
1156
1157         @Override
1158         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1159             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1160             getQueryProvider2().updateValue(resource);
1161             clientChanges.claimValue(resource);
1162         }
1163
1164         @Override
1165         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1166             byte[] value = reader.readBytes(null, amount);
1167             claimValue(provider, resource, value);
1168         }
1169
1170         @Override
1171         public Resource createResource(VirtualGraph provider) {
1172             TransientGraph impl = (TransientGraph)provider;
1173             return impl.getResource(impl.newResource(false));
1174         }
1175
1176         @Override
1177         public Resource createResource(VirtualGraph provider, long clusterId) {
1178             throw new UnsupportedOperationException();
1179         }
1180
1181         @Override
1182         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1183         throws DatabaseException {
1184             throw new UnsupportedOperationException();
1185         }
1186
1187         @Override
1188         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1189         throws DatabaseException {
1190             throw new UnsupportedOperationException();
1191         }
1192
1193         @Override
1194         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1195         throws ServiceException {
1196             throw new UnsupportedOperationException();
1197         }
1198
1199         @Override
1200         public Resource setDefaultClusterSet(Resource clusterSet)
1201         throws ServiceException {
1202                 return null;
1203         }
1204
1205         @Override
1206         public void denyValue(VirtualGraph provider, Resource resource) {
1207             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1208             getQueryProvider2().updateValue(querySupport.getId(resource));
1209             // NOTE: this only keeps track of value changes by-resource.
1210             clientChanges.claimValue(resource);
1211         }
1212
1213         @Override
1214         public void flush(boolean intermediate) {
1215             throw new UnsupportedOperationException();
1216         }
1217
1218         @Override
1219         public void flushCluster() {
1220             throw new UnsupportedOperationException();
1221         }
1222
1223         @Override
1224         public void flushCluster(Resource r) {
1225             throw new UnsupportedOperationException("Resource " + r);
1226         }
1227
1228         @Override
1229         public void gc() {
1230         }
1231
1232         @Override
1233         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1234             TransientGraph impl = (TransientGraph) provider;
1235             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1236             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1237             clientChanges.deny(subject, predicate, object);
1238             return true;
1239         }
1240
1241         @Override
1242         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1243             throw new UnsupportedOperationException();
1244         }
1245
1246         @Override
1247         public boolean writeOnly() {
1248             return true;
1249         }
1250
1251         @Override
1252         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1253             throw new UnsupportedOperationException();
1254         }
1255
1256         @Override
1257         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1258             throw new UnsupportedOperationException();
1259         }
1260
1261         @Override
1262         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1263             throw new UnsupportedOperationException();
1264         }
1265
1266         @Override
1267         public <T> void addMetadata(Metadata data) throws ServiceException {
1268             throw new UnsupportedOperationException();
1269         }
1270
1271         @Override
1272         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1273             throw new UnsupportedOperationException();
1274         }
1275
1276         @Override
1277         public TreeMap<String, byte[]> getMetadata() {
1278             throw new UnsupportedOperationException();
1279         }
1280
1281         @Override
1282         public void commitDone(WriteTraits writeTraits, long csid) {
1283         }
1284
1285         @Override
1286         public void clearUndoList(WriteTraits writeTraits) {
1287         }
1288
1289         @Override
1290         public int clearMetadata() {
1291             return 0;
1292         }
1293
1294                 @Override
1295                 public void startUndo() {
1296                 }
1297     }
1298
1299     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1300
1301         try {
1302
1303             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1304
1305             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1306
1307             flushCounter = 0;
1308             Disposable.safeDispose(clientChanges);
1309             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1310
1311             acquireWriteOnly();
1312
1313             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1314
1315             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1316
1317             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1318             writeState = writeStateT;
1319
1320             assert (null != writer);
1321 //            writer.state.barrier.inc();
1322             long start = System.nanoTime();
1323             T result = request.perform(writer);
1324             long duration = System.nanoTime() - start;
1325             if (DEBUG) {
1326                 System.err.println("################");
1327                 System.err.println("WriteOnly duration " + 1e-9*duration);
1328             }
1329             writeStateT.setResult(result);
1330
1331             // This makes clusters available from server
1332             clusterStream.reallyFlush();
1333
1334             // This will trigger query updates
1335             releaseWriteOnly(writer);
1336             assert (null != writer);
1337
1338             handleUpdatesAndMetadata(writer);
1339
1340         } catch (CancelTransactionException e) {
1341
1342             releaseWriteOnly(writeState.getGraph());
1343
1344             clusterTable.removeWriteOnlyClusters();
1345             state.stopWriteTransaction(clusterStream);
1346
1347         } catch (Throwable e) {
1348
1349             e.printStackTrace();
1350
1351             releaseWriteOnly(writeState.getGraph());
1352
1353             clusterTable.removeWriteOnlyClusters();
1354
1355             if (callback != null)
1356                 callback.exception(new DatabaseException(e));
1357
1358             state.stopWriteTransaction(clusterStream);
1359             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1360
1361         } finally  {
1362
1363             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1364
1365         }
1366
1367     }
1368
1369     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1370         scheduleRequest(request, callback, notify, null);
1371     }
1372
1373     @Override
1374     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1375
1376         assertAlive();
1377
1378         assert (request != null);
1379
1380         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1381
1382         requestManager.scheduleWrite(new SessionTask(true) {
1383
1384             @Override
1385             public void run(int thread) {
1386
1387                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1388
1389                     try {
1390
1391                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1392
1393                         flushCounter = 0;
1394                         Disposable.safeDispose(clientChanges);
1395                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1396
1397                         acquireWriteOnly();
1398
1399                         VirtualGraph vg = getProvider(request.getProvider());
1400                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1401
1402                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1403
1404                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1405
1406                             @Override
1407                             public void execute(Object result) {
1408                                 if(callback != null) callback.accept(null);
1409                             }
1410
1411                             @Override
1412                             public void exception(Throwable t) {
1413                                 if(callback != null) callback.accept((DatabaseException)t);
1414                             }
1415
1416                         });
1417
1418                         assert (null != writer);
1419 //                        writer.state.barrier.inc();
1420
1421                         try {
1422
1423                             request.perform(writer);
1424
1425                         } catch (Throwable e) {
1426
1427 //                            writer.state.barrier.dec();
1428 //                            writer.waitAsync(null);
1429
1430                             releaseWriteOnly(writer);
1431
1432                             clusterTable.removeWriteOnlyClusters();
1433
1434                             if(!(e instanceof CancelTransactionException)) {
1435                                 if (callback != null)
1436                                     callback.accept(new DatabaseException(e));
1437                             }
1438
1439                             writeState.except(e);
1440
1441                             return;
1442
1443                         }
1444
1445                         // This makes clusters available from server
1446                         boolean empty = clusterStream.reallyFlush();
1447                         // This was needed to make WO requests call metadata listeners.
1448                         // NOTE: the calling event does not contain clientChanges information.
1449                         if (!empty && clientChanges.isEmpty())
1450                             clientChanges.setNotEmpty(true);
1451                         releaseWriteOnly(writer);
1452                         assert (null != writer);
1453                     } finally  {
1454
1455                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1456
1457                     }
1458
1459
1460                 task.finish();
1461
1462             }
1463
1464         }, combine);
1465
1466     }
1467
1468     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1469         scheduleRequest(request, callback, notify, null);
1470     }
1471
1472     /* (non-Javadoc)
1473      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1474      */
1475     @Override
1476     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1477
1478         assert (request != null);
1479
1480         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1481
1482         requestManager.scheduleWrite(new SessionTask(true) {
1483
1484             @Override
1485             public void run(int thread) {
1486
1487                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1488
1489                 performWriteOnly(request, notify, callback);
1490
1491                 task.finish();
1492
1493             }
1494
1495         }, combine);
1496
1497     }
1498
1499     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1500
1501         assert (request != null);
1502         assert (procedure != null);
1503
1504         //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1505
1506         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1507
1508             @Override
1509             public void run(int thread) {
1510
1511                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1512
1513                 ListenerBase listener = getListenerBase(procedure);
1514
1515                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1516
1517                 try {
1518
1519                     if (listener != null) {
1520
1521                         try {
1522                                 
1523                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1524
1525                                         @Override
1526                                         public void exception(AsyncReadGraph graph, Throwable t) {
1527                                                 procedure.exception(graph, t);
1528                                                 if(throwable != null) {
1529                                                         throwable.set(t);
1530                                                 } else {
1531                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1532                                                 }
1533                                         }
1534
1535                                         @Override
1536                                         public void execute(AsyncReadGraph graph, T t) {
1537                                                 if(result != null) result.set(t);
1538                                                 procedure.execute(graph, t);
1539                                         }
1540
1541                                 };
1542                                 
1543                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1544                                 
1545                         } catch (Throwable t) {
1546                             // This is handled by the AsyncProcedure
1547                                 //Logger.defaultLogError("Internal error", t);
1548                         }
1549
1550                     } else {
1551
1552                         try {
1553
1554 //                            newGraph.state.barrier.inc();
1555
1556                             T t = request.perform(newGraph);
1557
1558                             try {
1559
1560                                 if(result != null) result.set(t);
1561                                 procedure.execute(newGraph, t);
1562
1563                             } catch (Throwable th) {
1564
1565                                 if(throwable != null) {
1566                                     throwable.set(th);
1567                                 } else {
1568                                     Logger.defaultLogError("Unhandled exception", th);
1569                                 }
1570
1571                             }
1572
1573                         } catch (Throwable t) {
1574
1575                             if (DEBUG)
1576                                 t.printStackTrace();
1577
1578                             if(throwable != null) {
1579                                 throwable.set(t);
1580                             } else {
1581                                 Logger.defaultLogError("Unhandled exception", t);
1582                             }
1583
1584                             try {
1585
1586                                 procedure.exception(newGraph, t);
1587
1588                             } catch (Throwable t2) {
1589
1590                                 if(throwable != null) {
1591                                     throwable.set(t2);
1592                                 } else {
1593                                     Logger.defaultLogError("Unhandled exception", t2);
1594                                 }
1595
1596                             }
1597
1598                         }
1599
1600 //                        newGraph.state.barrier.dec();
1601 //                        newGraph.waitAsync(request);
1602
1603                     }
1604
1605                 } finally {
1606
1607                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1608
1609                 }
1610
1611             }
1612
1613         });
1614
1615     }
1616
1617     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1618
1619         assert (request != null);
1620         assert (procedure != null);
1621
1622         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1623
1624         requestManager.scheduleRead(new SessionRead(null, notify) {
1625
1626             @Override
1627             public void run(int thread) {
1628
1629                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1630
1631                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1632
1633                 try {
1634
1635                     if (listener != null) {
1636
1637                         try {
1638                                 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1639                                 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1640                                                         //newGraph.processor.query(newGraph, request, null, procedure, listener);
1641                                                 } catch (DatabaseException e) {
1642                                                         Logger.defaultLogError(e);
1643                                                 }
1644
1645                     } else {
1646
1647 //                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1648 //                                procedure, "request");
1649
1650                         BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
1651
1652                         try {
1653
1654                             request.perform(newGraph, wrap);
1655                             wrap.get();
1656
1657                         } catch (DatabaseException e) {
1658
1659                                                         Logger.defaultLogError(e);
1660
1661                         }
1662
1663                     }
1664
1665                 } finally {
1666
1667                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1668
1669                 }
1670
1671             }
1672
1673         });
1674
1675     }
1676
1677     public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1678
1679         assert (request != null);
1680         assert (procedure != null);
1681
1682         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1683
1684         int sync = notify != null ? thread : -1;
1685
1686         requestManager.scheduleRead(new SessionRead(null, notify) {
1687
1688             @Override
1689             public void run(int thread) {
1690
1691                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1692
1693                 ListenerBase listener = getListenerBase(procedure);
1694
1695                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1696
1697                 try {
1698
1699                     if (listener != null) {
1700
1701                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1702
1703 //                        newGraph.waitAsync(request);
1704
1705                     } else {
1706
1707                         final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1708
1709                         try {
1710
1711                             request.perform(newGraph, wrapper);
1712
1713                         } catch (Throwable t) {
1714
1715                             t.printStackTrace();
1716
1717                         }
1718
1719                     }
1720
1721                 } finally {
1722
1723                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1724
1725                 }
1726
1727             }
1728
1729         });
1730
1731     }
1732     
1733     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1734
1735         assert (request != null);
1736         assert (procedure != null);
1737
1738         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1739
1740         int sync = notify != null ? thread : -1;
1741
1742         requestManager.scheduleRead(new SessionRead(null, notify) {
1743
1744             @Override
1745             public void run(int thread) {
1746
1747                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1748
1749                 ListenerBase listener = getListenerBase(procedure);
1750
1751                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1752
1753                 try {
1754
1755                     if (listener != null) {
1756
1757                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1758
1759 //                        newGraph.waitAsync(request);
1760
1761                     } else {
1762
1763                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1764
1765                         try {
1766
1767                             request.perform(newGraph, wrapper);
1768
1769                         } catch (Throwable t) {
1770
1771                             t.printStackTrace();
1772
1773                         }
1774
1775                     }
1776
1777                 } finally {
1778
1779                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1780
1781                 }
1782
1783             }
1784
1785         });
1786
1787     }
1788
1789     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1790
1791         assert (request != null);
1792         assert (procedure != null);
1793
1794         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1795
1796         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1797
1798             @Override
1799             public void run(int thread) {
1800
1801                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1802
1803                 ListenerBase listener = getListenerBase(procedure);
1804
1805                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1806
1807                 try {
1808
1809                     if (listener != null) {
1810
1811                         try {
1812                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1813                         } catch (DatabaseException e) {
1814                             Logger.defaultLogError(e);
1815                         }
1816                         
1817                         
1818 //                        newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1819 //
1820 //                            @Override
1821 //                            public void exception(Throwable t) {
1822 //                                procedure.exception(t);
1823 //                                if(throwable != null) {
1824 //                                    throwable.set(t);
1825 //                                }
1826 //                            }
1827 //
1828 //                            @Override
1829 //                            public void execute(T t) {
1830 //                                if(result != null) result.set(t);
1831 //                                procedure.execute(t);
1832 //                            }
1833 //
1834 //                        }, listener);
1835
1836 //                        newGraph.waitAsync(request);
1837
1838                     } else {
1839
1840 //                        newGraph.state.barrier.inc();
1841
1842                         request.register(newGraph, new Listener<T>() {
1843
1844                             @Override
1845                             public void exception(Throwable t) {
1846                                 if(throwable != null) throwable.set(t);
1847                                 procedure.exception(t);
1848 //                                newGraph.state.barrier.dec();
1849                             }
1850
1851                             @Override
1852                             public void execute(T t) {
1853                                 if(result != null) result.set(t);
1854                                 procedure.execute(t);
1855 //                                newGraph.state.barrier.dec();
1856                             }
1857
1858                             @Override
1859                             public boolean isDisposed() {
1860                                 return true;
1861                             }
1862
1863                         });
1864
1865 //                        newGraph.waitAsync(request);
1866
1867                     }
1868
1869                 } finally {
1870
1871                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1872
1873                 }
1874
1875             }
1876
1877         });
1878
1879     }
1880
1881
1882     @Override
1883     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1884
1885         scheduleRequest(request, procedure, null, null, null);
1886
1887     }
1888
1889     @Override
1890     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1891         assertNotSession();
1892         Semaphore notify = new Semaphore(0);
1893         DataContainer<Throwable> container = new DataContainer<Throwable>();
1894         DataContainer<T> result = new DataContainer<T>();
1895         scheduleRequest(request, procedure, notify, container, result);
1896         acquire(notify, request);
1897         Throwable throwable = container.get();
1898         if(throwable != null) {
1899             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1900             else throw new DatabaseException("Unexpected exception", throwable);
1901         }
1902         return result.get();
1903     }
1904
1905     @Override
1906     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1907         assertNotSession();
1908         Semaphore notify = new Semaphore(0);
1909         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1910         final DataContainer<T> resultContainer = new DataContainer<T>();
1911         scheduleRequest(request, new AsyncProcedure<T>() {
1912             @Override
1913             public void exception(AsyncReadGraph graph, Throwable throwable) {
1914                 exceptionContainer.set(throwable);
1915                 procedure.exception(graph, throwable);
1916             }
1917             @Override
1918             public void execute(AsyncReadGraph graph, T result) {
1919                 resultContainer.set(result);
1920                 procedure.execute(graph, result);
1921             }
1922         }, getListenerBase(procedure), notify);
1923         acquire(notify, request, procedure);
1924         Throwable throwable = exceptionContainer.get();
1925         if (throwable != null) {
1926             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1927             else throw new DatabaseException("Unexpected exception", throwable);
1928         }
1929         return resultContainer.get();
1930     }
1931
1932
1933     @Override
1934     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1935         assertNotSession();
1936         Semaphore notify = new Semaphore(0);
1937         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1938         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1939             @Override
1940             public void exception(AsyncReadGraph graph, Throwable throwable) {
1941                 exceptionContainer.set(throwable);
1942                 procedure.exception(graph, throwable);
1943             }
1944             @Override
1945             public void execute(AsyncReadGraph graph, T result) {
1946                 procedure.execute(graph, result);
1947             }
1948             @Override
1949             public void finished(AsyncReadGraph graph) {
1950                 procedure.finished(graph);
1951             }
1952         }, notify);
1953         acquire(notify, request, procedure);
1954         Throwable throwable = exceptionContainer.get();
1955         if (throwable != null) {
1956             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1957             else throw new DatabaseException("Unexpected exception", throwable);
1958         }
1959         // TODO: implement return value
1960         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1961         return null;
1962     }
1963
1964     @Override
1965     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1966         return syncRequest(request, new ProcedureAdapter<T>());
1967
1968
1969 //        assert(request != null);
1970 //
1971 //        final DataContainer<T> result = new DataContainer<T>();
1972 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1973 //
1974 //        syncRequest(request, new Procedure<T>() {
1975 //
1976 //            @Override
1977 //            public void execute(T t) {
1978 //                result.set(t);
1979 //            }
1980 //
1981 //            @Override
1982 //            public void exception(Throwable t) {
1983 //                exception.set(t);
1984 //            }
1985 //
1986 //        });
1987 //
1988 //        Throwable t = exception.get();
1989 //        if(t != null) {
1990 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1991 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1992 //        }
1993 //
1994 //        return result.get();
1995
1996     }
1997
1998     @Override
1999     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
2000         return syncRequest(request, (Procedure<T>)procedure);
2001     }
2002
2003
2004 //    @Override
2005 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
2006 //        assertNotSession();
2007 //        Semaphore notify = new Semaphore(0);
2008 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
2009 //        DataContainer<T> result = new DataContainer<T>();
2010 //        scheduleRequest(request, procedure, notify, container, result);
2011 //        acquire(notify, request);
2012 //        Throwable throwable = container.get();
2013 //        if(throwable != null) {
2014 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2015 //            else throw new DatabaseException("Unexpected exception", throwable);
2016 //        }
2017 //        return result.get();
2018 //    }
2019
2020
2021     @Override
2022     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
2023         assertNotSession();
2024         Semaphore notify = new Semaphore(0);
2025         final DataContainer<Throwable> container = new DataContainer<Throwable>();
2026         final DataContainer<T> result = new DataContainer<T>();
2027         scheduleRequest(request, procedure, notify, container, result);
2028         acquire(notify, request);
2029         Throwable throwable = container.get();
2030         if (throwable != null) {
2031             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2032             else throw new DatabaseException("Unexpected exception", throwable);
2033         }
2034         return result.get();
2035     }
2036
2037     @Override
2038     public void syncRequest(Write request) throws DatabaseException  {
2039         assertNotSession();
2040         assertAlive();
2041         Semaphore notify = new Semaphore(0);
2042         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2043         scheduleRequest(request, e -> exception.set(e), notify);
2044         acquire(notify, request);
2045         if(exception.get() != null) throw exception.get();
2046     }
2047
2048     @Override
2049     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
2050         assertNotSession();
2051         Semaphore notify = new Semaphore(0);
2052         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2053         final DataContainer<T> result = new DataContainer<T>();
2054         scheduleRequest(request, new Procedure<T>() {
2055
2056             @Override
2057             public void exception(Throwable t) {
2058               exception.set(t);
2059             }
2060
2061             @Override
2062             public void execute(T t) {
2063               result.set(t);
2064             }
2065
2066         }, notify);
2067         acquire(notify, request);
2068         if(exception.get() != null) {
2069             Throwable t = exception.get();
2070             if(t instanceof DatabaseException) throw (DatabaseException)t;
2071             else throw new DatabaseException(t);
2072         }
2073         return result.get();
2074     }
2075
2076     @Override
2077     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2078         assertNotSession();
2079         Semaphore notify = new Semaphore(0);
2080         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2081         final DataContainer<T> result = new DataContainer<T>();
2082         scheduleRequest(request, new Procedure<T>() {
2083
2084             @Override
2085             public void exception(Throwable t) {
2086               exception.set(t);
2087             }
2088
2089             @Override
2090             public void execute(T t) {
2091               result.set(t);
2092             }
2093
2094         }, notify);
2095         acquire(notify, request);
2096         if(exception.get() != null) {
2097             Throwable t = exception.get();
2098             if(t instanceof DatabaseException) throw (DatabaseException)t;
2099             else throw new DatabaseException(t);
2100         }
2101         return result.get();
2102     }
2103
2104     @Override
2105     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2106         assertNotSession();
2107         Semaphore notify = new Semaphore(0);
2108         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2109         final DataContainer<T> result = new DataContainer<T>();
2110         scheduleRequest(request, new Procedure<T>() {
2111
2112             @Override
2113             public void exception(Throwable t) {
2114               exception.set(t);
2115             }
2116
2117             @Override
2118             public void execute(T t) {
2119               result.set(t);
2120             }
2121
2122         }, notify);
2123         acquire(notify, request);
2124         if(exception.get() != null) {
2125             Throwable t = exception.get();
2126             if(t instanceof DatabaseException) throw (DatabaseException)t;
2127             else throw new DatabaseException(t);
2128         }
2129         return result.get();
2130     }
2131
2132     @Override
2133     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2134         assertNotSession();
2135         Semaphore notify = new Semaphore(0);
2136         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2137         scheduleRequest(request, e -> exception.set(e), notify);
2138         acquire(notify, request);
2139         if(exception.get() != null) throw exception.get();
2140     }
2141
2142     @Override
2143     public void syncRequest(WriteOnly request) throws DatabaseException  {
2144         assertNotSession();
2145         assertAlive();
2146         Semaphore notify = new Semaphore(0);
2147         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2148         scheduleRequest(request, e -> exception.set(e), notify);
2149         acquire(notify, request);
2150         if(exception.get() != null) throw exception.get();
2151     }
2152
2153     /*
2154      *
2155      * GraphRequestProcessor interface
2156      */
2157
2158     @Override
2159     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2160
2161         scheduleRequest(request, procedure, null, null);
2162
2163     }
2164
2165     @Override
2166     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2167
2168         scheduleRequest(request, procedure, null);
2169
2170     }
2171
2172     @Override
2173     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2174
2175         scheduleRequest(request, procedure, null, null, null);
2176
2177     }
2178
2179     @Override
2180     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2181
2182         scheduleRequest(request, callback, null);
2183
2184     }
2185
2186     @Override
2187     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2188
2189         scheduleRequest(request, procedure, null);
2190
2191     }
2192
2193     @Override
2194     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2195
2196         scheduleRequest(request, procedure, null);
2197
2198     }
2199
2200     @Override
2201     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2202
2203         scheduleRequest(request, procedure, null);
2204
2205     }
2206
2207     @Override
2208     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2209
2210         scheduleRequest(request, callback, null);
2211
2212     }
2213
2214     @Override
2215     public void asyncRequest(final Write r) {
2216         asyncRequest(r, null);
2217     }
2218
2219     @Override
2220     public void asyncRequest(final DelayedWrite r) {
2221         asyncRequest(r, null);
2222     }
2223
2224     @Override
2225     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2226
2227         scheduleRequest(request, callback, null);
2228
2229     }
2230
2231     @Override
2232     public void asyncRequest(final WriteOnly request) {
2233
2234         asyncRequest(request, null);
2235
2236     }
2237
2238     @Override
2239     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2240         r.request(this, procedure);
2241     }
2242
2243     @Override
2244     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2245         r.request(this, procedure);
2246     }
2247
2248     @Override
2249     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2250         r.request(this, procedure);
2251     }
2252
2253     @Override
2254     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2255         r.request(this, procedure);
2256     }
2257
2258     @Override
2259     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2260         r.request(this, procedure);
2261     }
2262
2263     @Override
2264     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2265         r.request(this, procedure);
2266     }
2267
2268     @Override
2269     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2270         return r.request(this);
2271     }
2272
2273     @Override
2274     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2275         return r.request(this);
2276     }
2277
2278     @Override
2279     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2280         r.request(this, procedure);
2281     }
2282
2283     @Override
2284     public <T> void async(WriteInterface<T> r) {
2285         r.request(this, new ProcedureAdapter<T>());
2286     }
2287
2288     //@Override
2289     public void incAsync() {
2290         state.incAsync();
2291     }
2292
2293     //@Override
2294     public void decAsync() {
2295         state.decAsync();
2296     }
2297
2298     public long getCluster(ResourceImpl resource) {
2299         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2300         return cluster.getClusterId();
2301     }
2302
2303     public long getCluster(int id) {
2304         if (clusterTable == null)
2305             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2306         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2307     }
2308
2309     public ResourceImpl getResource(int id) {
2310         return new ResourceImpl(resourceSupport, id);
2311     }
2312
2313     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2314         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2315         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2316         int key = proxy.getClusterKey();
2317         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2318         return new ResourceImpl(resourceSupport, resourceKey);
2319     }
2320
2321     public ResourceImpl getResource2(int id) {
2322         assert (id != 0);
2323         return new ResourceImpl(resourceSupport, id);
2324     }
2325
2326     final public int getId(ResourceImpl impl) {
2327         return impl.id;
2328     }
2329
2330     public static final Charset UTF8 = Charset.forName("utf-8");
2331
2332
2333
2334
2335     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2336         for(TransientGraph g : support.providers) {
2337             if(g.isPending(subject)) return false;
2338         }
2339         return true;
2340     }
2341
2342     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2343         for(TransientGraph g : support.providers) {
2344             if(g.isPending(subject, predicate)) return false;
2345         }
2346         return true;
2347     }
2348
2349     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2350
2351         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2352
2353             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2354
2355             @Override
2356             public void accept(ReadGraphImpl graph) {
2357                 if(ready.decrementAndGet() == 0) {
2358                     runnable.accept(graph);
2359                 }
2360             }
2361
2362         };
2363
2364         for(TransientGraph g : support.providers) {
2365             if(g.isPending(subject)) {
2366                 try {
2367                     g.load(graph, subject, composite);
2368                 } catch (DatabaseException e) {
2369                     e.printStackTrace();
2370                 }
2371             } else {
2372                 composite.accept(graph);
2373             }
2374         }
2375
2376         composite.accept(graph);
2377
2378     }
2379
2380     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2381
2382         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2383
2384             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2385
2386             @Override
2387             public void accept(ReadGraphImpl graph) {
2388                 if(ready.decrementAndGet() == 0) {
2389                     runnable.accept(graph);
2390                 }
2391             }
2392
2393         };
2394
2395         for(TransientGraph g : support.providers) {
2396             if(g.isPending(subject, predicate)) {
2397                 try {
2398                     g.load(graph, subject, predicate, composite);
2399                 } catch (DatabaseException e) {
2400                     e.printStackTrace();
2401                 }
2402             } else {
2403                 composite.accept(graph);
2404             }
2405         }
2406
2407         composite.accept(graph);
2408
2409     }
2410
2411 //    void dumpHeap() {
2412 //
2413 //        try {
2414 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2415 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2416 //                    "d:/heap" + flushCounter + ".txt", true);
2417 //        } catch (IOException e) {
2418 //            e.printStackTrace();
2419 //        }
2420 //
2421 //    }
2422
2423     void fireReactionsToSynchronize(ChangeSet cs) {
2424
2425         // Do not fire empty events
2426         if (cs.isEmpty())
2427             return;
2428
2429         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2430 //        g.state.barrier.inc();
2431
2432         try {
2433
2434             if (!cs.isEmpty()) {
2435                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2436                 for (ChangeListener l : changeListeners2) {
2437                     try {
2438                         l.graphChanged(e2);
2439                     } catch (Exception ex) {
2440                         ex.printStackTrace();
2441                     }
2442                 }
2443             }
2444
2445         } finally {
2446
2447 //            g.state.barrier.dec();
2448 //            g.waitAsync(null);
2449
2450         }
2451
2452     }
2453
2454     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2455         try {
2456
2457             // Do not fire empty events
2458             if (cs2.isEmpty())
2459                 return;
2460
2461 //            graph.restart();
2462 //            graph.state.barrier.inc();
2463
2464             try {
2465
2466                 if (!cs2.isEmpty()) {
2467
2468                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2469                     for (ChangeListener l : changeListeners2) {
2470                         try {
2471                             l.graphChanged(e2);
2472 //                            System.out.println("changelistener " + l);
2473                         } catch (Exception ex) {
2474                             ex.printStackTrace();
2475                         }
2476                     }
2477
2478                 }
2479
2480             } finally {
2481
2482 //                graph.state.barrier.dec();
2483 //                graph.waitAsync(null);
2484
2485             }
2486         } catch (Throwable t) {
2487             t.printStackTrace();
2488
2489         }
2490     }
2491     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2492
2493         try {
2494
2495             // Do not fire empty events
2496             if (cs2.isEmpty())
2497                 return;
2498
2499             // Do not fire on virtual requests
2500             if(graph.getProvider() != null)
2501                 return;
2502
2503             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2504
2505             try {
2506
2507                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2508                 for (ChangeListener l : metadataListeners) {
2509                     try {
2510                         l.graphChanged(e2);
2511                     } catch (Throwable ex) {
2512                         ex.printStackTrace();
2513                     }
2514                 }
2515
2516             } finally {
2517
2518             }
2519
2520         } catch (Throwable t) {
2521             t.printStackTrace();
2522         }
2523
2524     }
2525
2526
2527     /*
2528      * (non-Javadoc)
2529      *
2530      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2531      */
2532     @Override
2533     public <T> T getService(Class<T> api) {
2534         T t = peekService(api);
2535         if (t == null) {
2536             if (state.isClosed())
2537                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2538             throw new ServiceNotFoundException(this, api);
2539         }
2540         return t;
2541     }
2542
2543     /**
2544      * @return
2545      */
2546     protected abstract ServerInformation getCachedServerInformation();
2547
2548         private Class<?> serviceKey1 = null;
2549         private Class<?> serviceKey2 = null;
2550         private Object service1 = null;
2551         private Object service2 = null;
2552
2553     /*
2554      * (non-Javadoc)
2555      *
2556      * @see
2557      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2558      */
2559     @SuppressWarnings("unchecked")
2560     @Override
2561     public synchronized <T> T peekService(Class<T> api) {
2562
2563                 if(serviceKey1 == api) {
2564                         return (T)service1;
2565                 } else if (serviceKey2 == api) {
2566                         // Promote this key
2567                         Object result = service2;
2568                         service2 = service1;
2569                         serviceKey2 = serviceKey1;
2570                         service1 = result;
2571                         serviceKey1 = api;
2572                         return (T)result;
2573                 }
2574
2575         if (Layer0.class == api)
2576                 return (T) L0;
2577         if (ServerInformation.class == api)
2578             return (T) getCachedServerInformation();
2579         else if (WriteGraphImpl.class == api)
2580             return (T) writeState.getGraph();
2581         else if (ClusterBuilder.class == api)
2582             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2583         else if (ClusterBuilderFactory.class == api)
2584             return (T)new ClusterBuilderFactoryImpl(this);
2585
2586                 service2 = service1;
2587                 serviceKey2 = serviceKey1;
2588
2589         service1 = serviceLocator.peekService(api);
2590         serviceKey1 = api;
2591
2592         return (T)service1;
2593
2594     }
2595
2596     /*
2597      * (non-Javadoc)
2598      *
2599      * @see
2600      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2601      */
2602     @Override
2603     public boolean hasService(Class<?> api) {
2604         return serviceLocator.hasService(api);
2605     }
2606
2607     /**
2608      * @param api the api that must be implemented by the specified service
2609      * @param service the service implementation
2610      */
2611     @Override
2612     public <T> void registerService(Class<T> api, T service) {
2613         if(Layer0.class == api) {
2614                 L0 = (Layer0)service;
2615                 return;
2616         }
2617         serviceLocator.registerService(api, service);
2618         if (TransactionPolicySupport.class == api) {
2619             transactionPolicy = (TransactionPolicySupport)service;
2620             state.resetTransactionPolicy();
2621         }
2622         if (api == serviceKey1)
2623                 service1 = service;
2624         else if (api == serviceKey2)
2625                 service2 = service;
2626     }
2627
2628     // ----------------
2629     // SessionMonitor
2630     // ----------------
2631
2632     void fireSessionVariableChange(String variable) {
2633         for (MonitorHandler h : monitorHandlers) {
2634             MonitorContext ctx = monitorContexts.getLeft(h);
2635             assert ctx != null;
2636             // SafeRunner functionality repeated here to avoid dependency.
2637             try {
2638                 h.valuesChanged(ctx);
2639             } catch (Exception e) {
2640                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2641             } catch (LinkageError e) {
2642                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2643             }
2644         }
2645     }
2646
2647
2648     class ResourceSerializerImpl implements ResourceSerializer {
2649
2650         public long createRandomAccessId(int id) throws DatabaseException {
2651             if(id < 0)
2652                 return id;
2653             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2654             long cluster = getCluster(id);
2655             if (0 == cluster)
2656                 return 0; // Better to return 0 then invalid id.
2657             long result = ClusterTraitsBase.createResourceId(cluster, index);
2658             return result;
2659         }
2660
2661         @Override
2662         public long getRandomAccessId(Resource resource) throws DatabaseException {
2663
2664             ResourceImpl resourceImpl = (ResourceImpl) resource;
2665             return createRandomAccessId(resourceImpl.id);
2666
2667         }
2668
2669         @Override
2670         public int getTransientId(Resource resource) throws DatabaseException {
2671
2672             ResourceImpl resourceImpl = (ResourceImpl) resource;
2673             return resourceImpl.id;
2674
2675         }
2676
2677         @Override
2678         public String createRandomAccessId(Resource resource)
2679         throws InvalidResourceReferenceException {
2680
2681             if(resource == null) throw new IllegalArgumentException();
2682
2683             ResourceImpl resourceImpl = (ResourceImpl) resource;
2684
2685             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2686
2687             int r;
2688             try {
2689                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2690             } catch (DatabaseException e1) {
2691                 throw new InvalidResourceReferenceException(e1);
2692             }
2693             try {
2694                 // Serialize as '<resource index>_<cluster id>'
2695                 return "" + r + "_" + getCluster(resourceImpl);
2696             } catch (Throwable e) {
2697                 e.printStackTrace();
2698                 throw new InvalidResourceReferenceException(e);
2699             } finally {
2700             }
2701         }
2702
2703         public int getTransientId(long serialized) throws DatabaseException {
2704             if (serialized <= 0)
2705                 return (int)serialized;
2706             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2707             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2708             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2709             if (cluster == null)
2710                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2711             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2712             return key;
2713         }
2714
2715         @Override
2716         public Resource getResource(long randomAccessId) throws DatabaseException {
2717             return getResourceByKey(getTransientId(randomAccessId));
2718         }
2719
2720         @Override
2721         public Resource getResource(int transientId) throws DatabaseException {
2722             return getResourceByKey(transientId);
2723         }
2724
2725         @Override
2726         public Resource getResource(String randomAccessId)
2727         throws InvalidResourceReferenceException {
2728             try {
2729                 int i = randomAccessId.indexOf('_');
2730                 if (i == -1)
2731                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2732                             + randomAccessId + "'");
2733                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2734                 if(r < 0) return getResourceByKey(r);
2735                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2736                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2737                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2738                 if (cluster.hasResource(key, clusterTranslator))
2739                     return getResourceByKey(key);
2740             } catch (InvalidResourceReferenceException e) {
2741                 throw e;
2742             } catch (NumberFormatException e) {
2743                 throw new InvalidResourceReferenceException(e);
2744             } catch (Throwable e) {
2745                 e.printStackTrace();
2746                 throw new InvalidResourceReferenceException(e);
2747             } finally {
2748             }
2749             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2750         }
2751
2752         @Override
2753         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2754             try {
2755                 return true;
2756             } catch (Throwable e) {
2757                 e.printStackTrace();
2758                 throw new InvalidResourceReferenceException(e);
2759             } finally {
2760             }
2761         }
2762     }
2763
2764     // Copied from old SessionImpl
2765
2766     void check() {
2767         if (state.isClosed())
2768             throw new Error("Session closed.");
2769     }
2770
2771
2772
2773     public ResourceImpl getNewResource(long clusterId) {
2774         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2775         int newId;
2776         try {
2777             newId = cluster.createResource(clusterTranslator);
2778         } catch (DatabaseException e) {
2779             Logger.defaultLogError(e);
2780             return null;
2781         }
2782         return new ResourceImpl(resourceSupport, newId);
2783     }
2784
2785     public ResourceImpl getNewResource(Resource clusterSet)
2786     throws DatabaseException {
2787         long resourceId = clusterSet.getResourceId();
2788         Long clusterId = clusterSetsSupport.get(resourceId);
2789         if (null == clusterId)
2790             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2791         if (Constants.NewClusterId == clusterId) {
2792             clusterId = getService(ClusteringSupport.class).createCluster();
2793             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2794                 createdClusters.add(clusterId);
2795             clusterSetsSupport.put(resourceId, clusterId);
2796             return getNewResource(clusterId);
2797         } else {
2798             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2799             ResourceImpl result;
2800             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2801                 clusterId = getService(ClusteringSupport.class).createCluster();
2802                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2803                     createdClusters.add(clusterId);
2804                 clusterSetsSupport.put(resourceId, clusterId);
2805                 return getNewResource(clusterId);
2806             } else {
2807                 result = getNewResource(clusterId);
2808                 int resultKey = querySupport.getId(result);
2809                 long resultCluster = querySupport.getClusterId(resultKey);
2810                 if (clusterId != resultCluster)
2811                     clusterSetsSupport.put(resourceId, resultCluster);
2812                 return result;
2813             }
2814         }
2815     }
2816
2817     public void getNewClusterSet(Resource clusterSet)
2818     throws DatabaseException {
2819         if (DEBUG)
2820             System.out.println("new cluster set=" + clusterSet);
2821         long resourceId = clusterSet.getResourceId();
2822         if (clusterSetsSupport.containsKey(resourceId))
2823             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2824         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2825     }
2826     public boolean containsClusterSet(Resource clusterSet)
2827     throws ServiceException {
2828         long resourceId = clusterSet.getResourceId();
2829         return clusterSetsSupport.containsKey(resourceId);
2830     }
2831     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2832         Resource r = defaultClusterSet;
2833         defaultClusterSet = clusterSet;
2834         return r;
2835     }
2836     void printDiagnostics() {
2837
2838         if (DIAGNOSTICS) {
2839
2840             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2841             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2842             for(ClusterI cluster : clusterTable.getClusters()) {
2843                 try {
2844                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2845                         residentClusters.set(residentClusters.get() + 1);
2846                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2847                     }
2848                 } catch (DatabaseException e) {
2849                     Logger.defaultLogError(e);
2850                 }
2851             }
2852
2853             System.out.println("--------------------------------");
2854             System.out.println("Cluster information:");
2855             System.out.println("-amount of resident clusters=" + residentClusters.get());
2856             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2857
2858             for(ClusterI cluster : clusterTable.getClusters()) {
2859                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2860                 try {
2861                     if (!cluster.isLoaded())
2862                         System.out.println("not loaded]");
2863                     else if (cluster.isEmpty())
2864                         System.out.println("is empty]");
2865                     else {
2866                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2867                         System.out.print(",references=" + cluster.getReferenceCount());
2868                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2869                     }
2870                 } catch (DatabaseException e) {
2871                     Logger.defaultLogError(e);
2872                     System.out.println("is corrupted]");
2873                 }
2874             }
2875
2876             queryProvider2.printDiagnostics();
2877             System.out.println("--------------------------------");
2878         }
2879
2880     }
2881
2882     public void fireStartReadTransaction() {
2883
2884         if (DIAGNOSTICS)
2885             System.out.println("StartReadTransaction");
2886
2887         for (SessionEventListener listener : eventListeners) {
2888             try {
2889                 listener.readTransactionStarted();
2890             } catch (Throwable t) {
2891                 t.printStackTrace();
2892             }
2893         }
2894
2895     }
2896
2897     public void fireFinishReadTransaction() {
2898
2899         if (DIAGNOSTICS)
2900             System.out.println("FinishReadTransaction");
2901
2902         for (SessionEventListener listener : eventListeners) {
2903             try {
2904                 listener.readTransactionFinished();
2905             } catch (Throwable t) {
2906                 t.printStackTrace();
2907             }
2908         }
2909
2910     }
2911
2912     public void fireStartWriteTransaction() {
2913
2914         if (DIAGNOSTICS)
2915             System.out.println("StartWriteTransaction");
2916
2917         for (SessionEventListener listener : eventListeners) {
2918             try {
2919                 listener.writeTransactionStarted();
2920             } catch (Throwable t) {
2921                 t.printStackTrace();
2922             }
2923         }
2924
2925     }
2926
2927     public void fireFinishWriteTransaction() {
2928
2929         if (DIAGNOSTICS)
2930             System.out.println("FinishWriteTransaction");
2931
2932         for (SessionEventListener listener : eventListeners) {
2933             try {
2934                 listener.writeTransactionFinished();
2935             } catch (Throwable t) {
2936                 t.printStackTrace();
2937             }
2938         }
2939
2940         Indexing.resetDependenciesIndexingDisabled();
2941
2942     }
2943
2944     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2945         throw new Error("Not supported at the moment.");
2946     }
2947
2948     State getState() {
2949         return state;
2950     }
2951
2952     ClusterTable getClusterTable() {
2953         return clusterTable;
2954     }
2955
2956     public GraphSession getGraphSession() {
2957         return graphSession;
2958     }
2959
2960     public QueryProcessor getQueryProvider2() {
2961         return queryProvider2;
2962     }
2963
2964     ClientChangesImpl getClientChanges() {
2965         return clientChanges;
2966     }
2967
2968     boolean getWriteOnly() {
2969         return writeOnly;
2970     }
2971
2972     static int counter = 0;
2973
2974     public void onClusterLoaded(long clusterId) {
2975
2976         clusterTable.updateSize();
2977
2978     }
2979
2980
2981
2982
2983     /*
2984      * Implementation of the interface RequestProcessor
2985      */
2986
2987     @Override
2988     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2989
2990         assertNotSession();
2991         assertAlive();
2992
2993         assert(request != null);
2994
2995         final DataContainer<T> result = new DataContainer<T>();
2996         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2997
2998         syncRequest(request, new AsyncProcedure<T>() {
2999
3000             @Override
3001             public void execute(AsyncReadGraph graph, T t) {
3002                 result.set(t);
3003             }
3004
3005             @Override
3006             public void exception(AsyncReadGraph graph, Throwable t) {
3007                 exception.set(t);
3008             }
3009
3010         });
3011
3012         Throwable t = exception.get();
3013         if(t != null) {
3014             if(t instanceof DatabaseException) throw (DatabaseException)t;
3015             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3016         }
3017
3018         return result.get();
3019
3020     }
3021
3022 //    @Override
3023 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
3024 //        assertNotSession();
3025 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
3026 //    }
3027
3028     @Override
3029     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
3030         assertNotSession();
3031         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3032     }
3033
3034     @Override
3035     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
3036         assertNotSession();
3037         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3038     }
3039
3040     @Override
3041     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3042         assertNotSession();
3043         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3044     }
3045
3046     @Override
3047     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
3048         assertNotSession();
3049         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3050     }
3051
3052     @Override
3053     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3054
3055         assertNotSession();
3056
3057         assert(request != null);
3058
3059         final DataContainer<T> result = new DataContainer<T>();
3060         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3061
3062         syncRequest(request, new AsyncProcedure<T>() {
3063
3064             @Override
3065             public void execute(AsyncReadGraph graph, T t) {
3066                 result.set(t);
3067             }
3068
3069             @Override
3070             public void exception(AsyncReadGraph graph, Throwable t) {
3071                 exception.set(t);
3072             }
3073
3074         });
3075
3076         Throwable t = exception.get();
3077         if(t != null) {
3078             if(t instanceof DatabaseException) throw (DatabaseException)t;
3079             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3080         }
3081
3082         return result.get();
3083
3084     }
3085
3086     @Override
3087     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3088         assertNotSession();
3089         return syncRequest(request, (AsyncProcedure<T>)procedure);
3090     }
3091
3092     @Override
3093     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3094         assertNotSession();
3095         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3096     }
3097
3098     @Override
3099     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3100         assertNotSession();
3101         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3102     }
3103
3104     @Override
3105     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3106         assertNotSession();
3107         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3108     }
3109
3110     @Override
3111     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3112         assertNotSession();
3113         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3114     }
3115
3116     @Override
3117     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3118
3119         assertNotSession();
3120
3121         assert(request != null);
3122
3123         final ArrayList<T> result = new ArrayList<T>();
3124         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3125
3126         syncRequest(request, new SyncMultiProcedure<T>() {
3127
3128             @Override
3129             public void execute(ReadGraph graph, T t) {
3130                 synchronized(result) {
3131                     result.add(t);
3132                 }
3133             }
3134
3135             @Override
3136             public void finished(ReadGraph graph) {
3137             }
3138
3139             @Override
3140             public void exception(ReadGraph graph, Throwable t) {
3141                 exception.set(t);
3142             }
3143
3144
3145         });
3146
3147         Throwable t = exception.get();
3148         if(t != null) {
3149             if(t instanceof DatabaseException) throw (DatabaseException)t;
3150             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3151         }
3152
3153         return result;
3154
3155     }
3156
3157     @Override
3158     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
3159         assertNotSession();
3160         throw new Error("Not implemented!");
3161     }
3162
3163     @Override
3164     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3165         assertNotSession();
3166         return syncRequest(request, (SyncMultiProcedure<T>)procedure);
3167     }
3168
3169     @Override
3170     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3171         assertNotSession();
3172         return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3173     }
3174
3175     @Override
3176     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3177         assertNotSession();
3178         return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3179     }
3180
3181     @Override
3182     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3183
3184         assertNotSession();
3185
3186         assert(request != null);
3187
3188         final ArrayList<T> result = new ArrayList<T>();
3189         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3190
3191         syncRequest(request, new AsyncMultiProcedure<T>() {
3192
3193             @Override
3194             public void execute(AsyncReadGraph graph, T t) {
3195                 synchronized(result) {
3196                     result.add(t);
3197                 }
3198             }
3199
3200             @Override
3201             public void finished(AsyncReadGraph graph) {
3202             }
3203
3204             @Override
3205             public void exception(AsyncReadGraph graph, Throwable t) {
3206                 exception.set(t);
3207             }
3208
3209
3210         });
3211
3212         Throwable t = exception.get();
3213         if(t != null) {
3214             if(t instanceof DatabaseException) throw (DatabaseException)t;
3215             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3216         }
3217
3218         return result;
3219
3220     }
3221
3222     @Override
3223     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3224         assertNotSession();
3225         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3226     }
3227
3228     @Override
3229     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3230         assertNotSession();
3231         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3232     }
3233
3234     @Override
3235     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3236         assertNotSession();
3237        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3238     }
3239
3240     @Override
3241     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3242         assertNotSession();
3243         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3244     }
3245
3246     @Override
3247     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3248         assertNotSession();
3249         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3250     }
3251
3252
3253     @Override
3254     public <T> void asyncRequest(Read<T> request) {
3255
3256         asyncRequest(request, new ProcedureAdapter<T>() {
3257             @Override
3258             public void exception(Throwable t) {
3259                 t.printStackTrace();
3260             }
3261         });
3262
3263     }
3264
3265     @Override
3266     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3267         asyncRequest(request, (AsyncProcedure<T>)procedure);
3268     }
3269
3270     @Override
3271     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3272         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3273     }
3274
3275     @Override
3276     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3277         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3278     }
3279
3280     @Override
3281     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3282         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3283     }
3284
3285     @Override
3286     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3287         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3288     }
3289
3290     @Override
3291     final public <T> void asyncRequest(final AsyncRead<T> request) {
3292
3293         assert(request != null);
3294
3295         asyncRequest(request, new ProcedureAdapter<T>() {
3296             @Override
3297             public void exception(Throwable t) {
3298                 t.printStackTrace();
3299             }
3300         });
3301
3302     }
3303
3304     @Override
3305     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3306         scheduleRequest(request, procedure, procedure, null);
3307     }
3308
3309     @Override
3310     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3311         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3312     }
3313
3314     @Override
3315     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3316         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3317     }
3318
3319     @Override
3320     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3321         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3322     }
3323
3324     @Override
3325     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3326         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3327     }
3328
3329     @Override
3330     public <T> void asyncRequest(MultiRead<T> request) {
3331
3332         assert(request != null);
3333
3334         asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
3335             @Override
3336             public void exception(ReadGraph graph, Throwable t) {
3337                 t.printStackTrace();
3338             }
3339         });
3340
3341     }
3342
3343     @Override
3344     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3345         asyncRequest(request, (SyncMultiProcedure<T>)procedure);
3346     }
3347
3348     @Override
3349     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3350         asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3351     }
3352
3353     @Override
3354     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3355         scheduleRequest(request, procedure, null);
3356     }
3357
3358     @Override
3359     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3360         asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3361     }
3362
3363     @Override
3364     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3365
3366         assert(request != null);
3367
3368         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3369             @Override
3370             public void exception(AsyncReadGraph graph, Throwable t) {
3371                 t.printStackTrace();
3372             }
3373         });
3374
3375     }
3376
3377     @Override
3378     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3379         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3380     }
3381
3382     @Override
3383     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3384         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3385     }
3386
3387     @Override
3388     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3389         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3390     }
3391
3392     @Override
3393     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3394         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3395     }
3396
3397     @Override
3398     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3399         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3400     }
3401
3402     @Override
3403     final public <T> void asyncRequest(final ExternalRead<T> request) {
3404
3405         assert(request != null);
3406
3407         asyncRequest(request, new ProcedureAdapter<T>() {
3408             @Override
3409             public void exception(Throwable t) {
3410                 t.printStackTrace();
3411             }
3412         });
3413
3414     }
3415
3416     @Override
3417     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3418         asyncRequest(request, (Procedure<T>)procedure);
3419     }
3420
3421
3422
3423     void check(Throwable t) throws DatabaseException {
3424         if(t != null) {
3425             if(t instanceof DatabaseException) throw (DatabaseException)t;
3426             else throw new DatabaseException("Unexpected exception", t);
3427         }
3428     }
3429
3430     void check(DataContainer<Throwable> container) throws DatabaseException {
3431         Throwable t = container.get();
3432         if(t != null) {
3433             if(t instanceof DatabaseException) throw (DatabaseException)t;
3434             else throw new DatabaseException("Unexpected exception", t);
3435         }
3436     }
3437
3438
3439
3440
3441
3442
3443     boolean sameProvider(Write request) {
3444         if(writeState.getGraph().provider != null) {
3445             return writeState.getGraph().provider.equals(request.getProvider());
3446         } else {
3447             return request.getProvider() == null;
3448         }
3449     }
3450
3451     boolean plainWrite(WriteGraphImpl graph) {
3452         if(graph == null) return false;
3453         if(graph.writeSupport.writeOnly()) return false;
3454         return true;
3455     }
3456
3457
3458     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3459     private void assertNotSession() throws DatabaseException {
3460         Thread current = Thread.currentThread();
3461         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3462     }
3463
3464     void assertAlive() {
3465         if (!state.isAlive())
3466             throw new RuntimeDatabaseException("Session has been shut down.");
3467     }
3468
3469     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3470         return querySupport.getValueStream(graph, querySupport.getId(resource));
3471     }
3472
3473     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3474         return querySupport.getValue(graph, querySupport.getId(resource));
3475     }
3476
3477     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3478         acquire(semaphore, request, null);
3479     }
3480
3481     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3482         assertAlive();
3483         try {
3484             long tryCount = 0;
3485             // Loop until the semaphore is acquired reporting requests that take a long time.
3486             while (true) {
3487
3488                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3489                     // Acquired OK.
3490                     return;
3491                 } else {
3492                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3493                         ++tryCount;
3494                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3495                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3496                                 * tryCount;
3497                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3498                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3499                     }
3500                 }
3501
3502                 assertAlive();
3503
3504             }
3505         } catch (InterruptedException e) {
3506             e.printStackTrace();
3507             // FIXME: Should perhaps do something else in this case ??
3508         }
3509     }
3510
3511
3512
3513 //    @Override
3514 //    public boolean holdOnToTransactionAfterCancel() {
3515 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3516 //    }
3517 //
3518 //    @Override
3519 //    public boolean holdOnToTransactionAfterCommit() {
3520 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3521 //    }
3522 //
3523 //    @Override
3524 //    public boolean holdOnToTransactionAfterRead() {
3525 //        return transactionPolicy.holdOnToTransactionAfterRead();
3526 //    }
3527 //
3528 //    @Override
3529 //    public void onRelinquish() {
3530 //        transactionPolicy.onRelinquish();
3531 //    }
3532 //
3533 //    @Override
3534 //    public void onRelinquishDone() {
3535 //        transactionPolicy.onRelinquishDone();
3536 //    }
3537 //
3538 //    @Override
3539 //    public void onRelinquishError() {
3540 //        transactionPolicy.onRelinquishError();
3541 //    }
3542
3543
3544     @Override
3545     public Session getSession() {
3546         return this;
3547     }
3548
3549
3550     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3551     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3552
3553     public void ceased(int thread) {
3554
3555         requestManager.ceased(thread);
3556
3557     }
3558
3559     public int getAmountOfQueryThreads() {
3560         // This must be a power of two
3561         return 1;
3562 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3563     }
3564
3565     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3566
3567         return new ResourceImpl(resourceSupport, key);
3568
3569     }
3570
3571     public void acquireWriteOnly() {
3572         writeOnly = true;
3573     }
3574
3575     public void releaseWriteOnly(ReadGraphImpl graph) {
3576         writeOnly = false;
3577         queryProvider2.releaseWrite(graph);
3578     }
3579
3580     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3581
3582         long start = System.nanoTime();
3583
3584         while(dirtyPrimitives) {
3585             dirtyPrimitives = false;
3586             getQueryProvider2().performDirtyUpdates(writer);
3587             getQueryProvider2().performScheduledUpdates(writer);
3588         }
3589
3590         fireMetadataListeners(writer, clientChanges);
3591
3592         if(DebugPolicy.PERFORMANCE_DATA) {
3593             long end = System.nanoTime();
3594             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3595         }
3596
3597     }
3598
3599     void removeTemporaryData() {
3600         File platform = Platform.getLocation().toFile();
3601         File tempFiles = new File(platform, "tempFiles");
3602         File temp = new File(tempFiles, "db");
3603         if(!temp.exists()) 
3604             return;
3605         try {
3606             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3607                 @Override
3608                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3609                     try {
3610                         Files.delete(file);
3611                     } catch (IOException e) {
3612                         Logger.defaultLogError(e);
3613                     }
3614                     return FileVisitResult.CONTINUE;
3615                 }
3616             });
3617         } catch (IOException e) {
3618             Logger.defaultLogError(e);
3619         }
3620     }
3621
3622     public void handleCreatedClusters() {
3623         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3624             createdClusters.forEach(new TLongProcedure() {
3625
3626                 @Override
3627                 public boolean execute(long value) {
3628                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3629                     cluster.setImmutable(true, clusterTranslator);
3630                     return true;
3631                 }
3632             });
3633         }
3634         createdClusters.clear();
3635     }
3636
3637     @Override
3638     public Object getModificationCounter() {
3639         return queryProvider2.modificationCounter;
3640     }
3641     
3642     @Override
3643     public void markUndoPoint() {
3644         state.setCombine(false);
3645     }
3646
3647 }