Removed contact application support prints
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.GraphChangeListenerSupport;
147 import org.simantics.db.service.InitSupport;
148 import org.simantics.db.service.LifecycleSupport;
149 import org.simantics.db.service.ManagementSupport;
150 import org.simantics.db.service.QueryControl;
151 import org.simantics.db.service.SerialisationSupport;
152 import org.simantics.db.service.ServerInformation;
153 import org.simantics.db.service.ServiceActivityMonitor;
154 import org.simantics.db.service.SessionEventSupport;
155 import org.simantics.db.service.SessionMonitorSupport;
156 import org.simantics.db.service.SessionUserSupport;
157 import org.simantics.db.service.StatementSupport;
158 import org.simantics.db.service.TransactionPolicySupport;
159 import org.simantics.db.service.TransactionSupport;
160 import org.simantics.db.service.TransferableGraphSupport;
161 import org.simantics.db.service.UndoRedoSupport;
162 import org.simantics.db.service.VirtualGraphSupport;
163 import org.simantics.db.service.XSupport;
164 import org.simantics.layer0.Layer0;
165 import org.simantics.utils.DataContainer;
166 import org.simantics.utils.Development;
167 import org.simantics.utils.threads.logger.ITask;
168 import org.simantics.utils.threads.logger.ThreadLogger;
169
170 import gnu.trove.procedure.TLongProcedure;
171 import gnu.trove.set.hash.TLongHashSet;
172
173
174 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
175
176     protected static final boolean DEBUG        = false;
177
178     private static final boolean DIAGNOSTICS       = false;
179
180     private TransactionPolicySupport transactionPolicy;
181
182     final private ClusterControl clusterControl;
183
184     final protected BuiltinSupportImpl builtinSupport;
185     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
186     final protected ClusterSetsSupport clusterSetsSupport;
187     final protected LifecycleSupportImpl lifecycleSupport;
188
189     protected QuerySupportImpl querySupport;
190     protected ResourceSupportImpl resourceSupport;
191     protected WriteSupport writeSupport;
192     public ClusterTranslator clusterTranslator;
193
194     boolean dirtyPrimitives = false;
195
196     public static final int SERVICE_MODE_CREATE = 2;
197     public static final int SERVICE_MODE_ALLOW = 1;
198     public int serviceMode = 0;
199     public boolean createdImmutableClusters = false;
200     public TLongHashSet createdClusters = new TLongHashSet();
201
202     private Layer0 L0;
203
204     /**
205      * The service locator maintained by the workbench. These services are
206      * initialized during workbench during the <code>init</code> method.
207      */
208     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
209
210     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
211
212     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
213
214     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
215
216     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
217
218     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
219
220     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
221
222     final protected State                                        state              = new State();
223
224     protected GraphSession                                       graphSession       = null;
225
226     protected SessionManager                                     sessionManagerImpl = null;
227
228     protected UserAuthenticationAgent                            authAgent          = null;
229
230     protected UserAuthenticator                                  authenticator      = null;
231
232     protected Resource                                           user               = null;
233
234     protected ClusterStream                                      clusterStream      = null;
235
236     protected SessionRequestManager                         requestManager = null;
237
238     public ClusterTable                                       clusterTable       = null;
239
240     public QueryProcessor                                     queryProvider2 = null;
241
242     ClientChangesImpl                                  clientChanges      = null;
243
244     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
245
246 //    protected long                                               newClusterId       = Constants.NullClusterId;
247
248     protected int                                                flushCounter       = 0;
249
250     protected boolean                                            writeOnly          = false;
251
252     WriteState<?>                                                writeState = null;
253     WriteStateBase<?>                                            delayedWriteState = null;
254     protected Resource defaultClusterSet = null; // If not null then used for newResource().
255
256     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
257
258 //        if (authAgent == null)
259 //            throw new IllegalArgumentException("null authentication agent");
260
261         File t = StaticSessionProperties.virtualGraphStoragePath;
262         if (null == t)
263             t = new File(".");
264         this.clusterTable = new ClusterTable(this, t);
265         this.builtinSupport = new BuiltinSupportImpl(this);
266         this.sessionManagerImpl = sessionManagerImpl;
267         this.user = null;
268 //        this.authAgent = authAgent;
269
270         serviceLocator.registerService(Session.class, this);
271         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
272         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
273         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
274         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
275         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
276         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
277         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
278         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
279         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
280         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
281         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
282         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
283         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
284         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
285         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
286         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
287         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
288         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
289         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
290         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
291         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
292         ServiceActivityUpdaterForWriteTransactions.register(this);
293
294         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
295         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
296         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
297         this.lifecycleSupport = new LifecycleSupportImpl(this);
298         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
299         this.transactionPolicy = new TransactionPolicyRelease();
300         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
301         this.clusterControl = new ClusterControlImpl(this);
302         serviceLocator.registerService(ClusterControl.class, clusterControl);
303         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
304         this.clusterSetsSupport.setReadDirectory(t.toPath());
305         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
306         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
307     }
308
309     /*
310      *
311      * Session interface
312      */
313
314
315     @Override
316     public Resource getRootLibrary() {
317         return queryProvider2.getRootLibraryResource();
318     }
319
320     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
321         if (!graphSession.dbSession.refreshEnabled())
322             return;
323         try {
324             getClusterTable().refresh(csid, this, clusterUID);
325         } catch (Throwable t) {
326             Logger.defaultLogError("Refesh failed.", t);
327         }
328     }
329
330     static final class TaskHelper {
331         private final String name;
332         private Object result;
333         final Semaphore sema = new Semaphore(0);
334         private Throwable throwable = null;
335         final Consumer<DatabaseException> callback = e -> {
336             synchronized (TaskHelper.this) {
337                 throwable = e;
338             }
339         };
340         final Procedure<Object> proc = new Procedure<Object>() {
341             @Override
342             public void execute(Object result) {
343                 callback.accept(null);
344             }
345             @Override
346             public void exception(Throwable t) {
347                 if (t instanceof DatabaseException)
348                     callback.accept((DatabaseException)t);
349                 else
350                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
351             }
352         };
353         final WriteTraits writeTraits = new WriteTraits() {};
354         TaskHelper(String name) {
355             this.name = name;
356         }
357         @SuppressWarnings("unchecked")
358         <T> T getResult() {
359             return (T)result;
360         }
361         void setResult(Object result) {
362             this.result = result;
363         }
364 //        Throwable throwableGet() {
365 //            return throwable;
366 //        }
367         synchronized void throwableSet(Throwable t) {
368             throwable = t;
369         }
370 //        void throwableSet(String t) {
371 //            throwable = new InternalException("" + name + " operation failed. " + t);
372 //        }
373         synchronized void throwableCheck()
374         throws DatabaseException {
375             if (null != throwable)
376                 if (throwable instanceof DatabaseException)
377                     throw (DatabaseException)throwable;
378                 else
379                     throw new DatabaseException("Undo operation failed.", throwable);
380         }
381         void throw_(String message)
382         throws DatabaseException {
383             throw new DatabaseException("" + name + " operation failed. " + message);
384         }
385     }
386
387
388
389     private ListenerBase getListenerBase(Object procedure) {
390         if (procedure instanceof ListenerBase)
391             return (ListenerBase) procedure;
392         else
393             return null;
394     }
395
396     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
397         scheduleRequest(request, callback, notify, null);
398     }
399
400     /* (non-Javadoc)
401      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
402      */
403     @Override
404     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
405
406         assert (request != null);
407
408         if(Development.DEVELOPMENT) {
409             try {
410             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
411                 System.err.println("schedule write '" + request + "'");
412             } catch (Throwable t) {
413                 Logger.defaultLogError(t);
414             }
415         }
416
417         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
418
419         requestManager.scheduleWrite(new SessionTask(true) {
420
421             @Override
422             public void run(int thread) {
423
424                 if(Development.DEVELOPMENT) {
425                     try {
426                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
427                         System.err.println("perform write '" + request + "'");
428                     } catch (Throwable t) {
429                         Logger.defaultLogError(t);
430                     }
431                 }
432
433                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
434
435                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
436
437                 try {
438
439                     flushCounter = 0;
440                     Disposable.safeDispose(clientChanges);
441                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
442
443                     VirtualGraph vg = getProvider(request.getProvider());
444
445                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
446                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
447
448                         @Override
449                         public void execute(Object result) {
450                             if(callback != null) callback.accept(null);
451                         }
452
453                         @Override
454                         public void exception(Throwable t) {
455                             if(callback != null) callback.accept((DatabaseException)t);
456                         }
457
458                     });
459
460                     assert (null != writer);
461 //                    writer.state.barrier.inc();
462
463                     try {
464                         request.perform(writer);
465                         assert (null != writer);
466                     } catch (Throwable t) {
467                         if (!(t instanceof CancelTransactionException))
468                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
469                         writeState.except(t);
470                     } finally {
471 //                        writer.state.barrier.dec();
472 //                        writer.waitAsync(request);
473                     }
474
475
476                     assert(!queryProvider2.cache.dirty);
477
478                 } catch (Throwable e) {
479
480                     // Log it first, just to be safe that the error is always logged.
481                     if (!(e instanceof CancelTransactionException))
482                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
483
484 //                    writeState.getGraph().state.barrier.dec();
485 //                    writeState.getGraph().waitAsync(request);
486
487                     writeState.except(e);
488
489 //                    try {
490 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
491 //                        // All we can do here is to log those, can't really pass them anywhere.
492 //                        if (callback != null) {
493 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
494 //                            else callback.run(new DatabaseException(e));
495 //                        }
496 //                    } catch (Throwable e2) {
497 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
498 //                    }
499 //                    boolean empty = clusterStream.reallyFlush();
500 //                    int callerThread = -1;
501 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
502 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
503 //                    try {
504 //                        // Send all local changes to server so it can calculate correct reverse change set.
505 //                        // This will call clusterStream.accept().
506 //                        state.cancelCommit(context, clusterStream);
507 //                        if (!empty) {
508 //                            if (!context.isOk()) // this is a blocking operation
509 //                                throw new InternalException("Cancel failed. This should never happen.");
510 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
511 //                        }
512 //                        state.cancelCommit2(context, clusterStream);
513 //                    } catch (DatabaseException e3) {
514 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
515 //                    } finally {
516 //                        clusterStream.setOff(false);
517 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
518 //                    }
519
520                 } finally {
521
522                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
523
524                 }
525
526                 task.finish();
527
528                 if(Development.DEVELOPMENT) {
529                     try {
530                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
531                         System.err.println("finish write '" + request + "'");
532                     } catch (Throwable t) {
533                         Logger.defaultLogError(t);
534                     }
535                 }
536
537             }
538
539         }, combine);
540
541     }
542
543     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
544         scheduleRequest(request, procedure, notify, null);
545     }
546
547     /* (non-Javadoc)
548      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
549      */
550     @Override
551     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
552
553         assert (request != null);
554
555         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
556
557         requestManager.scheduleWrite(new SessionTask(true) {
558
559             @Override
560             public void run(int thread) {
561
562                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
563
564                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
565
566                 flushCounter = 0;
567                 Disposable.safeDispose(clientChanges);
568                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
569
570                 VirtualGraph vg = getProvider(request.getProvider());
571                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
572
573                 try {
574                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
575                     writeState = writeStateT;
576
577                     assert (null != writer);
578 //                    writer.state.barrier.inc();
579                     writeStateT.setResult(request.perform(writer));
580                     assert (null != writer);
581
582 //                    writer.state.barrier.dec();
583 //                    writer.waitAsync(null);
584
585                 } catch (Throwable e) {
586
587 //                    writer.state.barrier.dec();
588 //                    writer.waitAsync(null);
589
590                     writeState.except(e);
591
592 //                  state.stopWriteTransaction(clusterStream);
593 //
594 //              } catch (Throwable e) {
595 //                  // Log it first, just to be safe that the error is always logged.
596 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
597 //
598 //                  try {
599 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
600 //                      // All we can do here is to log those, can't really pass them anywhere.
601 //                      if (procedure != null) {
602 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
603 //                          else procedure.exception(new DatabaseException(e));
604 //                      }
605 //                  } catch (Throwable e2) {
606 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
607 //                  }
608 //
609 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
610 //
611 //                  state.stopWriteTransaction(clusterStream);
612
613                 } finally {
614                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
615                 }
616
617 //              if(notify != null) notify.release();
618
619                 task.finish();
620
621             }
622
623         }, combine);
624
625     }
626
627     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
628         scheduleRequest(request, callback, notify, null);
629     }
630
631     /* (non-Javadoc)
632      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
633      */
634     @Override
635     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
636
637         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
638
639         assert (request != null);
640
641         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
642
643         requestManager.scheduleWrite(new SessionTask(true) {
644
645             @Override
646             public void run(int thread) {
647                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
648
649                 Procedure<Object> stateProcedure = new Procedure<Object>() {
650                     @Override
651                     public void execute(Object result) {
652                         if (callback != null)
653                             callback.accept(null);
654                     }
655                     @Override
656                     public void exception(Throwable t) {
657                         if (callback != null) {
658                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
659                             else callback.accept(new DatabaseException(t));
660                         } else
661                             Logger.defaultLogError("Unhandled exception", t);
662                     }
663                 };
664
665                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
666                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
667                 DelayedWriteGraph dwg = null;
668 //                newGraph.state.barrier.inc();
669
670                 try {
671                     dwg = new DelayedWriteGraph(newGraph);
672                     request.perform(dwg);
673                 } catch (Throwable e) {
674                     delayedWriteState.except(e);
675                     total.finish();
676                     dwg.close();
677                     return;
678                 } finally {
679 //                    newGraph.state.barrier.dec();
680 //                    newGraph.waitAsync(request);
681                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
682                 }
683
684                 delayedWriteState = null;
685
686                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
687                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
688
689                 flushCounter = 0;
690                 Disposable.safeDispose(clientChanges);
691                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
692
693                 acquireWriteOnly();
694
695                 VirtualGraph vg = getProvider(request.getProvider());
696                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
697
698                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
699
700                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
701
702                 assert (null != writer);
703 //                writer.state.barrier.inc();
704
705                 try {
706                     // Cannot cancel
707                     dwg.commit(writer, request);
708
709                         if(defaultClusterSet != null) {
710                                 XSupport xs = getService(XSupport.class);
711                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
712                                 ClusteringSupport cs = getService(ClusteringSupport.class);
713                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
714                         }
715
716                     // This makes clusters available from server
717                     clusterStream.reallyFlush();
718
719                     releaseWriteOnly(writer);
720                     handleUpdatesAndMetadata(writer);
721
722                 } catch (ServiceException e) {
723 //                    writer.state.barrier.dec();
724 //                    writer.waitAsync(null);
725
726                     // These shall be requested from server
727                     clusterTable.removeWriteOnlyClusters();
728                     // This makes clusters available from server
729                     clusterStream.reallyFlush();
730
731                     releaseWriteOnly(writer);
732                     writeState.except(e);
733                 } finally {
734                     // Debugging & Profiling
735                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
736                     task2.finish();
737                     total.finish();
738                 }
739             }
740
741         }, combine);
742
743     }
744
745     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
746         scheduleRequest(request, procedure, notify, null);
747     }
748
749     /* (non-Javadoc)
750      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
751      */
752     @Override
753     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
754         throw new Error("Not implemented");
755     }
756
757     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
758         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
759         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
760             createdClusters.add(cluster.clusterId);
761         }
762         return cluster;
763     }
764
765     class WriteOnlySupport implements WriteSupport {
766
767         ClusterStream stream;
768         ClusterImpl currentCluster;
769
770         public WriteOnlySupport() {
771             this.stream = clusterStream;
772         }
773
774         @Override
775         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
776             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
777         }
778
779         @Override
780         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
781
782             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
783
784             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
785                 if(s != queryProvider2.getRootLibrary())
786                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
787
788             try {
789                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
790             } catch (DatabaseException e) {
791                 Logger.defaultLogError(e);
792                 //throw e;
793             }
794
795             clientChanges.invalidate(s);
796
797             if (cluster.isWriteOnly())
798                 return;
799             queryProvider2.updateStatements(s, p);
800
801         }
802
803         @Override
804         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
805             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
806         }
807
808         @Override
809         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
810
811             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
812             try {
813                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
814             } catch (DatabaseException e) {
815                 Logger.defaultLogError(e);
816             }
817
818             clientChanges.invalidate(rid);
819
820             if (cluster.isWriteOnly())
821                 return;
822             queryProvider2.updateValue(rid);
823
824         }
825
826         @Override
827         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
828
829             if(amount < 65536) {
830                 claimValue(provider,resource, reader.readBytes(null, amount));
831                 return;
832             }
833
834             byte[] bytes = new byte[65536];
835
836             int rid = ((ResourceImpl)resource).id;
837             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
838             try {
839                 int left = amount;
840                 while(left > 0) {
841                     int block = Math.min(left, 65536);
842                     reader.readBytes(bytes, block);
843                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
844                     left -= block;
845                 }
846             } catch (DatabaseException e) {
847                 Logger.defaultLogError(e);
848             }
849
850             clientChanges.invalidate(rid);
851
852             if (cluster.isWriteOnly())
853                 return;
854             queryProvider2.updateValue(rid);
855
856         }
857
858         private void maintainCluster(ClusterImpl before, ClusterI after_) {
859             if(after_ != null && after_ != before) {
860                 ClusterImpl after = (ClusterImpl)after_;
861                 if(currentCluster == before) {
862                     currentCluster = after;
863                 }
864                 clusterTable.replaceCluster(after);
865             }
866         }
867
868         public int createResourceKey(int foreignCounter) throws DatabaseException {
869             if(currentCluster == null) {
870                 currentCluster = getNewResourceCluster();
871             }
872             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
873                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
874                 newCluster.foreignLookup = new byte[foreignCounter];
875                 currentCluster = newCluster;
876                 if (DEBUG)
877                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
878             }
879             return currentCluster.createResource(clusterTranslator);
880         }
881
882         @Override
883         public Resource createResource(VirtualGraph provider) throws DatabaseException {
884             if(currentCluster == null) {
885                 if (null != defaultClusterSet) {
886                         ResourceImpl result = getNewResource(defaultClusterSet);
887                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
888                     return result;
889                 } else {
890                     currentCluster = getNewResourceCluster();
891                 }
892             }
893             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
894                 if (null != defaultClusterSet) {
895                         ResourceImpl result = getNewResource(defaultClusterSet);
896                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
897                     return result;
898                 } else {
899                     currentCluster = getNewResourceCluster();
900                 }
901             }
902             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
903         }
904
905         @Override
906         public Resource createResource(VirtualGraph provider, long clusterId)
907         throws DatabaseException {
908             return getNewResource(clusterId);
909         }
910
911         @Override
912         public Resource createResource(VirtualGraph provider, Resource clusterSet)
913         throws DatabaseException {
914             return getNewResource(clusterSet);
915         }
916
917         @Override
918         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
919                 throws DatabaseException {
920             getNewClusterSet(clusterSet);
921         }
922
923         @Override
924         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
925         throws ServiceException {
926             return containsClusterSet(clusterSet);
927         }
928
929         public void selectCluster(long cluster) {
930                 currentCluster = clusterTable.getClusterByClusterId(cluster);
931                 long setResourceId = clusterSetsSupport.getSet(cluster);
932                 clusterSetsSupport.put(setResourceId, cluster);
933         }
934
935         @Override
936         public Resource setDefaultClusterSet(Resource clusterSet)
937         throws ServiceException {
938                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
939                 if(clusterSet != null) {
940                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
941                         currentCluster = clusterTable.getClusterByClusterId(id);
942                         return result;
943                 } else {
944                         currentCluster = null;
945                         return null;
946                 }
947         }
948
949         @Override
950         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
951
952                  provider = getProvider(provider);
953              if (null == provider) {
954                  int key = ((ResourceImpl)resource).id;
955
956
957
958                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
959 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
960 //                      if(key != queryProvider2.getRootLibrary())
961 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
962
963 //                 try {
964 //                     cluster.removeValue(key, clusterTranslator);
965 //                 } catch (DatabaseException e) {
966 //                     Logger.defaultLogError(e);
967 //                     return;
968 //                 }
969
970                  clusterTable.writeOnlyInvalidate(cluster);
971
972                  try {
973                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
974                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
975                          clusterTranslator.removeValue(cluster);
976                  } catch (DatabaseException e) {
977                          Logger.defaultLogError(e);
978                  }
979
980                  queryProvider2.invalidateResource(key);
981                  clientChanges.invalidate(key);
982
983              } else {
984                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
985                  queryProvider2.updateValue(querySupport.getId(resource));
986                  clientChanges.claimValue(resource);
987              }
988
989
990         }
991
992         @Override
993         public void flush(boolean intermediate) {
994             throw new UnsupportedOperationException();
995         }
996
997         @Override
998         public void flushCluster() {
999             clusterTable.flushCluster(graphSession);
1000             if(defaultClusterSet != null) {
1001                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
1002             }
1003             currentCluster = null;
1004         }
1005
1006         @Override
1007         public void flushCluster(Resource r) {
1008             throw new UnsupportedOperationException("flushCluster resource " + r);
1009         }
1010
1011         @Override
1012         public void gc() {
1013         }
1014
1015         @Override
1016         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1017                 Resource object) {
1018
1019                 int s = ((ResourceImpl)subject).id;
1020                 int p = ((ResourceImpl)predicate).id;
1021                 int o = ((ResourceImpl)object).id;
1022
1023                 provider = getProvider(provider);
1024                 if (null == provider) {
1025
1026                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1027                         clusterTable.writeOnlyInvalidate(cluster);
1028
1029                         try {
1030
1031                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1032
1033                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1034                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1035
1036                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1037                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1038                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1039                                 clusterTranslator.removeStatement(cluster);
1040
1041                                 queryProvider2.invalidateResource(s);
1042                                 clientChanges.invalidate(s);
1043
1044                         } catch (DatabaseException e) {
1045
1046                                 Logger.defaultLogError(e);
1047
1048                         }
1049
1050                         return true;
1051
1052                 } else {
1053                         
1054                         ((VirtualGraphImpl)provider).deny(s, p, o);
1055                         queryProvider2.invalidateResource(s);
1056                         clientChanges.invalidate(s);
1057
1058                         return true;
1059
1060                 }
1061
1062         }
1063
1064         @Override
1065         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1066             throw new UnsupportedOperationException();
1067         }
1068
1069         @Override
1070         public boolean writeOnly() {
1071             return true;
1072         }
1073
1074         @Override
1075         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1076             writeSupport.performWriteRequest(graph, request);
1077         }
1078
1079         @Override
1080         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1081             throw new UnsupportedOperationException();
1082         }
1083
1084         @Override
1085         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1086             throw new UnsupportedOperationException();
1087         }
1088
1089         @Override
1090         public <T> void addMetadata(Metadata data) throws ServiceException {
1091             writeSupport.addMetadata(data);
1092         }
1093
1094         @Override
1095         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1096             return writeSupport.getMetadata(clazz);
1097         }
1098
1099         @Override
1100         public TreeMap<String, byte[]> getMetadata() {
1101             return writeSupport.getMetadata();
1102         }
1103
1104         @Override
1105         public void commitDone(WriteTraits writeTraits, long csid) {
1106             writeSupport.commitDone(writeTraits, csid);
1107         }
1108
1109         @Override
1110         public void clearUndoList(WriteTraits writeTraits) {
1111             writeSupport.clearUndoList(writeTraits);
1112         }
1113         @Override
1114         public int clearMetadata() {
1115             return writeSupport.clearMetadata();
1116         }
1117
1118                 @Override
1119                 public void startUndo() {
1120                         writeSupport.startUndo();
1121                 }
1122     }
1123
1124     class VirtualWriteOnlySupport implements WriteSupport {
1125
1126 //        @Override
1127 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1128 //                Resource object) {
1129 //            throw new UnsupportedOperationException();
1130 //        }
1131
1132         @Override
1133         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1134
1135             TransientGraph impl = (TransientGraph)provider;
1136             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1137             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1138             clientChanges.claim(subject, predicate, object);
1139
1140         }
1141
1142         @Override
1143         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1144
1145             TransientGraph impl = (TransientGraph)provider;
1146             impl.claim(subject, predicate, object);
1147             getQueryProvider2().updateStatements(subject, predicate);
1148             clientChanges.claim(subject, predicate, object);
1149
1150         }
1151
1152         @Override
1153         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1154             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1155         }
1156
1157         @Override
1158         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1159             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1160             getQueryProvider2().updateValue(resource);
1161             clientChanges.claimValue(resource);
1162         }
1163
1164         @Override
1165         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1166             byte[] value = reader.readBytes(null, amount);
1167             claimValue(provider, resource, value);
1168         }
1169
1170         @Override
1171         public Resource createResource(VirtualGraph provider) {
1172             TransientGraph impl = (TransientGraph)provider;
1173             return impl.getResource(impl.newResource(false));
1174         }
1175
1176         @Override
1177         public Resource createResource(VirtualGraph provider, long clusterId) {
1178             throw new UnsupportedOperationException();
1179         }
1180
1181         @Override
1182         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1183         throws DatabaseException {
1184             throw new UnsupportedOperationException();
1185         }
1186
1187         @Override
1188         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1189         throws DatabaseException {
1190             throw new UnsupportedOperationException();
1191         }
1192
1193         @Override
1194         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1195         throws ServiceException {
1196             throw new UnsupportedOperationException();
1197         }
1198
1199         @Override
1200         public Resource setDefaultClusterSet(Resource clusterSet)
1201         throws ServiceException {
1202                 return null;
1203         }
1204
1205         @Override
1206         public void denyValue(VirtualGraph provider, Resource resource) {
1207             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1208             getQueryProvider2().updateValue(querySupport.getId(resource));
1209             // NOTE: this only keeps track of value changes by-resource.
1210             clientChanges.claimValue(resource);
1211         }
1212
1213         @Override
1214         public void flush(boolean intermediate) {
1215             throw new UnsupportedOperationException();
1216         }
1217
1218         @Override
1219         public void flushCluster() {
1220             throw new UnsupportedOperationException();
1221         }
1222
1223         @Override
1224         public void flushCluster(Resource r) {
1225             throw new UnsupportedOperationException("Resource " + r);
1226         }
1227
1228         @Override
1229         public void gc() {
1230         }
1231
1232         @Override
1233         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1234             TransientGraph impl = (TransientGraph) provider;
1235             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1236             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1237             clientChanges.deny(subject, predicate, object);
1238             return true;
1239         }
1240
1241         @Override
1242         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1243             throw new UnsupportedOperationException();
1244         }
1245
1246         @Override
1247         public boolean writeOnly() {
1248             return true;
1249         }
1250
1251         @Override
1252         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1253             throw new UnsupportedOperationException();
1254         }
1255
1256         @Override
1257         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1258             throw new UnsupportedOperationException();
1259         }
1260
1261         @Override
1262         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1263             throw new UnsupportedOperationException();
1264         }
1265
1266         @Override
1267         public <T> void addMetadata(Metadata data) throws ServiceException {
1268             throw new UnsupportedOperationException();
1269         }
1270
1271         @Override
1272         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1273             throw new UnsupportedOperationException();
1274         }
1275
1276         @Override
1277         public TreeMap<String, byte[]> getMetadata() {
1278             throw new UnsupportedOperationException();
1279         }
1280
1281         @Override
1282         public void commitDone(WriteTraits writeTraits, long csid) {
1283         }
1284
1285         @Override
1286         public void clearUndoList(WriteTraits writeTraits) {
1287         }
1288
1289         @Override
1290         public int clearMetadata() {
1291             return 0;
1292         }
1293
1294                 @Override
1295                 public void startUndo() {
1296                 }
1297     }
1298
1299     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1300
1301         try {
1302
1303             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1304
1305             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1306
1307             flushCounter = 0;
1308             Disposable.safeDispose(clientChanges);
1309             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1310
1311             acquireWriteOnly();
1312
1313             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1314
1315             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1316
1317             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1318             writeState = writeStateT;
1319
1320             assert (null != writer);
1321 //            writer.state.barrier.inc();
1322             long start = System.nanoTime();
1323             T result = request.perform(writer);
1324             long duration = System.nanoTime() - start;
1325             if (DEBUG) {
1326                 System.err.println("################");
1327                 System.err.println("WriteOnly duration " + 1e-9*duration);
1328             }
1329             writeStateT.setResult(result);
1330
1331             // This makes clusters available from server
1332             clusterStream.reallyFlush();
1333
1334             // This will trigger query updates
1335             releaseWriteOnly(writer);
1336             assert (null != writer);
1337
1338             handleUpdatesAndMetadata(writer);
1339
1340         } catch (CancelTransactionException e) {
1341
1342             releaseWriteOnly(writeState.getGraph());
1343
1344             clusterTable.removeWriteOnlyClusters();
1345             state.stopWriteTransaction(clusterStream);
1346
1347         } catch (Throwable e) {
1348
1349             e.printStackTrace();
1350
1351             releaseWriteOnly(writeState.getGraph());
1352
1353             clusterTable.removeWriteOnlyClusters();
1354
1355             if (callback != null)
1356                 callback.exception(new DatabaseException(e));
1357
1358             state.stopWriteTransaction(clusterStream);
1359             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1360
1361         } finally  {
1362
1363             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1364
1365         }
1366
1367     }
1368
1369     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1370         scheduleRequest(request, callback, notify, null);
1371     }
1372
1373     @Override
1374     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1375
1376         assertAlive();
1377
1378         assert (request != null);
1379
1380         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1381
1382         requestManager.scheduleWrite(new SessionTask(true) {
1383
1384             @Override
1385             public void run(int thread) {
1386
1387                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1388
1389                     try {
1390
1391                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1392
1393                         flushCounter = 0;
1394                         Disposable.safeDispose(clientChanges);
1395                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1396
1397                         acquireWriteOnly();
1398
1399                         VirtualGraph vg = getProvider(request.getProvider());
1400                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1401
1402                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1403
1404                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1405
1406                             @Override
1407                             public void execute(Object result) {
1408                                 if(callback != null) callback.accept(null);
1409                             }
1410
1411                             @Override
1412                             public void exception(Throwable t) {
1413                                 if(callback != null) callback.accept((DatabaseException)t);
1414                             }
1415
1416                         });
1417
1418                         assert (null != writer);
1419 //                        writer.state.barrier.inc();
1420
1421                         try {
1422
1423                             request.perform(writer);
1424
1425                         } catch (Throwable e) {
1426
1427 //                            writer.state.barrier.dec();
1428 //                            writer.waitAsync(null);
1429
1430                             releaseWriteOnly(writer);
1431
1432                             clusterTable.removeWriteOnlyClusters();
1433
1434                             if(!(e instanceof CancelTransactionException)) {
1435                                 if (callback != null)
1436                                     callback.accept(new DatabaseException(e));
1437                             }
1438
1439                             writeState.except(e);
1440
1441                             return;
1442
1443                         }
1444
1445                         // This makes clusters available from server
1446                         boolean empty = clusterStream.reallyFlush();
1447                         // This was needed to make WO requests call metadata listeners.
1448                         // NOTE: the calling event does not contain clientChanges information.
1449                         if (!empty && clientChanges.isEmpty())
1450                             clientChanges.setNotEmpty(true);
1451                         releaseWriteOnly(writer);
1452                         assert (null != writer);
1453                     } finally  {
1454
1455                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1456
1457                     }
1458
1459
1460                 task.finish();
1461
1462             }
1463
1464         }, combine);
1465
1466     }
1467
1468     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1469         scheduleRequest(request, callback, notify, null);
1470     }
1471
1472     /* (non-Javadoc)
1473      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1474      */
1475     @Override
1476     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1477
1478         assert (request != null);
1479
1480         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1481
1482         requestManager.scheduleWrite(new SessionTask(true) {
1483
1484             @Override
1485             public void run(int thread) {
1486
1487                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1488
1489                 performWriteOnly(request, notify, callback);
1490
1491                 task.finish();
1492
1493             }
1494
1495         }, combine);
1496
1497     }
1498
1499     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1500
1501         assert (request != null);
1502         assert (procedure != null);
1503
1504         //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1505
1506         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1507
1508             @Override
1509             public void run(int thread) {
1510
1511                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1512
1513                 ListenerBase listener = getListenerBase(procedure);
1514
1515                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1516
1517                 try {
1518
1519                     if (listener != null) {
1520
1521                         try {
1522                                 
1523                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1524
1525                                         @Override
1526                                         public void exception(AsyncReadGraph graph, Throwable t) {
1527                                                 procedure.exception(graph, t);
1528                                                 if(throwable != null) {
1529                                                         throwable.set(t);
1530                                                 } else {
1531                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1532                                                 }
1533                                         }
1534
1535                                         @Override
1536                                         public void execute(AsyncReadGraph graph, T t) {
1537                                                 if(result != null) result.set(t);
1538                                                 procedure.execute(graph, t);
1539                                         }
1540
1541                                 };
1542                                 
1543                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1544                                 
1545                         } catch (Throwable t) {
1546                             // This is handled by the AsyncProcedure
1547                                 //Logger.defaultLogError("Internal error", t);
1548                         }
1549
1550                     } else {
1551
1552                         try {
1553
1554 //                            newGraph.state.barrier.inc();
1555
1556                             T t = request.perform(newGraph);
1557
1558                             try {
1559
1560                                 if(result != null) result.set(t);
1561                                 procedure.execute(newGraph, t);
1562
1563                             } catch (Throwable th) {
1564
1565                                 if(throwable != null) {
1566                                     throwable.set(th);
1567                                 } else {
1568                                     Logger.defaultLogError("Unhandled exception", th);
1569                                 }
1570
1571                             }
1572
1573                         } catch (Throwable t) {
1574
1575                             if (DEBUG)
1576                                 t.printStackTrace();
1577
1578                             if(throwable != null) {
1579                                 throwable.set(t);
1580                             } else {
1581                                 Logger.defaultLogError("Unhandled exception", t);
1582                             }
1583
1584                             try {
1585
1586                                 procedure.exception(newGraph, t);
1587
1588                             } catch (Throwable t2) {
1589
1590                                 if(throwable != null) {
1591                                     throwable.set(t2);
1592                                 } else {
1593                                     Logger.defaultLogError("Unhandled exception", t2);
1594                                 }
1595
1596                             }
1597
1598                         }
1599
1600 //                        newGraph.state.barrier.dec();
1601 //                        newGraph.waitAsync(request);
1602
1603                     }
1604
1605                 } finally {
1606
1607                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1608
1609                 }
1610
1611             }
1612
1613         });
1614
1615     }
1616
1617     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1618
1619         assert (request != null);
1620         assert (procedure != null);
1621
1622         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1623
1624         requestManager.scheduleRead(new SessionRead(null, notify) {
1625
1626             @Override
1627             public void run(int thread) {
1628
1629                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1630
1631                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1632
1633                 try {
1634
1635                     if (listener != null) {
1636
1637                         try {
1638                                 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1639                                 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1640                                                         //newGraph.processor.query(newGraph, request, null, procedure, listener);
1641                                                 } catch (DatabaseException e) {
1642                                                         Logger.defaultLogError(e);
1643                                                 }
1644
1645                     } else {
1646
1647 //                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1648 //                                procedure, "request");
1649
1650                         BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
1651
1652                         try {
1653
1654                             request.perform(newGraph, wrap);
1655                             wrap.get();
1656
1657                         } catch (DatabaseException e) {
1658
1659                                                         Logger.defaultLogError(e);
1660
1661                         }
1662
1663                     }
1664
1665                 } finally {
1666
1667                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1668
1669                 }
1670
1671             }
1672
1673         });
1674
1675     }
1676
1677     public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1678
1679         assert (request != null);
1680         assert (procedure != null);
1681
1682         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1683
1684         int sync = notify != null ? thread : -1;
1685
1686         requestManager.scheduleRead(new SessionRead(null, notify) {
1687
1688             @Override
1689             public void run(int thread) {
1690
1691                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1692
1693                 ListenerBase listener = getListenerBase(procedure);
1694
1695                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1696
1697                 try {
1698
1699                     if (listener != null) {
1700
1701                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1702
1703 //                        newGraph.waitAsync(request);
1704
1705                     } else {
1706
1707                         final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1708
1709                         try {
1710
1711                             request.perform(newGraph, wrapper);
1712
1713                         } catch (Throwable t) {
1714
1715                             t.printStackTrace();
1716
1717                         }
1718
1719                     }
1720
1721                 } finally {
1722
1723                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1724
1725                 }
1726
1727             }
1728
1729         });
1730
1731     }
1732     
1733     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1734
1735         assert (request != null);
1736         assert (procedure != null);
1737
1738         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1739
1740         int sync = notify != null ? thread : -1;
1741
1742         requestManager.scheduleRead(new SessionRead(null, notify) {
1743
1744             @Override
1745             public void run(int thread) {
1746
1747                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1748
1749                 ListenerBase listener = getListenerBase(procedure);
1750
1751                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1752
1753                 try {
1754
1755                     if (listener != null) {
1756
1757                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1758
1759 //                        newGraph.waitAsync(request);
1760
1761                     } else {
1762
1763                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1764
1765                         try {
1766
1767                             request.perform(newGraph, wrapper);
1768
1769                         } catch (Throwable t) {
1770
1771                             t.printStackTrace();
1772
1773                         }
1774
1775                     }
1776
1777                 } finally {
1778
1779                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1780
1781                 }
1782
1783             }
1784
1785         });
1786
1787     }
1788
1789     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1790
1791         assert (request != null);
1792         assert (procedure != null);
1793
1794         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1795
1796         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1797
1798             @Override
1799             public void run(int thread) {
1800
1801                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1802
1803                 ListenerBase listener = getListenerBase(procedure);
1804
1805                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1806
1807                 try {
1808
1809                     if (listener != null) {
1810
1811                         try {
1812                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1813                         } catch (DatabaseException e) {
1814                             Logger.defaultLogError(e);
1815                         }
1816
1817                     } else {
1818
1819 //                        newGraph.state.barrier.inc();
1820
1821                         request.register(newGraph, new Listener<T>() {
1822
1823                             @Override
1824                             public void exception(Throwable t) {
1825                                 if(throwable != null) throwable.set(t);
1826                                 procedure.exception(t);
1827 //                                newGraph.state.barrier.dec();
1828                             }
1829
1830                             @Override
1831                             public void execute(T t) {
1832                                 if(result != null) result.set(t);
1833                                 procedure.execute(t);
1834 //                                newGraph.state.barrier.dec();
1835                             }
1836
1837                             @Override
1838                             public boolean isDisposed() {
1839                                 return true;
1840                             }
1841
1842                         });
1843
1844 //                        newGraph.waitAsync(request);
1845
1846                     }
1847
1848                 } finally {
1849
1850                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1851
1852                 }
1853
1854             }
1855
1856         });
1857
1858     }
1859
1860
1861     @Override
1862     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1863
1864         scheduleRequest(request, procedure, null, null, null);
1865
1866     }
1867
1868     @Override
1869     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1870         assertNotSession();
1871         Semaphore notify = new Semaphore(0);
1872         DataContainer<Throwable> container = new DataContainer<Throwable>();
1873         DataContainer<T> result = new DataContainer<T>();
1874         scheduleRequest(request, procedure, notify, container, result);
1875         acquire(notify, request);
1876         Throwable throwable = container.get();
1877         if(throwable != null) {
1878             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1879             else throw new DatabaseException("Unexpected exception", throwable);
1880         }
1881         return result.get();
1882     }
1883
1884     @Override
1885     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1886         assertNotSession();
1887         Semaphore notify = new Semaphore(0);
1888         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1889         final DataContainer<T> resultContainer = new DataContainer<T>();
1890         scheduleRequest(request, new AsyncProcedure<T>() {
1891             @Override
1892             public void exception(AsyncReadGraph graph, Throwable throwable) {
1893                 exceptionContainer.set(throwable);
1894                 procedure.exception(graph, throwable);
1895             }
1896             @Override
1897             public void execute(AsyncReadGraph graph, T result) {
1898                 resultContainer.set(result);
1899                 procedure.execute(graph, result);
1900             }
1901         }, getListenerBase(procedure), notify);
1902         acquire(notify, request, procedure);
1903         Throwable throwable = exceptionContainer.get();
1904         if (throwable != null) {
1905             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1906             else throw new DatabaseException("Unexpected exception", throwable);
1907         }
1908         return resultContainer.get();
1909     }
1910
1911
1912     @Override
1913     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1914         assertNotSession();
1915         Semaphore notify = new Semaphore(0);
1916         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1917         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1918             @Override
1919             public void exception(AsyncReadGraph graph, Throwable throwable) {
1920                 exceptionContainer.set(throwable);
1921                 procedure.exception(graph, throwable);
1922             }
1923             @Override
1924             public void execute(AsyncReadGraph graph, T result) {
1925                 procedure.execute(graph, result);
1926             }
1927             @Override
1928             public void finished(AsyncReadGraph graph) {
1929                 procedure.finished(graph);
1930             }
1931         }, notify);
1932         acquire(notify, request, procedure);
1933         Throwable throwable = exceptionContainer.get();
1934         if (throwable != null) {
1935             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1936             else throw new DatabaseException("Unexpected exception", throwable);
1937         }
1938         // TODO: implement return value
1939         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1940         return null;
1941     }
1942
1943     @Override
1944     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1945         return syncRequest(request, new ProcedureAdapter<T>());
1946
1947
1948 //        assert(request != null);
1949 //
1950 //        final DataContainer<T> result = new DataContainer<T>();
1951 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1952 //
1953 //        syncRequest(request, new Procedure<T>() {
1954 //
1955 //            @Override
1956 //            public void execute(T t) {
1957 //                result.set(t);
1958 //            }
1959 //
1960 //            @Override
1961 //            public void exception(Throwable t) {
1962 //                exception.set(t);
1963 //            }
1964 //
1965 //        });
1966 //
1967 //        Throwable t = exception.get();
1968 //        if(t != null) {
1969 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1970 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1971 //        }
1972 //
1973 //        return result.get();
1974
1975     }
1976
1977     @Override
1978     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1979         return syncRequest(request, (Procedure<T>)procedure);
1980     }
1981
1982
1983 //    @Override
1984 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1985 //        assertNotSession();
1986 //        Semaphore notify = new Semaphore(0);
1987 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1988 //        DataContainer<T> result = new DataContainer<T>();
1989 //        scheduleRequest(request, procedure, notify, container, result);
1990 //        acquire(notify, request);
1991 //        Throwable throwable = container.get();
1992 //        if(throwable != null) {
1993 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1994 //            else throw new DatabaseException("Unexpected exception", throwable);
1995 //        }
1996 //        return result.get();
1997 //    }
1998
1999
2000     @Override
2001     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
2002         assertNotSession();
2003         Semaphore notify = new Semaphore(0);
2004         final DataContainer<Throwable> container = new DataContainer<Throwable>();
2005         final DataContainer<T> result = new DataContainer<T>();
2006         scheduleRequest(request, procedure, notify, container, result);
2007         acquire(notify, request);
2008         Throwable throwable = container.get();
2009         if (throwable != null) {
2010             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2011             else throw new DatabaseException("Unexpected exception", throwable);
2012         }
2013         return result.get();
2014     }
2015
2016     @Override
2017     public void syncRequest(Write request) throws DatabaseException  {
2018         assertNotSession();
2019         assertAlive();
2020         Semaphore notify = new Semaphore(0);
2021         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2022         scheduleRequest(request, e -> exception.set(e), notify);
2023         acquire(notify, request);
2024         if(exception.get() != null) throw exception.get();
2025     }
2026
2027     @Override
2028     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
2029         assertNotSession();
2030         Semaphore notify = new Semaphore(0);
2031         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2032         final DataContainer<T> result = new DataContainer<T>();
2033         scheduleRequest(request, new Procedure<T>() {
2034
2035             @Override
2036             public void exception(Throwable t) {
2037               exception.set(t);
2038             }
2039
2040             @Override
2041             public void execute(T t) {
2042               result.set(t);
2043             }
2044
2045         }, notify);
2046         acquire(notify, request);
2047         if(exception.get() != null) {
2048             Throwable t = exception.get();
2049             if(t instanceof DatabaseException) throw (DatabaseException)t;
2050             else throw new DatabaseException(t);
2051         }
2052         return result.get();
2053     }
2054
2055     @Override
2056     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2057         assertNotSession();
2058         Semaphore notify = new Semaphore(0);
2059         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2060         final DataContainer<T> result = new DataContainer<T>();
2061         scheduleRequest(request, new Procedure<T>() {
2062
2063             @Override
2064             public void exception(Throwable t) {
2065               exception.set(t);
2066             }
2067
2068             @Override
2069             public void execute(T t) {
2070               result.set(t);
2071             }
2072
2073         }, notify);
2074         acquire(notify, request);
2075         if(exception.get() != null) {
2076             Throwable t = exception.get();
2077             if(t instanceof DatabaseException) throw (DatabaseException)t;
2078             else throw new DatabaseException(t);
2079         }
2080         return result.get();
2081     }
2082
2083     @Override
2084     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2085         assertNotSession();
2086         Semaphore notify = new Semaphore(0);
2087         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2088         final DataContainer<T> result = new DataContainer<T>();
2089         scheduleRequest(request, new Procedure<T>() {
2090
2091             @Override
2092             public void exception(Throwable t) {
2093               exception.set(t);
2094             }
2095
2096             @Override
2097             public void execute(T t) {
2098               result.set(t);
2099             }
2100
2101         }, notify);
2102         acquire(notify, request);
2103         if(exception.get() != null) {
2104             Throwable t = exception.get();
2105             if(t instanceof DatabaseException) throw (DatabaseException)t;
2106             else throw new DatabaseException(t);
2107         }
2108         return result.get();
2109     }
2110
2111     @Override
2112     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2113         assertNotSession();
2114         Semaphore notify = new Semaphore(0);
2115         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2116         scheduleRequest(request, e -> exception.set(e), notify);
2117         acquire(notify, request);
2118         if(exception.get() != null) throw exception.get();
2119     }
2120
2121     @Override
2122     public void syncRequest(WriteOnly request) throws DatabaseException  {
2123         assertNotSession();
2124         assertAlive();
2125         Semaphore notify = new Semaphore(0);
2126         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2127         scheduleRequest(request, e -> exception.set(e), notify);
2128         acquire(notify, request);
2129         if(exception.get() != null) throw exception.get();
2130     }
2131
2132     /*
2133      *
2134      * GraphRequestProcessor interface
2135      */
2136
2137     @Override
2138     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2139
2140         scheduleRequest(request, procedure, null, null);
2141
2142     }
2143
2144     @Override
2145     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2146
2147         scheduleRequest(request, procedure, null);
2148
2149     }
2150
2151     @Override
2152     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2153
2154         scheduleRequest(request, procedure, null, null, null);
2155
2156     }
2157
2158     @Override
2159     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2160
2161         scheduleRequest(request, callback, null);
2162
2163     }
2164
2165     @Override
2166     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2167
2168         scheduleRequest(request, procedure, null);
2169
2170     }
2171
2172     @Override
2173     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2174
2175         scheduleRequest(request, procedure, null);
2176
2177     }
2178
2179     @Override
2180     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2181
2182         scheduleRequest(request, procedure, null);
2183
2184     }
2185
2186     @Override
2187     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2188
2189         scheduleRequest(request, callback, null);
2190
2191     }
2192
2193     @Override
2194     public void asyncRequest(final Write r) {
2195         asyncRequest(r, null);
2196     }
2197
2198     @Override
2199     public void asyncRequest(final DelayedWrite r) {
2200         asyncRequest(r, null);
2201     }
2202
2203     @Override
2204     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2205
2206         scheduleRequest(request, callback, null);
2207
2208     }
2209
2210     @Override
2211     public void asyncRequest(final WriteOnly request) {
2212
2213         asyncRequest(request, null);
2214
2215     }
2216
2217     @Override
2218     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2219         r.request(this, procedure);
2220     }
2221
2222     @Override
2223     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2224         r.request(this, procedure);
2225     }
2226
2227     @Override
2228     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2229         r.request(this, procedure);
2230     }
2231
2232     @Override
2233     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2234         r.request(this, procedure);
2235     }
2236
2237     @Override
2238     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2239         r.request(this, procedure);
2240     }
2241
2242     @Override
2243     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2244         r.request(this, procedure);
2245     }
2246
2247     @Override
2248     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2249         return r.request(this);
2250     }
2251
2252     @Override
2253     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2254         return r.request(this);
2255     }
2256
2257     @Override
2258     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2259         r.request(this, procedure);
2260     }
2261
2262     @Override
2263     public <T> void async(WriteInterface<T> r) {
2264         r.request(this, new ProcedureAdapter<T>());
2265     }
2266
2267     //@Override
2268     public void incAsync() {
2269         state.incAsync();
2270     }
2271
2272     //@Override
2273     public void decAsync() {
2274         state.decAsync();
2275     }
2276
2277     public long getCluster(ResourceImpl resource) {
2278         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2279         return cluster.getClusterId();
2280     }
2281
2282     public long getCluster(int id) {
2283         if (clusterTable == null)
2284             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2285         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2286     }
2287
2288     public ResourceImpl getResource(int id) {
2289         return new ResourceImpl(resourceSupport, id);
2290     }
2291
2292     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2293         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2294         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2295         int key = proxy.getClusterKey();
2296         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2297         return new ResourceImpl(resourceSupport, resourceKey);
2298     }
2299
2300     public ResourceImpl getResource2(int id) {
2301         assert (id != 0);
2302         return new ResourceImpl(resourceSupport, id);
2303     }
2304
2305     final public int getId(ResourceImpl impl) {
2306         return impl.id;
2307     }
2308
2309     public static final Charset UTF8 = Charset.forName("utf-8");
2310
2311
2312
2313
2314     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2315         for(TransientGraph g : support.providers) {
2316             if(g.isPending(subject)) return false;
2317         }
2318         return true;
2319     }
2320
2321     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2322         for(TransientGraph g : support.providers) {
2323             if(g.isPending(subject, predicate)) return false;
2324         }
2325         return true;
2326     }
2327
2328     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2329
2330         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2331
2332             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2333
2334             @Override
2335             public void accept(ReadGraphImpl graph) {
2336                 if(ready.decrementAndGet() == 0) {
2337                     runnable.accept(graph);
2338                 }
2339             }
2340
2341         };
2342
2343         for(TransientGraph g : support.providers) {
2344             if(g.isPending(subject)) {
2345                 try {
2346                     g.load(graph, subject, composite);
2347                 } catch (DatabaseException e) {
2348                     e.printStackTrace();
2349                 }
2350             } else {
2351                 composite.accept(graph);
2352             }
2353         }
2354
2355         composite.accept(graph);
2356
2357     }
2358
2359     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2360
2361         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2362
2363             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2364
2365             @Override
2366             public void accept(ReadGraphImpl graph) {
2367                 if(ready.decrementAndGet() == 0) {
2368                     runnable.accept(graph);
2369                 }
2370             }
2371
2372         };
2373
2374         for(TransientGraph g : support.providers) {
2375             if(g.isPending(subject, predicate)) {
2376                 try {
2377                     g.load(graph, subject, predicate, composite);
2378                 } catch (DatabaseException e) {
2379                     e.printStackTrace();
2380                 }
2381             } else {
2382                 composite.accept(graph);
2383             }
2384         }
2385
2386         composite.accept(graph);
2387
2388     }
2389
2390 //    void dumpHeap() {
2391 //
2392 //        try {
2393 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2394 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2395 //                    "d:/heap" + flushCounter + ".txt", true);
2396 //        } catch (IOException e) {
2397 //            e.printStackTrace();
2398 //        }
2399 //
2400 //    }
2401
2402     void fireReactionsToSynchronize(ChangeSet cs) {
2403
2404         // Do not fire empty events
2405         if (cs.isEmpty())
2406             return;
2407
2408         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2409 //        g.state.barrier.inc();
2410
2411         try {
2412
2413             if (!cs.isEmpty()) {
2414                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2415                 for (ChangeListener l : changeListeners2) {
2416                     try {
2417                         l.graphChanged(e2);
2418                     } catch (Exception ex) {
2419                         ex.printStackTrace();
2420                     }
2421                 }
2422             }
2423
2424         } finally {
2425
2426 //            g.state.barrier.dec();
2427 //            g.waitAsync(null);
2428
2429         }
2430
2431     }
2432
2433     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2434         try {
2435
2436             // Do not fire empty events
2437             if (cs2.isEmpty())
2438                 return;
2439
2440 //            graph.restart();
2441 //            graph.state.barrier.inc();
2442
2443             try {
2444
2445                 if (!cs2.isEmpty()) {
2446
2447                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2448                     for (ChangeListener l : changeListeners2) {
2449                         try {
2450                             l.graphChanged(e2);
2451 //                            System.out.println("changelistener " + l);
2452                         } catch (Exception ex) {
2453                             ex.printStackTrace();
2454                         }
2455                     }
2456
2457                 }
2458
2459             } finally {
2460
2461 //                graph.state.barrier.dec();
2462 //                graph.waitAsync(null);
2463
2464             }
2465         } catch (Throwable t) {
2466             t.printStackTrace();
2467
2468         }
2469     }
2470     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2471
2472         try {
2473
2474             // Do not fire empty events
2475             if (cs2.isEmpty())
2476                 return;
2477
2478             // Do not fire on virtual requests
2479             if(graph.getProvider() != null)
2480                 return;
2481
2482             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2483
2484             try {
2485
2486                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2487                 for (ChangeListener l : metadataListeners) {
2488                     try {
2489                         l.graphChanged(e2);
2490                     } catch (Throwable ex) {
2491                         ex.printStackTrace();
2492                     }
2493                 }
2494
2495             } finally {
2496
2497             }
2498
2499         } catch (Throwable t) {
2500             t.printStackTrace();
2501         }
2502
2503     }
2504
2505
2506     /*
2507      * (non-Javadoc)
2508      *
2509      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2510      */
2511     @Override
2512     public <T> T getService(Class<T> api) {
2513         T t = peekService(api);
2514         if (t == null) {
2515             if (state.isClosed())
2516                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2517             throw new ServiceNotFoundException(this, api);
2518         }
2519         return t;
2520     }
2521
2522     /**
2523      * @return
2524      */
2525     protected abstract ServerInformation getCachedServerInformation();
2526
2527         private Class<?> serviceKey1 = null;
2528         private Class<?> serviceKey2 = null;
2529         private Object service1 = null;
2530         private Object service2 = null;
2531
2532     /*
2533      * (non-Javadoc)
2534      *
2535      * @see
2536      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2537      */
2538     @SuppressWarnings("unchecked")
2539     @Override
2540     public synchronized <T> T peekService(Class<T> api) {
2541
2542                 if(serviceKey1 == api) {
2543                         return (T)service1;
2544                 } else if (serviceKey2 == api) {
2545                         // Promote this key
2546                         Object result = service2;
2547                         service2 = service1;
2548                         serviceKey2 = serviceKey1;
2549                         service1 = result;
2550                         serviceKey1 = api;
2551                         return (T)result;
2552                 }
2553
2554         if (Layer0.class == api)
2555                 return (T) L0;
2556         if (ServerInformation.class == api)
2557             return (T) getCachedServerInformation();
2558         else if (WriteGraphImpl.class == api)
2559             return (T) writeState.getGraph();
2560         else if (ClusterBuilder.class == api)
2561             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2562         else if (ClusterBuilderFactory.class == api)
2563             return (T)new ClusterBuilderFactoryImpl(this);
2564
2565                 service2 = service1;
2566                 serviceKey2 = serviceKey1;
2567
2568         service1 = serviceLocator.peekService(api);
2569         serviceKey1 = api;
2570
2571         return (T)service1;
2572
2573     }
2574
2575     /*
2576      * (non-Javadoc)
2577      *
2578      * @see
2579      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2580      */
2581     @Override
2582     public boolean hasService(Class<?> api) {
2583         return serviceLocator.hasService(api);
2584     }
2585
2586     /**
2587      * @param api the api that must be implemented by the specified service
2588      * @param service the service implementation
2589      */
2590     @Override
2591     public <T> void registerService(Class<T> api, T service) {
2592         if(Layer0.class == api) {
2593                 L0 = (Layer0)service;
2594                 return;
2595         }
2596         serviceLocator.registerService(api, service);
2597         if (TransactionPolicySupport.class == api) {
2598             transactionPolicy = (TransactionPolicySupport)service;
2599             state.resetTransactionPolicy();
2600         }
2601         if (api == serviceKey1)
2602                 service1 = service;
2603         else if (api == serviceKey2)
2604                 service2 = service;
2605     }
2606
2607     // ----------------
2608     // SessionMonitor
2609     // ----------------
2610
2611     void fireSessionVariableChange(String variable) {
2612         for (MonitorHandler h : monitorHandlers) {
2613             MonitorContext ctx = monitorContexts.getLeft(h);
2614             assert ctx != null;
2615             // SafeRunner functionality repeated here to avoid dependency.
2616             try {
2617                 h.valuesChanged(ctx);
2618             } catch (Exception e) {
2619                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2620             } catch (LinkageError e) {
2621                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2622             }
2623         }
2624     }
2625
2626
2627     class ResourceSerializerImpl implements ResourceSerializer {
2628
2629         public long createRandomAccessId(int id) throws DatabaseException {
2630             if(id < 0)
2631                 return id;
2632             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2633             long cluster = getCluster(id);
2634             if (0 == cluster)
2635                 return 0; // Better to return 0 then invalid id.
2636             long result = ClusterTraitsBase.createResourceId(cluster, index);
2637             return result;
2638         }
2639
2640         @Override
2641         public long getRandomAccessId(Resource resource) throws DatabaseException {
2642
2643             ResourceImpl resourceImpl = (ResourceImpl) resource;
2644             return createRandomAccessId(resourceImpl.id);
2645
2646         }
2647
2648         @Override
2649         public int getTransientId(Resource resource) throws DatabaseException {
2650
2651             ResourceImpl resourceImpl = (ResourceImpl) resource;
2652             return resourceImpl.id;
2653
2654         }
2655
2656         @Override
2657         public String createRandomAccessId(Resource resource)
2658         throws InvalidResourceReferenceException {
2659
2660             if(resource == null) throw new IllegalArgumentException();
2661
2662             ResourceImpl resourceImpl = (ResourceImpl) resource;
2663
2664             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2665