Cluster loading problem fix made by Antti
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.eclipse.core.runtime.Platform;
32 import org.simantics.databoard.Bindings;
33 import org.simantics.db.AsyncReadGraph;
34 import org.simantics.db.ChangeSet;
35 import org.simantics.db.DevelopmentKeys;
36 import org.simantics.db.ExternalValueSupport;
37 import org.simantics.db.Metadata;
38 import org.simantics.db.MonitorContext;
39 import org.simantics.db.MonitorHandler;
40 import org.simantics.db.Resource;
41 import org.simantics.db.ResourceSerializer;
42 import org.simantics.db.Session;
43 import org.simantics.db.SessionManager;
44 import org.simantics.db.SessionVariables;
45 import org.simantics.db.VirtualGraph;
46 import org.simantics.db.WriteGraph;
47 import org.simantics.db.authentication.UserAuthenticationAgent;
48 import org.simantics.db.authentication.UserAuthenticator;
49 import org.simantics.db.common.Indexing;
50 import org.simantics.db.common.TransactionPolicyRelease;
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
63 import org.simantics.db.common.utils.Logger;
64 import org.simantics.db.event.ChangeEvent;
65 import org.simantics.db.event.ChangeListener;
66 import org.simantics.db.event.SessionEventListener;
67 import org.simantics.db.exception.CancelTransactionException;
68 import org.simantics.db.exception.ClusterSetExistException;
69 import org.simantics.db.exception.DatabaseException;
70 import org.simantics.db.exception.ImmutableException;
71 import org.simantics.db.exception.InvalidResourceReferenceException;
72 import org.simantics.db.exception.ResourceNotFoundException;
73 import org.simantics.db.exception.RuntimeDatabaseException;
74 import org.simantics.db.exception.ServiceException;
75 import org.simantics.db.exception.ServiceNotFoundException;
76 import org.simantics.db.impl.ClusterBase;
77 import org.simantics.db.impl.ClusterI;
78 import org.simantics.db.impl.ClusterTraitsBase;
79 import org.simantics.db.impl.ClusterTranslator;
80 import org.simantics.db.impl.ResourceImpl;
81 import org.simantics.db.impl.TransientGraph;
82 import org.simantics.db.impl.VirtualGraphImpl;
83 import org.simantics.db.impl.graph.DelayedWriteGraph;
84 import org.simantics.db.impl.graph.ReadGraphImpl;
85 import org.simantics.db.impl.graph.WriteGraphImpl;
86 import org.simantics.db.impl.graph.WriteSupport;
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
90 import org.simantics.db.impl.query.QueryProcessor;
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
93 import org.simantics.db.impl.service.QueryDebug;
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
96 import org.simantics.db.procedure.AsyncListener;
97 import org.simantics.db.procedure.AsyncMultiListener;
98 import org.simantics.db.procedure.AsyncMultiProcedure;
99 import org.simantics.db.procedure.AsyncProcedure;
100 import org.simantics.db.procedure.Listener;
101 import org.simantics.db.procedure.ListenerBase;
102 import org.simantics.db.procedure.MultiListener;
103 import org.simantics.db.procedure.MultiProcedure;
104 import org.simantics.db.procedure.Procedure;
105 import org.simantics.db.procedure.SyncListener;
106 import org.simantics.db.procedure.SyncMultiListener;
107 import org.simantics.db.procedure.SyncMultiProcedure;
108 import org.simantics.db.procedure.SyncProcedure;
109 import org.simantics.db.procore.cluster.ClusterImpl;
110 import org.simantics.db.procore.cluster.ClusterTraits;
111 import org.simantics.db.procore.protocol.Constants;
112 import org.simantics.db.procore.protocol.DebugPolicy;
113 import org.simantics.db.request.AsyncMultiRead;
114 import org.simantics.db.request.AsyncRead;
115 import org.simantics.db.request.DelayedWrite;
116 import org.simantics.db.request.DelayedWriteResult;
117 import org.simantics.db.request.ExternalRead;
118 import org.simantics.db.request.MultiRead;
119 import org.simantics.db.request.Read;
120 import org.simantics.db.request.ReadInterface;
121 import org.simantics.db.request.Write;
122 import org.simantics.db.request.WriteInterface;
123 import org.simantics.db.request.WriteOnly;
124 import org.simantics.db.request.WriteOnlyResult;
125 import org.simantics.db.request.WriteResult;
126 import org.simantics.db.request.WriteTraits;
127 import org.simantics.db.service.ByteReader;
128 import org.simantics.db.service.ClusterBuilder;
129 import org.simantics.db.service.ClusterBuilderFactory;
130 import org.simantics.db.service.ClusterControl;
131 import org.simantics.db.service.ClusterSetsSupport;
132 import org.simantics.db.service.ClusterUID;
133 import org.simantics.db.service.ClusteringSupport;
134 import org.simantics.db.service.CollectionSupport;
135 import org.simantics.db.service.DebugSupport;
136 import org.simantics.db.service.DirectQuerySupport;
137 import org.simantics.db.service.GraphChangeListenerSupport;
138 import org.simantics.db.service.InitSupport;
139 import org.simantics.db.service.LifecycleSupport;
140 import org.simantics.db.service.ManagementSupport;
141 import org.simantics.db.service.QueryControl;
142 import org.simantics.db.service.SerialisationSupport;
143 import org.simantics.db.service.ServerInformation;
144 import org.simantics.db.service.ServiceActivityMonitor;
145 import org.simantics.db.service.SessionEventSupport;
146 import org.simantics.db.service.SessionMonitorSupport;
147 import org.simantics.db.service.SessionUserSupport;
148 import org.simantics.db.service.StatementSupport;
149 import org.simantics.db.service.TransactionPolicySupport;
150 import org.simantics.db.service.TransactionSupport;
151 import org.simantics.db.service.TransferableGraphSupport;
152 import org.simantics.db.service.UndoRedoSupport;
153 import org.simantics.db.service.VirtualGraphSupport;
154 import org.simantics.db.service.XSupport;
155 import org.simantics.layer0.Layer0;
156 import org.simantics.utils.DataContainer;
157 import org.simantics.utils.Development;
158 import org.simantics.utils.datastructures.Callback;
159 import org.simantics.utils.threads.logger.ITask;
160 import org.simantics.utils.threads.logger.ThreadLogger;
161
162 import gnu.trove.procedure.TLongProcedure;
163 import gnu.trove.set.hash.TLongHashSet;
164
165
166 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
167
168     protected static final boolean DEBUG        = false;
169
170     private static final boolean DIAGNOSTICS       = false;
171
172     private TransactionPolicySupport transactionPolicy;
173
174     final private ClusterControl clusterControl;
175
176     final protected BuiltinSupportImpl builtinSupport;
177     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
178     final protected ClusterSetsSupport clusterSetsSupport;
179     final protected LifecycleSupportImpl lifecycleSupport;
180
181     protected QuerySupportImpl querySupport;
182     protected ResourceSupportImpl resourceSupport;
183     protected WriteSupport writeSupport;
184     public ClusterTranslator clusterTranslator;
185
186     boolean dirtyPrimitives = false;
187
188     public static final int SERVICE_MODE_CREATE = 2;
189     public static final int SERVICE_MODE_ALLOW = 1;
190     public int serviceMode = 0;
191     public boolean createdImmutableClusters = false;
192     public TLongHashSet createdClusters = new TLongHashSet();
193
194     private Layer0 L0;
195
196     /**
197      * The service locator maintained by the workbench. These services are
198      * initialized during workbench during the <code>init</code> method.
199      */
200     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
201
202     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
203
204     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
205
206     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
207
208     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
209
210     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
211
212     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
213
214     final protected State                                        state              = new State();
215
216     protected GraphSession                                       graphSession       = null;
217
218     protected SessionManager                                     sessionManagerImpl = null;
219
220     protected UserAuthenticationAgent                            authAgent          = null;
221
222     protected UserAuthenticator                                  authenticator      = null;
223
224     protected Resource                                           user               = null;
225
226     protected ClusterStream                                      clusterStream      = null;
227
228     protected SessionRequestManager                         requestManager = null;
229
230     public ClusterTable                                       clusterTable       = null;
231
232     public QueryProcessor                                     queryProvider2 = null;
233
234     ClientChangesImpl                                  clientChanges      = null;
235
236     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
237
238 //    protected long                                               newClusterId       = Constants.NullClusterId;
239
240     protected int                                                flushCounter       = 0;
241
242     protected boolean                                            writeOnly          = false;
243
244     WriteState<?>                                                writeState = null;
245     WriteStateBase<?>                                            delayedWriteState = null;
246     protected Resource defaultClusterSet = null; // If not null then used for newResource().
247
248     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
249
250 //        if (authAgent == null)
251 //            throw new IllegalArgumentException("null authentication agent");
252
253         File t = StaticSessionProperties.virtualGraphStoragePath;
254         if (null == t)
255             t = new File(".");
256         this.clusterTable = new ClusterTable(this, t);
257         this.builtinSupport = new BuiltinSupportImpl(this);
258         this.sessionManagerImpl = sessionManagerImpl;
259         this.user = null;
260 //        this.authAgent = authAgent;
261
262         serviceLocator.registerService(Session.class, this);
263         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
264         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
265         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
266         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
267         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
268         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
269         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
270         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
271         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
272         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
273         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
274         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
275         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
276         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
277         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
278         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
279         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
280         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
281         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
282         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
283         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
284         ServiceActivityUpdaterForWriteTransactions.register(this);
285
286         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
287         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
288         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
289         this.lifecycleSupport = new LifecycleSupportImpl(this);
290         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
291         this.transactionPolicy = new TransactionPolicyRelease();
292         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
293         this.clusterControl = new ClusterControlImpl(this);
294         serviceLocator.registerService(ClusterControl.class, clusterControl);
295         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
296         this.clusterSetsSupport.setReadDirectory(t.toPath());
297         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
298         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
299     }
300
301     /*
302      *
303      * Session interface
304      */
305
306
307     @Override
308     public Resource getRootLibrary() {
309         return queryProvider2.getRootLibraryResource();
310     }
311
312     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
313         if (!graphSession.dbSession.refreshEnabled())
314             return;
315         try {
316             getClusterTable().refresh(csid, this, clusterUID);
317         } catch (Throwable t) {
318             Logger.defaultLogError("Refesh failed.", t);
319         }
320     }
321
322     static final class TaskHelper {
323         private final String name;
324         private Object result;
325         final Semaphore sema = new Semaphore(0);
326         private Throwable throwable = null;
327         final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
328             @Override
329             public void run(DatabaseException e) {
330                 synchronized (TaskHelper.this) {
331                     throwable = e;
332                 }
333             }
334         };
335         final Procedure<Object> proc = new Procedure<Object>() {
336             @Override
337             public void execute(Object result) {
338                 callback.run(null);
339             }
340             @Override
341             public void exception(Throwable t) {
342                 if (t instanceof DatabaseException)
343                     callback.run((DatabaseException)t);
344                 else
345                     callback.run(new DatabaseException("" + name + "operation failed.", t));
346             }
347         };
348         final WriteTraits writeTraits = new WriteTraits() {};
349         TaskHelper(String name) {
350             this.name = name;
351         }
352         <T> T getResult() {
353             return (T)result;
354         }
355         void setResult(Object result) {
356             this.result = result;
357         }
358 //        Throwable throwableGet() {
359 //            return throwable;
360 //        }
361         synchronized void throwableSet(Throwable t) {
362             throwable = t;
363         }
364 //        void throwableSet(String t) {
365 //            throwable = new InternalException("" + name + " operation failed. " + t);
366 //        }
367         synchronized void throwableCheck()
368         throws DatabaseException {
369             if (null != throwable)
370                 if (throwable instanceof DatabaseException)
371                     throw (DatabaseException)throwable;
372                 else
373                     throw new DatabaseException("Undo operation failed.", throwable);
374         }
375         void throw_(String message)
376         throws DatabaseException {
377             throw new DatabaseException("" + name + " operation failed. " + message);
378         }
379     }
380
381
382
383     private ListenerBase getListenerBase(Object procedure) {
384         if (procedure instanceof ListenerBase)
385             return (ListenerBase) procedure;
386         else
387             return null;
388     }
389
390     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
391         scheduleRequest(request, callback, notify, null);
392     }
393
394     /* (non-Javadoc)
395      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
396      */
397     @Override
398     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
399
400         assert (request != null);
401
402         if(Development.DEVELOPMENT) {
403             try {
404             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
405                 System.err.println("schedule write '" + request + "'");
406             } catch (Throwable t) {
407                 Logger.defaultLogError(t);
408             }
409         }
410
411         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
412
413         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
414
415             @Override
416             public void run(int thread) {
417
418                 if(Development.DEVELOPMENT) {
419                     try {
420                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
421                         System.err.println("perform write '" + request + "'");
422                     } catch (Throwable t) {
423                         Logger.defaultLogError(t);
424                     }
425                 }
426
427                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
428
429                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
430
431                 try {
432
433                     flushCounter = 0;
434                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
435
436                     VirtualGraph vg = getProvider(request.getProvider());
437
438                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
439                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
440
441                         @Override
442                         public void execute(Object result) {
443                             if(callback != null) callback.run(null);
444                         }
445
446                         @Override
447                         public void exception(Throwable t) {
448                             if(callback != null) callback.run((DatabaseException)t);
449                         }
450
451                     });
452
453                     assert (null != writer);
454 //                    writer.state.barrier.inc();
455
456                     try {
457                         request.perform(writer);
458                         assert (null != writer);
459                     } catch (Throwable t) {
460                         if (!(t instanceof CancelTransactionException))
461                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
462                         writeState.except(t);
463                     } finally {
464 //                        writer.state.barrier.dec();
465 //                        writer.waitAsync(request);
466                     }
467
468
469                     assert(!queryProvider2.dirty);
470
471                 } catch (Throwable e) {
472
473                     // Log it first, just to be safe that the error is always logged.
474                     if (!(e instanceof CancelTransactionException))
475                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
476
477 //                    writeState.getGraph().state.barrier.dec();
478 //                    writeState.getGraph().waitAsync(request);
479
480                     writeState.except(e);
481
482 //                    try {
483 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
484 //                        // All we can do here is to log those, can't really pass them anywhere.
485 //                        if (callback != null) {
486 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
487 //                            else callback.run(new DatabaseException(e));
488 //                        }
489 //                    } catch (Throwable e2) {
490 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
491 //                    }
492 //                    boolean empty = clusterStream.reallyFlush();
493 //                    int callerThread = -1;
494 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
495 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
496 //                    try {
497 //                        // Send all local changes to server so it can calculate correct reverse change set.
498 //                        // This will call clusterStream.accept().
499 //                        state.cancelCommit(context, clusterStream);
500 //                        if (!empty) {
501 //                            if (!context.isOk()) // this is a blocking operation
502 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
503 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
504 //                        }
505 //                        state.cancelCommit2(context, clusterStream);
506 //                    } catch (DatabaseException e3) {
507 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
508 //                    } finally {
509 //                        clusterStream.setOff(false);
510 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
511 //                    }
512
513                 } finally {
514
515                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
516
517                 }
518
519                 task.finish();
520
521                 if(Development.DEVELOPMENT) {
522                     try {
523                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
524                         System.err.println("finish write '" + request + "'");
525                     } catch (Throwable t) {
526                         Logger.defaultLogError(t);
527                     }
528                 }
529
530             }
531
532         }, combine);
533
534     }
535
536     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
537         scheduleRequest(request, procedure, notify, null);
538     }
539
540     /* (non-Javadoc)
541      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
542      */
543     @Override
544     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
545
546         assert (request != null);
547
548         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
549
550         requestManager.scheduleWrite(new SessionTask(request, thread) {
551
552             @Override
553             public void run(int thread) {
554
555                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
556
557                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
558
559                 flushCounter = 0;
560                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
561
562                 VirtualGraph vg = getProvider(request.getProvider());
563                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
564
565                 try {
566                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
567                     writeState = writeStateT;
568
569                     assert (null != writer);
570 //                    writer.state.barrier.inc();
571                     writeStateT.setResult(request.perform(writer));
572                     assert (null != writer);
573
574 //                    writer.state.barrier.dec();
575 //                    writer.waitAsync(null);
576
577                 } catch (Throwable e) {
578
579 //                    writer.state.barrier.dec();
580 //                    writer.waitAsync(null);
581
582                     writeState.except(e);
583
584 //                  state.stopWriteTransaction(clusterStream);
585 //
586 //              } catch (Throwable e) {
587 //                  // Log it first, just to be safe that the error is always logged.
588 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
589 //
590 //                  try {
591 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
592 //                      // All we can do here is to log those, can't really pass them anywhere.
593 //                      if (procedure != null) {
594 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
595 //                          else procedure.exception(new DatabaseException(e));
596 //                      }
597 //                  } catch (Throwable e2) {
598 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
599 //                  }
600 //
601 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
602 //
603 //                  state.stopWriteTransaction(clusterStream);
604
605                 } finally {
606                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
607                 }
608
609 //              if(notify != null) notify.release();
610
611                 task.finish();
612
613             }
614
615         }, combine);
616
617     }
618
619     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
620         scheduleRequest(request, callback, notify, null);
621     }
622
623     /* (non-Javadoc)
624      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
625      */
626     @Override
627     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
628
629         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
630
631         assert (request != null);
632
633         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
634
635         requestManager.scheduleWrite(new SessionTask(request, thread) {
636
637             @Override
638             public void run(int thread) {
639                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
640
641                 Procedure<Object> stateProcedure = new Procedure<Object>() {
642                     @Override
643                     public void execute(Object result) {
644                         if (callback != null)
645                             callback.run(null);
646                     }
647                     @Override
648                     public void exception(Throwable t) {
649                         if (callback != null) {
650                             if (t instanceof DatabaseException) callback.run((DatabaseException) t);
651                             else callback.run(new DatabaseException(t));
652                         } else
653                             Logger.defaultLogError("Unhandled exception", t);
654                     }
655                 };
656
657                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
658                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
659                 DelayedWriteGraph dwg = null;
660 //                newGraph.state.barrier.inc();
661
662                 try {
663                     dwg = new DelayedWriteGraph(newGraph);
664                     request.perform(dwg);
665                 } catch (Throwable e) {
666                     delayedWriteState.except(e);
667                     total.finish();
668                     dwg.close();
669                     return;
670                 } finally {
671 //                    newGraph.state.barrier.dec();
672 //                    newGraph.waitAsync(request);
673                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
674                 }
675
676                 delayedWriteState = null;
677
678                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
679                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
680
681                 flushCounter = 0;
682                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
683
684                 acquireWriteOnly();
685
686                 VirtualGraph vg = getProvider(request.getProvider());
687                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
688
689                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
690
691                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
692
693                 assert (null != writer);
694 //                writer.state.barrier.inc();
695
696                 try {
697                     // Cannot cancel
698                     dwg.commit(writer, request);
699
700                         if(defaultClusterSet != null) {
701                                 XSupport xs = getService(XSupport.class);
702                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
703                                 ClusteringSupport cs = getService(ClusteringSupport.class);
704                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
705                         }
706
707                     // This makes clusters available from server
708                     clusterStream.reallyFlush();
709
710                     releaseWriteOnly(writer);
711                     handleUpdatesAndMetadata(writer);
712
713                 } catch (ServiceException e) {
714 //                    writer.state.barrier.dec();
715 //                    writer.waitAsync(null);
716
717                     // These shall be requested from server
718                     clusterTable.removeWriteOnlyClusters();
719                     // This makes clusters available from server
720                     clusterStream.reallyFlush();
721
722                     releaseWriteOnly(writer);
723                     writeState.except(e);
724                 } finally {
725                     // Debugging & Profiling
726                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
727                     task2.finish();
728                     total.finish();
729                 }
730             }
731
732         }, combine);
733
734     }
735
736     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
737         scheduleRequest(request, procedure, notify, null);
738     }
739
740     /* (non-Javadoc)
741      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
742      */
743     @Override
744     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
745         throw new Error("Not implemented");
746     }
747
748     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
749         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
750         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
751             createdClusters.add(cluster.clusterId);
752         }
753         return cluster;
754     }
755
756     class WriteOnlySupport implements WriteSupport {
757
758         ClusterStream stream;
759         ClusterImpl currentCluster;
760
761         public WriteOnlySupport() {
762             this.stream = clusterStream;
763         }
764
765         @Override
766         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
767             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
768         }
769
770         @Override
771         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
772
773             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
774
775             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
776                 if(s != queryProvider2.getRootLibrary())
777                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
778
779             try {
780                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
781             } catch (DatabaseException e) {
782                 Logger.defaultLogError(e);
783                 //throw e;
784             }
785
786             clientChanges.invalidate(s);
787
788             if (cluster.isWriteOnly())
789                 return;
790             queryProvider2.updateStatements(s, p);
791
792         }
793
794         @Override
795         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
796             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
797         }
798
799         @Override
800         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
801
802             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
803             try {
804                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
805             } catch (DatabaseException e) {
806                 Logger.defaultLogError(e);
807             }
808
809             clientChanges.invalidate(rid);
810
811             if (cluster.isWriteOnly())
812                 return;
813             queryProvider2.updateValue(rid);
814
815         }
816
817         @Override
818         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
819
820             if(amount < 65536) {
821                 claimValue(provider,resource, reader.readBytes(null, amount));
822                 return;
823             }
824
825             byte[] bytes = new byte[65536];
826
827             int rid = ((ResourceImpl)resource).id;
828             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
829             try {
830                 int left = amount;
831                 while(left > 0) {
832                     int block = Math.min(left, 65536);
833                     reader.readBytes(bytes, block);
834                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
835                     left -= block;
836                 }
837             } catch (DatabaseException e) {
838                 Logger.defaultLogError(e);
839             }
840
841             clientChanges.invalidate(rid);
842
843             if (cluster.isWriteOnly())
844                 return;
845             queryProvider2.updateValue(rid);
846
847         }
848
849         private void maintainCluster(ClusterImpl before, ClusterI after_) {
850             if(after_ != null && after_ != before) {
851                 ClusterImpl after = (ClusterImpl)after_;
852                 if(currentCluster == before) {
853                     currentCluster = after;
854                 }
855                 clusterTable.replaceCluster(after);
856             }
857         }
858
859         public int createResourceKey(int foreignCounter) throws DatabaseException {
860             if(currentCluster == null) {
861                 currentCluster = getNewResourceCluster();
862             }
863             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
864                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
865                 newCluster.foreignLookup = new byte[foreignCounter];
866                 currentCluster = newCluster;
867                 if (DEBUG)
868                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
869             }
870             return currentCluster.createResource(clusterTranslator);
871         }
872
873         @Override
874         public Resource createResource(VirtualGraph provider) throws DatabaseException {
875             if(currentCluster == null) {
876                 if (null != defaultClusterSet) {
877                         ResourceImpl result = getNewResource(defaultClusterSet);
878                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
879                     return result;
880                 } else {
881                     currentCluster = getNewResourceCluster();
882                 }
883             }
884             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
885                 if (null != defaultClusterSet) {
886                         ResourceImpl result = getNewResource(defaultClusterSet);
887                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
888                     return result;
889                 } else {
890                     currentCluster = getNewResourceCluster();
891                 }
892             }
893             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
894         }
895
896         @Override
897         public Resource createResource(VirtualGraph provider, long clusterId)
898         throws DatabaseException {
899             return getNewResource(clusterId);
900         }
901
902         @Override
903         public Resource createResource(VirtualGraph provider, Resource clusterSet)
904         throws DatabaseException {
905             return getNewResource(clusterSet);
906         }
907
908         @Override
909         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
910                 throws DatabaseException {
911             getNewClusterSet(clusterSet);
912         }
913
914         @Override
915         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
916         throws ServiceException {
917             return containsClusterSet(clusterSet);
918         }
919
920         public void selectCluster(long cluster) {
921                 currentCluster = clusterTable.getClusterByClusterId(cluster);
922                 long setResourceId = clusterSetsSupport.getSet(cluster);
923                 clusterSetsSupport.put(setResourceId, cluster);
924         }
925
926         @Override
927         public Resource setDefaultClusterSet(Resource clusterSet)
928         throws ServiceException {
929                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
930                 if(clusterSet != null) {
931                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
932                         currentCluster = clusterTable.getClusterByClusterId(id);
933                         return result;
934                 } else {
935                         currentCluster = null;
936                         return null;
937                 }
938         }
939
940         @Override
941         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
942
943                  provider = getProvider(provider);
944              if (null == provider) {
945                  int key = ((ResourceImpl)resource).id;
946
947
948
949                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
950 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
951 //                      if(key != queryProvider2.getRootLibrary())
952 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
953
954 //                 try {
955 //                     cluster.removeValue(key, clusterTranslator);
956 //                 } catch (DatabaseException e) {
957 //                     Logger.defaultLogError(e);
958 //                     return;
959 //                 }
960
961                  clusterTable.writeOnlyInvalidate(cluster);
962
963                  try {
964                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
965                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
966                          clusterTranslator.removeValue(cluster);
967                  } catch (DatabaseException e) {
968                          Logger.defaultLogError(e);
969                  }
970
971                  queryProvider2.invalidateResource(key);
972                  clientChanges.invalidate(key);
973
974              } else {
975                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
976                  queryProvider2.updateValue(querySupport.getId(resource));
977                  clientChanges.claimValue(resource);
978              }
979
980
981         }
982
983         @Override
984         public void flush(boolean intermediate) {
985             throw new UnsupportedOperationException();
986         }
987
988         @Override
989         public void flushCluster() {
990             clusterTable.flushCluster(graphSession);
991             if(defaultClusterSet != null) {
992                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
993             }
994             currentCluster = null;
995         }
996
997         @Override
998         public void flushCluster(Resource r) {
999             throw new UnsupportedOperationException("flushCluster resource " + r);
1000         }
1001
1002         @Override
1003         public void gc() {
1004         }
1005
1006         @Override
1007         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1008                 Resource object) {
1009
1010                 int s = ((ResourceImpl)subject).id;
1011                 int p = ((ResourceImpl)predicate).id;
1012                 int o = ((ResourceImpl)object).id;
1013
1014                 provider = getProvider(provider);
1015                 if (null == provider) {
1016
1017                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1018                         clusterTable.writeOnlyInvalidate(cluster);
1019
1020                         try {
1021
1022                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1023                                 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1024                                 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1025
1026                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1027                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1028
1029                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1030                                 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1031                                 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1032                                 clusterTranslator.removeStatement(cluster);
1033
1034                                 queryProvider2.invalidateResource(s);
1035                                 clientChanges.invalidate(s);
1036
1037                         } catch (DatabaseException e) {
1038
1039                                 Logger.defaultLogError(e);
1040
1041                         }
1042
1043                         return true;
1044
1045                 } else {
1046                         
1047                         ((VirtualGraphImpl)provider).deny(s, p, o);
1048                         queryProvider2.invalidateResource(s);
1049                         clientChanges.invalidate(s);
1050
1051                         return true;
1052
1053                 }
1054
1055         }
1056
1057         @Override
1058         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1059             throw new UnsupportedOperationException();
1060         }
1061
1062         @Override
1063         public boolean writeOnly() {
1064             return true;
1065         }
1066
1067         @Override
1068         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1069             writeSupport.performWriteRequest(graph, request);
1070         }
1071
1072         @Override
1073         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1074             throw new UnsupportedOperationException();
1075         }
1076
1077         @Override
1078         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1079             throw new UnsupportedOperationException();
1080         }
1081
1082         @Override
1083         public <T> void addMetadata(Metadata data) throws ServiceException {
1084             writeSupport.addMetadata(data);
1085         }
1086
1087         @Override
1088         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1089             return writeSupport.getMetadata(clazz);
1090         }
1091
1092         @Override
1093         public TreeMap<String, byte[]> getMetadata() {
1094             return writeSupport.getMetadata();
1095         }
1096
1097         @Override
1098         public void commitDone(WriteTraits writeTraits, long csid) {
1099             writeSupport.commitDone(writeTraits, csid);
1100         }
1101
1102         @Override
1103         public void clearUndoList(WriteTraits writeTraits) {
1104             writeSupport.clearUndoList(writeTraits);
1105         }
1106         @Override
1107         public int clearMetadata() {
1108             return writeSupport.clearMetadata();
1109         }
1110
1111                 @Override
1112                 public void startUndo() {
1113                         writeSupport.startUndo();
1114                 }
1115     }
1116
1117     class VirtualWriteOnlySupport implements WriteSupport {
1118
1119 //        @Override
1120 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1121 //                Resource object) {
1122 //            throw new UnsupportedOperationException();
1123 //        }
1124
1125         @Override
1126         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1127
1128             TransientGraph impl = (TransientGraph)provider;
1129             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1130             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1131             clientChanges.claim(subject, predicate, object);
1132
1133         }
1134
1135         @Override
1136         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1137
1138             TransientGraph impl = (TransientGraph)provider;
1139             impl.claim(subject, predicate, object);
1140             getQueryProvider2().updateStatements(subject, predicate);
1141             clientChanges.claim(subject, predicate, object);
1142
1143         }
1144
1145         @Override
1146         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1147             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1148         }
1149
1150         @Override
1151         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1152             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1153             getQueryProvider2().updateValue(resource);
1154             clientChanges.claimValue(resource);
1155         }
1156
1157         @Override
1158         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1159             byte[] value = reader.readBytes(null, amount);
1160             claimValue(provider, resource, value);
1161         }
1162
1163         @Override
1164         public Resource createResource(VirtualGraph provider) {
1165             TransientGraph impl = (TransientGraph)provider;
1166             return impl.getResource(impl.newResource(false));
1167         }
1168
1169         @Override
1170         public Resource createResource(VirtualGraph provider, long clusterId) {
1171             throw new UnsupportedOperationException();
1172         }
1173
1174         @Override
1175         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1176         throws DatabaseException {
1177             throw new UnsupportedOperationException();
1178         }
1179
1180         @Override
1181         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1182         throws DatabaseException {
1183             throw new UnsupportedOperationException();
1184         }
1185
1186         @Override
1187         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1188         throws ServiceException {
1189             throw new UnsupportedOperationException();
1190         }
1191
1192         @Override
1193         public Resource setDefaultClusterSet(Resource clusterSet)
1194         throws ServiceException {
1195                 return null;
1196         }
1197
1198         @Override
1199         public void denyValue(VirtualGraph provider, Resource resource) {
1200             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1201             getQueryProvider2().updateValue(querySupport.getId(resource));
1202             // NOTE: this only keeps track of value changes by-resource.
1203             clientChanges.claimValue(resource);
1204         }
1205
1206         @Override
1207         public void flush(boolean intermediate) {
1208             throw new UnsupportedOperationException();
1209         }
1210
1211         @Override
1212         public void flushCluster() {
1213             throw new UnsupportedOperationException();
1214         }
1215
1216         @Override
1217         public void flushCluster(Resource r) {
1218             throw new UnsupportedOperationException("Resource " + r);
1219         }
1220
1221         @Override
1222         public void gc() {
1223         }
1224
1225         @Override
1226         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1227             TransientGraph impl = (TransientGraph) provider;
1228             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1229             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1230             clientChanges.deny(subject, predicate, object);
1231             return true;
1232         }
1233
1234         @Override
1235         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1236             throw new UnsupportedOperationException();
1237         }
1238
1239         @Override
1240         public boolean writeOnly() {
1241             return true;
1242         }
1243
1244         @Override
1245         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1246             throw new UnsupportedOperationException();
1247         }
1248
1249         @Override
1250         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1251             throw new UnsupportedOperationException();
1252         }
1253
1254         @Override
1255         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1256             throw new UnsupportedOperationException();
1257         }
1258
1259         @Override
1260         public <T> void addMetadata(Metadata data) throws ServiceException {
1261             throw new UnsupportedOperationException();
1262         }
1263
1264         @Override
1265         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1266             throw new UnsupportedOperationException();
1267         }
1268
1269         @Override
1270         public TreeMap<String, byte[]> getMetadata() {
1271             throw new UnsupportedOperationException();
1272         }
1273
1274         @Override
1275         public void commitDone(WriteTraits writeTraits, long csid) {
1276         }
1277
1278         @Override
1279         public void clearUndoList(WriteTraits writeTraits) {
1280         }
1281
1282         @Override
1283         public int clearMetadata() {
1284             return 0;
1285         }
1286
1287                 @Override
1288                 public void startUndo() {
1289                 }
1290     }
1291
1292     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1293
1294         try {
1295
1296             int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1297
1298             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1299
1300             flushCounter = 0;
1301             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1302
1303             acquireWriteOnly();
1304
1305             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1306
1307             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1308
1309             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1310             writeState = writeStateT;
1311
1312             assert (null != writer);
1313 //            writer.state.barrier.inc();
1314             long start = System.nanoTime();
1315             T result = request.perform(writer);
1316             long duration = System.nanoTime() - start;
1317             if (DEBUG) {
1318                 System.err.println("################");
1319                 System.err.println("WriteOnly duration " + 1e-9*duration);
1320             }
1321             writeStateT.setResult(result);
1322
1323             // This makes clusters available from server
1324             clusterStream.reallyFlush();
1325
1326             // This will trigger query updates
1327             releaseWriteOnly(writer);
1328             assert (null != writer);
1329
1330             handleUpdatesAndMetadata(writer);
1331
1332         } catch (CancelTransactionException e) {
1333
1334             releaseWriteOnly(writeState.getGraph());
1335
1336             clusterTable.removeWriteOnlyClusters();
1337             state.stopWriteTransaction(clusterStream);
1338
1339         } catch (Throwable e) {
1340
1341             e.printStackTrace();
1342
1343             releaseWriteOnly(writeState.getGraph());
1344
1345             clusterTable.removeWriteOnlyClusters();
1346
1347             if (callback != null)
1348                 callback.exception(new DatabaseException(e));
1349
1350             state.stopWriteTransaction(clusterStream);
1351             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1352
1353         } finally  {
1354
1355             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1356
1357         }
1358
1359     }
1360
1361     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
1362         scheduleRequest(request, callback, notify, null);
1363     }
1364
1365     @Override
1366     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1367
1368         assertAlive();
1369
1370         assert (request != null);
1371
1372         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1373
1374         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1375
1376             @Override
1377             public void run(int thread) {
1378
1379                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1380
1381                     try {
1382
1383                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1384
1385                         flushCounter = 0;
1386                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1387
1388                         acquireWriteOnly();
1389
1390                         VirtualGraph vg = getProvider(request.getProvider());
1391                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1392
1393                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1394
1395                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1396
1397                             @Override
1398                             public void execute(Object result) {
1399                                 if(callback != null) callback.run(null);
1400                             }
1401
1402                             @Override
1403                             public void exception(Throwable t) {
1404                                 if(callback != null) callback.run((DatabaseException)t);
1405                             }
1406
1407                         });
1408
1409                         assert (null != writer);
1410 //                        writer.state.barrier.inc();
1411
1412                         try {
1413
1414                             request.perform(writer);
1415
1416                         } catch (Throwable e) {
1417
1418 //                            writer.state.barrier.dec();
1419 //                            writer.waitAsync(null);
1420
1421                             releaseWriteOnly(writer);
1422
1423                             clusterTable.removeWriteOnlyClusters();
1424
1425                             if(!(e instanceof CancelTransactionException)) {
1426                                 if (callback != null)
1427                                     callback.run(new DatabaseException(e));
1428                             }
1429
1430                             writeState.except(e);
1431
1432                             return;
1433
1434                         }
1435
1436                         // This makes clusters available from server
1437                         boolean empty = clusterStream.reallyFlush();
1438                         // This was needed to make WO requests call metadata listeners.
1439                         // NOTE: the calling event does not contain clientChanges information.
1440                         if (!empty && clientChanges.isEmpty())
1441                             clientChanges.setNotEmpty(true);
1442                         releaseWriteOnly(writer);
1443                         assert (null != writer);
1444                     } finally  {
1445
1446                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1447
1448                     }
1449
1450
1451                 task.finish();
1452
1453             }
1454
1455         }, combine);
1456
1457     }
1458
1459     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1460         scheduleRequest(request, callback, notify, null);
1461     }
1462
1463     /* (non-Javadoc)
1464      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1465      */
1466     @Override
1467     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1468
1469         assert (request != null);
1470
1471         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1472
1473         requestManager.scheduleWrite(new SessionTask(request, thread) {
1474
1475             @Override
1476             public void run(int thread) {
1477
1478                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1479
1480                 performWriteOnly(request, notify, callback);
1481
1482                 task.finish();
1483
1484             }
1485
1486         }, combine);
1487
1488     }
1489
1490     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1491
1492         assert (request != null);
1493         assert (procedure != null);
1494
1495         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1496
1497         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1498
1499             @Override
1500             public void run(int thread) {
1501
1502                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1503
1504                 ListenerBase listener = getListenerBase(procedure);
1505
1506                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1507
1508                 try {
1509
1510                     if (listener != null) {
1511
1512                         try {
1513                                 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1514
1515                                         @Override
1516                                         public void exception(AsyncReadGraph graph, Throwable t) {
1517                                                 procedure.exception(graph, t);
1518                                                 if(throwable != null) {
1519                                                         throwable.set(t);
1520                                                 } else {
1521                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1522                                                 }
1523                                         }
1524
1525                                         @Override
1526                                         public void execute(AsyncReadGraph graph, T t) {
1527                                                 if(result != null) result.set(t);
1528                                                 procedure.execute(graph, t);
1529                                         }
1530
1531                                 }, listener);
1532                         } catch (Throwable t) {
1533                             // This is handled by the AsyncProcedure
1534                                 //Logger.defaultLogError("Internal error", t);
1535                         }
1536
1537                     } else {
1538
1539                         try {
1540
1541 //                            newGraph.state.barrier.inc();
1542
1543                             T t = request.perform(newGraph);
1544
1545                             try {
1546
1547                                 if(result != null) result.set(t);
1548                                 procedure.execute(newGraph, t);
1549
1550                             } catch (Throwable th) {
1551
1552                                 if(throwable != null) {
1553                                     throwable.set(th);
1554                                 } else {
1555                                     Logger.defaultLogError("Unhandled exception", th);
1556                                 }
1557
1558                             }
1559
1560                         } catch (Throwable t) {
1561
1562                             if (DEBUG)
1563                                 t.printStackTrace();
1564
1565                             if(throwable != null) {
1566                                 throwable.set(t);
1567                             } else {
1568                                 Logger.defaultLogError("Unhandled exception", t);
1569                             }
1570
1571                             try {
1572
1573                                 procedure.exception(newGraph, t);
1574
1575                             } catch (Throwable t2) {
1576
1577                                 if(throwable != null) {
1578                                     throwable.set(t2);
1579                                 } else {
1580                                     Logger.defaultLogError("Unhandled exception", t2);
1581                                 }
1582
1583                             }
1584
1585                         }
1586
1587 //                        newGraph.state.barrier.dec();
1588 //                        newGraph.waitAsync(request);
1589
1590                     }
1591
1592                 } finally {
1593
1594                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1595
1596                 }
1597
1598             }
1599
1600         });
1601
1602     }
1603
1604     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1605
1606         assert (request != null);
1607         assert (procedure != null);
1608
1609         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1610
1611         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1612
1613             @Override
1614             public void run(int thread) {
1615
1616                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1617
1618                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1619
1620                 try {
1621
1622                     if (listener != null) {
1623
1624                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1625
1626 //                        newGraph.waitAsync(request);
1627
1628                     } else {
1629
1630                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1631                                 procedure, "request");
1632
1633                         try {
1634
1635 //                            newGraph.state.barrier.inc();
1636
1637                             request.perform(newGraph, wrapper);
1638
1639 //                            newGraph.waitAsync(request);
1640
1641                         } catch (Throwable t) {
1642
1643                             wrapper.exception(newGraph, t);
1644 //                            newGraph.waitAsync(request);
1645
1646
1647                         }
1648
1649                     }
1650
1651                 } finally {
1652
1653                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1654
1655                 }
1656
1657             }
1658
1659         });
1660
1661     }
1662
1663     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1664
1665         assert (request != null);
1666         assert (procedure != null);
1667
1668         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1669
1670         int sync = notify != null ? thread : -1;
1671
1672         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1673
1674             @Override
1675             public void run(int thread) {
1676
1677                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1678
1679                 ListenerBase listener = getListenerBase(procedure);
1680
1681                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1682
1683                 try {
1684
1685                     if (listener != null) {
1686
1687                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1688
1689 //                        newGraph.waitAsync(request);
1690
1691                     } else {
1692
1693                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1694
1695                         try {
1696
1697                             request.perform(newGraph, wrapper);
1698
1699                         } catch (Throwable t) {
1700
1701                             t.printStackTrace();
1702
1703                         }
1704
1705                     }
1706
1707                 } finally {
1708
1709                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1710
1711                 }
1712
1713             }
1714
1715         });
1716
1717     }
1718
1719     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1720
1721         assert (request != null);
1722         assert (procedure != null);
1723
1724         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1725
1726         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1727
1728             @Override
1729             public void run(int thread) {
1730
1731                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1732
1733                 ListenerBase listener = getListenerBase(procedure);
1734
1735                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1736
1737                 try {
1738
1739                     if (listener != null) {
1740
1741                         newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1742
1743                             @Override
1744                             public void exception(Throwable t) {
1745                                 procedure.exception(t);
1746                                 if(throwable != null) {
1747                                     throwable.set(t);
1748                                 }
1749                             }
1750
1751                             @Override
1752                             public void execute(T t) {
1753                                 if(result != null) result.set(t);
1754                                 procedure.execute(t);
1755                             }
1756
1757                         }, listener);
1758
1759 //                        newGraph.waitAsync(request);
1760
1761                     } else {
1762
1763 //                        newGraph.state.barrier.inc();
1764
1765                         request.register(newGraph, new Listener<T>() {
1766
1767                             @Override
1768                             public void exception(Throwable t) {
1769                                 if(throwable != null) throwable.set(t);
1770                                 procedure.exception(t);
1771 //                                newGraph.state.barrier.dec();
1772                             }
1773
1774                             @Override
1775                             public void execute(T t) {
1776                                 if(result != null) result.set(t);
1777                                 procedure.execute(t);
1778 //                                newGraph.state.barrier.dec();
1779                             }
1780
1781                             @Override
1782                             public boolean isDisposed() {
1783                                 return true;
1784                             }
1785
1786                         });
1787
1788 //                        newGraph.waitAsync(request);
1789
1790                     }
1791
1792                 } finally {
1793
1794                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1795
1796                 }
1797
1798             }
1799
1800         });
1801
1802     }
1803
1804
1805     @Override
1806     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1807
1808         scheduleRequest(request, procedure, null, null, null);
1809
1810     }
1811
1812     @Override
1813     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1814         assertNotSession();
1815         Semaphore notify = new Semaphore(0);
1816         DataContainer<Throwable> container = new DataContainer<Throwable>();
1817         DataContainer<T> result = new DataContainer<T>();
1818         scheduleRequest(request, procedure, notify, container, result);
1819         acquire(notify, request);
1820         Throwable throwable = container.get();
1821         if(throwable != null) {
1822             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1823             else throw new DatabaseException("Unexpected exception", throwable);
1824         }
1825         return result.get();
1826     }
1827
1828     @Override
1829     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1830         assertNotSession();
1831         Semaphore notify = new Semaphore(0);
1832         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1833         final DataContainer<T> resultContainer = new DataContainer<T>();
1834         scheduleRequest(request, new AsyncProcedure<T>() {
1835             @Override
1836             public void exception(AsyncReadGraph graph, Throwable throwable) {
1837                 exceptionContainer.set(throwable);
1838                 procedure.exception(graph, throwable);
1839             }
1840             @Override
1841             public void execute(AsyncReadGraph graph, T result) {
1842                 resultContainer.set(result);
1843                 procedure.execute(graph, result);
1844             }
1845         }, getListenerBase(procedure), notify);
1846         acquire(notify, request, procedure);
1847         Throwable throwable = exceptionContainer.get();
1848         if (throwable != null) {
1849             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1850             else throw new DatabaseException("Unexpected exception", throwable);
1851         }
1852         return resultContainer.get();
1853     }
1854
1855
1856     @Override
1857     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1858         assertNotSession();
1859         Semaphore notify = new Semaphore(0);
1860         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1861         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1862             @Override
1863             public void exception(AsyncReadGraph graph, Throwable throwable) {
1864                 exceptionContainer.set(throwable);
1865                 procedure.exception(graph, throwable);
1866             }
1867             @Override
1868             public void execute(AsyncReadGraph graph, T result) {
1869                 procedure.execute(graph, result);
1870             }
1871             @Override
1872             public void finished(AsyncReadGraph graph) {
1873                 procedure.finished(graph);
1874             }
1875         }, notify);
1876         acquire(notify, request, procedure);
1877         Throwable throwable = exceptionContainer.get();
1878         if (throwable != null) {
1879             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1880             else throw new DatabaseException("Unexpected exception", throwable);
1881         }
1882         // TODO: implement return value
1883         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1884         return null;
1885     }
1886
1887     @Override
1888     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1889         return syncRequest(request, new ProcedureAdapter<T>());
1890
1891
1892 //        assert(request != null);
1893 //
1894 //        final DataContainer<T> result = new DataContainer<T>();
1895 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1896 //
1897 //        syncRequest(request, new Procedure<T>() {
1898 //
1899 //            @Override
1900 //            public void execute(T t) {
1901 //                result.set(t);
1902 //            }
1903 //
1904 //            @Override
1905 //            public void exception(Throwable t) {
1906 //                exception.set(t);
1907 //            }
1908 //
1909 //        });
1910 //
1911 //        Throwable t = exception.get();
1912 //        if(t != null) {
1913 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1914 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1915 //        }
1916 //
1917 //        return result.get();
1918
1919     }
1920
1921     @Override
1922     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1923         return syncRequest(request, (Procedure<T>)procedure);
1924     }
1925
1926
1927 //    @Override
1928 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1929 //        assertNotSession();
1930 //        Semaphore notify = new Semaphore(0);
1931 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1932 //        DataContainer<T> result = new DataContainer<T>();
1933 //        scheduleRequest(request, procedure, notify, container, result);
1934 //        acquire(notify, request);
1935 //        Throwable throwable = container.get();
1936 //        if(throwable != null) {
1937 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1938 //            else throw new DatabaseException("Unexpected exception", throwable);
1939 //        }
1940 //        return result.get();
1941 //    }
1942
1943
1944     @Override
1945     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1946         assertNotSession();
1947         Semaphore notify = new Semaphore(0);
1948         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1949         final DataContainer<T> result = new DataContainer<T>();
1950         scheduleRequest(request, procedure, notify, container, result);
1951         acquire(notify, request);
1952         Throwable throwable = container.get();
1953         if (throwable != null) {
1954             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1955             else throw new DatabaseException("Unexpected exception", throwable);
1956         }
1957         return result.get();
1958     }
1959
1960     @Override
1961     public void syncRequest(Write request) throws DatabaseException  {
1962         assertNotSession();
1963         assertAlive();
1964         Semaphore notify = new Semaphore(0);
1965         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1966         scheduleRequest(request, new Callback<DatabaseException>() {
1967
1968             @Override
1969             public void run(DatabaseException e) {
1970               exception.set(e);
1971             }
1972
1973         }, notify);
1974         acquire(notify, request);
1975         if(exception.get() != null) throw exception.get();
1976     }
1977
1978     @Override
1979     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1980         assertNotSession();
1981         Semaphore notify = new Semaphore(0);
1982         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1983         final DataContainer<T> result = new DataContainer<T>();
1984         scheduleRequest(request, new Procedure<T>() {
1985
1986             @Override
1987             public void exception(Throwable t) {
1988               exception.set(t);
1989             }
1990
1991             @Override
1992             public void execute(T t) {
1993               result.set(t);
1994             }
1995
1996         }, notify);
1997         acquire(notify, request);
1998         if(exception.get() != null) {
1999             Throwable t = exception.get();
2000             if(t instanceof DatabaseException) throw (DatabaseException)t;
2001             else throw new DatabaseException(t);
2002         }
2003         return result.get();
2004     }
2005
2006     @Override
2007     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2008         assertNotSession();
2009         Semaphore notify = new Semaphore(0);
2010         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2011         final DataContainer<T> result = new DataContainer<T>();
2012         scheduleRequest(request, new Procedure<T>() {
2013
2014             @Override
2015             public void exception(Throwable t) {
2016               exception.set(t);
2017             }
2018
2019             @Override
2020             public void execute(T t) {
2021               result.set(t);
2022             }
2023
2024         }, notify);
2025         acquire(notify, request);
2026         if(exception.get() != null) {
2027             Throwable t = exception.get();
2028             if(t instanceof DatabaseException) throw (DatabaseException)t;
2029             else throw new DatabaseException(t);
2030         }
2031         return result.get();
2032     }
2033
2034     @Override
2035     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2036         assertNotSession();
2037         Semaphore notify = new Semaphore(0);
2038         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2039         final DataContainer<T> result = new DataContainer<T>();
2040         scheduleRequest(request, new Procedure<T>() {
2041
2042             @Override
2043             public void exception(Throwable t) {
2044               exception.set(t);
2045             }
2046
2047             @Override
2048             public void execute(T t) {
2049               result.set(t);
2050             }
2051
2052         }, notify);
2053         acquire(notify, request);
2054         if(exception.get() != null) {
2055             Throwable t = exception.get();
2056             if(t instanceof DatabaseException) throw (DatabaseException)t;
2057             else throw new DatabaseException(t);
2058         }
2059         return result.get();
2060     }
2061
2062     @Override
2063     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2064         assertNotSession();
2065         Semaphore notify = new Semaphore(0);
2066         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2067         scheduleRequest(request, new Callback<DatabaseException>() {
2068             @Override
2069             public void run(DatabaseException e) {
2070                 exception.set(e);
2071             }
2072         }, notify);
2073         acquire(notify, request);
2074         if(exception.get() != null) throw exception.get();
2075     }
2076
2077     @Override
2078     public void syncRequest(WriteOnly request) throws DatabaseException  {
2079         assertNotSession();
2080         assertAlive();
2081         Semaphore notify = new Semaphore(0);
2082         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2083         scheduleRequest(request, new Callback<DatabaseException>() {
2084             @Override
2085             public void run(DatabaseException e) {
2086                 exception.set(e);
2087             }
2088         }, notify);
2089         acquire(notify, request);
2090         if(exception.get() != null) throw exception.get();
2091     }
2092
2093     /*
2094      *
2095      * GraphRequestProcessor interface
2096      */
2097
2098     @Override
2099     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2100
2101         scheduleRequest(request, procedure, null, null);
2102
2103     }
2104
2105     @Override
2106     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2107
2108         scheduleRequest(request, procedure, null);
2109
2110     }
2111
2112     @Override
2113     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2114
2115         scheduleRequest(request, procedure, null, null, null);
2116
2117     }
2118
2119     @Override
2120     public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
2121
2122         scheduleRequest(request, callback, null);
2123
2124     }
2125
2126     @Override
2127     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2128
2129         scheduleRequest(request, procedure, null);
2130
2131     }
2132
2133     @Override
2134     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2135
2136         scheduleRequest(request, procedure, null);
2137
2138     }
2139
2140     @Override
2141     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2142
2143         scheduleRequest(request, procedure, null);
2144
2145     }
2146
2147     @Override
2148     public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
2149
2150         scheduleRequest(request, callback, null);
2151
2152     }
2153
2154     @Override
2155     public void asyncRequest(final Write r) {
2156         asyncRequest(r, null);
2157     }
2158
2159     @Override
2160     public void asyncRequest(final DelayedWrite r) {
2161         asyncRequest(r, null);
2162     }
2163
2164     @Override
2165     public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
2166
2167         scheduleRequest(request, callback, null);
2168
2169     }
2170
2171     @Override
2172     public void asyncRequest(final WriteOnly request) {
2173
2174         asyncRequest(request, null);
2175
2176     }
2177
2178     @Override
2179     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2180         r.request(this, procedure);
2181     }
2182
2183     @Override
2184     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2185         r.request(this, procedure);
2186     }
2187
2188     @Override
2189     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2190         r.request(this, procedure);
2191     }
2192
2193     @Override
2194     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2195         r.request(this, procedure);
2196     }
2197
2198     @Override
2199     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2200         r.request(this, procedure);
2201     }
2202
2203     @Override
2204     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2205         r.request(this, procedure);
2206     }
2207
2208     @Override
2209     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2210         return r.request(this);
2211     }
2212
2213     @Override
2214     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2215         return r.request(this);
2216     }
2217
2218     @Override
2219     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2220         r.request(this, procedure);
2221     }
2222
2223     @Override
2224     public <T> void async(WriteInterface<T> r) {
2225         r.request(this, new ProcedureAdapter<T>());
2226     }
2227
2228     //@Override
2229     public void incAsync() {
2230         state.incAsync();
2231     }
2232
2233     //@Override
2234     public void decAsync() {
2235         state.decAsync();
2236     }
2237
2238     public long getCluster(ResourceImpl resource) {
2239         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2240         return cluster.getClusterId();
2241     }
2242
2243     public long getCluster(int id) {
2244         if (clusterTable == null)
2245             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2246         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2247     }
2248
2249     public ResourceImpl getResource(int id) {
2250         return new ResourceImpl(resourceSupport, id);
2251     }
2252
2253     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2254         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2255         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2256         int key = proxy.getClusterKey();
2257         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2258         return new ResourceImpl(resourceSupport, resourceKey);
2259     }
2260
2261     public ResourceImpl getResource2(int id) {
2262         assert (id != 0);
2263         return new ResourceImpl(resourceSupport, id);
2264     }
2265
2266     final public int getId(ResourceImpl impl) {
2267         return impl.id;
2268     }
2269
2270     public static final Charset UTF8 = Charset.forName("utf-8");
2271
2272
2273
2274
2275     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2276         for(TransientGraph g : support.providers) {
2277             if(g.isPending(subject)) return false;
2278         }
2279         return true;
2280     }
2281
2282     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2283         for(TransientGraph g : support.providers) {
2284             if(g.isPending(subject, predicate)) return false;
2285         }
2286         return true;
2287     }
2288
2289     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
2290
2291         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2292
2293             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2294
2295             @Override
2296             public void run(ReadGraphImpl graph) {
2297                 if(ready.decrementAndGet() == 0) {
2298                     runnable.run(graph);
2299                 }
2300             }
2301
2302         };
2303
2304         for(TransientGraph g : support.providers) {
2305             if(g.isPending(subject)) {
2306                 try {
2307                     g.load(graph, subject, composite);
2308                 } catch (DatabaseException e) {
2309                     e.printStackTrace();
2310                 }
2311             } else {
2312                 composite.run(graph);
2313             }
2314         }
2315
2316         composite.run(graph);
2317
2318     }
2319
2320     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
2321
2322         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2323
2324             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2325
2326             @Override
2327             public void run(ReadGraphImpl graph) {
2328                 if(ready.decrementAndGet() == 0) {
2329                     runnable.run(graph);
2330                 }
2331             }
2332
2333         };
2334
2335         for(TransientGraph g : support.providers) {
2336             if(g.isPending(subject, predicate)) {
2337                 try {
2338                     g.load(graph, subject, predicate, composite);
2339                 } catch (DatabaseException e) {
2340                     e.printStackTrace();
2341                 }
2342             } else {
2343                 composite.run(graph);
2344             }
2345         }
2346
2347         composite.run(graph);
2348
2349     }
2350
2351 //    void dumpHeap() {
2352 //
2353 //        try {
2354 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2355 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2356 //                    "d:/heap" + flushCounter + ".txt", true);
2357 //        } catch (IOException e) {
2358 //            e.printStackTrace();
2359 //        }
2360 //
2361 //    }
2362
2363     void fireReactionsToSynchronize(ChangeSet cs) {
2364
2365         // Do not fire empty events
2366         if (cs.isEmpty())
2367             return;
2368
2369         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2370 //        g.state.barrier.inc();
2371
2372         try {
2373
2374             if (!cs.isEmpty()) {
2375                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2376                 for (ChangeListener l : changeListeners2) {
2377                     try {
2378                         l.graphChanged(e2);
2379                     } catch (Exception ex) {
2380                         ex.printStackTrace();
2381                     }
2382                 }
2383             }
2384
2385         } finally {
2386
2387 //            g.state.barrier.dec();
2388 //            g.waitAsync(null);
2389
2390         }
2391
2392     }
2393
2394     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2395         try {
2396
2397             // Do not fire empty events
2398             if (cs2.isEmpty())
2399                 return;
2400
2401 //            graph.restart();
2402 //            graph.state.barrier.inc();
2403
2404             try {
2405
2406                 if (!cs2.isEmpty()) {
2407
2408                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2409                     for (ChangeListener l : changeListeners2) {
2410                         try {
2411                             l.graphChanged(e2);
2412 //                            System.out.println("changelistener " + l);
2413                         } catch (Exception ex) {
2414                             ex.printStackTrace();
2415                         }
2416                     }
2417
2418                 }
2419
2420             } finally {
2421
2422 //                graph.state.barrier.dec();
2423 //                graph.waitAsync(null);
2424
2425             }
2426         } catch (Throwable t) {
2427             t.printStackTrace();
2428
2429         }
2430     }
2431     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2432
2433         try {
2434
2435             // Do not fire empty events
2436             if (cs2.isEmpty())
2437                 return;
2438
2439             // Do not fire on virtual requests
2440             if(graph.getProvider() != null)
2441                 return;
2442
2443             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2444
2445             try {
2446
2447                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2448                 for (ChangeListener l : metadataListeners) {
2449                     try {
2450                         l.graphChanged(e2);
2451                     } catch (Throwable ex) {
2452                         ex.printStackTrace();
2453                     }
2454                 }
2455
2456             } finally {
2457
2458             }
2459
2460         } catch (Throwable t) {
2461             t.printStackTrace();
2462         }
2463
2464     }
2465
2466
2467     /*
2468      * (non-Javadoc)
2469      *
2470      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2471      */
2472     @Override
2473     public <T> T getService(Class<T> api) {
2474         T t = peekService(api);
2475         if (t == null) {
2476             if (state.isClosed())
2477                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2478             throw new ServiceNotFoundException(this, api);
2479         }
2480         return t;
2481     }
2482
2483     /**
2484      * @return
2485      */
2486     protected abstract ServerInformation getCachedServerInformation();
2487
2488         private Class<?> serviceKey1 = null;
2489         private Class<?> serviceKey2 = null;
2490         private Object service1 = null;
2491         private Object service2 = null;
2492
2493     /*
2494      * (non-Javadoc)
2495      *
2496      * @see
2497      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2498      */
2499     @SuppressWarnings("unchecked")
2500     @Override
2501     public synchronized <T> T peekService(Class<T> api) {
2502
2503                 if(serviceKey1 == api) {
2504                         return (T)service1;
2505                 } else if (serviceKey2 == api) {
2506                         // Promote this key
2507                         Object result = service2;
2508                         service2 = service1;
2509                         serviceKey2 = serviceKey1;
2510                         service1 = result;
2511                         serviceKey1 = api;
2512                         return (T)result;
2513                 }
2514
2515         if (Layer0.class == api)
2516                 return (T) L0;
2517         if (ServerInformation.class == api)
2518             return (T) getCachedServerInformation();
2519         else if (WriteGraphImpl.class == api)
2520             return (T) writeState.getGraph();
2521         else if (ClusterBuilder.class == api)
2522             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2523         else if (ClusterBuilderFactory.class == api)
2524             return (T)new ClusterBuilderFactoryImpl(this);
2525
2526                 service2 = service1;
2527                 serviceKey2 = serviceKey1;
2528
2529         service1 = serviceLocator.peekService(api);
2530         serviceKey1 = api;
2531
2532         return (T)service1;
2533
2534     }
2535
2536     /*
2537      * (non-Javadoc)
2538      *
2539      * @see
2540      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2541      */
2542     @Override
2543     public boolean hasService(Class<?> api) {
2544         return serviceLocator.hasService(api);
2545     }
2546
2547     /**
2548      * @param api the api that must be implemented by the specified service
2549      * @param service the service implementation
2550      */
2551     @Override
2552     public <T> void registerService(Class<T> api, T service) {
2553         if(Layer0.class == api) {
2554                 L0 = (Layer0)service;
2555                 return;
2556         }
2557         serviceLocator.registerService(api, service);
2558         if (TransactionPolicySupport.class == api) {
2559             transactionPolicy = (TransactionPolicySupport)service;
2560             state.resetTransactionPolicy();
2561         }
2562         if (api == serviceKey1)
2563                 service1 = service;
2564         else if (api == serviceKey2)
2565                 service2 = service;
2566     }
2567
2568     // ----------------
2569     // SessionMonitor
2570     // ----------------
2571
2572     void fireSessionVariableChange(String variable) {
2573         for (MonitorHandler h : monitorHandlers) {
2574             MonitorContext ctx = monitorContexts.getLeft(h);
2575             assert ctx != null;
2576             // SafeRunner functionality repeated here to avoid dependency.
2577             try {
2578                 h.valuesChanged(ctx);
2579             } catch (Exception e) {
2580                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2581             } catch (LinkageError e) {
2582                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2583             }
2584         }
2585     }
2586
2587
2588     class ResourceSerializerImpl implements ResourceSerializer {
2589
2590         public long createRandomAccessId(int id) throws DatabaseException {
2591             if(id < 0)
2592                 return id;
2593             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2594             long cluster = getCluster(id);
2595             if (0 == cluster)
2596                 return 0; // Better to return 0 then invalid id.
2597             long result = ClusterTraitsBase.createResourceId(cluster, index);
2598             return result;
2599         }
2600
2601         @Override
2602         public long getRandomAccessId(Resource resource) throws DatabaseException {
2603
2604             ResourceImpl resourceImpl = (ResourceImpl) resource;
2605             return createRandomAccessId(resourceImpl.id);
2606
2607         }
2608
2609         @Override
2610         public int getTransientId(Resource resource) throws DatabaseException {
2611
2612             ResourceImpl resourceImpl = (ResourceImpl) resource;
2613             return resourceImpl.id;
2614
2615         }
2616
2617         @Override
2618         public String createRandomAccessId(Resource resource)
2619         throws InvalidResourceReferenceException {
2620
2621             if(resource == null) throw new IllegalArgumentException();
2622
2623             ResourceImpl resourceImpl = (ResourceImpl) resource;
2624
2625             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2626
2627             int r;
2628             try {
2629                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2630             } catch (DatabaseException e1) {
2631                 throw new InvalidResourceReferenceException(e1);
2632             }
2633             try {
2634                 // Serialize as '<resource index>_<cluster id>'
2635                 return "" + r + "_" + getCluster(resourceImpl);
2636             } catch (Throwable e) {
2637                 e.printStackTrace();
2638                 throw new InvalidResourceReferenceException(e);
2639             } finally {
2640             }
2641         }
2642
2643         public int getTransientId(long serialized) throws DatabaseException {
2644             if (serialized <= 0)
2645                 return (int)serialized;
2646             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2647             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2648             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2649             if (cluster == null)
2650                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2651             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2652             return key;
2653         }
2654
2655         @Override
2656         public Resource getResource(long randomAccessId) throws DatabaseException {
2657             return getResourceByKey(getTransientId(randomAccessId));
2658         }
2659
2660         @Override
2661         public Resource getResource(int transientId) throws DatabaseException {
2662             return getResourceByKey(transientId);
2663         }