a18b66e6978a03d66e46b965848b0f9c956fa961
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.GraphChangeListenerSupport;
147 import org.simantics.db.service.InitSupport;
148 import org.simantics.db.service.LifecycleSupport;
149 import org.simantics.db.service.ManagementSupport;
150 import org.simantics.db.service.QueryControl;
151 import org.simantics.db.service.SerialisationSupport;
152 import org.simantics.db.service.ServerInformation;
153 import org.simantics.db.service.ServiceActivityMonitor;
154 import org.simantics.db.service.SessionEventSupport;
155 import org.simantics.db.service.SessionMonitorSupport;
156 import org.simantics.db.service.SessionUserSupport;
157 import org.simantics.db.service.StatementSupport;
158 import org.simantics.db.service.TransactionPolicySupport;
159 import org.simantics.db.service.TransactionSupport;
160 import org.simantics.db.service.TransferableGraphSupport;
161 import org.simantics.db.service.UndoRedoSupport;
162 import org.simantics.db.service.VirtualGraphSupport;
163 import org.simantics.db.service.XSupport;
164 import org.simantics.layer0.Layer0;
165 import org.simantics.utils.DataContainer;
166 import org.simantics.utils.Development;
167 import org.simantics.utils.threads.logger.ITask;
168 import org.simantics.utils.threads.logger.ThreadLogger;
169 import org.slf4j.LoggerFactory;
170
171 import gnu.trove.procedure.TLongProcedure;
172 import gnu.trove.set.hash.TLongHashSet;
173
174
175 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
176
177     private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class);
178
179     protected static final boolean DEBUG        = false;
180
181     private static final boolean DIAGNOSTICS       = false;
182
183     private TransactionPolicySupport transactionPolicy;
184
185     final private ClusterControl clusterControl;
186
187     final protected BuiltinSupportImpl builtinSupport;
188     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
189     final protected ClusterSetsSupport clusterSetsSupport;
190     final protected LifecycleSupportImpl lifecycleSupport;
191
192     protected QuerySupportImpl querySupport;
193     protected ResourceSupportImpl resourceSupport;
194     protected WriteSupport writeSupport;
195     public ClusterTranslator clusterTranslator;
196
197     boolean dirtyPrimitives = false;
198
199     public static final int SERVICE_MODE_CREATE = 2;
200     public static final int SERVICE_MODE_ALLOW = 1;
201     public int serviceMode = 0;
202     public boolean createdImmutableClusters = false;
203     public TLongHashSet createdClusters = new TLongHashSet();
204
205     private Layer0 L0;
206
207     /**
208      * The service locator maintained by the workbench. These services are
209      * initialized during workbench during the <code>init</code> method.
210      */
211     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
212
213     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
214
215     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
216
217     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
218
219     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
220
221     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
222
223     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
224
225     final protected State                                        state              = new State();
226
227     protected GraphSession                                       graphSession       = null;
228
229     protected SessionManager                                     sessionManagerImpl = null;
230
231     protected UserAuthenticationAgent                            authAgent          = null;
232
233     protected UserAuthenticator                                  authenticator      = null;
234
235     protected Resource                                           user               = null;
236
237     protected ClusterStream                                      clusterStream      = null;
238
239     protected SessionRequestManager                         requestManager = null;
240
241     public ClusterTable                                       clusterTable       = null;
242
243     public QueryProcessor                                     queryProvider2 = null;
244
245     ClientChangesImpl                                  clientChanges      = null;
246
247     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
248
249 //    protected long                                               newClusterId       = Constants.NullClusterId;
250
251     protected int                                                flushCounter       = 0;
252
253     protected boolean                                            writeOnly          = false;
254
255     WriteState<?>                                                writeState = null;
256     WriteStateBase<?>                                            delayedWriteState = null;
257     protected Resource defaultClusterSet = null; // If not null then used for newResource().
258
259     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
260
261 //        if (authAgent == null)
262 //            throw new IllegalArgumentException("null authentication agent");
263
264         File t = StaticSessionProperties.virtualGraphStoragePath;
265         if (null == t)
266             t = new File(".");
267         this.clusterTable = new ClusterTable(this, t);
268         this.builtinSupport = new BuiltinSupportImpl(this);
269         this.sessionManagerImpl = sessionManagerImpl;
270         this.user = null;
271 //        this.authAgent = authAgent;
272
273         serviceLocator.registerService(Session.class, this);
274         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
275         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
276         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
277         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
278         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
279         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
280         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
281         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
282         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
283         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
284         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
285         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
286         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
287         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
288         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
289         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
290         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
291         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
292         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
293         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
294         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
295         ServiceActivityUpdaterForWriteTransactions.register(this);
296
297         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
298         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
299         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
300         this.lifecycleSupport = new LifecycleSupportImpl(this);
301         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
302         this.transactionPolicy = new TransactionPolicyRelease();
303         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
304         this.clusterControl = new ClusterControlImpl(this);
305         serviceLocator.registerService(ClusterControl.class, clusterControl);
306         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
307         this.clusterSetsSupport.setReadDirectory(t.toPath());
308         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
309         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
310     }
311
312     /*
313      *
314      * Session interface
315      */
316
317
318     @Override
319     public Resource getRootLibrary() {
320         return queryProvider2.getRootLibraryResource();
321     }
322
323     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
324         if (!graphSession.dbSession.refreshEnabled())
325             return;
326         try {
327             getClusterTable().refresh(csid, this, clusterUID);
328         } catch (Throwable t) {
329             Logger.defaultLogError("Refesh failed.", t);
330         }
331     }
332
333     static final class TaskHelper {
334         private final String name;
335         private Object result;
336         final Semaphore sema = new Semaphore(0);
337         private Throwable throwable = null;
338         final Consumer<DatabaseException> callback = e -> {
339             synchronized (TaskHelper.this) {
340                 throwable = e;
341             }
342         };
343         final Procedure<Object> proc = new Procedure<Object>() {
344             @Override
345             public void execute(Object result) {
346                 callback.accept(null);
347             }
348             @Override
349             public void exception(Throwable t) {
350                 if (t instanceof DatabaseException)
351                     callback.accept((DatabaseException)t);
352                 else
353                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
354             }
355         };
356         final WriteTraits writeTraits = new WriteTraits() {};
357         TaskHelper(String name) {
358             this.name = name;
359         }
360         @SuppressWarnings("unchecked")
361         <T> T getResult() {
362             return (T)result;
363         }
364         void setResult(Object result) {
365             this.result = result;
366         }
367 //        Throwable throwableGet() {
368 //            return throwable;
369 //        }
370         synchronized void throwableSet(Throwable t) {
371             throwable = t;
372         }
373 //        void throwableSet(String t) {
374 //            throwable = new InternalException("" + name + " operation failed. " + t);
375 //        }
376         synchronized void throwableCheck()
377         throws DatabaseException {
378             if (null != throwable)
379                 if (throwable instanceof DatabaseException)
380                     throw (DatabaseException)throwable;
381                 else
382                     throw new DatabaseException("Undo operation failed.", throwable);
383         }
384         void throw_(String message)
385         throws DatabaseException {
386             throw new DatabaseException("" + name + " operation failed. " + message);
387         }
388     }
389
390
391
392     private ListenerBase getListenerBase(Object procedure) {
393         if (procedure instanceof ListenerBase)
394             return (ListenerBase) procedure;
395         else
396             return null;
397     }
398
399     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
400         scheduleRequest(request, callback, notify, null);
401     }
402
403     /* (non-Javadoc)
404      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
405      */
406     @Override
407     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
408
409         assert (request != null);
410
411         if(Development.DEVELOPMENT) {
412             try {
413             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
414                 System.err.println("schedule write '" + request + "'");
415             } catch (Throwable t) {
416                 Logger.defaultLogError(t);
417             }
418         }
419
420         requestManager.scheduleWrite(new SessionTask(null) {
421
422             @Override
423             public void run0(int thread) {
424
425                 if(Development.DEVELOPMENT) {
426                     try {
427                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
428                         System.err.println("perform write '" + request + "'");
429                     } catch (Throwable t) {
430                         Logger.defaultLogError(t);
431                     }
432                 }
433
434                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
435
436                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
437
438                 try {
439
440                     flushCounter = 0;
441                     Disposable.safeDispose(clientChanges);
442                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
443
444                     VirtualGraph vg = getProvider(request.getProvider());
445
446                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
447                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
448
449                         @Override
450                         public void execute(Object result) {
451                             if(callback != null) callback.accept(null);
452                         }
453
454                         @Override
455                         public void exception(Throwable t) {
456                             if(callback != null) callback.accept((DatabaseException)t);
457                         }
458
459                     });
460
461                     assert (null != writer);
462 //                    writer.state.barrier.inc();
463
464                     try {
465                         request.perform(writer);
466                         assert (null != writer);
467                     } catch (Throwable t) {
468                         if (!(t instanceof CancelTransactionException))
469                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
470                         writeState.except(t);
471                     } finally {
472 //                        writer.state.barrier.dec();
473 //                        writer.waitAsync(request);
474                     }
475
476
477                     assert(!queryProvider2.cache.dirty);
478
479                 } catch (Throwable e) {
480
481                     // Log it first, just to be safe that the error is always logged.
482                     if (!(e instanceof CancelTransactionException))
483                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
484
485 //                    writeState.getGraph().state.barrier.dec();
486 //                    writeState.getGraph().waitAsync(request);
487
488                     writeState.except(e);
489
490 //                    try {
491 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
492 //                        // All we can do here is to log those, can't really pass them anywhere.
493 //                        if (callback != null) {
494 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
495 //                            else callback.run(new DatabaseException(e));
496 //                        }
497 //                    } catch (Throwable e2) {
498 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
499 //                    }
500 //                    boolean empty = clusterStream.reallyFlush();
501 //                    int callerThread = -1;
502 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
503 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
504 //                    try {
505 //                        // Send all local changes to server so it can calculate correct reverse change set.
506 //                        // This will call clusterStream.accept().
507 //                        state.cancelCommit(context, clusterStream);
508 //                        if (!empty) {
509 //                            if (!context.isOk()) // this is a blocking operation
510 //                                throw new InternalException("Cancel failed. This should never happen.");
511 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
512 //                        }
513 //                        state.cancelCommit2(context, clusterStream);
514 //                    } catch (DatabaseException e3) {
515 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
516 //                    } finally {
517 //                        clusterStream.setOff(false);
518 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
519 //                    }
520
521                 } finally {
522
523                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
524
525                 }
526
527                 task.finish();
528
529                 if(Development.DEVELOPMENT) {
530                     try {
531                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
532                         System.err.println("finish write '" + request + "'");
533                     } catch (Throwable t) {
534                         Logger.defaultLogError(t);
535                     }
536                 }
537
538             }
539
540         }, combine);
541
542     }
543
544     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
545         scheduleRequest(request, procedure, notify, null);
546     }
547
548     /* (non-Javadoc)
549      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
550      */
551     @Override
552     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
553
554         assert (request != null);
555
556         requestManager.scheduleWrite(new SessionTask(null) {
557
558             @Override
559             public void run0(int thread) {
560
561                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
562
563                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
564
565                 flushCounter = 0;
566                 Disposable.safeDispose(clientChanges);
567                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
568
569                 VirtualGraph vg = getProvider(request.getProvider());
570                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
571
572                 try {
573                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
574                     writeState = writeStateT;
575
576                     assert (null != writer);
577 //                    writer.state.barrier.inc();
578                     writeStateT.setResult(request.perform(writer));
579                     assert (null != writer);
580
581 //                    writer.state.barrier.dec();
582 //                    writer.waitAsync(null);
583
584                 } catch (Throwable e) {
585
586 //                    writer.state.barrier.dec();
587 //                    writer.waitAsync(null);
588
589                     writeState.except(e);
590
591 //                  state.stopWriteTransaction(clusterStream);
592 //
593 //              } catch (Throwable e) {
594 //                  // Log it first, just to be safe that the error is always logged.
595 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
596 //
597 //                  try {
598 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
599 //                      // All we can do here is to log those, can't really pass them anywhere.
600 //                      if (procedure != null) {
601 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
602 //                          else procedure.exception(new DatabaseException(e));
603 //                      }
604 //                  } catch (Throwable e2) {
605 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
606 //                  }
607 //
608 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
609 //
610 //                  state.stopWriteTransaction(clusterStream);
611
612                 } finally {
613                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
614                 }
615
616 //              if(notify != null) notify.release();
617
618                 task.finish();
619
620             }
621
622         }, combine);
623
624     }
625
626     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
627         scheduleRequest(request, callback, notify, null);
628     }
629
630     /* (non-Javadoc)
631      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
632      */
633     @Override
634     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
635
636         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
637
638         assert (request != null);
639
640         requestManager.scheduleWrite(new SessionTask(null) {
641
642             @Override
643             public void run0(int thread) {
644                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
645
646                 Procedure<Object> stateProcedure = new Procedure<Object>() {
647                     @Override
648                     public void execute(Object result) {
649                         if (callback != null)
650                             callback.accept(null);
651                     }
652                     @Override
653                     public void exception(Throwable t) {
654                         if (callback != null) {
655                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
656                             else callback.accept(new DatabaseException(t));
657                         } else
658                             Logger.defaultLogError("Unhandled exception", t);
659                     }
660                 };
661
662                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
663                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
664                 DelayedWriteGraph dwg = null;
665 //                newGraph.state.barrier.inc();
666
667                 try {
668                     dwg = new DelayedWriteGraph(newGraph);
669                     request.perform(dwg);
670                 } catch (Throwable e) {
671                     delayedWriteState.except(e);
672                     total.finish();
673                     dwg.close();
674                     return;
675                 } finally {
676 //                    newGraph.state.barrier.dec();
677 //                    newGraph.waitAsync(request);
678                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
679                 }
680
681                 delayedWriteState = null;
682
683                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
684                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
685
686                 flushCounter = 0;
687                 Disposable.safeDispose(clientChanges);
688                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
689
690                 acquireWriteOnly();
691
692                 VirtualGraph vg = getProvider(request.getProvider());
693                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
694
695                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
696
697                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
698
699                 assert (null != writer);
700 //                writer.state.barrier.inc();
701
702                 try {
703                     // Cannot cancel
704                     dwg.commit(writer, request);
705
706                         if(defaultClusterSet != null) {
707                                 XSupport xs = getService(XSupport.class);
708                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
709                                 ClusteringSupport cs = getService(ClusteringSupport.class);
710                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
711                         }
712
713                     // This makes clusters available from server
714                     clusterStream.reallyFlush();
715
716                     releaseWriteOnly(writer);
717                     handleUpdatesAndMetadata(writer);
718
719                 } catch (ServiceException e) {
720 //                    writer.state.barrier.dec();
721 //                    writer.waitAsync(null);
722
723                     // These shall be requested from server
724                     clusterTable.removeWriteOnlyClusters();
725                     // This makes clusters available from server
726                     clusterStream.reallyFlush();
727
728                     releaseWriteOnly(writer);
729                     writeState.except(e);
730                 } finally {
731                     // Debugging & Profiling
732                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
733                     task2.finish();
734                     total.finish();
735                 }
736             }
737
738         }, combine);
739
740     }
741
742     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
743         scheduleRequest(request, procedure, notify, null);
744     }
745
746     /* (non-Javadoc)
747      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
748      */
749     @Override
750     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
751         throw new Error("Not implemented");
752     }
753
754     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
755         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
756         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
757             createdClusters.add(cluster.clusterId);
758         }
759         return cluster;
760     }
761
762     class WriteOnlySupport implements WriteSupport {
763
764         ClusterStream stream;
765         ClusterImpl currentCluster;
766
767         public WriteOnlySupport() {
768             this.stream = clusterStream;
769         }
770
771         @Override
772         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
773             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
774         }
775
776         @Override
777         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
778
779             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
780
781             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
782                 if(s != queryProvider2.getRootLibrary())
783                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
784
785             try {
786                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
787             } catch (DatabaseException e) {
788                 Logger.defaultLogError(e);
789                 //throw e;
790             }
791
792             clientChanges.invalidate(s);
793
794             if (cluster.isWriteOnly())
795                 return;
796             queryProvider2.updateStatements(s, p);
797
798         }
799
800         @Override
801         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
802             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
803         }
804
805         @Override
806         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
807
808             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
809             try {
810                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
811             } catch (DatabaseException e) {
812                 Logger.defaultLogError(e);
813             }
814
815             clientChanges.invalidate(rid);
816
817             if (cluster.isWriteOnly())
818                 return;
819             queryProvider2.updateValue(rid);
820
821         }
822
823         @Override
824         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
825
826             if(amount < 65536) {
827                 claimValue(provider,resource, reader.readBytes(null, amount));
828                 return;
829             }
830
831             byte[] bytes = new byte[65536];
832
833             int rid = ((ResourceImpl)resource).id;
834             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
835             try {
836                 int left = amount;
837                 while(left > 0) {
838                     int block = Math.min(left, 65536);
839                     reader.readBytes(bytes, block);
840                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
841                     left -= block;
842                 }
843             } catch (DatabaseException e) {
844                 Logger.defaultLogError(e);
845             }
846
847             clientChanges.invalidate(rid);
848
849             if (cluster.isWriteOnly())
850                 return;
851             queryProvider2.updateValue(rid);
852
853         }
854
855         private void maintainCluster(ClusterImpl before, ClusterI after_) {
856             if(after_ != null && after_ != before) {
857                 ClusterImpl after = (ClusterImpl)after_;
858                 if(currentCluster == before) {
859                     currentCluster = after;
860                 }
861                 clusterTable.replaceCluster(after);
862             }
863         }
864
865         public int createResourceKey(int foreignCounter) throws DatabaseException {
866             if(currentCluster == null) {
867                 currentCluster = getNewResourceCluster();
868             }
869             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
870                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
871                 newCluster.foreignLookup = new byte[foreignCounter];
872                 currentCluster = newCluster;
873                 if (DEBUG)
874                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
875             }
876             return currentCluster.createResource(clusterTranslator);
877         }
878
879         @Override
880         public Resource createResource(VirtualGraph provider) throws DatabaseException {
881             if(currentCluster == null) {
882                 if (null != defaultClusterSet) {
883                         ResourceImpl result = getNewResource(defaultClusterSet);
884                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
885                     return result;
886                 } else {
887                     currentCluster = getNewResourceCluster();
888                 }
889             }
890             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
891                 if (null != defaultClusterSet) {
892                         ResourceImpl result = getNewResource(defaultClusterSet);
893                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
894                     return result;
895                 } else {
896                     currentCluster = getNewResourceCluster();
897                 }
898             }
899             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
900         }
901
902         @Override
903         public Resource createResource(VirtualGraph provider, long clusterId)
904         throws DatabaseException {
905             return getNewResource(clusterId);
906         }
907
908         @Override
909         public Resource createResource(VirtualGraph provider, Resource clusterSet)
910         throws DatabaseException {
911             return getNewResource(clusterSet);
912         }
913
914         @Override
915         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
916                 throws DatabaseException {
917             getNewClusterSet(clusterSet);
918         }
919
920         @Override
921         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
922         throws ServiceException {
923             return containsClusterSet(clusterSet);
924         }
925
926         public void selectCluster(long cluster) {
927                 currentCluster = clusterTable.getClusterByClusterId(cluster);
928                 long setResourceId = clusterSetsSupport.getSet(cluster);
929                 clusterSetsSupport.put(setResourceId, cluster);
930         }
931
932         @Override
933         public Resource setDefaultClusterSet(Resource clusterSet)
934         throws ServiceException {
935                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
936                 if(clusterSet != null) {
937                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
938                         currentCluster = clusterTable.getClusterByClusterId(id);
939                         return result;
940                 } else {
941                         currentCluster = null;
942                         return null;
943                 }
944         }
945
946         @Override
947         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
948
949                  provider = getProvider(provider);
950              if (null == provider) {
951                  int key = ((ResourceImpl)resource).id;
952
953
954
955                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
956 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
957 //                      if(key != queryProvider2.getRootLibrary())
958 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
959
960 //                 try {
961 //                     cluster.removeValue(key, clusterTranslator);
962 //                 } catch (DatabaseException e) {
963 //                     Logger.defaultLogError(e);
964 //                     return;
965 //                 }
966
967                  clusterTable.writeOnlyInvalidate(cluster);
968
969                  try {
970                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
971                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
972                          clusterTranslator.removeValue(cluster);
973                  } catch (DatabaseException e) {
974                          Logger.defaultLogError(e);
975                  }
976
977                  queryProvider2.invalidateResource(key);
978                  clientChanges.invalidate(key);
979
980              } else {
981                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
982                  queryProvider2.updateValue(querySupport.getId(resource));
983                  clientChanges.claimValue(resource);
984              }
985
986
987         }
988
989         @Override
990         public void flush(boolean intermediate) {
991             throw new UnsupportedOperationException();
992         }
993
994         @Override
995         public void flushCluster() {
996             clusterTable.flushCluster(graphSession);
997             if(defaultClusterSet != null) {
998                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
999             }
1000             currentCluster = null;
1001         }
1002
1003         @Override
1004         public void flushCluster(Resource r) {
1005             throw new UnsupportedOperationException("flushCluster resource " + r);
1006         }
1007
1008         @Override
1009         public void gc() {
1010         }
1011
1012         @Override
1013         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1014                 Resource object) {
1015
1016                 int s = ((ResourceImpl)subject).id;
1017                 int p = ((ResourceImpl)predicate).id;
1018                 int o = ((ResourceImpl)object).id;
1019
1020                 provider = getProvider(provider);
1021                 if (null == provider) {
1022
1023                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1024                         clusterTable.writeOnlyInvalidate(cluster);
1025
1026                         try {
1027
1028                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1029
1030                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1031                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1032
1033                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1034                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1035                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1036                                 clusterTranslator.removeStatement(cluster);
1037
1038                                 queryProvider2.invalidateResource(s);
1039                                 clientChanges.invalidate(s);
1040
1041                         } catch (DatabaseException e) {
1042
1043                                 Logger.defaultLogError(e);
1044
1045                         }
1046
1047                         return true;
1048
1049                 } else {
1050                         
1051                         ((VirtualGraphImpl)provider).deny(s, p, o);
1052                         queryProvider2.invalidateResource(s);
1053                         clientChanges.invalidate(s);
1054
1055                         return true;
1056
1057                 }
1058
1059         }
1060
1061         @Override
1062         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1063             throw new UnsupportedOperationException();
1064         }
1065
1066         @Override
1067         public boolean writeOnly() {
1068             return true;
1069         }
1070
1071         @Override
1072         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1073             writeSupport.performWriteRequest(graph, request);
1074         }
1075
1076         @Override
1077         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1078             throw new UnsupportedOperationException();
1079         }
1080
1081         @Override
1082         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1083             throw new UnsupportedOperationException();
1084         }
1085
1086         @Override
1087         public <T> void addMetadata(Metadata data) throws ServiceException {
1088             writeSupport.addMetadata(data);
1089         }
1090
1091         @Override
1092         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1093             return writeSupport.getMetadata(clazz);
1094         }
1095
1096         @Override
1097         public TreeMap<String, byte[]> getMetadata() {
1098             return writeSupport.getMetadata();
1099         }
1100
1101         @Override
1102         public void commitDone(WriteTraits writeTraits, long csid) {
1103             writeSupport.commitDone(writeTraits, csid);
1104         }
1105
1106         @Override
1107         public void clearUndoList(WriteTraits writeTraits) {
1108             writeSupport.clearUndoList(writeTraits);
1109         }
1110         @Override
1111         public int clearMetadata() {
1112             return writeSupport.clearMetadata();
1113         }
1114
1115                 @Override
1116                 public void startUndo() {
1117                         writeSupport.startUndo();
1118                 }
1119     }
1120
1121     class VirtualWriteOnlySupport implements WriteSupport {
1122
1123 //        @Override
1124 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1125 //                Resource object) {
1126 //            throw new UnsupportedOperationException();
1127 //        }
1128
1129         @Override
1130         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1131
1132             TransientGraph impl = (TransientGraph)provider;
1133             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1134             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1135             clientChanges.claim(subject, predicate, object);
1136
1137         }
1138
1139         @Override
1140         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1141
1142             TransientGraph impl = (TransientGraph)provider;
1143             impl.claim(subject, predicate, object);
1144             getQueryProvider2().updateStatements(subject, predicate);
1145             clientChanges.claim(subject, predicate, object);
1146
1147         }
1148
1149         @Override
1150         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1151             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1152         }
1153
1154         @Override
1155         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1156             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1157             getQueryProvider2().updateValue(resource);
1158             clientChanges.claimValue(resource);
1159         }
1160
1161         @Override
1162         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1163             byte[] value = reader.readBytes(null, amount);
1164             claimValue(provider, resource, value);
1165         }
1166
1167         @Override
1168         public Resource createResource(VirtualGraph provider) {
1169             TransientGraph impl = (TransientGraph)provider;
1170             return impl.getResource(impl.newResource(false));
1171         }
1172
1173         @Override
1174         public Resource createResource(VirtualGraph provider, long clusterId) {
1175             throw new UnsupportedOperationException();
1176         }
1177
1178         @Override
1179         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1180         throws DatabaseException {
1181             throw new UnsupportedOperationException();
1182         }
1183
1184         @Override
1185         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1186         throws DatabaseException {
1187             throw new UnsupportedOperationException();
1188         }
1189
1190         @Override
1191         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1192         throws ServiceException {
1193             throw new UnsupportedOperationException();
1194         }
1195
1196         @Override
1197         public Resource setDefaultClusterSet(Resource clusterSet)
1198         throws ServiceException {
1199                 return null;
1200         }
1201
1202         @Override
1203         public void denyValue(VirtualGraph provider, Resource resource) {
1204             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1205             getQueryProvider2().updateValue(querySupport.getId(resource));
1206             // NOTE: this only keeps track of value changes by-resource.
1207             clientChanges.claimValue(resource);
1208         }
1209
1210         @Override
1211         public void flush(boolean intermediate) {
1212             throw new UnsupportedOperationException();
1213         }
1214
1215         @Override
1216         public void flushCluster() {
1217             throw new UnsupportedOperationException();
1218         }
1219
1220         @Override
1221         public void flushCluster(Resource r) {
1222             throw new UnsupportedOperationException("Resource " + r);
1223         }
1224
1225         @Override
1226         public void gc() {
1227         }
1228
1229         @Override
1230         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1231             TransientGraph impl = (TransientGraph) provider;
1232             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1233             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1234             clientChanges.deny(subject, predicate, object);
1235             return true;
1236         }
1237
1238         @Override
1239         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1240             throw new UnsupportedOperationException();
1241         }
1242
1243         @Override
1244         public boolean writeOnly() {
1245             return true;
1246         }
1247
1248         @Override
1249         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1250             throw new UnsupportedOperationException();
1251         }
1252
1253         @Override
1254         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1255             throw new UnsupportedOperationException();
1256         }
1257
1258         @Override
1259         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1260             throw new UnsupportedOperationException();
1261         }
1262
1263         @Override
1264         public <T> void addMetadata(Metadata data) throws ServiceException {
1265             throw new UnsupportedOperationException();
1266         }
1267
1268         @Override
1269         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1270             throw new UnsupportedOperationException();
1271         }
1272
1273         @Override
1274         public TreeMap<String, byte[]> getMetadata() {
1275             throw new UnsupportedOperationException();
1276         }
1277
1278         @Override
1279         public void commitDone(WriteTraits writeTraits, long csid) {
1280         }
1281
1282         @Override
1283         public void clearUndoList(WriteTraits writeTraits) {
1284         }
1285
1286         @Override
1287         public int clearMetadata() {
1288             return 0;
1289         }
1290
1291                 @Override
1292                 public void startUndo() {
1293                 }
1294     }
1295
1296     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1297
1298         try {
1299
1300             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1301
1302             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1303
1304             flushCounter = 0;
1305             Disposable.safeDispose(clientChanges);
1306             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1307
1308             acquireWriteOnly();
1309
1310             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1311
1312             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1313
1314             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1315             writeState = writeStateT;
1316
1317             assert (null != writer);
1318 //            writer.state.barrier.inc();
1319             long start = System.nanoTime();
1320             T result = request.perform(writer);
1321             long duration = System.nanoTime() - start;
1322             if (DEBUG) {
1323                 System.err.println("################");
1324                 System.err.println("WriteOnly duration " + 1e-9*duration);
1325             }
1326             writeStateT.setResult(result);
1327
1328             // This makes clusters available from server
1329             clusterStream.reallyFlush();
1330
1331             // This will trigger query updates
1332             releaseWriteOnly(writer);
1333             assert (null != writer);
1334
1335             handleUpdatesAndMetadata(writer);
1336
1337         } catch (CancelTransactionException e) {
1338
1339             releaseWriteOnly(writeState.getGraph());
1340
1341             clusterTable.removeWriteOnlyClusters();
1342             state.stopWriteTransaction(clusterStream);
1343
1344         } catch (Throwable e) {
1345
1346             e.printStackTrace();
1347
1348             releaseWriteOnly(writeState.getGraph());
1349
1350             clusterTable.removeWriteOnlyClusters();
1351
1352             if (callback != null)
1353                 callback.exception(new DatabaseException(e));
1354
1355             state.stopWriteTransaction(clusterStream);
1356             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1357
1358         } finally  {
1359
1360             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1361
1362         }
1363
1364     }
1365
1366     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1367         scheduleRequest(request, callback, notify, null);
1368     }
1369
1370     @Override
1371     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1372
1373         assertAlive();
1374
1375         assert (request != null);
1376
1377         requestManager.scheduleWrite(new SessionTask(null) {
1378
1379             @Override
1380             public void run0(int thread) {
1381
1382                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1383
1384                     try {
1385
1386                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1387
1388                         flushCounter = 0;
1389                         Disposable.safeDispose(clientChanges);
1390                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1391
1392                         acquireWriteOnly();
1393
1394                         VirtualGraph vg = getProvider(request.getProvider());
1395                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1396
1397                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1398
1399                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1400
1401                             @Override
1402                             public void execute(Object result) {
1403                                 if(callback != null) callback.accept(null);
1404                             }
1405
1406                             @Override
1407                             public void exception(Throwable t) {
1408                                 if(callback != null) callback.accept((DatabaseException)t);
1409                             }
1410
1411                         });
1412
1413                         assert (null != writer);
1414 //                        writer.state.barrier.inc();
1415
1416                         try {
1417
1418                             request.perform(writer);
1419
1420                         } catch (Throwable e) {
1421
1422 //                            writer.state.barrier.dec();
1423 //                            writer.waitAsync(null);
1424
1425                             releaseWriteOnly(writer);
1426
1427                             clusterTable.removeWriteOnlyClusters();
1428
1429                             if(!(e instanceof CancelTransactionException)) {
1430                                 if (callback != null)
1431                                     callback.accept(new DatabaseException(e));
1432                             }
1433
1434                             writeState.except(e);
1435
1436                             return;
1437
1438                         }
1439
1440                         // This makes clusters available from server
1441                         boolean empty = clusterStream.reallyFlush();
1442                         // This was needed to make WO requests call metadata listeners.
1443                         // NOTE: the calling event does not contain clientChanges information.
1444                         if (!empty && clientChanges.isEmpty())
1445                             clientChanges.setNotEmpty(true);
1446                         releaseWriteOnly(writer);
1447                         assert (null != writer);
1448                     } finally  {
1449
1450                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1451
1452                     }
1453
1454
1455                 task.finish();
1456
1457             }
1458
1459         }, combine);
1460
1461     }
1462
1463     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1464         scheduleRequest(request, callback, notify, null);
1465     }
1466
1467     /* (non-Javadoc)
1468      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1469      */
1470     @Override
1471     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1472
1473         assert (request != null);
1474
1475         requestManager.scheduleWrite(new SessionTask(null) {
1476
1477             @Override
1478             public void run0(int thread) {
1479
1480                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1481
1482                 performWriteOnly(request, notify, callback);
1483
1484                 task.finish();
1485
1486             }
1487
1488         }, combine);
1489
1490     }
1491
1492     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1493
1494         assert (request != null);
1495         assert (procedure != null);
1496
1497         //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1498
1499         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1500
1501             @Override
1502             public void run0(int thread) {
1503
1504                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1505
1506                 ListenerBase listener = getListenerBase(procedure);
1507
1508                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1509
1510                 // This is never synced but increase to prevent it from visiting 0
1511                 newGraph.asyncBarrier.inc();
1512
1513                 try {
1514
1515                     if (listener != null) {
1516
1517                         try {
1518                                 
1519                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1520
1521                                         @Override
1522                                         public void exception(AsyncReadGraph graph, Throwable t) {
1523                                                 procedure.exception(graph, t);
1524                                                 if(throwable != null) {
1525                                                         throwable.set(t);
1526                                                 } else {
1527                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1528                                                 }
1529                                         }
1530
1531                                         @Override
1532                                         public void execute(AsyncReadGraph graph, T t) {
1533                                                 if(result != null) result.set(t);
1534                                                 procedure.execute(graph, t);
1535                                         }
1536
1537                                 };
1538                                 
1539                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1540                                 
1541                         } catch (Throwable t) {
1542                             // This is handled by the AsyncProcedure
1543                                 //Logger.defaultLogError("Internal error", t);
1544                         }
1545
1546                     } else {
1547
1548                         try {
1549
1550 //                            newGraph.state.barrier.inc();
1551
1552                             T t = request.perform(newGraph);
1553
1554                             try {
1555
1556                                 if(result != null) result.set(t);
1557                                 procedure.execute(newGraph, t);
1558
1559                             } catch (Throwable th) {
1560
1561                                 if(throwable != null) {
1562                                     throwable.set(th);
1563                                 } else {
1564                                     Logger.defaultLogError("Unhandled exception", th);
1565                                 }
1566
1567                             }
1568
1569                         } catch (Throwable t) {
1570
1571                             if (DEBUG)
1572                                 t.printStackTrace();
1573
1574                             if(throwable != null) {
1575                                 throwable.set(t);
1576                             } else {
1577                                 Logger.defaultLogError("Unhandled exception", t);
1578                             }
1579
1580                             try {
1581
1582                                 procedure.exception(newGraph, t);
1583
1584                             } catch (Throwable t2) {
1585
1586                                 if(throwable != null) {
1587                                     throwable.set(t2);
1588                                 } else {
1589                                     Logger.defaultLogError("Unhandled exception", t2);
1590                                 }
1591
1592                             }
1593
1594                         }
1595
1596 //                        newGraph.state.barrier.dec();
1597 //                        newGraph.waitAsync(request);
1598
1599                     }
1600
1601                 } finally {
1602
1603                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1604
1605                 }
1606
1607             }
1608
1609         });
1610
1611     }
1612
1613     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1614
1615         assert (request != null);
1616         assert (procedure != null);
1617
1618         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1619
1620         requestManager.scheduleRead(new SessionRead(null, notify) {
1621
1622             @Override
1623             public void run0(int thread) {
1624
1625                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1626
1627                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1628
1629                 try {
1630
1631                     if (listener != null) {
1632
1633                         try {
1634                                 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1635                                 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1636                                                         //newGraph.processor.query(newGraph, request, null, procedure, listener);
1637                                                 } catch (DatabaseException e) {
1638                                                         Logger.defaultLogError(e);
1639                                                 }
1640
1641                     } else {
1642
1643 //                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1644 //                                procedure, "request");
1645
1646                         BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request);
1647
1648                         try {
1649
1650                             request.perform(newGraph, wrap);
1651                             wrap.get();
1652
1653                         } catch (DatabaseException e) {
1654
1655                                                         Logger.defaultLogError(e);
1656
1657                         }
1658
1659                     }
1660
1661                 } finally {
1662
1663                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1664
1665                 }
1666
1667             }
1668
1669         });
1670
1671     }
1672
1673     public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1674
1675         assert (request != null);
1676         assert (procedure != null);
1677
1678         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1679
1680         int sync = notify != null ? thread : -1;
1681
1682         requestManager.scheduleRead(new SessionRead(null, notify) {
1683
1684             @Override
1685             public void run0(int thread) {
1686
1687                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1688
1689                 ListenerBase listener = getListenerBase(procedure);
1690
1691                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1692
1693                 try {
1694
1695                     if (listener != null) {
1696
1697                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1698
1699 //                        newGraph.waitAsync(request);
1700
1701                     } else {
1702
1703                         final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1704
1705                         try {
1706
1707                             request.perform(newGraph, wrapper);
1708
1709                         } catch (Throwable t) {
1710
1711                             t.printStackTrace();
1712
1713                         }
1714
1715                     }
1716
1717                 } finally {
1718
1719                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1720
1721                 }
1722
1723             }
1724
1725         });
1726
1727     }
1728     
1729     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1730
1731         assert (request != null);
1732         assert (procedure != null);
1733
1734         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1735
1736         int sync = notify != null ? thread : -1;
1737
1738         requestManager.scheduleRead(new SessionRead(null, notify) {
1739
1740             @Override
1741             public void run0(int thread) {
1742
1743                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1744
1745                 ListenerBase listener = getListenerBase(procedure);
1746
1747                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1748
1749                 try {
1750
1751                     if (listener != null) {
1752
1753                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1754
1755 //                        newGraph.waitAsync(request);
1756
1757                     } else {
1758
1759                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1760
1761                         try {
1762
1763                             request.perform(newGraph, wrapper);
1764
1765                         } catch (Throwable t) {
1766
1767                             t.printStackTrace();
1768
1769                         }
1770
1771                     }
1772
1773                 } finally {
1774
1775                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1776
1777                 }
1778
1779             }
1780
1781         });
1782
1783     }
1784
1785     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1786
1787         assert (request != null);
1788         assert (procedure != null);
1789
1790         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1791
1792         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1793
1794             @Override
1795             public void run0(int thread) {
1796
1797                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1798
1799                 ListenerBase listener = getListenerBase(procedure);
1800
1801                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1802
1803                 try {
1804
1805                     if (listener != null) {
1806
1807                         try {
1808                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1809                         } catch (DatabaseException e) {
1810                             Logger.defaultLogError(e);
1811                         }
1812
1813                     } else {
1814
1815 //                        newGraph.state.barrier.inc();
1816
1817                         request.register(newGraph, new Listener<T>() {
1818
1819                             @Override
1820                             public void exception(Throwable t) {
1821                                 if(throwable != null) throwable.set(t);
1822                                 procedure.exception(t);
1823 //                                newGraph.state.barrier.dec();
1824                             }
1825
1826                             @Override
1827                             public void execute(T t) {
1828                                 if(result != null) result.set(t);
1829                                 procedure.execute(t);
1830 //                                newGraph.state.barrier.dec();
1831                             }
1832
1833                             @Override
1834                             public boolean isDisposed() {
1835                                 return true;
1836                             }
1837
1838                         });
1839
1840 //                        newGraph.waitAsync(request);
1841
1842                     }
1843
1844                 } finally {
1845
1846                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1847
1848                 }
1849
1850             }
1851
1852         });
1853
1854     }
1855
1856
1857     @Override
1858     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1859
1860         scheduleRequest(request, procedure, null, null, null);
1861
1862     }
1863
1864     @Override
1865     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1866         assertNotSession();
1867         Semaphore notify = new Semaphore(0);
1868         DataContainer<Throwable> container = new DataContainer<Throwable>();
1869         DataContainer<T> result = new DataContainer<T>();
1870         scheduleRequest(request, procedure, notify, container, result);
1871         acquire(notify, request);
1872         Throwable throwable = container.get();
1873         if(throwable != null) {
1874             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1875             else throw new DatabaseException("Unexpected exception", throwable);
1876         }
1877         return result.get();
1878     }
1879
1880     @Override
1881     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1882         assertNotSession();
1883         Semaphore notify = new Semaphore(0);
1884         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1885         final DataContainer<T> resultContainer = new DataContainer<T>();
1886         scheduleRequest(request, new AsyncProcedure<T>() {
1887             @Override
1888             public void exception(AsyncReadGraph graph, Throwable throwable) {
1889                 exceptionContainer.set(throwable);
1890                 procedure.exception(graph, throwable);
1891             }
1892             @Override
1893             public void execute(AsyncReadGraph graph, T result) {
1894                 resultContainer.set(result);
1895                 procedure.execute(graph, result);
1896             }
1897         }, getListenerBase(procedure), notify);
1898         acquire(notify, request, procedure);
1899         Throwable throwable = exceptionContainer.get();
1900         if (throwable != null) {
1901             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1902             else throw new DatabaseException("Unexpected exception", throwable);
1903         }
1904         return resultContainer.get();
1905     }
1906
1907
1908     @Override
1909     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1910         assertNotSession();
1911         Semaphore notify = new Semaphore(0);
1912         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1913         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1914             @Override
1915             public void exception(AsyncReadGraph graph, Throwable throwable) {
1916                 exceptionContainer.set(throwable);
1917                 procedure.exception(graph, throwable);
1918             }
1919             @Override
1920             public void execute(AsyncReadGraph graph, T result) {
1921                 procedure.execute(graph, result);
1922             }
1923             @Override
1924             public void finished(AsyncReadGraph graph) {
1925                 procedure.finished(graph);
1926             }
1927         }, notify);
1928         acquire(notify, request, procedure);
1929         Throwable throwable = exceptionContainer.get();
1930         if (throwable != null) {
1931             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1932             else throw new DatabaseException("Unexpected exception", throwable);
1933         }
1934         // TODO: implement return value
1935         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1936         return null;
1937     }
1938
1939     @Override
1940     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1941         return syncRequest(request, new ProcedureAdapter<T>());
1942
1943
1944 //        assert(request != null);
1945 //
1946 //        final DataContainer<T> result = new DataContainer<T>();
1947 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1948 //
1949 //        syncRequest(request, new Procedure<T>() {
1950 //
1951 //            @Override
1952 //            public void execute(T t) {
1953 //                result.set(t);
1954 //            }
1955 //
1956 //            @Override
1957 //            public void exception(Throwable t) {
1958 //                exception.set(t);
1959 //            }
1960 //
1961 //        });
1962 //
1963 //        Throwable t = exception.get();
1964 //        if(t != null) {
1965 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1966 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1967 //        }
1968 //
1969 //        return result.get();
1970
1971     }
1972
1973     @Override
1974     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1975         return syncRequest(request, (Procedure<T>)procedure);
1976     }
1977
1978
1979 //    @Override
1980 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1981 //        assertNotSession();
1982 //        Semaphore notify = new Semaphore(0);
1983 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1984 //        DataContainer<T> result = new DataContainer<T>();
1985 //        scheduleRequest(request, procedure, notify, container, result);
1986 //        acquire(notify, request);
1987 //        Throwable throwable = container.get();
1988 //        if(throwable != null) {
1989 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1990 //            else throw new DatabaseException("Unexpected exception", throwable);
1991 //        }
1992 //        return result.get();
1993 //    }
1994
1995
1996     @Override
1997     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1998         assertNotSession();
1999         Semaphore notify = new Semaphore(0);
2000         final DataContainer<Throwable> container = new DataContainer<Throwable>();
2001         final DataContainer<T> result = new DataContainer<T>();
2002         scheduleRequest(request, procedure, notify, container, result);
2003         acquire(notify, request);
2004         Throwable throwable = container.get();
2005         if (throwable != null) {
2006             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2007             else throw new DatabaseException("Unexpected exception", throwable);
2008         }
2009         return result.get();
2010     }
2011
2012     @Override
2013     public void syncRequest(Write request) throws DatabaseException  {
2014         assertNotSession();
2015         assertAlive();
2016         Semaphore notify = new Semaphore(0);
2017         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2018         scheduleRequest(request, e -> exception.set(e), notify);
2019         acquire(notify, request);
2020         if(exception.get() != null) throw exception.get();
2021     }
2022
2023     @Override
2024     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
2025         assertNotSession();
2026         Semaphore notify = new Semaphore(0);
2027         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2028         final DataContainer<T> result = new DataContainer<T>();
2029         scheduleRequest(request, new Procedure<T>() {
2030
2031             @Override
2032             public void exception(Throwable t) {
2033               exception.set(t);
2034             }
2035
2036             @Override
2037             public void execute(T t) {
2038               result.set(t);
2039             }
2040
2041         }, notify);
2042         acquire(notify, request);
2043         if(exception.get() != null) {
2044             Throwable t = exception.get();
2045             if(t instanceof DatabaseException) throw (DatabaseException)t;
2046             else throw new DatabaseException(t);
2047         }
2048         return result.get();
2049     }
2050
2051     @Override
2052     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2053         assertNotSession();
2054         Semaphore notify = new Semaphore(0);
2055         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2056         final DataContainer<T> result = new DataContainer<T>();
2057         scheduleRequest(request, new Procedure<T>() {
2058
2059             @Override
2060             public void exception(Throwable t) {
2061               exception.set(t);
2062             }
2063
2064             @Override
2065             public void execute(T t) {
2066               result.set(t);
2067             }
2068
2069         }, notify);
2070         acquire(notify, request);
2071         if(exception.get() != null) {
2072             Throwable t = exception.get();
2073             if(t instanceof DatabaseException) throw (DatabaseException)t;
2074             else throw new DatabaseException(t);
2075         }
2076         return result.get();
2077     }
2078
2079     @Override
2080     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2081         assertNotSession();
2082         Semaphore notify = new Semaphore(0);
2083         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2084         final DataContainer<T> result = new DataContainer<T>();
2085         scheduleRequest(request, new Procedure<T>() {
2086
2087             @Override
2088             public void exception(Throwable t) {
2089               exception.set(t);
2090             }
2091
2092             @Override
2093             public void execute(T t) {
2094               result.set(t);
2095             }
2096
2097         }, notify);
2098         acquire(notify, request);
2099         if(exception.get() != null) {
2100             Throwable t = exception.get();
2101             if(t instanceof DatabaseException) throw (DatabaseException)t;
2102             else throw new DatabaseException(t);
2103         }
2104         return result.get();
2105     }
2106
2107     @Override
2108     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2109         assertNotSession();
2110         Semaphore notify = new Semaphore(0);
2111         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2112         scheduleRequest(request, e -> exception.set(e), notify);
2113         acquire(notify, request);
2114         if(exception.get() != null) throw exception.get();
2115     }
2116
2117     @Override
2118     public void syncRequest(WriteOnly request) throws DatabaseException  {
2119         assertNotSession();
2120         assertAlive();
2121         Semaphore notify = new Semaphore(0);
2122         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2123         scheduleRequest(request, e -> exception.set(e), notify);
2124         acquire(notify, request);
2125         if(exception.get() != null) throw exception.get();
2126     }
2127
2128     /*
2129      *
2130      * GraphRequestProcessor interface
2131      */
2132
2133     @Override
2134     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2135
2136         scheduleRequest(request, procedure, null, null);
2137
2138     }
2139
2140     @Override
2141     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2142
2143         scheduleRequest(request, procedure, null);
2144
2145     }
2146
2147     @Override
2148     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2149
2150         scheduleRequest(request, procedure, null, null, null);
2151
2152     }
2153
2154     @Override
2155     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2156
2157         scheduleRequest(request, callback, null);
2158
2159     }
2160
2161     @Override
2162     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2163
2164         scheduleRequest(request, procedure, null);
2165
2166     }
2167
2168     @Override
2169     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2170
2171         scheduleRequest(request, procedure, null);
2172
2173     }
2174
2175     @Override
2176     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2177
2178         scheduleRequest(request, procedure, null);
2179
2180     }
2181
2182     @Override
2183     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2184
2185         scheduleRequest(request, callback, null);
2186
2187     }
2188
2189     @Override
2190     public void asyncRequest(final Write r) {
2191         asyncRequest(r, null);
2192     }
2193
2194     @Override
2195     public void asyncRequest(final DelayedWrite r) {
2196         asyncRequest(r, null);
2197     }
2198
2199     @Override
2200     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2201
2202         scheduleRequest(request, callback, null);
2203
2204     }
2205
2206     @Override
2207     public void asyncRequest(final WriteOnly request) {
2208
2209         asyncRequest(request, null);
2210
2211     }
2212
2213     @Override
2214     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2215         r.request(this, procedure);
2216     }
2217
2218     @Override
2219     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2220         r.request(this, procedure);
2221     }
2222
2223     @Override
2224     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2225         r.request(this, procedure);
2226     }
2227
2228     @Override
2229     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2230         r.request(this, procedure);
2231     }
2232
2233     @Override
2234     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2235         r.request(this, procedure);
2236     }
2237
2238     @Override
2239     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2240         r.request(this, procedure);
2241     }
2242
2243     @Override
2244     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2245         return r.request(this);
2246     }
2247
2248     @Override
2249     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2250         return r.request(this);
2251     }
2252
2253     @Override
2254     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2255         r.request(this, procedure);
2256     }
2257
2258     @Override
2259     public <T> void async(WriteInterface<T> r) {
2260         r.request(this, new ProcedureAdapter<T>());
2261     }
2262
2263     //@Override
2264     public void incAsync() {
2265         state.incAsync();
2266     }
2267
2268     //@Override
2269     public void decAsync() {
2270         state.decAsync();
2271     }
2272
2273     public long getCluster(ResourceImpl resource) {
2274         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2275         return cluster.getClusterId();
2276     }
2277
2278     public long getCluster(int id) {
2279         if (clusterTable == null)
2280             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2281         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2282     }
2283
2284     public ResourceImpl getResource(int id) {
2285         return new ResourceImpl(resourceSupport, id);
2286     }
2287
2288     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2289         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2290         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2291         int key = proxy.getClusterKey();
2292         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2293         return new ResourceImpl(resourceSupport, resourceKey);
2294     }
2295
2296     public ResourceImpl getResource2(int id) {
2297         assert (id != 0);
2298         return new ResourceImpl(resourceSupport, id);
2299     }
2300
2301     final public int getId(ResourceImpl impl) {
2302         return impl.id;
2303     }
2304
2305     public static final Charset UTF8 = Charset.forName("utf-8");
2306
2307
2308
2309
2310     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2311         for(TransientGraph g : support.providers) {
2312             if(g.isPending(subject)) return false;
2313         }
2314         return true;
2315     }
2316
2317     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2318         for(TransientGraph g : support.providers) {
2319             if(g.isPending(subject, predicate)) return false;
2320         }
2321         return true;
2322     }
2323
2324     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2325
2326         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2327
2328             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2329
2330             @Override
2331             public void accept(ReadGraphImpl graph) {
2332                 if(ready.decrementAndGet() == 0) {
2333                     runnable.accept(graph);
2334                 }
2335             }
2336
2337         };
2338
2339         for(TransientGraph g : support.providers) {
2340             if(g.isPending(subject)) {
2341                 try {
2342                     g.load(graph, subject, composite);
2343                 } catch (DatabaseException e) {
2344                     e.printStackTrace();
2345                 }
2346             } else {
2347                 composite.accept(graph);
2348             }
2349         }
2350
2351         composite.accept(graph);
2352
2353     }
2354
2355     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2356
2357         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2358
2359             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2360
2361             @Override
2362             public void accept(ReadGraphImpl graph) {
2363                 if(ready.decrementAndGet() == 0) {
2364                     runnable.accept(graph);
2365                 }
2366             }
2367
2368         };
2369
2370         for(TransientGraph g : support.providers) {
2371             if(g.isPending(subject, predicate)) {
2372                 try {
2373                     g.load(graph, subject, predicate, composite);
2374                 } catch (DatabaseException e) {
2375                     e.printStackTrace();
2376                 }
2377             } else {
2378                 composite.accept(graph);
2379             }
2380         }
2381
2382         composite.accept(graph);
2383
2384     }
2385
2386 //    void dumpHeap() {
2387 //
2388 //        try {
2389 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2390 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2391 //                    "d:/heap" + flushCounter + ".txt", true);
2392 //        } catch (IOException e) {
2393 //            e.printStackTrace();
2394 //        }
2395 //
2396 //    }
2397
2398     void fireReactionsToSynchronize(ChangeSet cs) {
2399
2400         // Do not fire empty events
2401         if (cs.isEmpty())
2402             return;
2403
2404         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2405 //        g.state.barrier.inc();
2406
2407         try {
2408
2409             if (!cs.isEmpty()) {
2410                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2411                 for (ChangeListener l : changeListeners2) {
2412                     try {
2413                         l.graphChanged(e2);
2414                     } catch (Exception ex) {
2415                         ex.printStackTrace();
2416                     }
2417                 }
2418             }
2419
2420         } finally {
2421
2422 //            g.state.barrier.dec();
2423 //            g.waitAsync(null);
2424
2425         }
2426
2427     }
2428
2429     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2430         try {
2431
2432             // Do not fire empty events
2433             if (cs2.isEmpty())
2434                 return;
2435
2436 //            graph.restart();
2437 //            graph.state.barrier.inc();
2438
2439             try {
2440
2441                 if (!cs2.isEmpty()) {
2442
2443                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2444                     for (ChangeListener l : changeListeners2) {
2445                         try {
2446                             l.graphChanged(e2);
2447 //                            System.out.println("changelistener " + l);
2448                         } catch (Exception ex) {
2449                             ex.printStackTrace();
2450                         }
2451                     }
2452
2453                 }
2454
2455             } finally {
2456
2457 //                graph.state.barrier.dec();
2458 //                graph.waitAsync(null);
2459
2460             }
2461         } catch (Throwable t) {
2462             t.printStackTrace();
2463
2464         }
2465     }
2466     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2467
2468         try {
2469
2470             // Do not fire empty events
2471             if (cs2.isEmpty())
2472                 return;
2473
2474             // Do not fire on virtual requests
2475             if(graph.getProvider() != null)
2476                 return;
2477
2478             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2479
2480             try {
2481
2482                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2483                 for (ChangeListener l : metadataListeners) {
2484                     try {
2485                         l.graphChanged(e2);
2486                     } catch (Throwable ex) {
2487                         LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex);
2488                     }
2489                 }
2490
2491             } finally {
2492
2493             }
2494
2495         } catch (Throwable t) {
2496             LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t);
2497         }
2498
2499     }
2500
2501
2502     /*
2503      * (non-Javadoc)
2504      *
2505      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2506      */
2507     @Override
2508     public <T> T getService(Class<T> api) {
2509         T t = peekService(api);
2510         if (t == null) {
2511             if (state.isClosed())
2512                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2513             throw new ServiceNotFoundException(this, api);
2514         }
2515         return t;
2516     }
2517
2518     /**
2519      * @return
2520      */
2521     protected abstract ServerInformation getCachedServerInformation();
2522
2523         private Class<?> serviceKey1 = null;
2524         private Class<?> serviceKey2 = null;
2525         private Object service1 = null;
2526         private Object service2 = null;
2527
2528     /*
2529      * (non-Javadoc)
2530      *
2531      * @see
2532      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2533      */
2534     @SuppressWarnings("unchecked")
2535     @Override
2536     public synchronized <T> T peekService(Class<T> api) {
2537
2538                 if(serviceKey1 == api) {
2539                         return (T)service1;
2540                 } else if (serviceKey2 == api) {
2541                         // Promote this key
2542                         Object result = service2;
2543                         service2 = service1;
2544                         serviceKey2 = serviceKey1;
2545                         service1 = result;
2546                         serviceKey1 = api;
2547                         return (T)result;
2548                 }
2549
2550         if (Layer0.class == api)
2551                 return (T) L0;
2552         if (ServerInformation.class == api)
2553             return (T) getCachedServerInformation();
2554         else if (WriteGraphImpl.class == api)
2555             return (T) writeState.getGraph();
2556         else if (ClusterBuilder.class == api)
2557             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2558         else if (ClusterBuilderFactory.class == api)
2559             return (T)new ClusterBuilderFactoryImpl(this);
2560
2561                 service2 = service1;
2562                 serviceKey2 = serviceKey1;
2563
2564         service1 = serviceLocator.peekService(api);
2565         serviceKey1 = api;
2566
2567         return (T)service1;
2568
2569     }
2570
2571     /*
2572      * (non-Javadoc)
2573      *
2574      * @see
2575      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2576      */
2577     @Override
2578     public boolean hasService(Class<?> api) {
2579         return serviceLocator.hasService(api);
2580     }
2581
2582     /**
2583      * @param api the api that must be implemented by the specified service
2584      * @param service the service implementation
2585      */
2586     @Override
2587     public <T> void registerService(Class<T> api, T service) {
2588         if(Layer0.class == api) {
2589                 L0 = (Layer0)service;
2590                 return;
2591         }
2592         serviceLocator.registerService(api, service);
2593         if (TransactionPolicySupport.class == api) {
2594             transactionPolicy = (TransactionPolicySupport)service;
2595             state.resetTransactionPolicy();
2596         }
2597         if (api == serviceKey1)
2598                 service1 = service;
2599         else if (api == serviceKey2)
2600                 service2 = service;
2601     }
2602
2603     // ----------------
2604     // SessionMonitor
2605     // ----------------
2606
2607     void fireSessionVariableChange(String variable) {
2608         for (MonitorHandler h : monitorHandlers) {
2609             MonitorContext ctx = monitorContexts.getLeft(h);
2610             assert ctx != null;
2611             // SafeRunner functionality repeated here to avoid dependency.
2612             try {
2613                 h.valuesChanged(ctx);
2614             } catch (Exception e) {
2615                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2616             } catch (LinkageError e) {
2617                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2618             }
2619         }
2620     }
2621
2622
2623     class ResourceSerializerImpl implements ResourceSerializer {
2624
2625         public long createRandomAccessId(int id) throws DatabaseException {
2626             if(id < 0)
2627                 return id;
2628             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2629             long cluster = getCluster(id);
2630             if (0 == cluster)
2631                 return 0; // Better to return 0 then invalid id.
2632             long result = ClusterTraitsBase.createResourceId(cluster, index);
2633             return result;
2634         }
2635
2636         @Override
2637         public long getRandomAccessId(Resource resource) throws DatabaseException {
2638
2639             ResourceImpl resourceImpl = (ResourceImpl) resource;
2640             return createRandomAccessId(resourceImpl.id);
2641
2642         }
2643
2644         @Override
2645         public int getTransientId(Resource resource) throws DatabaseException {
2646
2647             ResourceImpl resourceImpl = (ResourceImpl) resource;
2648             return resourceImpl.id;
2649
2650         }
2651
2652         @Override
2653         public String createRandomAccessId(Resource resource)
2654         throws InvalidResourceReferenceException {
2655
2656             if(resource == null) throw new IllegalArgumentException();
2657
2658             ResourceImpl resourceImpl = (ResourceImpl) resource;
2659
2660             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2661
2662             int r;
2663             try {
2664                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2665             } catch (DatabaseException e1) {
2666                 throw new InvalidResourceReferenceException(e1);
2667             }
2668             try {
2669                 // Serialize as '<resource index>_<cluster id>'
2670                 return "" + r + "_" + getCluster(resourceImpl);
2671             } catch (Throwable e) {
2672                 e.printStackTrace();
2673                 throw new InvalidResourceReferenceException(e);
2674             } finally {
2675             }
2676         }
2677
2678         public int getTransientId(long serialized) throws DatabaseException {
2679             if (serialized <= 0)
2680                 return (int)serialized;
2681             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2682             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2683             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2684             if (cluster == null)
2685                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2686             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2687             return key;
2688         }
2689
2690         @Override
2691         public Resource getResource(long randomAccessId) throws DatabaseException {
2692             return getResourceByKey(getTransientId(randomAccessId));
2693         }
2694
2695         @Override
2696         public Resource getResource(int transientId) throws DatabaseException {
2697             return getResourceByKey(transientId);
2698         }
2699
2700         @Override
2701         public Resource getResource(String randomAccessId)
2702         throws InvalidResourceReferenceException {
2703             try {
2704                 int i = randomAccessId.indexOf('_');
2705                 if (i == -1)
2706                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2707                             + randomAccessId + "'");
2708                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2709                 if(r < 0) return getResourceByKey(r);
2710                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2711                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2712                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2713                 if (cluster.hasResource(key, clusterTranslator))
2714                     return getResourceByKey(key);
2715             } catch (InvalidResourceReferenceException e) {
2716                 throw e;
2717             } catch (NumberFormatException e) {
2718                 throw new InvalidResourceReferenceException(e);
2719             } catch (Throwable e) {
2720                 e.printStackTrace();
2721                 throw new InvalidResourceReferenceException(e);
2722             } finally {
2723             }
2724             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2725         }
2726
2727         @Override
2728         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2729             try {
2730                 return true;
2731             } catch (Throwable e) {
2732                 e.printStackTrace();
2733                 throw new InvalidResourceReferenceException(e);
2734             } finally {
2735             }
2736         }
2737     }
2738
2739     // Copied from old SessionImpl
2740
2741     void check() {
2742         if (state.isClosed())
2743             throw new Error("Session closed.");
2744     }
2745
2746
2747
2748     public ResourceImpl getNewResource(long clusterId) {
2749         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2750         int newId;
2751         try {
2752             newId = cluster.createResource(clusterTranslator);
2753         } catch (DatabaseException e) {
2754             Logger.defaultLogError(e);
2755             return null;
2756         }
2757         return new ResourceImpl(resourceSupport, newId);
2758     }
2759
2760     public ResourceImpl getNewResource(Resource clusterSet)
2761     throws DatabaseException {
2762         long resourceId = clusterSet.getResourceId();
2763         Long clusterId = clusterSetsSupport.get(resourceId);
2764         if (null == clusterId)
2765             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2766         if (Constants.NewClusterId == clusterId) {
2767             clusterId = getService(ClusteringSupport.class).createCluster();
2768             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2769                 createdClusters.add(clusterId);
2770             clusterSetsSupport.put(resourceId, clusterId);
2771             return getNewResource(clusterId);
2772         } else {
2773             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2774             ResourceImpl result;
2775             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2776                 clusterId = getService(ClusteringSupport.class).createCluster();
2777                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2778                     createdClusters.add(clusterId);
2779                 clusterSetsSupport.put(resourceId, clusterId);
2780                 return getNewResource(clusterId);
2781             } else {
2782                 result = getNewResource(clusterId);
2783                 int resultKey = querySupport.getId(result);
2784                 long resultCluster = querySupport.getClusterId(resultKey);
2785                 if (clusterId != resultCluster)
2786                     clusterSetsSupport.put(resourceId, resultCluster);
2787                 return result;
2788             }
2789         }
2790     }
2791
2792     public void getNewClusterSet(Resource clusterSet)
2793     throws DatabaseException {
2794         if (DEBUG)
2795             System.out.println("new cluster set=" + clusterSet);
2796         long resourceId = clusterSet.getResourceId();
2797         if (clusterSetsSupport.containsKey(resourceId))
2798             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2799         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2800     }
2801     public boolean containsClusterSet(Resource clusterSet)
2802     throws ServiceException {
2803         long resourceId = clusterSet.getResourceId();
2804         return clusterSetsSupport.containsKey(resourceId);
2805     }
2806     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2807         Resource r = defaultClusterSet;
2808         defaultClusterSet = clusterSet;
2809         return r;
2810     }
2811     void printDiagnostics() {
2812
2813         if (DIAGNOSTICS) {
2814
2815             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2816             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2817             for(ClusterI cluster : clusterTable.getClusters()) {
2818                 try {
2819                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2820                         residentClusters.set(residentClusters.get() + 1);
2821                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2822                     }
2823                 } catch (DatabaseException e) {
2824                     Logger.defaultLogError(e);
2825                 }
2826             }
2827
2828             System.out.println("--------------------------------");
2829             System.out.println("Cluster information:");
2830             System.out.println("-amount of resident clusters=" + residentClusters.get());
2831             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2832
2833             for(ClusterI cluster : clusterTable.getClusters()) {
2834                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2835                 try {
2836                     if (!cluster.isLoaded())
2837                         System.out.println("not loaded]");
2838                     else if (cluster.isEmpty())
2839                         System.out.println("is empty]");
2840                     else {
2841                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2842                         System.out.print(",references=" + cluster.getReferenceCount());
2843                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2844                     }
2845                 } catch (DatabaseException e) {
2846                     Logger.defaultLogError(e);
2847                     System.out.println("is corrupted]");
2848                 }
2849             }
2850
2851             queryProvider2.printDiagnostics();
2852             System.out.println("--------------------------------");
2853         }
2854
2855     }
2856
2857     public void fireStartReadTransaction() {
2858
2859         if (DIAGNOSTICS)
2860             System.out.println("StartReadTransaction");
2861
2862         for (SessionEventListener listener : eventListeners) {
2863             try {
2864                 listener.readTransactionStarted();
2865             } catch (Throwable t) {
2866                 t.printStackTrace();
2867             }
2868         }
2869
2870     }
2871
2872     public void fireFinishReadTransaction() {
2873
2874         if (DIAGNOSTICS)
2875             System.out.println("FinishReadTransaction");
2876
2877         for (SessionEventListener listener : eventListeners) {
2878             try {
2879                 listener.readTransactionFinished();
2880             } catch (Throwable t) {
2881                 t.printStackTrace();
2882             }
2883         }
2884
2885     }
2886
2887     public void fireStartWriteTransaction() {
2888
2889         if (DIAGNOSTICS)
2890             System.out.println("StartWriteTransaction");
2891
2892         for (SessionEventListener listener : eventListeners) {
2893             try {
2894                 listener.writeTransactionStarted();
2895             } catch (Throwable t) {
2896                 t.printStackTrace();
2897             }
2898         }
2899
2900     }
2901
2902     public void fireFinishWriteTransaction() {
2903
2904         if (DIAGNOSTICS)
2905             System.out.println("FinishWriteTransaction");
2906
2907         for (SessionEventListener listener : eventListeners) {
2908             try {
2909                 listener.writeTransactionFinished();
2910             } catch (Throwable t) {
2911                 t.printStackTrace();
2912             }
2913         }
2914
2915         Indexing.resetDependenciesIndexingDisabled();
2916
2917     }
2918
2919     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2920         throw new Error("Not supported at the moment.");
2921     }
2922
2923     State getState() {
2924         return state;
2925     }
2926
2927     ClusterTable getClusterTable() {
2928         return clusterTable;
2929     }
2930
2931     public GraphSession getGraphSession() {
2932         return graphSession;
2933     }
2934
2935     public QueryProcessor getQueryProvider2() {
2936         return queryProvider2;
2937     }
2938
2939     ClientChangesImpl getClientChanges() {
2940         return clientChanges;
2941     }
2942
2943     boolean getWriteOnly() {
2944         return writeOnly;
2945     }
2946
2947     static int counter = 0;
2948
2949     public void onClusterLoaded(long clusterId) {
2950
2951         clusterTable.updateSize();
2952
2953     }
2954
2955
2956
2957
2958     /*
2959      * Implementation of the interface RequestProcessor
2960      */
2961
2962     @Override
2963     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2964
2965         assertNotSession();
2966         assertAlive();
2967
2968         assert(request != null);
2969
2970         final DataContainer<T> result = new DataContainer<T>();
2971         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2972
2973         syncRequest(request, new AsyncProcedure<T>() {
2974
2975             @Override
2976             public void execute(AsyncReadGraph graph, T t) {
2977                 result.set(t);
2978             }
2979
2980             @Override
2981             public void exception(AsyncReadGraph graph, Throwable t) {
2982                 exception.set(t);
2983             }
2984
2985         });
2986
2987         Throwable t = exception.get();
2988         if(t != null) {
2989             if(t instanceof DatabaseException) throw (DatabaseException)t;
2990             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2991         }
2992
2993         return result.get();
2994
2995     }
2996
2997 //    @Override
2998 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2999 //        assertNotSession();
3000 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
3001 //    }
3002
3003     @Override
3004     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
3005         assertNotSession();
3006         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3007     }
3008
3009     @Override
3010     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
3011