]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Working towards multiple readers.
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.GraphChangeListenerSupport;
147 import org.simantics.db.service.InitSupport;
148 import org.simantics.db.service.LifecycleSupport;
149 import org.simantics.db.service.ManagementSupport;
150 import org.simantics.db.service.QueryControl;
151 import org.simantics.db.service.SerialisationSupport;
152 import org.simantics.db.service.ServerInformation;
153 import org.simantics.db.service.ServiceActivityMonitor;
154 import org.simantics.db.service.SessionEventSupport;
155 import org.simantics.db.service.SessionMonitorSupport;
156 import org.simantics.db.service.SessionUserSupport;
157 import org.simantics.db.service.StatementSupport;
158 import org.simantics.db.service.TransactionPolicySupport;
159 import org.simantics.db.service.TransactionSupport;
160 import org.simantics.db.service.TransferableGraphSupport;
161 import org.simantics.db.service.UndoRedoSupport;
162 import org.simantics.db.service.VirtualGraphSupport;
163 import org.simantics.db.service.XSupport;
164 import org.simantics.layer0.Layer0;
165 import org.simantics.utils.DataContainer;
166 import org.simantics.utils.Development;
167 import org.simantics.utils.threads.logger.ITask;
168 import org.simantics.utils.threads.logger.ThreadLogger;
169
170 import gnu.trove.procedure.TLongProcedure;
171 import gnu.trove.set.hash.TLongHashSet;
172
173
174 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
175
176     protected static final boolean DEBUG        = false;
177
178     private static final boolean DIAGNOSTICS       = false;
179
180     private TransactionPolicySupport transactionPolicy;
181
182     final private ClusterControl clusterControl;
183
184     final protected BuiltinSupportImpl builtinSupport;
185     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
186     final protected ClusterSetsSupport clusterSetsSupport;
187     final protected LifecycleSupportImpl lifecycleSupport;
188
189     protected QuerySupportImpl querySupport;
190     protected ResourceSupportImpl resourceSupport;
191     protected WriteSupport writeSupport;
192     public ClusterTranslator clusterTranslator;
193
194     boolean dirtyPrimitives = false;
195
196     public static final int SERVICE_MODE_CREATE = 2;
197     public static final int SERVICE_MODE_ALLOW = 1;
198     public int serviceMode = 0;
199     public boolean createdImmutableClusters = false;
200     public TLongHashSet createdClusters = new TLongHashSet();
201
202     private Layer0 L0;
203
204     /**
205      * The service locator maintained by the workbench. These services are
206      * initialized during workbench during the <code>init</code> method.
207      */
208     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
209
210     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
211
212     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
213
214     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
215
216     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
217
218     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
219
220     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
221
222     final protected State                                        state              = new State();
223
224     protected GraphSession                                       graphSession       = null;
225
226     protected SessionManager                                     sessionManagerImpl = null;
227
228     protected UserAuthenticationAgent                            authAgent          = null;
229
230     protected UserAuthenticator                                  authenticator      = null;
231
232     protected Resource                                           user               = null;
233
234     protected ClusterStream                                      clusterStream      = null;
235
236     protected SessionRequestManager                         requestManager = null;
237
238     public ClusterTable                                       clusterTable       = null;
239
240     public QueryProcessor                                     queryProvider2 = null;
241
242     ClientChangesImpl                                  clientChanges      = null;
243
244     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
245
246 //    protected long                                               newClusterId       = Constants.NullClusterId;
247
248     protected int                                                flushCounter       = 0;
249
250     protected boolean                                            writeOnly          = false;
251
252     WriteState<?>                                                writeState = null;
253     WriteStateBase<?>                                            delayedWriteState = null;
254     protected Resource defaultClusterSet = null; // If not null then used for newResource().
255
256     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
257
258 //        if (authAgent == null)
259 //            throw new IllegalArgumentException("null authentication agent");
260
261         File t = StaticSessionProperties.virtualGraphStoragePath;
262         if (null == t)
263             t = new File(".");
264         this.clusterTable = new ClusterTable(this, t);
265         this.builtinSupport = new BuiltinSupportImpl(this);
266         this.sessionManagerImpl = sessionManagerImpl;
267         this.user = null;
268 //        this.authAgent = authAgent;
269
270         serviceLocator.registerService(Session.class, this);
271         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
272         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
273         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
274         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
275         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
276         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
277         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
278         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
279         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
280         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
281         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
282         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
283         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
284         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
285         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
286         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
287         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
288         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
289         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
290         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
291         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
292         ServiceActivityUpdaterForWriteTransactions.register(this);
293
294         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
295         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
296         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
297         this.lifecycleSupport = new LifecycleSupportImpl(this);
298         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
299         this.transactionPolicy = new TransactionPolicyRelease();
300         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
301         this.clusterControl = new ClusterControlImpl(this);
302         serviceLocator.registerService(ClusterControl.class, clusterControl);
303         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
304         this.clusterSetsSupport.setReadDirectory(t.toPath());
305         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
306         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
307     }
308
309     /*
310      *
311      * Session interface
312      */
313
314
315     @Override
316     public Resource getRootLibrary() {
317         return queryProvider2.getRootLibraryResource();
318     }
319
320     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
321         if (!graphSession.dbSession.refreshEnabled())
322             return;
323         try {
324             getClusterTable().refresh(csid, this, clusterUID);
325         } catch (Throwable t) {
326             Logger.defaultLogError("Refesh failed.", t);
327         }
328     }
329
330     static final class TaskHelper {
331         private final String name;
332         private Object result;
333         final Semaphore sema = new Semaphore(0);
334         private Throwable throwable = null;
335         final Consumer<DatabaseException> callback = e -> {
336             synchronized (TaskHelper.this) {
337                 throwable = e;
338             }
339         };
340         final Procedure<Object> proc = new Procedure<Object>() {
341             @Override
342             public void execute(Object result) {
343                 callback.accept(null);
344             }
345             @Override
346             public void exception(Throwable t) {
347                 if (t instanceof DatabaseException)
348                     callback.accept((DatabaseException)t);
349                 else
350                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
351             }
352         };
353         final WriteTraits writeTraits = new WriteTraits() {};
354         TaskHelper(String name) {
355             this.name = name;
356         }
357         @SuppressWarnings("unchecked")
358         <T> T getResult() {
359             return (T)result;
360         }
361         void setResult(Object result) {
362             this.result = result;
363         }
364 //        Throwable throwableGet() {
365 //            return throwable;
366 //        }
367         synchronized void throwableSet(Throwable t) {
368             throwable = t;
369         }
370 //        void throwableSet(String t) {
371 //            throwable = new InternalException("" + name + " operation failed. " + t);
372 //        }
373         synchronized void throwableCheck()
374         throws DatabaseException {
375             if (null != throwable)
376                 if (throwable instanceof DatabaseException)
377                     throw (DatabaseException)throwable;
378                 else
379                     throw new DatabaseException("Undo operation failed.", throwable);
380         }
381         void throw_(String message)
382         throws DatabaseException {
383             throw new DatabaseException("" + name + " operation failed. " + message);
384         }
385     }
386
387
388
389     private ListenerBase getListenerBase(Object procedure) {
390         if (procedure instanceof ListenerBase)
391             return (ListenerBase) procedure;
392         else
393             return null;
394     }
395
396     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
397         scheduleRequest(request, callback, notify, null);
398     }
399
400     /* (non-Javadoc)
401      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
402      */
403     @Override
404     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
405
406         assert (request != null);
407
408         if(Development.DEVELOPMENT) {
409             try {
410             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
411                 System.err.println("schedule write '" + request + "'");
412             } catch (Throwable t) {
413                 Logger.defaultLogError(t);
414             }
415         }
416
417         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
418
419         requestManager.scheduleWrite(new SessionTask(true) {
420
421             @Override
422             public void run(int thread) {
423
424                 if(Development.DEVELOPMENT) {
425                     try {
426                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
427                         System.err.println("perform write '" + request + "'");
428                     } catch (Throwable t) {
429                         Logger.defaultLogError(t);
430                     }
431                 }
432
433                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
434
435                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
436
437                 try {
438
439                     flushCounter = 0;
440                     Disposable.safeDispose(clientChanges);
441                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
442
443                     VirtualGraph vg = getProvider(request.getProvider());
444
445                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
446                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
447
448                         @Override
449                         public void execute(Object result) {
450                             if(callback != null) callback.accept(null);
451                         }
452
453                         @Override
454                         public void exception(Throwable t) {
455                             if(callback != null) callback.accept((DatabaseException)t);
456                         }
457
458                     });
459
460                     assert (null != writer);
461 //                    writer.state.barrier.inc();
462
463                     try {
464                         request.perform(writer);
465                         assert (null != writer);
466                     } catch (Throwable t) {
467                         if (!(t instanceof CancelTransactionException))
468                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
469                         writeState.except(t);
470                     } finally {
471 //                        writer.state.barrier.dec();
472 //                        writer.waitAsync(request);
473                     }
474
475
476                     assert(!queryProvider2.cache.dirty);
477
478                 } catch (Throwable e) {
479
480                     // Log it first, just to be safe that the error is always logged.
481                     if (!(e instanceof CancelTransactionException))
482                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
483
484 //                    writeState.getGraph().state.barrier.dec();
485 //                    writeState.getGraph().waitAsync(request);
486
487                     writeState.except(e);
488
489 //                    try {
490 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
491 //                        // All we can do here is to log those, can't really pass them anywhere.
492 //                        if (callback != null) {
493 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
494 //                            else callback.run(new DatabaseException(e));
495 //                        }
496 //                    } catch (Throwable e2) {
497 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
498 //                    }
499 //                    boolean empty = clusterStream.reallyFlush();
500 //                    int callerThread = -1;
501 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
502 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
503 //                    try {
504 //                        // Send all local changes to server so it can calculate correct reverse change set.
505 //                        // This will call clusterStream.accept().
506 //                        state.cancelCommit(context, clusterStream);
507 //                        if (!empty) {
508 //                            if (!context.isOk()) // this is a blocking operation
509 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
510 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
511 //                        }
512 //                        state.cancelCommit2(context, clusterStream);
513 //                    } catch (DatabaseException e3) {
514 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
515 //                    } finally {
516 //                        clusterStream.setOff(false);
517 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
518 //                    }
519
520                 } finally {
521
522                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
523
524                 }
525
526                 task.finish();
527
528                 if(Development.DEVELOPMENT) {
529                     try {
530                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
531                         System.err.println("finish write '" + request + "'");
532                     } catch (Throwable t) {
533                         Logger.defaultLogError(t);
534                     }
535                 }
536
537             }
538
539         }, combine);
540
541     }
542
543     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
544         scheduleRequest(request, procedure, notify, null);
545     }
546
547     /* (non-Javadoc)
548      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
549      */
550     @Override
551     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
552
553         assert (request != null);
554
555         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
556
557         requestManager.scheduleWrite(new SessionTask(true) {
558
559             @Override
560             public void run(int thread) {
561
562                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
563
564                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
565
566                 flushCounter = 0;
567                 Disposable.safeDispose(clientChanges);
568                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
569
570                 VirtualGraph vg = getProvider(request.getProvider());
571                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
572
573                 try {
574                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
575                     writeState = writeStateT;
576
577                     assert (null != writer);
578 //                    writer.state.barrier.inc();
579                     writeStateT.setResult(request.perform(writer));
580                     assert (null != writer);
581
582 //                    writer.state.barrier.dec();
583 //                    writer.waitAsync(null);
584
585                 } catch (Throwable e) {
586
587 //                    writer.state.barrier.dec();
588 //                    writer.waitAsync(null);
589
590                     writeState.except(e);
591
592 //                  state.stopWriteTransaction(clusterStream);
593 //
594 //              } catch (Throwable e) {
595 //                  // Log it first, just to be safe that the error is always logged.
596 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
597 //
598 //                  try {
599 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
600 //                      // All we can do here is to log those, can't really pass them anywhere.
601 //                      if (procedure != null) {
602 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
603 //                          else procedure.exception(new DatabaseException(e));
604 //                      }
605 //                  } catch (Throwable e2) {
606 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
607 //                  }
608 //
609 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
610 //
611 //                  state.stopWriteTransaction(clusterStream);
612
613                 } finally {
614                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
615                 }
616
617 //              if(notify != null) notify.release();
618
619                 task.finish();
620
621             }
622
623         }, combine);
624
625     }
626
627     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
628         scheduleRequest(request, callback, notify, null);
629     }
630
631     /* (non-Javadoc)
632      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
633      */
634     @Override
635     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
636
637         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
638
639         assert (request != null);
640
641         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
642
643         requestManager.scheduleWrite(new SessionTask(true) {
644
645             @Override
646             public void run(int thread) {
647                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
648
649                 Procedure<Object> stateProcedure = new Procedure<Object>() {
650                     @Override
651                     public void execute(Object result) {
652                         if (callback != null)
653                             callback.accept(null);
654                     }
655                     @Override
656                     public void exception(Throwable t) {
657                         if (callback != null) {
658                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
659                             else callback.accept(new DatabaseException(t));
660                         } else
661                             Logger.defaultLogError("Unhandled exception", t);
662                     }
663                 };
664
665                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
666                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
667                 DelayedWriteGraph dwg = null;
668 //                newGraph.state.barrier.inc();
669
670                 try {
671                     dwg = new DelayedWriteGraph(newGraph);
672                     request.perform(dwg);
673                 } catch (Throwable e) {
674                     delayedWriteState.except(e);
675                     total.finish();
676                     dwg.close();
677                     return;
678                 } finally {
679 //                    newGraph.state.barrier.dec();
680 //                    newGraph.waitAsync(request);
681                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
682                 }
683
684                 delayedWriteState = null;
685
686                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
687                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
688
689                 flushCounter = 0;
690                 Disposable.safeDispose(clientChanges);
691                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
692
693                 acquireWriteOnly();
694
695                 VirtualGraph vg = getProvider(request.getProvider());
696                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
697
698                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
699
700                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
701
702                 assert (null != writer);
703 //                writer.state.barrier.inc();
704
705                 try {
706                     // Cannot cancel
707                     dwg.commit(writer, request);
708
709                         if(defaultClusterSet != null) {
710                                 XSupport xs = getService(XSupport.class);
711                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
712                                 ClusteringSupport cs = getService(ClusteringSupport.class);
713                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
714                         }
715
716                     // This makes clusters available from server
717                     clusterStream.reallyFlush();
718
719                     releaseWriteOnly(writer);
720                     handleUpdatesAndMetadata(writer);
721
722                 } catch (ServiceException e) {
723 //                    writer.state.barrier.dec();
724 //                    writer.waitAsync(null);
725
726                     // These shall be requested from server
727                     clusterTable.removeWriteOnlyClusters();
728                     // This makes clusters available from server
729                     clusterStream.reallyFlush();
730
731                     releaseWriteOnly(writer);
732                     writeState.except(e);
733                 } finally {
734                     // Debugging & Profiling
735                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
736                     task2.finish();
737                     total.finish();
738                 }
739             }
740
741         }, combine);
742
743     }
744
745     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
746         scheduleRequest(request, procedure, notify, null);
747     }
748
749     /* (non-Javadoc)
750      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
751      */
752     @Override
753     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
754         throw new Error("Not implemented");
755     }
756
757     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
758         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
759         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
760             createdClusters.add(cluster.clusterId);
761         }
762         return cluster;
763     }
764
765     class WriteOnlySupport implements WriteSupport {
766
767         ClusterStream stream;
768         ClusterImpl currentCluster;
769
770         public WriteOnlySupport() {
771             this.stream = clusterStream;
772         }
773
774         @Override
775         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
776             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
777         }
778
779         @Override
780         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
781
782             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
783
784             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
785                 if(s != queryProvider2.getRootLibrary())
786                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
787
788             try {
789                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
790             } catch (DatabaseException e) {
791                 Logger.defaultLogError(e);
792                 //throw e;
793             }
794
795             clientChanges.invalidate(s);
796
797             if (cluster.isWriteOnly())
798                 return;
799             queryProvider2.updateStatements(s, p);
800
801         }
802
803         @Override
804         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
805             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
806         }
807
808         @Override
809         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
810
811             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
812             try {
813                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
814             } catch (DatabaseException e) {
815                 Logger.defaultLogError(e);
816             }
817
818             clientChanges.invalidate(rid);
819
820             if (cluster.isWriteOnly())
821                 return;
822             queryProvider2.updateValue(rid);
823
824         }
825
826         @Override
827         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
828
829             if(amount < 65536) {
830                 claimValue(provider,resource, reader.readBytes(null, amount));
831                 return;
832             }
833
834             byte[] bytes = new byte[65536];
835
836             int rid = ((ResourceImpl)resource).id;
837             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
838             try {
839                 int left = amount;
840                 while(left > 0) {
841                     int block = Math.min(left, 65536);
842                     reader.readBytes(bytes, block);
843                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
844                     left -= block;
845                 }
846             } catch (DatabaseException e) {
847                 Logger.defaultLogError(e);
848             }
849
850             clientChanges.invalidate(rid);
851
852             if (cluster.isWriteOnly())
853                 return;
854             queryProvider2.updateValue(rid);
855
856         }
857
858         private void maintainCluster(ClusterImpl before, ClusterI after_) {
859             if(after_ != null && after_ != before) {
860                 ClusterImpl after = (ClusterImpl)after_;
861                 if(currentCluster == before) {
862                     currentCluster = after;
863                 }
864                 clusterTable.replaceCluster(after);
865             }
866         }
867
868         public int createResourceKey(int foreignCounter) throws DatabaseException {
869             if(currentCluster == null) {
870                 currentCluster = getNewResourceCluster();
871             }
872             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
873                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
874                 newCluster.foreignLookup = new byte[foreignCounter];
875                 currentCluster = newCluster;
876                 if (DEBUG)
877                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
878             }
879             return currentCluster.createResource(clusterTranslator);
880         }
881
882         @Override
883         public Resource createResource(VirtualGraph provider) throws DatabaseException {
884             if(currentCluster == null) {
885                 if (null != defaultClusterSet) {
886                         ResourceImpl result = getNewResource(defaultClusterSet);
887                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
888                     return result;
889                 } else {
890                     currentCluster = getNewResourceCluster();
891                 }
892             }
893             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
894                 if (null != defaultClusterSet) {
895                         ResourceImpl result = getNewResource(defaultClusterSet);
896                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
897                     return result;
898                 } else {
899                     currentCluster = getNewResourceCluster();
900                 }
901             }
902             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
903         }
904
905         @Override
906         public Resource createResource(VirtualGraph provider, long clusterId)
907         throws DatabaseException {
908             return getNewResource(clusterId);
909         }
910
911         @Override
912         public Resource createResource(VirtualGraph provider, Resource clusterSet)
913         throws DatabaseException {
914             return getNewResource(clusterSet);
915         }
916
917         @Override
918         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
919                 throws DatabaseException {
920             getNewClusterSet(clusterSet);
921         }
922
923         @Override
924         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
925         throws ServiceException {
926             return containsClusterSet(clusterSet);
927         }
928
929         public void selectCluster(long cluster) {
930                 currentCluster = clusterTable.getClusterByClusterId(cluster);
931                 long setResourceId = clusterSetsSupport.getSet(cluster);
932                 clusterSetsSupport.put(setResourceId, cluster);
933         }
934
935         @Override
936         public Resource setDefaultClusterSet(Resource clusterSet)
937         throws ServiceException {
938                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
939                 if(clusterSet != null) {
940                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
941                         currentCluster = clusterTable.getClusterByClusterId(id);
942                         return result;
943                 } else {
944                         currentCluster = null;
945                         return null;
946                 }
947         }
948
949         @Override
950         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
951
952                  provider = getProvider(provider);
953              if (null == provider) {
954                  int key = ((ResourceImpl)resource).id;
955
956
957
958                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
959 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
960 //                      if(key != queryProvider2.getRootLibrary())
961 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
962
963 //                 try {
964 //                     cluster.removeValue(key, clusterTranslator);
965 //                 } catch (DatabaseException e) {
966 //                     Logger.defaultLogError(e);
967 //                     return;
968 //                 }
969
970                  clusterTable.writeOnlyInvalidate(cluster);
971
972                  try {
973                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
974                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
975                          clusterTranslator.removeValue(cluster);
976                  } catch (DatabaseException e) {
977                          Logger.defaultLogError(e);
978                  }
979
980                  queryProvider2.invalidateResource(key);
981                  clientChanges.invalidate(key);
982
983              } else {
984                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
985                  queryProvider2.updateValue(querySupport.getId(resource));
986                  clientChanges.claimValue(resource);
987              }
988
989
990         }
991
992         @Override
993         public void flush(boolean intermediate) {
994             throw new UnsupportedOperationException();
995         }
996
997         @Override
998         public void flushCluster() {
999             clusterTable.flushCluster(graphSession);
1000             if(defaultClusterSet != null) {
1001                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
1002             }
1003             currentCluster = null;
1004         }
1005
1006         @Override
1007         public void flushCluster(Resource r) {
1008             throw new UnsupportedOperationException("flushCluster resource " + r);
1009         }
1010
1011         @Override
1012         public void gc() {
1013         }
1014
1015         @Override
1016         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1017                 Resource object) {
1018
1019                 int s = ((ResourceImpl)subject).id;
1020                 int p = ((ResourceImpl)predicate).id;
1021                 int o = ((ResourceImpl)object).id;
1022
1023                 provider = getProvider(provider);
1024                 if (null == provider) {
1025
1026                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1027                         clusterTable.writeOnlyInvalidate(cluster);
1028
1029                         try {
1030
1031                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1032
1033                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1034                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1035
1036                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1037                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1038                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1039                                 clusterTranslator.removeStatement(cluster);
1040
1041                                 queryProvider2.invalidateResource(s);
1042                                 clientChanges.invalidate(s);
1043
1044                         } catch (DatabaseException e) {
1045
1046                                 Logger.defaultLogError(e);
1047
1048                         }
1049
1050                         return true;
1051
1052                 } else {
1053                         
1054                         ((VirtualGraphImpl)provider).deny(s, p, o);
1055                         queryProvider2.invalidateResource(s);
1056                         clientChanges.invalidate(s);
1057
1058                         return true;
1059
1060                 }
1061
1062         }
1063
1064         @Override
1065         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1066             throw new UnsupportedOperationException();
1067         }
1068
1069         @Override
1070         public boolean writeOnly() {
1071             return true;
1072         }
1073
1074         @Override
1075         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1076             writeSupport.performWriteRequest(graph, request);
1077         }
1078
1079         @Override
1080         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1081             throw new UnsupportedOperationException();
1082         }
1083
1084         @Override
1085         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1086             throw new UnsupportedOperationException();
1087         }
1088
1089         @Override
1090         public <T> void addMetadata(Metadata data) throws ServiceException {
1091             writeSupport.addMetadata(data);
1092         }
1093
1094         @Override
1095         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1096             return writeSupport.getMetadata(clazz);
1097         }
1098
1099         @Override
1100         public TreeMap<String, byte[]> getMetadata() {
1101             return writeSupport.getMetadata();
1102         }
1103
1104         @Override
1105         public void commitDone(WriteTraits writeTraits, long csid) {
1106             writeSupport.commitDone(writeTraits, csid);
1107         }
1108
1109         @Override
1110         public void clearUndoList(WriteTraits writeTraits) {
1111             writeSupport.clearUndoList(writeTraits);
1112         }
1113         @Override
1114         public int clearMetadata() {
1115             return writeSupport.clearMetadata();
1116         }
1117
1118                 @Override
1119                 public void startUndo() {
1120                         writeSupport.startUndo();
1121                 }
1122     }
1123
1124     class VirtualWriteOnlySupport implements WriteSupport {
1125
1126 //        @Override
1127 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1128 //                Resource object) {
1129 //            throw new UnsupportedOperationException();
1130 //        }
1131
1132         @Override
1133         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1134
1135             TransientGraph impl = (TransientGraph)provider;
1136             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1137             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1138             clientChanges.claim(subject, predicate, object);
1139
1140         }
1141
1142         @Override
1143         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1144
1145             TransientGraph impl = (TransientGraph)provider;
1146             impl.claim(subject, predicate, object);
1147             getQueryProvider2().updateStatements(subject, predicate);
1148             clientChanges.claim(subject, predicate, object);
1149
1150         }
1151
1152         @Override
1153         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1154             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1155         }
1156
1157         @Override
1158         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1159             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1160             getQueryProvider2().updateValue(resource);
1161             clientChanges.claimValue(resource);
1162         }
1163
1164         @Override
1165         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1166             byte[] value = reader.readBytes(null, amount);
1167             claimValue(provider, resource, value);
1168         }
1169
1170         @Override
1171         public Resource createResource(VirtualGraph provider) {
1172             TransientGraph impl = (TransientGraph)provider;
1173             return impl.getResource(impl.newResource(false));
1174         }
1175
1176         @Override
1177         public Resource createResource(VirtualGraph provider, long clusterId) {
1178             throw new UnsupportedOperationException();
1179         }
1180
1181         @Override
1182         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1183         throws DatabaseException {
1184             throw new UnsupportedOperationException();
1185         }
1186
1187         @Override
1188         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1189         throws DatabaseException {
1190             throw new UnsupportedOperationException();
1191         }
1192
1193         @Override
1194         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1195         throws ServiceException {
1196             throw new UnsupportedOperationException();
1197         }
1198
1199         @Override
1200         public Resource setDefaultClusterSet(Resource clusterSet)
1201         throws ServiceException {
1202                 return null;
1203         }
1204
1205         @Override
1206         public void denyValue(VirtualGraph provider, Resource resource) {
1207             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1208             getQueryProvider2().updateValue(querySupport.getId(resource));
1209             // NOTE: this only keeps track of value changes by-resource.
1210             clientChanges.claimValue(resource);
1211         }
1212
1213         @Override
1214         public void flush(boolean intermediate) {
1215             throw new UnsupportedOperationException();
1216         }
1217
1218         @Override
1219         public void flushCluster() {
1220             throw new UnsupportedOperationException();
1221         }
1222
1223         @Override
1224         public void flushCluster(Resource r) {
1225             throw new UnsupportedOperationException("Resource " + r);
1226         }
1227
1228         @Override
1229         public void gc() {
1230         }
1231
1232         @Override
1233         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1234             TransientGraph impl = (TransientGraph) provider;
1235             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1236             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1237             clientChanges.deny(subject, predicate, object);
1238             return true;
1239         }
1240
1241         @Override
1242         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1243             throw new UnsupportedOperationException();
1244         }
1245
1246         @Override
1247         public boolean writeOnly() {
1248             return true;
1249         }
1250
1251         @Override
1252         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1253             throw new UnsupportedOperationException();
1254         }
1255
1256         @Override
1257         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1258             throw new UnsupportedOperationException();
1259         }
1260
1261         @Override
1262         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1263             throw new UnsupportedOperationException();
1264         }
1265
1266         @Override
1267         public <T> void addMetadata(Metadata data) throws ServiceException {
1268             throw new UnsupportedOperationException();
1269         }
1270
1271         @Override
1272         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1273             throw new UnsupportedOperationException();
1274         }
1275
1276         @Override
1277         public TreeMap<String, byte[]> getMetadata() {
1278             throw new UnsupportedOperationException();
1279         }
1280
1281         @Override
1282         public void commitDone(WriteTraits writeTraits, long csid) {
1283         }
1284
1285         @Override
1286         public void clearUndoList(WriteTraits writeTraits) {
1287         }
1288
1289         @Override
1290         public int clearMetadata() {
1291             return 0;
1292         }
1293
1294                 @Override
1295                 public void startUndo() {
1296                 }
1297     }
1298
1299     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1300
1301         try {
1302
1303             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1304
1305             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1306
1307             flushCounter = 0;
1308             Disposable.safeDispose(clientChanges);
1309             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1310
1311             acquireWriteOnly();
1312
1313             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1314
1315             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1316
1317             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1318             writeState = writeStateT;
1319
1320             assert (null != writer);
1321 //            writer.state.barrier.inc();
1322             long start = System.nanoTime();
1323             T result = request.perform(writer);
1324             long duration = System.nanoTime() - start;
1325             if (DEBUG) {
1326                 System.err.println("################");
1327                 System.err.println("WriteOnly duration " + 1e-9*duration);
1328             }
1329             writeStateT.setResult(result);
1330
1331             // This makes clusters available from server
1332             clusterStream.reallyFlush();
1333
1334             // This will trigger query updates
1335             releaseWriteOnly(writer);
1336             assert (null != writer);
1337
1338             handleUpdatesAndMetadata(writer);
1339
1340         } catch (CancelTransactionException e) {
1341
1342             releaseWriteOnly(writeState.getGraph());
1343
1344             clusterTable.removeWriteOnlyClusters();
1345             state.stopWriteTransaction(clusterStream);
1346
1347         } catch (Throwable e) {
1348
1349             e.printStackTrace();
1350
1351             releaseWriteOnly(writeState.getGraph());
1352
1353             clusterTable.removeWriteOnlyClusters();
1354
1355             if (callback != null)
1356                 callback.exception(new DatabaseException(e));
1357
1358             state.stopWriteTransaction(clusterStream);
1359             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1360
1361         } finally  {
1362
1363             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1364
1365         }
1366
1367     }
1368
1369     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1370         scheduleRequest(request, callback, notify, null);
1371     }
1372
1373     @Override
1374     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1375
1376         assertAlive();
1377
1378         assert (request != null);
1379
1380         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1381
1382         requestManager.scheduleWrite(new SessionTask(true) {
1383
1384             @Override
1385             public void run(int thread) {
1386
1387                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1388
1389                     try {
1390
1391                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1392
1393                         flushCounter = 0;
1394                         Disposable.safeDispose(clientChanges);
1395                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1396
1397                         acquireWriteOnly();
1398
1399                         VirtualGraph vg = getProvider(request.getProvider());
1400                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1401
1402                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1403
1404                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1405
1406                             @Override
1407                             public void execute(Object result) {
1408                                 if(callback != null) callback.accept(null);
1409                             }
1410
1411                             @Override
1412                             public void exception(Throwable t) {
1413                                 if(callback != null) callback.accept((DatabaseException)t);
1414                             }
1415
1416                         });
1417
1418                         assert (null != writer);
1419 //                        writer.state.barrier.inc();
1420
1421                         try {
1422
1423                             request.perform(writer);
1424
1425                         } catch (Throwable e) {
1426
1427 //                            writer.state.barrier.dec();
1428 //                            writer.waitAsync(null);
1429
1430                             releaseWriteOnly(writer);
1431
1432                             clusterTable.removeWriteOnlyClusters();
1433
1434                             if(!(e instanceof CancelTransactionException)) {
1435                                 if (callback != null)
1436                                     callback.accept(new DatabaseException(e));
1437                             }
1438
1439                             writeState.except(e);
1440
1441                             return;
1442
1443                         }
1444
1445                         // This makes clusters available from server
1446                         boolean empty = clusterStream.reallyFlush();
1447                         // This was needed to make WO requests call metadata listeners.
1448                         // NOTE: the calling event does not contain clientChanges information.
1449                         if (!empty && clientChanges.isEmpty())
1450                             clientChanges.setNotEmpty(true);
1451                         releaseWriteOnly(writer);
1452                         assert (null != writer);
1453                     } finally  {
1454
1455                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1456
1457                     }
1458
1459
1460                 task.finish();
1461
1462             }
1463
1464         }, combine);
1465
1466     }
1467
1468     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1469         scheduleRequest(request, callback, notify, null);
1470     }
1471
1472     /* (non-Javadoc)
1473      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1474      */
1475     @Override
1476     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1477
1478         assert (request != null);
1479
1480         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1481
1482         requestManager.scheduleWrite(new SessionTask(true) {
1483
1484             @Override
1485             public void run(int thread) {
1486
1487                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1488
1489                 performWriteOnly(request, notify, callback);
1490
1491                 task.finish();
1492
1493             }
1494
1495         }, combine);
1496
1497     }
1498
1499     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1500
1501         assert (request != null);
1502         assert (procedure != null);
1503
1504         //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1505
1506         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1507
1508             @Override
1509             public void run(int thread) {
1510
1511                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1512
1513                 ListenerBase listener = getListenerBase(procedure);
1514
1515                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1516
1517                 try {
1518
1519                     if (listener != null) {
1520
1521                         try {
1522                                 
1523                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1524
1525                                         @Override
1526                                         public void exception(AsyncReadGraph graph, Throwable t) {
1527                                                 procedure.exception(graph, t);
1528                                                 if(throwable != null) {
1529                                                         throwable.set(t);
1530                                                 } else {
1531                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1532                                                 }
1533                                         }
1534
1535                                         @Override
1536                                         public void execute(AsyncReadGraph graph, T t) {
1537                                                 if(result != null) result.set(t);
1538                                                 procedure.execute(graph, t);
1539                                         }
1540
1541                                 };
1542                                 
1543                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1544                                 
1545                         } catch (Throwable t) {
1546                             // This is handled by the AsyncProcedure
1547                                 //Logger.defaultLogError("Internal error", t);
1548                         }
1549
1550                     } else {
1551
1552                         try {
1553
1554 //                            newGraph.state.barrier.inc();
1555
1556                             T t = request.perform(newGraph);
1557
1558                             try {
1559
1560                                 if(result != null) result.set(t);
1561                                 procedure.execute(newGraph, t);
1562
1563                             } catch (Throwable th) {
1564
1565                                 if(throwable != null) {
1566                                     throwable.set(th);
1567                                 } else {
1568                                     Logger.defaultLogError("Unhandled exception", th);
1569                                 }
1570
1571                             }
1572
1573                         } catch (Throwable t) {
1574
1575                             if (DEBUG)
1576                                 t.printStackTrace();
1577
1578                             if(throwable != null) {
1579                                 throwable.set(t);
1580                             } else {
1581                                 Logger.defaultLogError("Unhandled exception", t);
1582                             }
1583
1584                             try {
1585
1586                                 procedure.exception(newGraph, t);
1587
1588                             } catch (Throwable t2) {
1589
1590                                 if(throwable != null) {
1591                                     throwable.set(t2);
1592                                 } else {
1593                                     Logger.defaultLogError("Unhandled exception", t2);
1594                                 }
1595
1596                             }
1597
1598                         }
1599
1600 //                        newGraph.state.barrier.dec();
1601 //                        newGraph.waitAsync(request);
1602
1603                     }
1604
1605                 } finally {
1606
1607                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1608
1609                 }
1610
1611             }
1612
1613         });
1614
1615     }
1616
1617     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1618
1619         assert (request != null);
1620         assert (procedure != null);
1621
1622         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1623
1624         requestManager.scheduleRead(new SessionRead(null, notify) {
1625
1626             @Override
1627             public void run(int thread) {
1628
1629                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1630
1631                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1632
1633                 try {
1634
1635                     if (listener != null) {
1636
1637                         try {
1638                                 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1639                                 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1640                                                         //newGraph.processor.query(newGraph, request, null, procedure, listener);
1641                                                 } catch (DatabaseException e) {
1642                                                         Logger.defaultLogError(e);
1643                                                 }
1644
1645                     } else {
1646
1647 //                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1648 //                                procedure, "request");
1649
1650                         BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
1651
1652                         try {
1653
1654                             request.perform(newGraph, wrap);
1655                             wrap.get();
1656
1657                         } catch (Throwable t) {
1658
1659                                 wrap.exception(newGraph, t);
1660                                 
1661 //                            wrapper.exception(newGraph, t);
1662 //                            newGraph.waitAsync(request);
1663
1664
1665                         }
1666
1667                     }
1668
1669                 } finally {
1670
1671                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1672
1673                 }
1674
1675             }
1676
1677         });
1678
1679     }
1680
1681     public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1682
1683         assert (request != null);
1684         assert (procedure != null);
1685
1686         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1687
1688         int sync = notify != null ? thread : -1;
1689
1690         requestManager.scheduleRead(new SessionRead(null, notify) {
1691
1692             @Override
1693             public void run(int thread) {
1694
1695                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1696
1697                 ListenerBase listener = getListenerBase(procedure);
1698
1699                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1700
1701                 try {
1702
1703                     if (listener != null) {
1704
1705                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1706
1707 //                        newGraph.waitAsync(request);
1708
1709                     } else {
1710
1711                         final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1712
1713                         try {
1714
1715                             request.perform(newGraph, wrapper);
1716
1717                         } catch (Throwable t) {
1718
1719                             t.printStackTrace();
1720
1721                         }
1722
1723                     }
1724
1725                 } finally {
1726
1727                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1728
1729                 }
1730
1731             }
1732
1733         });
1734
1735     }
1736     
1737     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1738
1739         assert (request != null);
1740         assert (procedure != null);
1741
1742         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1743
1744         int sync = notify != null ? thread : -1;
1745
1746         requestManager.scheduleRead(new SessionRead(null, notify) {
1747
1748             @Override
1749             public void run(int thread) {
1750
1751                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1752
1753                 ListenerBase listener = getListenerBase(procedure);
1754
1755                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1756
1757                 try {
1758
1759                     if (listener != null) {
1760
1761                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1762
1763 //                        newGraph.waitAsync(request);
1764
1765                     } else {
1766
1767                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1768
1769                         try {
1770
1771                             request.perform(newGraph, wrapper);
1772
1773                         } catch (Throwable t) {
1774
1775                             t.printStackTrace();
1776
1777                         }
1778
1779                     }
1780
1781                 } finally {
1782
1783                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1784
1785                 }
1786
1787             }
1788
1789         });
1790
1791     }
1792
1793     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1794
1795         assert (request != null);
1796         assert (procedure != null);
1797
1798         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1799
1800         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1801
1802             @Override
1803             public void run(int thread) {
1804
1805                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1806
1807                 ListenerBase listener = getListenerBase(procedure);
1808
1809                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1810
1811                 try {
1812
1813                     if (listener != null) {
1814
1815                         try {
1816                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1817                         } catch (DatabaseException e) {
1818                             Logger.defaultLogError(e);
1819                         }
1820                         
1821                         
1822 //                        newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1823 //
1824 //                            @Override
1825 //                            public void exception(Throwable t) {
1826 //                                procedure.exception(t);
1827 //                                if(throwable != null) {
1828 //                                    throwable.set(t);
1829 //                                }
1830 //                            }
1831 //
1832 //                            @Override
1833 //                            public void execute(T t) {
1834 //                                if(result != null) result.set(t);
1835 //                                procedure.execute(t);
1836 //                            }
1837 //
1838 //                        }, listener);
1839
1840 //                        newGraph.waitAsync(request);
1841
1842                     } else {
1843
1844 //                        newGraph.state.barrier.inc();
1845
1846                         request.register(newGraph, new Listener<T>() {
1847
1848                             @Override
1849                             public void exception(Throwable t) {
1850                                 if(throwable != null) throwable.set(t);
1851                                 procedure.exception(t);
1852 //                                newGraph.state.barrier.dec();
1853                             }
1854
1855                             @Override
1856                             public void execute(T t) {
1857                                 if(result != null) result.set(t);
1858                                 procedure.execute(t);
1859 //                                newGraph.state.barrier.dec();
1860                             }
1861
1862                             @Override
1863                             public boolean isDisposed() {
1864                                 return true;
1865                             }
1866
1867                         });
1868
1869 //                        newGraph.waitAsync(request);
1870
1871                     }
1872
1873                 } finally {
1874
1875                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1876
1877                 }
1878
1879             }
1880
1881         });
1882
1883     }
1884
1885
1886     @Override
1887     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1888
1889         scheduleRequest(request, procedure, null, null, null);
1890
1891     }
1892
1893     @Override
1894     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1895         assertNotSession();
1896         Semaphore notify = new Semaphore(0);
1897         DataContainer<Throwable> container = new DataContainer<Throwable>();
1898         DataContainer<T> result = new DataContainer<T>();
1899         scheduleRequest(request, procedure, notify, container, result);
1900         acquire(notify, request);
1901         Throwable throwable = container.get();
1902         if(throwable != null) {
1903             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1904             else throw new DatabaseException("Unexpected exception", throwable);
1905         }
1906         return result.get();
1907     }
1908
1909     @Override
1910     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1911         assertNotSession();
1912         Semaphore notify = new Semaphore(0);
1913         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1914         final DataContainer<T> resultContainer = new DataContainer<T>();
1915         scheduleRequest(request, new AsyncProcedure<T>() {
1916             @Override
1917             public void exception(AsyncReadGraph graph, Throwable throwable) {
1918                 exceptionContainer.set(throwable);
1919                 procedure.exception(graph, throwable);
1920             }
1921             @Override
1922             public void execute(AsyncReadGraph graph, T result) {
1923                 resultContainer.set(result);
1924                 procedure.execute(graph, result);
1925             }
1926         }, getListenerBase(procedure), notify);
1927         acquire(notify, request, procedure);
1928         Throwable throwable = exceptionContainer.get();
1929         if (throwable != null) {
1930             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1931             else throw new DatabaseException("Unexpected exception", throwable);
1932         }
1933         return resultContainer.get();
1934     }
1935
1936
1937     @Override
1938     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1939         assertNotSession();
1940         Semaphore notify = new Semaphore(0);
1941         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1942         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1943             @Override
1944             public void exception(AsyncReadGraph graph, Throwable throwable) {
1945                 exceptionContainer.set(throwable);
1946                 procedure.exception(graph, throwable);
1947             }
1948             @Override
1949             public void execute(AsyncReadGraph graph, T result) {
1950                 procedure.execute(graph, result);
1951             }
1952             @Override
1953             public void finished(AsyncReadGraph graph) {
1954                 procedure.finished(graph);
1955             }
1956         }, notify);
1957         acquire(notify, request, procedure);
1958         Throwable throwable = exceptionContainer.get();
1959         if (throwable != null) {
1960             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1961             else throw new DatabaseException("Unexpected exception", throwable);
1962         }
1963         // TODO: implement return value
1964         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1965         return null;
1966     }
1967
1968     @Override
1969     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1970         return syncRequest(request, new ProcedureAdapter<T>());
1971
1972
1973 //        assert(request != null);
1974 //
1975 //        final DataContainer<T> result = new DataContainer<T>();
1976 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1977 //
1978 //        syncRequest(request, new Procedure<T>() {
1979 //
1980 //            @Override
1981 //            public void execute(T t) {
1982 //                result.set(t);
1983 //            }
1984 //
1985 //            @Override
1986 //            public void exception(Throwable t) {
1987 //                exception.set(t);
1988 //            }
1989 //
1990 //        });
1991 //
1992 //        Throwable t = exception.get();
1993 //        if(t != null) {
1994 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1995 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1996 //        }
1997 //
1998 //        return result.get();
1999
2000     }
2001
2002     @Override
2003     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
2004         return syncRequest(request, (Procedure<T>)procedure);
2005     }
2006
2007
2008 //    @Override
2009 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
2010 //        assertNotSession();
2011 //        Semaphore notify = new Semaphore(0);
2012 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
2013 //        DataContainer<T> result = new DataContainer<T>();
2014 //        scheduleRequest(request, procedure, notify, container, result);
2015 //        acquire(notify, request);
2016 //        Throwable throwable = container.get();
2017 //        if(throwable != null) {
2018 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2019 //            else throw new DatabaseException("Unexpected exception", throwable);
2020 //        }
2021 //        return result.get();
2022 //    }
2023
2024
2025     @Override
2026     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
2027         assertNotSession();
2028         Semaphore notify = new Semaphore(0);
2029         final DataContainer<Throwable> container = new DataContainer<Throwable>();
2030         final DataContainer<T> result = new DataContainer<T>();
2031         scheduleRequest(request, procedure, notify, container, result);
2032         acquire(notify, request);
2033         Throwable throwable = container.get();
2034         if (throwable != null) {
2035             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2036             else throw new DatabaseException("Unexpected exception", throwable);
2037         }
2038         return result.get();
2039     }
2040
2041     @Override
2042     public void syncRequest(Write request) throws DatabaseException  {
2043         assertNotSession();
2044         assertAlive();
2045         Semaphore notify = new Semaphore(0);
2046         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2047         scheduleRequest(request, e -> exception.set(e), notify);
2048         acquire(notify, request);
2049         if(exception.get() != null) throw exception.get();
2050     }
2051
2052     @Override
2053     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
2054         assertNotSession();
2055         Semaphore notify = new Semaphore(0);
2056         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2057         final DataContainer<T> result = new DataContainer<T>();
2058         scheduleRequest(request, new Procedure<T>() {
2059
2060             @Override
2061             public void exception(Throwable t) {
2062               exception.set(t);
2063             }
2064
2065             @Override
2066             public void execute(T t) {
2067               result.set(t);
2068             }
2069
2070         }, notify);
2071         acquire(notify, request);
2072         if(exception.get() != null) {
2073             Throwable t = exception.get();
2074             if(t instanceof DatabaseException) throw (DatabaseException)t;
2075             else throw new DatabaseException(t);
2076         }
2077         return result.get();
2078     }
2079
2080     @Override
2081     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2082         assertNotSession();
2083         Semaphore notify = new Semaphore(0);
2084         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2085         final DataContainer<T> result = new DataContainer<T>();
2086         scheduleRequest(request, new Procedure<T>() {
2087
2088             @Override
2089             public void exception(Throwable t) {
2090               exception.set(t);
2091             }
2092
2093             @Override
2094             public void execute(T t) {
2095               result.set(t);
2096             }
2097
2098         }, notify);
2099         acquire(notify, request);
2100         if(exception.get() != null) {
2101             Throwable t = exception.get();
2102             if(t instanceof DatabaseException) throw (DatabaseException)t;
2103             else throw new DatabaseException(t);
2104         }
2105         return result.get();
2106     }
2107
2108     @Override
2109     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2110         assertNotSession();
2111         Semaphore notify = new Semaphore(0);
2112         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2113         final DataContainer<T> result = new DataContainer<T>();
2114         scheduleRequest(request, new Procedure<T>() {
2115
2116             @Override
2117             public void exception(Throwable t) {
2118               exception.set(t);
2119             }
2120
2121             @Override
2122             public void execute(T t) {
2123               result.set(t);
2124             }
2125
2126         }, notify);
2127         acquire(notify, request);
2128         if(exception.get() != null) {
2129             Throwable t = exception.get();
2130             if(t instanceof DatabaseException) throw (DatabaseException)t;
2131             else throw new DatabaseException(t);
2132         }
2133         return result.get();
2134     }
2135
2136     @Override
2137     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2138         assertNotSession();
2139         Semaphore notify = new Semaphore(0);
2140         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2141         scheduleRequest(request, e -> exception.set(e), notify);
2142         acquire(notify, request);
2143         if(exception.get() != null) throw exception.get();
2144     }
2145
2146     @Override
2147     public void syncRequest(WriteOnly request) throws DatabaseException  {
2148         assertNotSession();
2149         assertAlive();
2150         Semaphore notify = new Semaphore(0);
2151         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2152         scheduleRequest(request, e -> exception.set(e), notify);
2153         acquire(notify, request);
2154         if(exception.get() != null) throw exception.get();
2155     }
2156
2157     /*
2158      *
2159      * GraphRequestProcessor interface
2160      */
2161
2162     @Override
2163     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2164
2165         scheduleRequest(request, procedure, null, null);
2166
2167     }
2168
2169     @Override
2170     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2171
2172         scheduleRequest(request, procedure, null);
2173
2174     }
2175
2176     @Override
2177     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2178
2179         scheduleRequest(request, procedure, null, null, null);
2180
2181     }
2182
2183     @Override
2184     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2185
2186         scheduleRequest(request, callback, null);
2187
2188     }
2189
2190     @Override
2191     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2192
2193         scheduleRequest(request, procedure, null);
2194
2195     }
2196
2197     @Override
2198     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2199
2200         scheduleRequest(request, procedure, null);
2201
2202     }
2203
2204     @Override
2205     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2206
2207         scheduleRequest(request, procedure, null);
2208
2209     }
2210
2211     @Override
2212     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2213
2214         scheduleRequest(request, callback, null);
2215
2216     }
2217
2218     @Override
2219     public void asyncRequest(final Write r) {
2220         asyncRequest(r, null);
2221     }
2222
2223     @Override
2224     public void asyncRequest(final DelayedWrite r) {
2225         asyncRequest(r, null);
2226     }
2227
2228     @Override
2229     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2230
2231         scheduleRequest(request, callback, null);
2232
2233     }
2234
2235     @Override
2236     public void asyncRequest(final WriteOnly request) {
2237
2238         asyncRequest(request, null);
2239
2240     }
2241
2242     @Override
2243     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2244         r.request(this, procedure);
2245     }
2246
2247     @Override
2248     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2249         r.request(this, procedure);
2250     }
2251
2252     @Override
2253     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2254         r.request(this, procedure);
2255     }
2256
2257     @Override
2258     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2259         r.request(this, procedure);
2260     }
2261
2262     @Override
2263     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2264         r.request(this, procedure);
2265     }
2266
2267     @Override
2268     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2269         r.request(this, procedure);
2270     }
2271
2272     @Override
2273     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2274         return r.request(this);
2275     }
2276
2277     @Override
2278     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2279         return r.request(this);
2280     }
2281
2282     @Override
2283     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2284         r.request(this, procedure);
2285     }
2286
2287     @Override
2288     public <T> void async(WriteInterface<T> r) {
2289         r.request(this, new ProcedureAdapter<T>());
2290     }
2291
2292     //@Override
2293     public void incAsync() {
2294         state.incAsync();
2295     }
2296
2297     //@Override
2298     public void decAsync() {
2299         state.decAsync();
2300     }
2301
2302     public long getCluster(ResourceImpl resource) {
2303         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2304         return cluster.getClusterId();
2305     }
2306
2307     public long getCluster(int id) {
2308         if (clusterTable == null)
2309             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2310         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2311     }
2312
2313     public ResourceImpl getResource(int id) {
2314         return new ResourceImpl(resourceSupport, id);
2315     }
2316
2317     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2318         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2319         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2320         int key = proxy.getClusterKey();
2321         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2322         return new ResourceImpl(resourceSupport, resourceKey);
2323     }
2324
2325     public ResourceImpl getResource2(int id) {
2326         assert (id != 0);
2327         return new ResourceImpl(resourceSupport, id);
2328     }
2329
2330     final public int getId(ResourceImpl impl) {
2331         return impl.id;
2332     }
2333
2334     public static final Charset UTF8 = Charset.forName("utf-8");
2335
2336
2337
2338
2339     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2340         for(TransientGraph g : support.providers) {
2341             if(g.isPending(subject)) return false;
2342         }
2343         return true;
2344     }
2345
2346     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2347         for(TransientGraph g : support.providers) {
2348             if(g.isPending(subject, predicate)) return false;
2349         }
2350         return true;
2351     }
2352
2353     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2354
2355         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2356
2357             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2358
2359             @Override
2360             public void accept(ReadGraphImpl graph) {
2361                 if(ready.decrementAndGet() == 0) {
2362                     runnable.accept(graph);
2363                 }
2364             }
2365
2366         };
2367
2368         for(TransientGraph g : support.providers) {
2369             if(g.isPending(subject)) {
2370                 try {
2371                     g.load(graph, subject, composite);
2372                 } catch (DatabaseException e) {
2373                     e.printStackTrace();
2374                 }
2375             } else {
2376                 composite.accept(graph);
2377             }
2378         }
2379
2380         composite.accept(graph);
2381
2382     }
2383
2384     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2385
2386         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2387
2388             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2389
2390             @Override
2391             public void accept(ReadGraphImpl graph) {
2392                 if(ready.decrementAndGet() == 0) {
2393                     runnable.accept(graph);
2394                 }
2395             }
2396
2397         };
2398
2399         for(TransientGraph g : support.providers) {
2400             if(g.isPending(subject, predicate)) {
2401                 try {
2402                     g.load(graph, subject, predicate, composite);
2403                 } catch (DatabaseException e) {
2404                     e.printStackTrace();
2405                 }
2406             } else {
2407                 composite.accept(graph);
2408             }
2409         }
2410
2411         composite.accept(graph);
2412
2413     }
2414
2415 //    void dumpHeap() {
2416 //
2417 //        try {
2418 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2419 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2420 //                    "d:/heap" + flushCounter + ".txt", true);
2421 //        } catch (IOException e) {
2422 //            e.printStackTrace();
2423 //        }
2424 //
2425 //    }
2426
2427     void fireReactionsToSynchronize(ChangeSet cs) {
2428
2429         // Do not fire empty events
2430         if (cs.isEmpty())
2431             return;
2432
2433         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2434 //        g.state.barrier.inc();
2435
2436         try {
2437
2438             if (!cs.isEmpty()) {
2439                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2440                 for (ChangeListener l : changeListeners2) {
2441                     try {
2442                         l.graphChanged(e2);
2443                     } catch (Exception ex) {
2444                         ex.printStackTrace();
2445                     }
2446                 }
2447             }
2448
2449         } finally {
2450
2451 //            g.state.barrier.dec();
2452 //            g.waitAsync(null);
2453
2454         }
2455
2456     }
2457
2458     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2459         try {
2460
2461             // Do not fire empty events
2462             if (cs2.isEmpty())
2463                 return;
2464
2465 //            graph.restart();
2466 //            graph.state.barrier.inc();
2467
2468             try {
2469
2470                 if (!cs2.isEmpty()) {
2471
2472                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2473                     for (ChangeListener l : changeListeners2) {
2474                         try {
2475                             l.graphChanged(e2);
2476 //                            System.out.println("changelistener " + l);
2477                         } catch (Exception ex) {
2478                             ex.printStackTrace();
2479                         }
2480                     }
2481
2482                 }
2483
2484             } finally {
2485
2486 //                graph.state.barrier.dec();
2487 //                graph.waitAsync(null);
2488
2489             }
2490         } catch (Throwable t) {
2491             t.printStackTrace();
2492
2493         }
2494     }
2495     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2496
2497         try {
2498
2499             // Do not fire empty events
2500             if (cs2.isEmpty())
2501                 return;
2502
2503             // Do not fire on virtual requests
2504             if(graph.getProvider() != null)
2505                 return;
2506
2507             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2508
2509             try {
2510
2511                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2512                 for (ChangeListener l : metadataListeners) {
2513                     try {
2514                         l.graphChanged(e2);
2515                     } catch (Throwable ex) {
2516                         ex.printStackTrace();
2517                     }
2518                 }
2519
2520             } finally {
2521
2522             }
2523
2524         } catch (Throwable t) {
2525             t.printStackTrace();
2526         }
2527
2528     }
2529
2530
2531     /*
2532      * (non-Javadoc)
2533      *
2534      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2535      */
2536     @Override
2537     public <T> T getService(Class<T> api) {
2538         T t = peekService(api);
2539         if (t == null) {
2540             if (state.isClosed())
2541                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2542             throw new ServiceNotFoundException(this, api);
2543         }
2544         return t;
2545     }
2546
2547     /**
2548      * @return
2549      */
2550     protected abstract ServerInformation getCachedServerInformation();
2551
2552         private Class<?> serviceKey1 = null;
2553         private Class<?> serviceKey2 = null;
2554         private Object service1 = null;
2555         private Object service2 = null;
2556
2557     /*
2558      * (non-Javadoc)
2559      *
2560      * @see
2561      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2562      */
2563     @SuppressWarnings("unchecked")
2564     @Override
2565     public synchronized <T> T peekService(Class<T> api) {
2566
2567                 if(serviceKey1 == api) {
2568                         return (T)service1;
2569                 } else if (serviceKey2 == api) {
2570                         // Promote this key
2571                         Object result = service2;
2572                         service2 = service1;
2573                         serviceKey2 = serviceKey1;
2574                         service1 = result;
2575                         serviceKey1 = api;
2576                         return (T)result;
2577                 }
2578
2579         if (Layer0.class == api)
2580                 return (T) L0;
2581         if (ServerInformation.class == api)
2582             return (T) getCachedServerInformation();
2583         else if (WriteGraphImpl.class == api)
2584             return (T) writeState.getGraph();
2585         else if (ClusterBuilder.class == api)
2586             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2587         else if (ClusterBuilderFactory.class == api)
2588             return (T)new ClusterBuilderFactoryImpl(this);
2589
2590                 service2 = service1;
2591                 serviceKey2 = serviceKey1;
2592
2593         service1 = serviceLocator.peekService(api);
2594         serviceKey1 = api;
2595
2596         return (T)service1;
2597
2598     }
2599
2600     /*
2601      * (non-Javadoc)
2602      *
2603      * @see
2604      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2605      */
2606     @Override
2607     public boolean hasService(Class<?> api) {
2608         return serviceLocator.hasService(api);
2609     }
2610
2611     /**
2612      * @param api the api that must be implemented by the specified service
2613      * @param service the service implementation
2614      */
2615     @Override
2616     public <T> void registerService(Class<T> api, T service) {
2617         if(Layer0.class == api) {
2618                 L0 = (Layer0)service;
2619                 return;
2620         }
2621         serviceLocator.registerService(api, service);
2622         if (TransactionPolicySupport.class == api) {
2623             transactionPolicy = (TransactionPolicySupport)service;
2624             state.resetTransactionPolicy();
2625         }
2626         if (api == serviceKey1)
2627                 service1 = service;
2628         else if (api == serviceKey2)
2629                 service2 = service;
2630     }
2631
2632     // ----------------
2633     // SessionMonitor
2634     // ----------------
2635
2636     void fireSessionVariableChange(String variable) {
2637         for (MonitorHandler h : monitorHandlers) {
2638             MonitorContext ctx = monitorContexts.getLeft(h);
2639             assert ctx != null;
2640             // SafeRunner functionality repeated here to avoid dependency.
2641             try {
2642                 h.valuesChanged(ctx);
2643             } catch (Exception e) {
2644                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2645             } catch (LinkageError e) {
2646                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2647             }
2648         }
2649     }
2650
2651
2652     class ResourceSerializerImpl implements ResourceSerializer {
2653
2654         public long createRandomAccessId(int id) throws DatabaseException {
2655             if(id < 0)
2656                 return id;
2657             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2658             long cluster = getCluster(id);
2659             if (0 == cluster)
2660                 return 0; // Better to return 0 then invalid id.
2661             long result = ClusterTraitsBase.createResourceId(cluster, index);
2662             return result;
2663         }
2664
2665         @Override
2666         public long getRandomAccessId(Resource resource) throws DatabaseException {
2667
2668             ResourceImpl resourceImpl = (ResourceImpl) resource;
2669             return createRandomAccessId(resourceImpl.id);
2670
2671         }
2672
2673         @Override
2674         public int getTransientId(Resource resource) throws DatabaseException {
2675
2676             ResourceImpl resourceImpl = (ResourceImpl) resource;
2677             return resourceImpl.id;
2678
2679         }
2680
2681         @Override
2682         public String createRandomAccessId(Resource resource)
2683         throws InvalidResourceReferenceException {
2684
2685             if(resource == null) throw new IllegalArgumentException();
2686
2687             ResourceImpl resourceImpl = (ResourceImpl) resource;
2688
2689             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2690
2691             int r;
2692             try {
2693                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2694             } catch (DatabaseException e1) {
2695                 throw new InvalidResourceReferenceException(e1);
2696             }
2697             try {
2698                 // Serialize as '<resource index>_<cluster id>'
2699                 return "" + r + "_" + getCluster(resourceImpl);
2700             } catch (Throwable e) {
2701                 e.printStackTrace();
2702                 throw new InvalidResourceReferenceException(e);
2703             } finally {
2704             }
2705         }
2706
2707         public int getTransientId(long serialized) throws DatabaseException {
2708             if (serialized <= 0)
2709                 return (int)serialized;
2710             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2711             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2712             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2713             if (cluster == null)
2714                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2715             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2716             return key;
2717         }
2718
2719         @Override
2720         public Resource getResource(long randomAccessId) throws DatabaseException {
2721             return getResourceByKey(getTransientId(randomAccessId));
2722         }
2723
2724         @Override
2725         public Resource getResource(int transientId) throws DatabaseException {
2726             return getResourceByKey(transientId);
2727         }
2728
2729         @Override
2730         public Resource getResource(String randomAccessId)
2731         throws InvalidResourceReferenceException {
2732             try {
2733                 int i = randomAccessId.indexOf('_');
2734                 if (i == -1)
2735                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2736                             + randomAccessId + "'");
2737                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2738                 if(r < 0) return getResourceByKey(r);
2739                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2740                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2741                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2742                 if (cluster.hasResource(key, clusterTranslator))
2743                     return getResourceByKey(key);
2744             } catch (InvalidResourceReferenceException e) {
2745                 throw e;
2746             } catch (NumberFormatException e) {
2747                 throw new InvalidResourceReferenceException(e);
2748             } catch (Throwable e) {
2749                 e.printStackTrace();
2750                 throw new InvalidResourceReferenceException(e);
2751             } finally {
2752             }
2753             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2754         }
2755
2756         @Override
2757         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2758             try {
2759                 return true;
2760             } catch (Throwable e) {
2761                 e.printStackTrace();
2762                 throw new InvalidResourceReferenceException(e);
2763             } finally {
2764             }
2765         }
2766     }
2767
2768     // Copied from old SessionImpl
2769
2770     void check() {
2771         if (state.isClosed())
2772             throw new Error("Session closed.");
2773     }
2774
2775
2776
2777     public ResourceImpl getNewResource(long clusterId) {
2778         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2779         int newId;
2780         try {
2781             newId = cluster.createResource(clusterTranslator);
2782         } catch (DatabaseException e) {
2783             Logger.defaultLogError(e);
2784             return null;
2785         }
2786         return new ResourceImpl(resourceSupport, newId);
2787     }
2788
2789     public ResourceImpl getNewResource(Resource clusterSet)
2790     throws DatabaseException {
2791         long resourceId = clusterSet.getResourceId();
2792         Long clusterId = clusterSetsSupport.get(resourceId);
2793         if (null == clusterId)
2794             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2795         if (Constants.NewClusterId == clusterId) {
2796             clusterId = getService(ClusteringSupport.class).createCluster();
2797             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2798                 createdClusters.add(clusterId);
2799             clusterSetsSupport.put(resourceId, clusterId);
2800             return getNewResource(clusterId);
2801         } else {
2802             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2803             ResourceImpl result;
2804             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2805                 clusterId = getService(ClusteringSupport.class).createCluster();
2806                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2807                     createdClusters.add(clusterId);
2808                 clusterSetsSupport.put(resourceId, clusterId);
2809                 return getNewResource(clusterId);
2810             } else {
2811                 result = getNewResource(clusterId);
2812                 int resultKey = querySupport.getId(result);
2813                 long resultCluster = querySupport.getClusterId(resultKey);
2814                 if (clusterId != resultCluster)
2815                     clusterSetsSupport.put(resourceId, resultCluster);
2816                 return result;
2817             }
2818         }
2819     }
2820
2821     public void getNewClusterSet(Resource clusterSet)
2822     throws DatabaseException {
2823         if (DEBUG)
2824             System.out.println("new cluster set=" + clusterSet);
2825         long resourceId = clusterSet.getResourceId();
2826         if (clusterSetsSupport.containsKey(resourceId))
2827             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2828         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2829     }
2830     public boolean containsClusterSet(Resource clusterSet)
2831     throws ServiceException {
2832         long resourceId = clusterSet.getResourceId();
2833         return clusterSetsSupport.containsKey(resourceId);
2834     }
2835     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2836         Resource r = defaultClusterSet;
2837         defaultClusterSet = clusterSet;
2838         return r;
2839     }
2840     void printDiagnostics() {
2841
2842         if (DIAGNOSTICS) {
2843
2844             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2845             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2846             for(ClusterI cluster : clusterTable.getClusters()) {
2847                 try {
2848                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2849                         residentClusters.set(residentClusters.get() + 1);
2850                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2851                     }
2852                 } catch (DatabaseException e) {
2853                     Logger.defaultLogError(e);
2854                 }
2855             }
2856
2857             System.out.println("--------------------------------");
2858             System.out.println("Cluster information:");
2859             System.out.println("-amount of resident clusters=" + residentClusters.get());
2860             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2861
2862             for(ClusterI cluster : clusterTable.getClusters()) {
2863                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2864                 try {
2865                     if (!cluster.isLoaded())
2866                         System.out.println("not loaded]");
2867                     else if (cluster.isEmpty())
2868                         System.out.println("is empty]");
2869                     else {
2870                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2871                         System.out.print(",references=" + cluster.getReferenceCount());
2872                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2873                     }
2874                 } catch (DatabaseException e) {
2875                     Logger.defaultLogError(e);
2876                     System.out.println("is corrupted]");
2877                 }
2878             }
2879
2880             queryProvider2.printDiagnostics();
2881             System.out.println("--------------------------------");
2882         }
2883
2884     }
2885
2886     public void fireStartReadTransaction() {
2887
2888         if (DIAGNOSTICS)
2889             System.out.println("StartReadTransaction");
2890
2891         for (SessionEventListener listener : eventListeners) {
2892             try {
2893                 listener.readTransactionStarted();
2894             } catch (Throwable t) {
2895                 t.printStackTrace();
2896             }
2897         }
2898
2899     }
2900
2901     public void fireFinishReadTransaction() {
2902
2903         if (DIAGNOSTICS)
2904             System.out.println("FinishReadTransaction");
2905
2906         for (SessionEventListener listener : eventListeners) {
2907             try {
2908                 listener.readTransactionFinished();
2909             } catch (Throwable t) {
2910                 t.printStackTrace();
2911             }
2912         }
2913
2914     }
2915
2916     public void fireStartWriteTransaction() {
2917
2918         if (DIAGNOSTICS)
2919             System.out.println("StartWriteTransaction");
2920
2921         for (SessionEventListener listener : eventListeners) {
2922             try {
2923                 listener.writeTransactionStarted();
2924             } catch (Throwable t) {
2925                 t.printStackTrace();
2926             }
2927         }
2928
2929     }
2930
2931     public void fireFinishWriteTransaction() {
2932
2933         if (DIAGNOSTICS)
2934             System.out.println("FinishWriteTransaction");
2935
2936         for (SessionEventListener listener : eventListeners) {
2937             try {
2938                 listener.writeTransactionFinished();
2939             } catch (Throwable t) {
2940                 t.printStackTrace();
2941             }
2942         }
2943
2944         Indexing.resetDependenciesIndexingDisabled();
2945
2946     }
2947
2948     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2949         throw new Error("Not supported at the moment.");
2950     }
2951
2952     State getState() {
2953         return state;
2954     }
2955
2956     ClusterTable getClusterTable() {
2957         return clusterTable;
2958     }
2959
2960     public GraphSession getGraphSession() {
2961         return graphSession;
2962     }
2963
2964     public QueryProcessor getQueryProvider2() {
2965         return queryProvider2;
2966     }
2967
2968     ClientChangesImpl getClientChanges() {
2969         return clientChanges;
2970     }
2971
2972     boolean getWriteOnly() {
2973         return writeOnly;
2974     }
2975
2976     static int counter = 0;
2977
2978     public void onClusterLoaded(long clusterId) {
2979
2980         clusterTable.updateSize();
2981
2982     }
2983
2984
2985
2986
2987     /*
2988      * Implementation of the interface RequestProcessor
2989      */
2990
2991     @Override
2992     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2993
2994         assertNotSession();
2995         assertAlive();
2996
2997         assert(request != null);
2998
2999         final DataContainer<T> result = new DataContainer<T>();
3000         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3001
3002         syncRequest(request, new AsyncProcedure<T>() {
3003
3004             @Override
3005             public void execute(AsyncReadGraph graph, T t) {
3006                 result.set(t);
3007             }
3008
3009             @Override
3010             public void exception(AsyncReadGraph graph, Throwable t) {
3011                 exception.set(t);
3012             }
3013
3014         });
3015
3016         Throwable t = exception.get();
3017         if(t != null) {
3018             if(t instanceof DatabaseException) throw (DatabaseException)t;
3019             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3020         }
3021
3022         return result.get();
3023
3024     }
3025
3026 //    @Override
3027 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
3028 //        assertNotSession();
3029 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
3030 //    }
3031
3032     @Override
3033     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
3034         assertNotSession();
3035         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3036     }
3037
3038     @Override
3039     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
3040         assertNotSession();
3041         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3042     }
3043
3044     @Override
3045     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3046         assertNotSession();
3047         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3048     }
3049
3050     @Override
3051     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
3052         assertNotSession();
3053         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3054     }
3055
3056     @Override
3057     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3058
3059         assertNotSession();
3060
3061         assert(request != null);
3062
3063         final DataContainer<T> result = new DataContainer<T>();
3064         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3065
3066         syncRequest(request, new AsyncProcedure<T>() {
3067
3068             @Override
3069             public void execute(AsyncReadGraph graph, T t) {
3070                 result.set(t);
3071             }
3072
3073             @Override
3074             public void exception(AsyncReadGraph graph, Throwable t) {
3075                 exception.set(t);
3076             }
3077
3078         });
3079
3080         Throwable t = exception.get();
3081         if(t != null) {
3082             if(t instanceof DatabaseException) throw (DatabaseException)t;
3083             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3084         }
3085
3086         return result.get();
3087
3088     }
3089
3090     @Override
3091     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3092         assertNotSession();
3093         return syncRequest(request, (AsyncProcedure<T>)procedure);
3094     }
3095
3096     @Override
3097     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3098         assertNotSession();
3099         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3100     }
3101
3102     @Override
3103     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3104         assertNotSession();
3105         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3106     }
3107
3108     @Override
3109     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3110         assertNotSession();
3111         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3112     }
3113
3114     @Override
3115     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3116         assertNotSession();
3117         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3118     }
3119
3120     @Override
3121     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3122
3123         assertNotSession();
3124
3125         assert(request != null);
3126
3127         final ArrayList<T> result = new ArrayList<T>();
3128         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3129
3130         syncRequest(request, new SyncMultiProcedure<T>() {
3131
3132             @Override
3133             public void execute(ReadGraph graph, T t) {
3134                 synchronized(result) {
3135                     result.add(t);
3136                 }
3137             }
3138
3139             @Override
3140             public void finished(ReadGraph graph) {
3141             }
3142
3143             @Override
3144             public void exception(ReadGraph graph, Throwable t) {
3145                 exception.set(t);
3146             }
3147
3148
3149         });
3150
3151         Throwable t = exception.get();
3152         if(t != null) {
3153             if(t instanceof DatabaseException) throw (DatabaseException)t;
3154             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3155         }
3156
3157         return result;
3158
3159     }
3160
3161     @Override
3162     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
3163         assertNotSession();
3164         throw new Error("Not implemented!");
3165     }
3166
3167     @Override
3168     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3169         assertNotSession();
3170         return syncRequest(request, (SyncMultiProcedure<T>)procedure);
3171     }
3172
3173     @Override
3174     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3175         assertNotSession();
3176         return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3177     }
3178
3179     @Override
3180     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3181         assertNotSession();
3182         return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3183     }
3184
3185     @Override
3186     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3187
3188         assertNotSession();
3189
3190         assert(request != null);
3191
3192         final ArrayList<T> result = new ArrayList<T>();
3193         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3194
3195         syncRequest(request, new AsyncMultiProcedure<T>() {
3196
3197             @Override
3198             public void execute(AsyncReadGraph graph, T t) {
3199                 synchronized(result) {
3200                     result.add(t);
3201                 }
3202             }
3203
3204             @Override
3205             public void finished(AsyncReadGraph graph) {
3206             }
3207
3208             @Override
3209             public void exception(AsyncReadGraph graph, Throwable t) {
3210                 exception.set(t);
3211             }
3212
3213
3214         });
3215
3216         Throwable t = exception.get();
3217         if(t != null) {
3218             if(t instanceof DatabaseException) throw (DatabaseException)t;
3219             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3220         }
3221
3222         return result;
3223
3224     }
3225
3226     @Override
3227     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3228         assertNotSession();
3229         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3230     }
3231
3232     @Override
3233     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3234         assertNotSession();
3235         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3236     }
3237
3238     @Override
3239     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3240         assertNotSession();
3241        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3242     }
3243
3244     @Override
3245     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3246         assertNotSession();
3247         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3248     }
3249
3250     @Override
3251     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3252         assertNotSession();
3253         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3254     }
3255
3256
3257     @Override
3258     public <T> void asyncRequest(Read<T> request) {
3259
3260         asyncRequest(request, new ProcedureAdapter<T>() {
3261             @Override
3262             public void exception(Throwable t) {
3263                 t.printStackTrace();
3264             }
3265         });
3266
3267     }
3268
3269     @Override
3270     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3271         asyncRequest(request, (AsyncProcedure<T>)procedure);
3272     }
3273
3274     @Override
3275     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3276         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3277     }
3278
3279     @Override
3280     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3281         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3282     }
3283
3284     @Override
3285     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3286         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3287     }
3288
3289     @Override
3290     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3291         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3292     }
3293
3294     @Override
3295     final public <T> void asyncRequest(final AsyncRead<T> request) {
3296
3297         assert(request != null);
3298
3299         asyncRequest(request, new ProcedureAdapter<T>() {
3300             @Override
3301             public void exception(Throwable t) {
3302                 t.printStackTrace();
3303             }
3304         });
3305
3306     }
3307
3308     @Override
3309     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3310         scheduleRequest(request, procedure, procedure, null);
3311     }
3312
3313     @Override
3314     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3315         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3316     }
3317
3318     @Override
3319     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3320         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3321     }
3322
3323     @Override
3324     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3325         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3326     }
3327
3328     @Override
3329     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3330         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3331     }
3332
3333     @Override
3334     public <T> void asyncRequest(MultiRead<T> request) {
3335
3336         assert(request != null);
3337
3338         asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
3339             @Override
3340             public void exception(ReadGraph graph, Throwable t) {
3341                 t.printStackTrace();
3342             }
3343         });
3344
3345     }
3346
3347     @Override
3348     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3349         asyncRequest(request, (SyncMultiProcedure<T>)procedure);
3350     }
3351
3352     @Override
3353     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3354         asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3355     }
3356
3357     @Override
3358     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3359         scheduleRequest(request, procedure, null);
3360     }
3361
3362     @Override
3363     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3364         asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3365     }
3366
3367     @Override
3368     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3369
3370         assert(request != null);
3371
3372         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3373             @Override
3374             public void exception(AsyncReadGraph graph, Throwable t) {
3375                 t.printStackTrace();
3376             }
3377         });
3378
3379     }
3380
3381     @Override
3382     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3383         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3384     }
3385
3386     @Override
3387     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3388         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3389     }
3390
3391     @Override
3392     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3393         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3394     }
3395
3396     @Override
3397     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3398         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3399     }
3400
3401     @Override
3402     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3403         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3404     }
3405
3406     @Override
3407     final public <T> void asyncRequest(final ExternalRead<T> request) {
3408
3409         assert(request != null);
3410
3411         asyncRequest(request, new ProcedureAdapter<T>() {
3412             @Override
3413             public void exception(Throwable t) {
3414                 t.printStackTrace();
3415             }
3416         });
3417
3418     }
3419
3420     @Override
3421     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3422         asyncRequest(request, (Procedure<T>)procedure);
3423     }
3424
3425
3426
3427     void check(Throwable t) throws DatabaseException {
3428         if(t != null) {
3429             if(t instanceof DatabaseException) throw (DatabaseException)t;
3430             else throw new DatabaseException("Unexpected exception", t);
3431         }
3432     }
3433
3434     void check(DataContainer<Throwable> container) throws DatabaseException {
3435         Throwable t = container.get();
3436         if(t != null) {
3437             if(t instanceof DatabaseException) throw (DatabaseException)t;
3438             else throw new DatabaseException("Unexpected exception", t);
3439         }
3440     }
3441
3442
3443
3444
3445
3446
3447     boolean sameProvider(Write request) {
3448         if(writeState.getGraph().provider != null) {
3449             return writeState.getGraph().provider.equals(request.getProvider());
3450         } else {
3451             return request.getProvider() == null;
3452         }
3453     }
3454
3455     boolean plainWrite(WriteGraphImpl graph) {
3456         if(graph == null) return false;
3457         if(graph.writeSupport.writeOnly()) return false;
3458         return true;
3459     }
3460
3461
3462     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3463     private void assertNotSession() throws DatabaseException {
3464         Thread current = Thread.currentThread();
3465         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3466     }
3467
3468     void assertAlive() {
3469         if (!state.isAlive())
3470             throw new RuntimeDatabaseException("Session has been shut down.");
3471     }
3472
3473     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3474         return querySupport.getValueStream(graph, querySupport.getId(resource));
3475     }
3476
3477     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3478         return querySupport.getValue(graph, querySupport.getId(resource));
3479     }
3480
3481     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3482         acquire(semaphore, request, null);
3483     }
3484
3485     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3486         assertAlive();
3487         try {
3488             long tryCount = 0;
3489             // Loop until the semaphore is acquired reporting requests that take a long time.
3490             while (true) {
3491
3492                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3493                     // Acquired OK.
3494                     return;
3495                 } else {
3496                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3497                         ++tryCount;
3498                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3499                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3500                                 * tryCount;
3501                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3502                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3503                     }
3504                 }
3505
3506                 assertAlive();
3507
3508             }
3509         } catch (InterruptedException e) {
3510             e.printStackTrace();
3511             // FIXME: Should perhaps do something else in this case ??
3512         }
3513     }
3514
3515
3516
3517 //    @Override
3518 //    public boolean holdOnToTransactionAfterCancel() {
3519 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3520 //    }
3521 //
3522 //    @Override
3523 //    public boolean holdOnToTransactionAfterCommit() {
3524 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3525 //    }
3526 //
3527 //    @Override
3528 //    public boolean holdOnToTransactionAfterRead() {
3529 //        return transactionPolicy.holdOnToTransactionAfterRead();
3530 //    }
3531 //
3532 //    @Override
3533 //    public void onRelinquish() {
3534 //        transactionPolicy.onRelinquish();
3535 //    }
3536 //
3537 //    @Override
3538 //    public void onRelinquishDone() {
3539 //        transactionPolicy.onRelinquishDone();
3540 //    }
3541 //
3542 //    @Override
3543 //    public void onRelinquishError() {
3544 //        transactionPolicy.onRelinquishError();
3545 //    }
3546
3547
3548     @Override
3549     public Session getSession() {
3550         return this;
3551     }
3552
3553
3554     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3555     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3556
3557     public void ceased(int thread) {
3558
3559         requestManager.ceased(thread);
3560
3561     }
3562
3563     public int getAmountOfQueryThreads() {
3564         // This must be a power of two
3565         return 1;
3566 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3567     }
3568
3569     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3570
3571         return new ResourceImpl(resourceSupport, key);
3572
3573     }
3574
3575     public void acquireWriteOnly() {
3576         writeOnly = true;
3577     }
3578
3579     public void releaseWriteOnly(ReadGraphImpl graph) {
3580         writeOnly = false;
3581         queryProvider2.releaseWrite(graph);
3582     }
3583
3584     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3585
3586         long start = System.nanoTime();
3587
3588         while(dirtyPrimitives) {
3589             dirtyPrimitives = false;
3590             getQueryProvider2().performDirtyUpdates(writer);
3591             getQueryProvider2().performScheduledUpdates(writer);
3592         }
3593
3594         fireMetadataListeners(writer, clientChanges);
3595
3596         if(DebugPolicy.PERFORMANCE_DATA) {
3597             long end = System.nanoTime();
3598             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3599         }
3600
3601     }
3602
3603     void removeTemporaryData() {
3604         File platform = Platform.getLocation().toFile();
3605         File tempFiles = new File(platform, "tempFiles");
3606         File temp = new File(tempFiles, "db");
3607         if(!temp.exists()) 
3608             return;
3609         try {
3610             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3611                 @Override
3612                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3613                     try {
3614                         Files.delete(file);
3615                     } catch (IOException e) {
3616                         Logger.defaultLogError(e);
3617                     }
3618                     return FileVisitResult.CONTINUE;
3619                 }
3620             });
3621         } catch (IOException e) {
3622             Logger.defaultLogError(e);
3623         }
3624     }
3625
3626     public void handleCreatedClusters() {
3627         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3628             createdClusters.forEach(new TLongProcedure() {
3629
3630                 @Override
3631                 public boolean execute(long value) {
3632                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3633                     cluster.setImmutable(true, clusterTranslator);
3634                     return true;
3635                 }
3636             });
3637         }
3638         createdClusters.clear();
3639     }
3640
3641     @Override
3642     public Object getModificationCounter() {
3643         return queryProvider2.modificationCounter;
3644     }
3645     
3646     @Override
3647     public void markUndoPoint() {
3648         state.setCombine(false);
3649     }
3650
3651 }