c286f02c7d4434e69fe31fc0978ef057e16764d8
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.eclipse.core.runtime.Platform;
32 import org.simantics.databoard.Bindings;
33 import org.simantics.db.AsyncReadGraph;
34 import org.simantics.db.ChangeSet;
35 import org.simantics.db.DevelopmentKeys;
36 import org.simantics.db.ExternalValueSupport;
37 import org.simantics.db.Metadata;
38 import org.simantics.db.MonitorContext;
39 import org.simantics.db.MonitorHandler;
40 import org.simantics.db.Resource;
41 import org.simantics.db.ResourceSerializer;
42 import org.simantics.db.Session;
43 import org.simantics.db.SessionManager;
44 import org.simantics.db.SessionVariables;
45 import org.simantics.db.VirtualGraph;
46 import org.simantics.db.WriteGraph;
47 import org.simantics.db.authentication.UserAuthenticationAgent;
48 import org.simantics.db.authentication.UserAuthenticator;
49 import org.simantics.db.common.Indexing;
50 import org.simantics.db.common.TransactionPolicyRelease;
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
63 import org.simantics.db.common.utils.Logger;
64 import org.simantics.db.event.ChangeEvent;
65 import org.simantics.db.event.ChangeListener;
66 import org.simantics.db.event.SessionEventListener;
67 import org.simantics.db.exception.CancelTransactionException;
68 import org.simantics.db.exception.ClusterSetExistException;
69 import org.simantics.db.exception.DatabaseException;
70 import org.simantics.db.exception.ImmutableException;
71 import org.simantics.db.exception.InvalidResourceReferenceException;
72 import org.simantics.db.exception.ResourceNotFoundException;
73 import org.simantics.db.exception.RuntimeDatabaseException;
74 import org.simantics.db.exception.ServiceException;
75 import org.simantics.db.exception.ServiceNotFoundException;
76 import org.simantics.db.impl.ClusterBase;
77 import org.simantics.db.impl.ClusterI;
78 import org.simantics.db.impl.ClusterTraitsBase;
79 import org.simantics.db.impl.ClusterTranslator;
80 import org.simantics.db.impl.ResourceImpl;
81 import org.simantics.db.impl.TransientGraph;
82 import org.simantics.db.impl.VirtualGraphImpl;
83 import org.simantics.db.impl.graph.DelayedWriteGraph;
84 import org.simantics.db.impl.graph.ReadGraphImpl;
85 import org.simantics.db.impl.graph.WriteGraphImpl;
86 import org.simantics.db.impl.graph.WriteSupport;
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
90 import org.simantics.db.impl.query.QueryProcessor;
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
93 import org.simantics.db.impl.service.QueryDebug;
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
96 import org.simantics.db.procedure.AsyncListener;
97 import org.simantics.db.procedure.AsyncMultiListener;
98 import org.simantics.db.procedure.AsyncMultiProcedure;
99 import org.simantics.db.procedure.AsyncProcedure;
100 import org.simantics.db.procedure.Listener;
101 import org.simantics.db.procedure.ListenerBase;
102 import org.simantics.db.procedure.MultiListener;
103 import org.simantics.db.procedure.MultiProcedure;
104 import org.simantics.db.procedure.Procedure;
105 import org.simantics.db.procedure.SyncListener;
106 import org.simantics.db.procedure.SyncMultiListener;
107 import org.simantics.db.procedure.SyncMultiProcedure;
108 import org.simantics.db.procedure.SyncProcedure;
109 import org.simantics.db.procore.cluster.ClusterImpl;
110 import org.simantics.db.procore.cluster.ClusterTraits;
111 import org.simantics.db.procore.protocol.Constants;
112 import org.simantics.db.procore.protocol.DebugPolicy;
113 import org.simantics.db.request.AsyncMultiRead;
114 import org.simantics.db.request.AsyncRead;
115 import org.simantics.db.request.DelayedWrite;
116 import org.simantics.db.request.DelayedWriteResult;
117 import org.simantics.db.request.ExternalRead;
118 import org.simantics.db.request.MultiRead;
119 import org.simantics.db.request.Read;
120 import org.simantics.db.request.ReadInterface;
121 import org.simantics.db.request.Write;
122 import org.simantics.db.request.WriteInterface;
123 import org.simantics.db.request.WriteOnly;
124 import org.simantics.db.request.WriteOnlyResult;
125 import org.simantics.db.request.WriteResult;
126 import org.simantics.db.request.WriteTraits;
127 import org.simantics.db.service.ByteReader;
128 import org.simantics.db.service.ClusterBuilder;
129 import org.simantics.db.service.ClusterBuilderFactory;
130 import org.simantics.db.service.ClusterControl;
131 import org.simantics.db.service.ClusterSetsSupport;
132 import org.simantics.db.service.ClusterUID;
133 import org.simantics.db.service.ClusteringSupport;
134 import org.simantics.db.service.CollectionSupport;
135 import org.simantics.db.service.DebugSupport;
136 import org.simantics.db.service.DirectQuerySupport;
137 import org.simantics.db.service.GraphChangeListenerSupport;
138 import org.simantics.db.service.InitSupport;
139 import org.simantics.db.service.LifecycleSupport;
140 import org.simantics.db.service.ManagementSupport;
141 import org.simantics.db.service.QueryControl;
142 import org.simantics.db.service.SerialisationSupport;
143 import org.simantics.db.service.ServerInformation;
144 import org.simantics.db.service.ServiceActivityMonitor;
145 import org.simantics.db.service.SessionEventSupport;
146 import org.simantics.db.service.SessionMonitorSupport;
147 import org.simantics.db.service.SessionUserSupport;
148 import org.simantics.db.service.StatementSupport;
149 import org.simantics.db.service.TransactionPolicySupport;
150 import org.simantics.db.service.TransactionSupport;
151 import org.simantics.db.service.TransferableGraphSupport;
152 import org.simantics.db.service.UndoRedoSupport;
153 import org.simantics.db.service.VirtualGraphSupport;
154 import org.simantics.db.service.XSupport;
155 import org.simantics.layer0.Layer0;
156 import org.simantics.utils.DataContainer;
157 import org.simantics.utils.Development;
158 import org.simantics.utils.datastructures.Callback;
159 import org.simantics.utils.threads.logger.ITask;
160 import org.simantics.utils.threads.logger.ThreadLogger;
161
162 import gnu.trove.procedure.TLongProcedure;
163 import gnu.trove.set.hash.TLongHashSet;
164
165
166 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
167
168     protected static final boolean DEBUG        = false;
169
170     private static final boolean DIAGNOSTICS       = false;
171
172     private TransactionPolicySupport transactionPolicy;
173
174     final private ClusterControl clusterControl;
175
176     final protected BuiltinSupportImpl builtinSupport;
177     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
178     final protected ClusterSetsSupport clusterSetsSupport;
179     final protected LifecycleSupportImpl lifecycleSupport;
180
181     protected QuerySupportImpl querySupport;
182     protected ResourceSupportImpl resourceSupport;
183     protected WriteSupport writeSupport;
184     public ClusterTranslator clusterTranslator;
185
186     boolean dirtyPrimitives = false;
187
188     public static final int SERVICE_MODE_CREATE = 2;
189     public static final int SERVICE_MODE_ALLOW = 1;
190     public int serviceMode = 0;
191     public boolean createdImmutableClusters = false;
192     public TLongHashSet createdClusters = new TLongHashSet();
193
194     private Layer0 L0;
195
196     /**
197      * The service locator maintained by the workbench. These services are
198      * initialized during workbench during the <code>init</code> method.
199      */
200     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
201
202     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
203
204     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
205
206     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
207
208     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
209
210     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
211
212     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
213
214     final protected State                                        state              = new State();
215
216     protected GraphSession                                       graphSession       = null;
217
218     protected SessionManager                                     sessionManagerImpl = null;
219
220     protected UserAuthenticationAgent                            authAgent          = null;
221
222     protected UserAuthenticator                                  authenticator      = null;
223
224     protected Resource                                           user               = null;
225
226     protected ClusterStream                                      clusterStream      = null;
227
228     protected SessionRequestManager                         requestManager = null;
229
230     public ClusterTable                                       clusterTable       = null;
231
232     public QueryProcessor                                     queryProvider2 = null;
233
234     ClientChangesImpl                                  clientChanges      = null;
235
236     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
237
238 //    protected long                                               newClusterId       = Constants.NullClusterId;
239
240     protected int                                                flushCounter       = 0;
241
242     protected boolean                                            writeOnly          = false;
243
244     WriteState<?>                                                writeState = null;
245     WriteStateBase<?>                                            delayedWriteState = null;
246     protected Resource defaultClusterSet = null; // If not null then used for newResource().
247
248     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
249
250 //        if (authAgent == null)
251 //            throw new IllegalArgumentException("null authentication agent");
252
253         File t = StaticSessionProperties.virtualGraphStoragePath;
254         if (null == t)
255             t = new File(".");
256         this.clusterTable = new ClusterTable(this, t);
257         this.builtinSupport = new BuiltinSupportImpl(this);
258         this.sessionManagerImpl = sessionManagerImpl;
259         this.user = null;
260 //        this.authAgent = authAgent;
261
262         serviceLocator.registerService(Session.class, this);
263         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
264         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
265         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
266         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
267         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
268         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
269         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
270         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
271         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
272         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
273         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
274         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
275         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
276         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
277         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
278         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
279         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
280         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
281         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
282         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
283         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
284         ServiceActivityUpdaterForWriteTransactions.register(this);
285
286         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
287         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
288         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
289         this.lifecycleSupport = new LifecycleSupportImpl(this);
290         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
291         this.transactionPolicy = new TransactionPolicyRelease();
292         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
293         this.clusterControl = new ClusterControlImpl(this);
294         serviceLocator.registerService(ClusterControl.class, clusterControl);
295         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
296         this.clusterSetsSupport.setReadDirectory(t.toPath());
297         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
298         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
299     }
300
301     /*
302      *
303      * Session interface
304      */
305
306
307     @Override
308     public Resource getRootLibrary() {
309         return queryProvider2.getRootLibraryResource();
310     }
311
312     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
313         if (!graphSession.dbSession.refreshEnabled())
314             return;
315         try {
316             getClusterTable().refresh(csid, this, clusterUID);
317         } catch (Throwable t) {
318             Logger.defaultLogError("Refesh failed.", t);
319         }
320     }
321
322     static final class TaskHelper {
323         private final String name;
324         private Object result;
325         final Semaphore sema = new Semaphore(0);
326         private Throwable throwable = null;
327         final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
328             @Override
329             public void run(DatabaseException e) {
330                 synchronized (TaskHelper.this) {
331                     throwable = e;
332                 }
333             }
334         };
335         final Procedure<Object> proc = new Procedure<Object>() {
336             @Override
337             public void execute(Object result) {
338                 callback.run(null);
339             }
340             @Override
341             public void exception(Throwable t) {
342                 if (t instanceof DatabaseException)
343                     callback.run((DatabaseException)t);
344                 else
345                     callback.run(new DatabaseException("" + name + "operation failed.", t));
346             }
347         };
348         final WriteTraits writeTraits = new WriteTraits() {};
349         TaskHelper(String name) {
350             this.name = name;
351         }
352         <T> T getResult() {
353             return (T)result;
354         }
355         void setResult(Object result) {
356             this.result = result;
357         }
358 //        Throwable throwableGet() {
359 //            return throwable;
360 //        }
361         synchronized void throwableSet(Throwable t) {
362             throwable = t;
363         }
364 //        void throwableSet(String t) {
365 //            throwable = new InternalException("" + name + " operation failed. " + t);
366 //        }
367         synchronized void throwableCheck()
368         throws DatabaseException {
369             if (null != throwable)
370                 if (throwable instanceof DatabaseException)
371                     throw (DatabaseException)throwable;
372                 else
373                     throw new DatabaseException("Undo operation failed.", throwable);
374         }
375         void throw_(String message)
376         throws DatabaseException {
377             throw new DatabaseException("" + name + " operation failed. " + message);
378         }
379     }
380
381
382
383     private ListenerBase getListenerBase(Object procedure) {
384         if (procedure instanceof ListenerBase)
385             return (ListenerBase) procedure;
386         else
387             return null;
388     }
389
390     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
391         scheduleRequest(request, callback, notify, null);
392     }
393
394     /* (non-Javadoc)
395      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
396      */
397     @Override
398     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
399
400         assert (request != null);
401
402         if(Development.DEVELOPMENT) {
403             try {
404             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
405                 System.err.println("schedule write '" + request + "'");
406             } catch (Throwable t) {
407                 Logger.defaultLogError(t);
408             }
409         }
410
411         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
412
413         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
414
415             @Override
416             public void run(int thread) {
417
418                 if(Development.DEVELOPMENT) {
419                     try {
420                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
421                         System.err.println("perform write '" + request + "'");
422                     } catch (Throwable t) {
423                         Logger.defaultLogError(t);
424                     }
425                 }
426
427                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
428
429                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
430
431                 try {
432
433                     flushCounter = 0;
434                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
435
436                     VirtualGraph vg = getProvider(request.getProvider());
437
438                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
439                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
440
441                         @Override
442                         public void execute(Object result) {
443                             if(callback != null) callback.run(null);
444                         }
445
446                         @Override
447                         public void exception(Throwable t) {
448                             if(callback != null) callback.run((DatabaseException)t);
449                         }
450
451                     });
452
453                     assert (null != writer);
454 //                    writer.state.barrier.inc();
455
456                     try {
457                         request.perform(writer);
458                         assert (null != writer);
459                     } catch (Throwable t) {
460                         if (!(t instanceof CancelTransactionException))
461                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
462                         writeState.except(t);
463                     } finally {
464 //                        writer.state.barrier.dec();
465 //                        writer.waitAsync(request);
466                     }
467
468
469                     assert(!queryProvider2.dirty);
470
471                 } catch (Throwable e) {
472
473                     // Log it first, just to be safe that the error is always logged.
474                     if (!(e instanceof CancelTransactionException))
475                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
476
477 //                    writeState.getGraph().state.barrier.dec();
478 //                    writeState.getGraph().waitAsync(request);
479
480                     writeState.except(e);
481
482 //                    try {
483 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
484 //                        // All we can do here is to log those, can't really pass them anywhere.
485 //                        if (callback != null) {
486 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
487 //                            else callback.run(new DatabaseException(e));
488 //                        }
489 //                    } catch (Throwable e2) {
490 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
491 //                    }
492 //                    boolean empty = clusterStream.reallyFlush();
493 //                    int callerThread = -1;
494 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
495 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
496 //                    try {
497 //                        // Send all local changes to server so it can calculate correct reverse change set.
498 //                        // This will call clusterStream.accept().
499 //                        state.cancelCommit(context, clusterStream);
500 //                        if (!empty) {
501 //                            if (!context.isOk()) // this is a blocking operation
502 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
503 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
504 //                        }
505 //                        state.cancelCommit2(context, clusterStream);
506 //                    } catch (DatabaseException e3) {
507 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
508 //                    } finally {
509 //                        clusterStream.setOff(false);
510 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
511 //                    }
512
513                 } finally {
514
515                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
516
517                 }
518
519                 task.finish();
520
521                 if(Development.DEVELOPMENT) {
522                     try {
523                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
524                         System.err.println("finish write '" + request + "'");
525                     } catch (Throwable t) {
526                         Logger.defaultLogError(t);
527                     }
528                 }
529
530             }
531
532         }, combine);
533
534     }
535
536     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
537         scheduleRequest(request, procedure, notify, null);
538     }
539
540     /* (non-Javadoc)
541      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
542      */
543     @Override
544     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
545
546         assert (request != null);
547
548         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
549
550         requestManager.scheduleWrite(new SessionTask(request, thread) {
551
552             @Override
553             public void run(int thread) {
554
555                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
556
557                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
558
559                 flushCounter = 0;
560                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
561
562                 VirtualGraph vg = getProvider(request.getProvider());
563                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
564
565                 try {
566                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
567                     writeState = writeStateT;
568
569                     assert (null != writer);
570 //                    writer.state.barrier.inc();
571                     writeStateT.setResult(request.perform(writer));
572                     assert (null != writer);
573
574 //                    writer.state.barrier.dec();
575 //                    writer.waitAsync(null);
576
577                 } catch (Throwable e) {
578
579 //                    writer.state.barrier.dec();
580 //                    writer.waitAsync(null);
581
582                     writeState.except(e);
583
584 //                  state.stopWriteTransaction(clusterStream);
585 //
586 //              } catch (Throwable e) {
587 //                  // Log it first, just to be safe that the error is always logged.
588 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
589 //
590 //                  try {
591 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
592 //                      // All we can do here is to log those, can't really pass them anywhere.
593 //                      if (procedure != null) {
594 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
595 //                          else procedure.exception(new DatabaseException(e));
596 //                      }
597 //                  } catch (Throwable e2) {
598 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
599 //                  }
600 //
601 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
602 //
603 //                  state.stopWriteTransaction(clusterStream);
604
605                 } finally {
606                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
607                 }
608
609 //              if(notify != null) notify.release();
610
611                 task.finish();
612
613             }
614
615         }, combine);
616
617     }
618
619     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
620         scheduleRequest(request, callback, notify, null);
621     }
622
623     /* (non-Javadoc)
624      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
625      */
626     @Override
627     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
628
629         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
630
631         assert (request != null);
632
633         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
634
635         requestManager.scheduleWrite(new SessionTask(request, thread) {
636
637             @Override
638             public void run(int thread) {
639                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
640
641                 Procedure<Object> stateProcedure = new Procedure<Object>() {
642                     @Override
643                     public void execute(Object result) {
644                         if (callback != null)
645                             callback.run(null);
646                     }
647                     @Override
648                     public void exception(Throwable t) {
649                         if (callback != null) {
650                             if (t instanceof DatabaseException) callback.run((DatabaseException) t);
651                             else callback.run(new DatabaseException(t));
652                         } else
653                             Logger.defaultLogError("Unhandled exception", t);
654                     }
655                 };
656
657                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
658                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
659                 DelayedWriteGraph dwg = null;
660 //                newGraph.state.barrier.inc();
661
662                 try {
663                     dwg = new DelayedWriteGraph(newGraph);
664                     request.perform(dwg);
665                 } catch (Throwable e) {
666                     delayedWriteState.except(e);
667                     total.finish();
668                     dwg.close();
669                     return;
670                 } finally {
671 //                    newGraph.state.barrier.dec();
672 //                    newGraph.waitAsync(request);
673                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
674                 }
675
676                 delayedWriteState = null;
677
678                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
679                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
680
681                 flushCounter = 0;
682                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
683
684                 acquireWriteOnly();
685
686                 VirtualGraph vg = getProvider(request.getProvider());
687                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
688
689                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
690
691                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
692
693                 assert (null != writer);
694 //                writer.state.barrier.inc();
695
696                 try {
697                     // Cannot cancel
698                     dwg.commit(writer, request);
699
700                         if(defaultClusterSet != null) {
701                                 XSupport xs = getService(XSupport.class);
702                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
703                                 ClusteringSupport cs = getService(ClusteringSupport.class);
704                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
705                         }
706
707                     // This makes clusters available from server
708                     clusterStream.reallyFlush();
709
710                     releaseWriteOnly(writer);
711                     handleUpdatesAndMetadata(writer);
712
713                 } catch (ServiceException e) {
714 //                    writer.state.barrier.dec();
715 //                    writer.waitAsync(null);
716
717                     // These shall be requested from server
718                     clusterTable.removeWriteOnlyClusters();
719                     // This makes clusters available from server
720                     clusterStream.reallyFlush();
721
722                     releaseWriteOnly(writer);
723                     writeState.except(e);
724                 } finally {
725                     // Debugging & Profiling
726                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
727                     task2.finish();
728                     total.finish();
729                 }
730             }
731
732         }, combine);
733
734     }
735
736     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
737         scheduleRequest(request, procedure, notify, null);
738     }
739
740     /* (non-Javadoc)
741      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
742      */
743     @Override
744     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
745         throw new Error("Not implemented");
746     }
747
748     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
749         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
750         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
751             createdClusters.add(cluster.clusterId);
752         }
753         return cluster;
754     }
755
756     class WriteOnlySupport implements WriteSupport {
757
758         ClusterStream stream;
759         ClusterImpl currentCluster;
760
761         public WriteOnlySupport() {
762             this.stream = clusterStream;
763         }
764
765         @Override
766         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
767             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
768         }
769
770         @Override
771         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
772
773             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
774
775             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
776                 if(s != queryProvider2.getRootLibrary())
777                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
778
779             try {
780                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
781             } catch (DatabaseException e) {
782                 Logger.defaultLogError(e);
783                 //throw e;
784             }
785
786             clientChanges.invalidate(s);
787
788             if (cluster.isWriteOnly())
789                 return;
790             queryProvider2.updateStatements(s, p);
791
792         }
793
794         @Override
795         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
796             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
797         }
798
799         @Override
800         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
801
802             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
803             try {
804                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
805             } catch (DatabaseException e) {
806                 Logger.defaultLogError(e);
807             }
808
809             clientChanges.invalidate(rid);
810
811             if (cluster.isWriteOnly())
812                 return;
813             queryProvider2.updateValue(rid);
814
815         }
816
817         @Override
818         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
819
820             if(amount < 65536) {
821                 claimValue(provider,resource, reader.readBytes(null, amount));
822                 return;
823             }
824
825             byte[] bytes = new byte[65536];
826
827             int rid = ((ResourceImpl)resource).id;
828             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
829             try {
830                 int left = amount;
831                 while(left > 0) {
832                     int block = Math.min(left, 65536);
833                     reader.readBytes(bytes, block);
834                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
835                     left -= block;
836                 }
837             } catch (DatabaseException e) {
838                 Logger.defaultLogError(e);
839             }
840
841             clientChanges.invalidate(rid);
842
843             if (cluster.isWriteOnly())
844                 return;
845             queryProvider2.updateValue(rid);
846
847         }
848
849         private void maintainCluster(ClusterImpl before, ClusterI after_) {
850             if(after_ != null && after_ != before) {
851                 ClusterImpl after = (ClusterImpl)after_;
852                 if(currentCluster == before) currentCluster = after;
853                 clusterTable.replaceCluster(after);
854             }
855         }
856
857         public int createResourceKey(int foreignCounter) throws DatabaseException {
858             if(currentCluster == null)
859                 currentCluster = getNewResourceCluster();
860             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
861                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
862                 newCluster.foreignLookup = new byte[foreignCounter];
863                 currentCluster = newCluster;
864                 if (DEBUG)
865                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
866             }
867             return currentCluster.createResource(clusterTranslator);
868         }
869
870         @Override
871         public Resource createResource(VirtualGraph provider) throws DatabaseException {
872             if(currentCluster == null) {
873                 if (null != defaultClusterSet) {
874                         ResourceImpl result = getNewResource(defaultClusterSet);
875                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
876                     return result;
877                 } else {
878                     currentCluster = getNewResourceCluster();
879                 }
880             }
881             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
882                 if (null != defaultClusterSet) {
883                         ResourceImpl result = getNewResource(defaultClusterSet);
884                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
885                     return result;
886                 } else {
887                     currentCluster = getNewResourceCluster();
888                 }
889             }
890             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
891         }
892
893         @Override
894         public Resource createResource(VirtualGraph provider, long clusterId)
895         throws DatabaseException {
896             return getNewResource(clusterId);
897         }
898
899         @Override
900         public Resource createResource(VirtualGraph provider, Resource clusterSet)
901         throws DatabaseException {
902             return getNewResource(clusterSet);
903         }
904
905         @Override
906         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
907                 throws DatabaseException {
908             getNewClusterSet(clusterSet);
909         }
910
911         @Override
912         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
913         throws ServiceException {
914             return containsClusterSet(clusterSet);
915         }
916
917         public void selectCluster(long cluster) {
918                 currentCluster = clusterTable.getClusterByClusterId(cluster);
919                 long setResourceId = clusterSetsSupport.getSet(cluster);
920                 clusterSetsSupport.put(setResourceId, cluster);
921         }
922
923         @Override
924         public Resource setDefaultClusterSet(Resource clusterSet)
925         throws ServiceException {
926                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
927                 if(clusterSet != null) {
928                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
929                         currentCluster = clusterTable.getClusterByClusterId(id);
930                         return result;
931                 } else {
932                         currentCluster = null;
933                         return null;
934                 }
935         }
936
937         @Override
938         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
939
940                  provider = getProvider(provider);
941              if (null == provider) {
942                  int key = ((ResourceImpl)resource).id;
943
944
945
946                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
947 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
948 //                      if(key != queryProvider2.getRootLibrary())
949 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
950
951 //                 try {
952 //                     cluster.removeValue(key, clusterTranslator);
953 //                 } catch (DatabaseException e) {
954 //                     Logger.defaultLogError(e);
955 //                     return;
956 //                 }
957
958                  clusterTable.writeOnlyInvalidate(cluster);
959
960                  try {
961                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
962                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
963                          clusterTranslator.removeValue(cluster);
964                  } catch (DatabaseException e) {
965                          Logger.defaultLogError(e);
966                  }
967
968                  queryProvider2.invalidateResource(key);
969                  clientChanges.invalidate(key);
970
971              } else {
972                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
973                  queryProvider2.updateValue(querySupport.getId(resource));
974                  clientChanges.claimValue(resource);
975              }
976
977
978         }
979
980         @Override
981         public void flush(boolean intermediate) {
982             throw new UnsupportedOperationException();
983         }
984
985         @Override
986         public void flushCluster() {
987             clusterTable.flushCluster(graphSession);
988             if(defaultClusterSet != null) {
989                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
990             }
991             currentCluster = null;
992         }
993
994         @Override
995         public void flushCluster(Resource r) {
996             throw new UnsupportedOperationException("flushCluster resource " + r);
997         }
998
999         @Override
1000         public void gc() {
1001         }
1002
1003         @Override
1004         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1005                 Resource object) {
1006
1007                 int s = ((ResourceImpl)subject).id;
1008                 int p = ((ResourceImpl)predicate).id;
1009                 int o = ((ResourceImpl)object).id;
1010
1011                 provider = getProvider(provider);
1012                 if (null == provider) {
1013
1014                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1015                         clusterTable.writeOnlyInvalidate(cluster);
1016
1017                         try {
1018
1019                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1020                                 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1021                                 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1022
1023                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1024                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1025
1026                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1027                                 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1028                                 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1029                                 clusterTranslator.removeStatement(cluster);
1030
1031                                 queryProvider2.invalidateResource(s);
1032                                 clientChanges.invalidate(s);
1033
1034                         } catch (DatabaseException e) {
1035
1036                                 Logger.defaultLogError(e);
1037
1038                         }
1039
1040                         return true;
1041
1042                 } else {
1043                         
1044                         ((VirtualGraphImpl)provider).deny(s, p, o);
1045                         queryProvider2.invalidateResource(s);
1046                         clientChanges.invalidate(s);
1047
1048                         return true;
1049
1050                 }
1051
1052         }
1053
1054         @Override
1055         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1056             throw new UnsupportedOperationException();
1057         }
1058
1059         @Override
1060         public boolean writeOnly() {
1061             return true;
1062         }
1063
1064         @Override
1065         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1066             writeSupport.performWriteRequest(graph, request);
1067         }
1068
1069         @Override
1070         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1071             throw new UnsupportedOperationException();
1072         }
1073
1074         @Override
1075         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1076             throw new UnsupportedOperationException();
1077         }
1078
1079         @Override
1080         public <T> void addMetadata(Metadata data) throws ServiceException {
1081             writeSupport.addMetadata(data);
1082         }
1083
1084         @Override
1085         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1086             return writeSupport.getMetadata(clazz);
1087         }
1088
1089         @Override
1090         public TreeMap<String, byte[]> getMetadata() {
1091             return writeSupport.getMetadata();
1092         }
1093
1094         @Override
1095         public void commitDone(WriteTraits writeTraits, long csid) {
1096             writeSupport.commitDone(writeTraits, csid);
1097         }
1098
1099         @Override
1100         public void clearUndoList(WriteTraits writeTraits) {
1101             writeSupport.clearUndoList(writeTraits);
1102         }
1103         @Override
1104         public int clearMetadata() {
1105             return writeSupport.clearMetadata();
1106         }
1107
1108                 @Override
1109                 public void startUndo() {
1110                         writeSupport.startUndo();
1111                 }
1112     }
1113
1114     class VirtualWriteOnlySupport implements WriteSupport {
1115
1116 //        @Override
1117 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1118 //                Resource object) {
1119 //            throw new UnsupportedOperationException();
1120 //        }
1121
1122         @Override
1123         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1124
1125             TransientGraph impl = (TransientGraph)provider;
1126             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1127             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1128             clientChanges.claim(subject, predicate, object);
1129
1130         }
1131
1132         @Override
1133         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1134
1135             TransientGraph impl = (TransientGraph)provider;
1136             impl.claim(subject, predicate, object);
1137             getQueryProvider2().updateStatements(subject, predicate);
1138             clientChanges.claim(subject, predicate, object);
1139
1140         }
1141
1142         @Override
1143         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1144             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1145         }
1146
1147         @Override
1148         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1149             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1150             getQueryProvider2().updateValue(resource);
1151             clientChanges.claimValue(resource);
1152         }
1153
1154         @Override
1155         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1156             byte[] value = reader.readBytes(null, amount);
1157             claimValue(provider, resource, value);
1158         }
1159
1160         @Override
1161         public Resource createResource(VirtualGraph provider) {
1162             TransientGraph impl = (TransientGraph)provider;
1163             return impl.getResource(impl.newResource(false));
1164         }
1165
1166         @Override
1167         public Resource createResource(VirtualGraph provider, long clusterId) {
1168             throw new UnsupportedOperationException();
1169         }
1170
1171         @Override
1172         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1173         throws DatabaseException {
1174             throw new UnsupportedOperationException();
1175         }
1176
1177         @Override
1178         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1179         throws DatabaseException {
1180             throw new UnsupportedOperationException();
1181         }
1182
1183         @Override
1184         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1185         throws ServiceException {
1186             throw new UnsupportedOperationException();
1187         }
1188
1189         @Override
1190         public Resource setDefaultClusterSet(Resource clusterSet)
1191         throws ServiceException {
1192                 return null;
1193         }
1194
1195         @Override
1196         public void denyValue(VirtualGraph provider, Resource resource) {
1197             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1198             getQueryProvider2().updateValue(querySupport.getId(resource));
1199             // NOTE: this only keeps track of value changes by-resource.
1200             clientChanges.claimValue(resource);
1201         }
1202
1203         @Override
1204         public void flush(boolean intermediate) {
1205             throw new UnsupportedOperationException();
1206         }
1207
1208         @Override
1209         public void flushCluster() {
1210             throw new UnsupportedOperationException();
1211         }
1212
1213         @Override
1214         public void flushCluster(Resource r) {
1215             throw new UnsupportedOperationException("Resource " + r);
1216         }
1217
1218         @Override
1219         public void gc() {
1220         }
1221
1222         @Override
1223         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1224             TransientGraph impl = (TransientGraph) provider;
1225             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1226             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1227             clientChanges.deny(subject, predicate, object);
1228             return true;
1229         }
1230
1231         @Override
1232         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1233             throw new UnsupportedOperationException();
1234         }
1235
1236         @Override
1237         public boolean writeOnly() {
1238             return true;
1239         }
1240
1241         @Override
1242         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1243             throw new UnsupportedOperationException();
1244         }
1245
1246         @Override
1247         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1248             throw new UnsupportedOperationException();
1249         }
1250
1251         @Override
1252         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1253             throw new UnsupportedOperationException();
1254         }
1255
1256         @Override
1257         public <T> void addMetadata(Metadata data) throws ServiceException {
1258             throw new UnsupportedOperationException();
1259         }
1260
1261         @Override
1262         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1263             throw new UnsupportedOperationException();
1264         }
1265
1266         @Override
1267         public TreeMap<String, byte[]> getMetadata() {
1268             throw new UnsupportedOperationException();
1269         }
1270
1271         @Override
1272         public void commitDone(WriteTraits writeTraits, long csid) {
1273         }
1274
1275         @Override
1276         public void clearUndoList(WriteTraits writeTraits) {
1277         }
1278
1279         @Override
1280         public int clearMetadata() {
1281             return 0;
1282         }
1283
1284                 @Override
1285                 public void startUndo() {
1286                 }
1287     }
1288
1289     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1290
1291         try {
1292
1293             int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1294
1295             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1296
1297             flushCounter = 0;
1298             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1299
1300             acquireWriteOnly();
1301
1302             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1303
1304             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1305
1306             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1307             writeState = writeStateT;
1308
1309             assert (null != writer);
1310 //            writer.state.barrier.inc();
1311             long start = System.nanoTime();
1312             T result = request.perform(writer);
1313             long duration = System.nanoTime() - start;
1314             if (DEBUG) {
1315                 System.err.println("################");
1316                 System.err.println("WriteOnly duration " + 1e-9*duration);
1317             }
1318             writeStateT.setResult(result);
1319
1320             // This makes clusters available from server
1321             clusterStream.reallyFlush();
1322
1323             // This will trigger query updates
1324             releaseWriteOnly(writer);
1325             assert (null != writer);
1326
1327             handleUpdatesAndMetadata(writer);
1328
1329         } catch (CancelTransactionException e) {
1330
1331             releaseWriteOnly(writeState.getGraph());
1332
1333             clusterTable.removeWriteOnlyClusters();
1334             state.stopWriteTransaction(clusterStream);
1335
1336         } catch (Throwable e) {
1337
1338             e.printStackTrace();
1339
1340             releaseWriteOnly(writeState.getGraph());
1341
1342             clusterTable.removeWriteOnlyClusters();
1343
1344             if (callback != null)
1345                 callback.exception(new DatabaseException(e));
1346
1347             state.stopWriteTransaction(clusterStream);
1348             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1349
1350         } finally  {
1351
1352             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1353
1354         }
1355
1356     }
1357
1358     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
1359         scheduleRequest(request, callback, notify, null);
1360     }
1361
1362     @Override
1363     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1364
1365         assertAlive();
1366
1367         assert (request != null);
1368
1369         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1370
1371         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1372
1373             @Override
1374             public void run(int thread) {
1375
1376                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1377
1378                     try {
1379
1380                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1381
1382                         flushCounter = 0;
1383                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1384
1385                         acquireWriteOnly();
1386
1387                         VirtualGraph vg = getProvider(request.getProvider());
1388                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1389
1390                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1391
1392                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1393
1394                             @Override
1395                             public void execute(Object result) {
1396                                 if(callback != null) callback.run(null);
1397                             }
1398
1399                             @Override
1400                             public void exception(Throwable t) {
1401                                 if(callback != null) callback.run((DatabaseException)t);
1402                             }
1403
1404                         });
1405
1406                         assert (null != writer);
1407 //                        writer.state.barrier.inc();
1408
1409                         try {
1410
1411                             request.perform(writer);
1412
1413                         } catch (Throwable e) {
1414
1415 //                            writer.state.barrier.dec();
1416 //                            writer.waitAsync(null);
1417
1418                             releaseWriteOnly(writer);
1419
1420                             clusterTable.removeWriteOnlyClusters();
1421
1422                             if(!(e instanceof CancelTransactionException)) {
1423                                 if (callback != null)
1424                                     callback.run(new DatabaseException(e));
1425                             }
1426
1427                             writeState.except(e);
1428
1429                             return;
1430
1431                         }
1432
1433                         // This makes clusters available from server
1434                         boolean empty = clusterStream.reallyFlush();
1435                         // This was needed to make WO requests call metadata listeners.
1436                         // NOTE: the calling event does not contain clientChanges information.
1437                         if (!empty && clientChanges.isEmpty())
1438                             clientChanges.setNotEmpty(true);
1439                         releaseWriteOnly(writer);
1440                         assert (null != writer);
1441                     } finally  {
1442
1443                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1444
1445                     }
1446
1447
1448                 task.finish();
1449
1450             }
1451
1452         }, combine);
1453
1454     }
1455
1456     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1457         scheduleRequest(request, callback, notify, null);
1458     }
1459
1460     /* (non-Javadoc)
1461      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1462      */
1463     @Override
1464     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1465
1466         assert (request != null);
1467
1468         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1469
1470         requestManager.scheduleWrite(new SessionTask(request, thread) {
1471
1472             @Override
1473             public void run(int thread) {
1474
1475                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1476
1477                 performWriteOnly(request, notify, callback);
1478
1479                 task.finish();
1480
1481             }
1482
1483         }, combine);
1484
1485     }
1486
1487     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1488
1489         assert (request != null);
1490         assert (procedure != null);
1491
1492         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1493
1494         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1495
1496             @Override
1497             public void run(int thread) {
1498
1499                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1500
1501                 ListenerBase listener = getListenerBase(procedure);
1502
1503                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1504
1505                 try {
1506
1507                     if (listener != null) {
1508
1509                         try {
1510                                 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1511
1512                                         @Override
1513                                         public void exception(AsyncReadGraph graph, Throwable t) {
1514                                                 procedure.exception(graph, t);
1515                                                 if(throwable != null) {
1516                                                         throwable.set(t);
1517                                                 } else {
1518                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1519                                                 }
1520                                         }
1521
1522                                         @Override
1523                                         public void execute(AsyncReadGraph graph, T t) {
1524                                                 if(result != null) result.set(t);
1525                                                 procedure.execute(graph, t);
1526                                         }
1527
1528                                 }, listener);
1529                         } catch (Throwable t) {
1530                             // This is handled by the AsyncProcedure
1531                                 //Logger.defaultLogError("Internal error", t);
1532                         }
1533
1534                     } else {
1535
1536                         try {
1537
1538 //                            newGraph.state.barrier.inc();
1539
1540                             T t = request.perform(newGraph);
1541
1542                             try {
1543
1544                                 if(result != null) result.set(t);
1545                                 procedure.execute(newGraph, t);
1546
1547                             } catch (Throwable th) {
1548
1549                                 if(throwable != null) {
1550                                     throwable.set(th);
1551                                 } else {
1552                                     Logger.defaultLogError("Unhandled exception", th);
1553                                 }
1554
1555                             }
1556
1557                         } catch (Throwable t) {
1558
1559                             if (DEBUG)
1560                                 t.printStackTrace();
1561
1562                             if(throwable != null) {
1563                                 throwable.set(t);
1564                             } else {
1565                                 Logger.defaultLogError("Unhandled exception", t);
1566                             }
1567
1568                             try {
1569
1570                                 procedure.exception(newGraph, t);
1571
1572                             } catch (Throwable t2) {
1573
1574                                 if(throwable != null) {
1575                                     throwable.set(t2);
1576                                 } else {
1577                                     Logger.defaultLogError("Unhandled exception", t2);
1578                                 }
1579
1580                             }
1581
1582                         }
1583
1584 //                        newGraph.state.barrier.dec();
1585 //                        newGraph.waitAsync(request);
1586
1587                     }
1588
1589                 } finally {
1590
1591                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1592
1593                 }
1594
1595             }
1596
1597         });
1598
1599     }
1600
1601     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1602
1603         assert (request != null);
1604         assert (procedure != null);
1605
1606         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1607
1608         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1609
1610             @Override
1611             public void run(int thread) {
1612
1613                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1614
1615                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1616
1617                 try {
1618
1619                     if (listener != null) {
1620
1621                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1622
1623 //                        newGraph.waitAsync(request);
1624
1625                     } else {
1626
1627                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1628                                 procedure, "request");
1629
1630                         try {
1631
1632 //                            newGraph.state.barrier.inc();
1633
1634                             request.perform(newGraph, wrapper);
1635
1636 //                            newGraph.waitAsync(request);
1637
1638                         } catch (Throwable t) {
1639
1640                             wrapper.exception(newGraph, t);
1641 //                            newGraph.waitAsync(request);
1642
1643
1644                         }
1645
1646                     }
1647
1648                 } finally {
1649
1650                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1651
1652                 }
1653
1654             }
1655
1656         });
1657
1658     }
1659
1660     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1661
1662         assert (request != null);
1663         assert (procedure != null);
1664
1665         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1666
1667         int sync = notify != null ? thread : -1;
1668
1669         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1670
1671             @Override
1672             public void run(int thread) {
1673
1674                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1675
1676                 ListenerBase listener = getListenerBase(procedure);
1677
1678                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1679
1680                 try {
1681
1682                     if (listener != null) {
1683
1684                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1685
1686 //                        newGraph.waitAsync(request);
1687
1688                     } else {
1689
1690                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1691
1692                         try {
1693
1694                             request.perform(newGraph, wrapper);
1695
1696                         } catch (Throwable t) {
1697
1698                             t.printStackTrace();
1699
1700                         }
1701
1702                     }
1703
1704                 } finally {
1705
1706                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1707
1708                 }
1709
1710             }
1711
1712         });
1713
1714     }
1715
1716     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1717
1718         assert (request != null);
1719         assert (procedure != null);
1720
1721         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1722
1723         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1724
1725             @Override
1726             public void run(int thread) {
1727
1728                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1729
1730                 ListenerBase listener = getListenerBase(procedure);
1731
1732                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1733
1734                 try {
1735
1736                     if (listener != null) {
1737
1738                         newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1739
1740                             @Override
1741                             public void exception(Throwable t) {
1742                                 procedure.exception(t);
1743                                 if(throwable != null) {
1744                                     throwable.set(t);
1745                                 }
1746                             }
1747
1748                             @Override
1749                             public void execute(T t) {
1750                                 if(result != null) result.set(t);
1751                                 procedure.execute(t);
1752                             }
1753
1754                         }, listener);
1755
1756 //                        newGraph.waitAsync(request);
1757
1758                     } else {
1759
1760 //                        newGraph.state.barrier.inc();
1761
1762                         request.register(newGraph, new Listener<T>() {
1763
1764                             @Override
1765                             public void exception(Throwable t) {
1766                                 if(throwable != null) throwable.set(t);
1767                                 procedure.exception(t);
1768 //                                newGraph.state.barrier.dec();
1769                             }
1770
1771                             @Override
1772                             public void execute(T t) {
1773                                 if(result != null) result.set(t);
1774                                 procedure.execute(t);
1775 //                                newGraph.state.barrier.dec();
1776                             }
1777
1778                             @Override
1779                             public boolean isDisposed() {
1780                                 return true;
1781                             }
1782
1783                         });
1784
1785 //                        newGraph.waitAsync(request);
1786
1787                     }
1788
1789                 } finally {
1790
1791                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1792
1793                 }
1794
1795             }
1796
1797         });
1798
1799     }
1800
1801
1802     @Override
1803     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1804
1805         scheduleRequest(request, procedure, null, null, null);
1806
1807     }
1808
1809     @Override
1810     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1811         assertNotSession();
1812         Semaphore notify = new Semaphore(0);
1813         DataContainer<Throwable> container = new DataContainer<Throwable>();
1814         DataContainer<T> result = new DataContainer<T>();
1815         scheduleRequest(request, procedure, notify, container, result);
1816         acquire(notify, request);
1817         Throwable throwable = container.get();
1818         if(throwable != null) {
1819             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1820             else throw new DatabaseException("Unexpected exception", throwable);
1821         }
1822         return result.get();
1823     }
1824
1825     @Override
1826     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1827         assertNotSession();
1828         Semaphore notify = new Semaphore(0);
1829         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1830         final DataContainer<T> resultContainer = new DataContainer<T>();
1831         scheduleRequest(request, new AsyncProcedure<T>() {
1832             @Override
1833             public void exception(AsyncReadGraph graph, Throwable throwable) {
1834                 exceptionContainer.set(throwable);
1835                 procedure.exception(graph, throwable);
1836             }
1837             @Override
1838             public void execute(AsyncReadGraph graph, T result) {
1839                 resultContainer.set(result);
1840                 procedure.execute(graph, result);
1841             }
1842         }, getListenerBase(procedure), notify);
1843         acquire(notify, request, procedure);
1844         Throwable throwable = exceptionContainer.get();
1845         if (throwable != null) {
1846             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1847             else throw new DatabaseException("Unexpected exception", throwable);
1848         }
1849         return resultContainer.get();
1850     }
1851
1852
1853     @Override
1854     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1855         assertNotSession();
1856         Semaphore notify = new Semaphore(0);
1857         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1858         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1859             @Override
1860             public void exception(AsyncReadGraph graph, Throwable throwable) {
1861                 exceptionContainer.set(throwable);
1862                 procedure.exception(graph, throwable);
1863             }
1864             @Override
1865             public void execute(AsyncReadGraph graph, T result) {
1866                 procedure.execute(graph, result);
1867             }
1868             @Override
1869             public void finished(AsyncReadGraph graph) {
1870                 procedure.finished(graph);
1871             }
1872         }, notify);
1873         acquire(notify, request, procedure);
1874         Throwable throwable = exceptionContainer.get();
1875         if (throwable != null) {
1876             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1877             else throw new DatabaseException("Unexpected exception", throwable);
1878         }
1879         // TODO: implement return value
1880         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1881         return null;
1882     }
1883
1884     @Override
1885     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1886         return syncRequest(request, new ProcedureAdapter<T>());
1887
1888
1889 //        assert(request != null);
1890 //
1891 //        final DataContainer<T> result = new DataContainer<T>();
1892 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1893 //
1894 //        syncRequest(request, new Procedure<T>() {
1895 //
1896 //            @Override
1897 //            public void execute(T t) {
1898 //                result.set(t);
1899 //            }
1900 //
1901 //            @Override
1902 //            public void exception(Throwable t) {
1903 //                exception.set(t);
1904 //            }
1905 //
1906 //        });
1907 //
1908 //        Throwable t = exception.get();
1909 //        if(t != null) {
1910 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1911 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1912 //        }
1913 //
1914 //        return result.get();
1915
1916     }
1917
1918     @Override
1919     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1920         return syncRequest(request, (Procedure<T>)procedure);
1921     }
1922
1923
1924 //    @Override
1925 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1926 //        assertNotSession();
1927 //        Semaphore notify = new Semaphore(0);
1928 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1929 //        DataContainer<T> result = new DataContainer<T>();
1930 //        scheduleRequest(request, procedure, notify, container, result);
1931 //        acquire(notify, request);
1932 //        Throwable throwable = container.get();
1933 //        if(throwable != null) {
1934 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1935 //            else throw new DatabaseException("Unexpected exception", throwable);
1936 //        }
1937 //        return result.get();
1938 //    }
1939
1940
1941     @Override
1942     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1943         assertNotSession();
1944         Semaphore notify = new Semaphore(0);
1945         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1946         final DataContainer<T> result = new DataContainer<T>();
1947         scheduleRequest(request, procedure, notify, container, result);
1948         acquire(notify, request);
1949         Throwable throwable = container.get();
1950         if (throwable != null) {
1951             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1952             else throw new DatabaseException("Unexpected exception", throwable);
1953         }
1954         return result.get();
1955     }
1956
1957     @Override
1958     public void syncRequest(Write request) throws DatabaseException  {
1959         assertNotSession();
1960         assertAlive();
1961         Semaphore notify = new Semaphore(0);
1962         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1963         scheduleRequest(request, new Callback<DatabaseException>() {
1964
1965             @Override
1966             public void run(DatabaseException e) {
1967               exception.set(e);
1968             }
1969
1970         }, notify);
1971         acquire(notify, request);
1972         if(exception.get() != null) throw exception.get();
1973     }
1974
1975     @Override
1976     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1977         assertNotSession();
1978         Semaphore notify = new Semaphore(0);
1979         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1980         final DataContainer<T> result = new DataContainer<T>();
1981         scheduleRequest(request, new Procedure<T>() {
1982
1983             @Override
1984             public void exception(Throwable t) {
1985               exception.set(t);
1986             }
1987
1988             @Override
1989             public void execute(T t) {
1990               result.set(t);
1991             }
1992
1993         }, notify);
1994         acquire(notify, request);
1995         if(exception.get() != null) {
1996             Throwable t = exception.get();
1997             if(t instanceof DatabaseException) throw (DatabaseException)t;
1998             else throw new DatabaseException(t);
1999         }
2000         return result.get();
2001     }
2002
2003     @Override
2004     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2005         assertNotSession();
2006         Semaphore notify = new Semaphore(0);
2007         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2008         final DataContainer<T> result = new DataContainer<T>();
2009         scheduleRequest(request, new Procedure<T>() {
2010
2011             @Override
2012             public void exception(Throwable t) {
2013               exception.set(t);
2014             }
2015
2016             @Override
2017             public void execute(T t) {
2018               result.set(t);
2019             }
2020
2021         }, notify);
2022         acquire(notify, request);
2023         if(exception.get() != null) {
2024             Throwable t = exception.get();
2025             if(t instanceof DatabaseException) throw (DatabaseException)t;
2026             else throw new DatabaseException(t);
2027         }
2028         return result.get();
2029     }
2030
2031     @Override
2032     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2033         assertNotSession();
2034         Semaphore notify = new Semaphore(0);
2035         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2036         final DataContainer<T> result = new DataContainer<T>();
2037         scheduleRequest(request, new Procedure<T>() {
2038
2039             @Override
2040             public void exception(Throwable t) {
2041               exception.set(t);
2042             }
2043
2044             @Override
2045             public void execute(T t) {
2046               result.set(t);
2047             }
2048
2049         }, notify);
2050         acquire(notify, request);
2051         if(exception.get() != null) {
2052             Throwable t = exception.get();
2053             if(t instanceof DatabaseException) throw (DatabaseException)t;
2054             else throw new DatabaseException(t);
2055         }
2056         return result.get();
2057     }
2058
2059     @Override
2060     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2061         assertNotSession();
2062         Semaphore notify = new Semaphore(0);
2063         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2064         scheduleRequest(request, new Callback<DatabaseException>() {
2065             @Override
2066             public void run(DatabaseException e) {
2067                 exception.set(e);
2068             }
2069         }, notify);
2070         acquire(notify, request);
2071         if(exception.get() != null) throw exception.get();
2072     }
2073
2074     @Override
2075     public void syncRequest(WriteOnly request) throws DatabaseException  {
2076         assertNotSession();
2077         assertAlive();
2078         Semaphore notify = new Semaphore(0);
2079         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2080         scheduleRequest(request, new Callback<DatabaseException>() {
2081             @Override
2082             public void run(DatabaseException e) {
2083                 exception.set(e);
2084             }
2085         }, notify);
2086         acquire(notify, request);
2087         if(exception.get() != null) throw exception.get();
2088     }
2089
2090     /*
2091      *
2092      * GraphRequestProcessor interface
2093      */
2094
2095     @Override
2096     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2097
2098         scheduleRequest(request, procedure, null, null);
2099
2100     }
2101
2102     @Override
2103     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2104
2105         scheduleRequest(request, procedure, null);
2106
2107     }
2108
2109     @Override
2110     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2111
2112         scheduleRequest(request, procedure, null, null, null);
2113
2114     }
2115
2116     @Override
2117     public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
2118
2119         scheduleRequest(request, callback, null);
2120
2121     }
2122
2123     @Override
2124     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2125
2126         scheduleRequest(request, procedure, null);
2127
2128     }
2129
2130     @Override
2131     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2132
2133         scheduleRequest(request, procedure, null);
2134
2135     }
2136
2137     @Override
2138     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2139
2140         scheduleRequest(request, procedure, null);
2141
2142     }
2143
2144     @Override
2145     public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
2146
2147         scheduleRequest(request, callback, null);
2148
2149     }
2150
2151     @Override
2152     public void asyncRequest(final Write r) {
2153         asyncRequest(r, null);
2154     }
2155
2156     @Override
2157     public void asyncRequest(final DelayedWrite r) {
2158         asyncRequest(r, null);
2159     }
2160
2161     @Override
2162     public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
2163
2164         scheduleRequest(request, callback, null);
2165
2166     }
2167
2168     @Override
2169     public void asyncRequest(final WriteOnly request) {
2170
2171         asyncRequest(request, null);
2172
2173     }
2174
2175     @Override
2176     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2177         r.request(this, procedure);
2178     }
2179
2180     @Override
2181     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2182         r.request(this, procedure);
2183     }
2184
2185     @Override
2186     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2187         r.request(this, procedure);
2188     }
2189
2190     @Override
2191     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2192         r.request(this, procedure);
2193     }
2194
2195     @Override
2196     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2197         r.request(this, procedure);
2198     }
2199
2200     @Override
2201     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2202         r.request(this, procedure);
2203     }
2204
2205     @Override
2206     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2207         return r.request(this);
2208     }
2209
2210     @Override
2211     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2212         return r.request(this);
2213     }
2214
2215     @Override
2216     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2217         r.request(this, procedure);
2218     }
2219
2220     @Override
2221     public <T> void async(WriteInterface<T> r) {
2222         r.request(this, new ProcedureAdapter<T>());
2223     }
2224
2225     //@Override
2226     public void incAsync() {
2227         state.incAsync();
2228     }
2229
2230     //@Override
2231     public void decAsync() {
2232         state.decAsync();
2233     }
2234
2235     public long getCluster(ResourceImpl resource) {
2236         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2237         return cluster.getClusterId();
2238     }
2239
2240     public long getCluster(int id) {
2241         if (clusterTable == null)
2242             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2243         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2244     }
2245
2246     public ResourceImpl getResource(int id) {
2247         return new ResourceImpl(resourceSupport, id);
2248     }
2249
2250     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2251         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2252         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2253         int key = proxy.getClusterKey();
2254         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2255         return new ResourceImpl(resourceSupport, resourceKey);
2256     }
2257
2258     public ResourceImpl getResource2(int id) {
2259         assert (id != 0);
2260         return new ResourceImpl(resourceSupport, id);
2261     }
2262
2263     final public int getId(ResourceImpl impl) {
2264         return impl.id;
2265     }
2266
2267     public static final Charset UTF8 = Charset.forName("utf-8");
2268
2269
2270
2271
2272     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2273         for(TransientGraph g : support.providers) {
2274             if(g.isPending(subject)) return false;
2275         }
2276         return true;
2277     }
2278
2279     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2280         for(TransientGraph g : support.providers) {
2281             if(g.isPending(subject, predicate)) return false;
2282         }
2283         return true;
2284     }
2285
2286     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
2287
2288         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2289
2290             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2291
2292             @Override
2293             public void run(ReadGraphImpl graph) {
2294                 if(ready.decrementAndGet() == 0) {
2295                     runnable.run(graph);
2296                 }
2297             }
2298
2299         };
2300
2301         for(TransientGraph g : support.providers) {
2302             if(g.isPending(subject)) {
2303                 try {
2304                     g.load(graph, subject, composite);
2305                 } catch (DatabaseException e) {
2306                     e.printStackTrace();
2307                 }
2308             } else {
2309                 composite.run(graph);
2310             }
2311         }
2312
2313         composite.run(graph);
2314
2315     }
2316
2317     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
2318
2319         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2320
2321             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2322
2323             @Override
2324             public void run(ReadGraphImpl graph) {
2325                 if(ready.decrementAndGet() == 0) {
2326                     runnable.run(graph);
2327                 }
2328             }
2329
2330         };
2331
2332         for(TransientGraph g : support.providers) {
2333             if(g.isPending(subject, predicate)) {
2334                 try {
2335                     g.load(graph, subject, predicate, composite);
2336                 } catch (DatabaseException e) {
2337                     e.printStackTrace();
2338                 }
2339             } else {
2340                 composite.run(graph);
2341             }
2342         }
2343
2344         composite.run(graph);
2345
2346     }
2347
2348 //    void dumpHeap() {
2349 //
2350 //        try {
2351 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2352 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2353 //                    "d:/heap" + flushCounter + ".txt", true);
2354 //        } catch (IOException e) {
2355 //            e.printStackTrace();
2356 //        }
2357 //
2358 //    }
2359
2360     void fireReactionsToSynchronize(ChangeSet cs) {
2361
2362         // Do not fire empty events
2363         if (cs.isEmpty())
2364             return;
2365
2366         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2367 //        g.state.barrier.inc();
2368
2369         try {
2370
2371             if (!cs.isEmpty()) {
2372                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2373                 for (ChangeListener l : changeListeners2) {
2374                     try {
2375                         l.graphChanged(e2);
2376                     } catch (Exception ex) {
2377                         ex.printStackTrace();
2378                     }
2379                 }
2380             }
2381
2382         } finally {
2383
2384 //            g.state.barrier.dec();
2385 //            g.waitAsync(null);
2386
2387         }
2388
2389     }
2390
2391     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2392         try {
2393
2394             // Do not fire empty events
2395             if (cs2.isEmpty())
2396                 return;
2397
2398 //            graph.restart();
2399 //            graph.state.barrier.inc();
2400
2401             try {
2402
2403                 if (!cs2.isEmpty()) {
2404
2405                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2406                     for (ChangeListener l : changeListeners2) {
2407                         try {
2408                             l.graphChanged(e2);
2409 //                            System.out.println("changelistener " + l);
2410                         } catch (Exception ex) {
2411                             ex.printStackTrace();
2412                         }
2413                     }
2414
2415                 }
2416
2417             } finally {
2418
2419 //                graph.state.barrier.dec();
2420 //                graph.waitAsync(null);
2421
2422             }
2423         } catch (Throwable t) {
2424             t.printStackTrace();
2425
2426         }
2427     }
2428     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2429
2430         try {
2431
2432             // Do not fire empty events
2433             if (cs2.isEmpty())
2434                 return;
2435
2436             // Do not fire on virtual requests
2437             if(graph.getProvider() != null)
2438                 return;
2439
2440             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2441
2442             try {
2443
2444                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2445                 for (ChangeListener l : metadataListeners) {
2446                     try {
2447                         l.graphChanged(e2);
2448                     } catch (Throwable ex) {
2449                         ex.printStackTrace();
2450                     }
2451                 }
2452
2453             } finally {
2454
2455             }
2456
2457         } catch (Throwable t) {
2458             t.printStackTrace();
2459         }
2460
2461     }
2462
2463
2464     /*
2465      * (non-Javadoc)
2466      *
2467      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2468      */
2469     @Override
2470     public <T> T getService(Class<T> api) {
2471         T t = peekService(api);
2472         if (t == null) {
2473             if (state.isClosed())
2474                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2475             throw new ServiceNotFoundException(this, api);
2476         }
2477         return t;
2478     }
2479
2480     /**
2481      * @return
2482      */
2483     protected abstract ServerInformation getCachedServerInformation();
2484
2485         private Class<?> serviceKey1 = null;
2486         private Class<?> serviceKey2 = null;
2487         private Object service1 = null;
2488         private Object service2 = null;
2489
2490     /*
2491      * (non-Javadoc)
2492      *
2493      * @see
2494      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2495      */
2496     @SuppressWarnings("unchecked")
2497     @Override
2498     public synchronized <T> T peekService(Class<T> api) {
2499
2500                 if(serviceKey1 == api) {
2501                         return (T)service1;
2502                 } else if (serviceKey2 == api) {
2503                         // Promote this key
2504                         Object result = service2;
2505                         service2 = service1;
2506                         serviceKey2 = serviceKey1;
2507                         service1 = result;
2508                         serviceKey1 = api;
2509                         return (T)result;
2510                 }
2511
2512         if (Layer0.class == api)
2513                 return (T) L0;
2514         if (ServerInformation.class == api)
2515             return (T) getCachedServerInformation();
2516         else if (WriteGraphImpl.class == api)
2517             return (T) writeState.getGraph();
2518         else if (ClusterBuilder.class == api)
2519             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2520         else if (ClusterBuilderFactory.class == api)
2521             return (T)new ClusterBuilderFactoryImpl(this);
2522
2523                 service2 = service1;
2524                 serviceKey2 = serviceKey1;
2525
2526         service1 = serviceLocator.peekService(api);
2527         serviceKey1 = api;
2528
2529         return (T)service1;
2530
2531     }
2532
2533     /*
2534      * (non-Javadoc)
2535      *
2536      * @see
2537      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2538      */
2539     @Override
2540     public boolean hasService(Class<?> api) {
2541         return serviceLocator.hasService(api);
2542     }
2543
2544     /**
2545      * @param api the api that must be implemented by the specified service
2546      * @param service the service implementation
2547      */
2548     @Override
2549     public <T> void registerService(Class<T> api, T service) {
2550         if(Layer0.class == api) {
2551                 L0 = (Layer0)service;
2552                 return;
2553         }
2554         serviceLocator.registerService(api, service);
2555         if (TransactionPolicySupport.class == api) {
2556             transactionPolicy = (TransactionPolicySupport)service;
2557             state.resetTransactionPolicy();
2558         }
2559         if (api == serviceKey1)
2560                 service1 = service;
2561         else if (api == serviceKey2)
2562                 service2 = service;
2563     }
2564
2565     // ----------------
2566     // SessionMonitor
2567     // ----------------
2568
2569     void fireSessionVariableChange(String variable) {
2570         for (MonitorHandler h : monitorHandlers) {
2571             MonitorContext ctx = monitorContexts.getLeft(h);
2572             assert ctx != null;
2573             // SafeRunner functionality repeated here to avoid dependency.
2574             try {
2575                 h.valuesChanged(ctx);
2576             } catch (Exception e) {
2577                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2578             } catch (LinkageError e) {
2579                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2580             }
2581         }
2582     }
2583
2584
2585     class ResourceSerializerImpl implements ResourceSerializer {
2586
2587         public long createRandomAccessId(int id) throws DatabaseException {
2588             if(id < 0)
2589                 return id;
2590             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2591             long cluster = getCluster(id);
2592             if (0 == cluster)
2593                 return 0; // Better to return 0 then invalid id.
2594             long result = ClusterTraitsBase.createResourceId(cluster, index);
2595             return result;
2596         }
2597
2598         @Override
2599         public long getRandomAccessId(Resource resource) throws DatabaseException {
2600
2601             ResourceImpl resourceImpl = (ResourceImpl) resource;
2602             return createRandomAccessId(resourceImpl.id);
2603
2604         }
2605
2606         @Override
2607         public int getTransientId(Resource resource) throws DatabaseException {
2608
2609             ResourceImpl resourceImpl = (ResourceImpl) resource;
2610             return resourceImpl.id;
2611
2612         }
2613
2614         @Override
2615         public String createRandomAccessId(Resource resource)
2616         throws InvalidResourceReferenceException {
2617
2618             if(resource == null) throw new IllegalArgumentException();
2619
2620             ResourceImpl resourceImpl = (ResourceImpl) resource;
2621
2622             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2623
2624             int r;
2625             try {
2626                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2627             } catch (DatabaseException e1) {
2628                 throw new InvalidResourceReferenceException(e1);
2629             }
2630             try {
2631                 // Serialize as '<resource index>_<cluster id>'
2632                 return "" + r + "_" + getCluster(resourceImpl);
2633             } catch (Throwable e) {
2634                 e.printStackTrace();
2635                 throw new InvalidResourceReferenceException(e);
2636             } finally {
2637             }
2638         }
2639
2640         public int getTransientId(long serialized) throws DatabaseException {
2641             if (serialized <= 0)
2642                 return (int)serialized;
2643             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2644             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2645             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2646             if (cluster == null)
2647                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2648             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2649             return key;
2650         }
2651
2652         @Override
2653         public Resource getResource(long randomAccessId) throws DatabaseException {
2654             return getResourceByKey(getTransientId(randomAccessId));
2655         }
2656
2657         @Override
2658         public Resource getResource(int transientId) throws DatabaseException {
2659             return getResourceByKey(transientId);
2660         }
2661
2662         @Override
2663         public Resource getResource(String randomAccessId)
2664         throws InvalidResourceReferenceException {
2665             try {
2666                 int i = randomAccessId.indexOf('_');
2667                 if (i == -1)
2668                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2669                             + randomAccessId + "'");
2670                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2671                 if(r < 0) return getResourceByKey(r);
2672                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2673                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2674                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2675                 if (cluster.hasResource(key, clusterTranslator))
2676                     return getResourceByKey(key);
2677             } catch (InvalidResourceReferenceException e) {
2678                 throw e;
2679             } catch (NumberFormatException e) {
2680                 throw new InvalidResourceReferenceException(e);
2681             } catch (Throwable e) {
2682                 e.printStackTrace();
2683                 throw new InvalidResourceReferenceException(e);
2684             } finally {
2685             }
2686             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2687         }
2688
2689         @Override
2690         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2691             try {
2692                 return true;
2693             } catch (Throwable e) {
2694                 e.printStackTrace();
2695                 throw new InvalidResourceReferenceException(e);
2696             } finally {
2697             }
2698         }
2699     }
2700
2701     // Copied from old SessionImpl
2702
2703     void check() {
2704         if (state.isClosed())
2705             throw new Error("Session closed.");
2706     }
2707
2708
2709
2710     public ResourceImpl getNewResource(long clusterId) {
2711         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2712         int newId;
2713         try {
2714             newId = cluster.createResource(clusterTranslator);
2715         } catch (DatabaseException e) {
2716             Logger.defaultLogError(e);
2717             return null;
2718         }
2719         return new ResourceImpl(resourceSupport, newId);
2720     }
2721
2722     public ResourceImpl getNewResource(Resource clusterSet)
2723     throws DatabaseException {
2724         long resourceId = clusterSet.getResourceId();
2725         Long clusterId = clusterSetsSupport.get(resourceId);
2726         if (null == clusterId)
2727             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2728         if (Constants.NewClusterId == clusterId) {
2729             clusterId = getService(ClusteringSupport.class).createCluster();
2730             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2731                 createdClusters.add(clusterId);
2732             clusterSetsSupport.put(resourceId, clusterId);
2733             return getNewResource(clusterId);
2734         } else {
2735             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2736             ResourceImpl result;
2737             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2738                 clusterId = getService(ClusteringSupport.class).createCluster();
2739                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2740                     createdClusters.add(clusterId);
2741                 clusterSetsSupport.put(resourceId, clusterId);
2742                 return getNewResource(clusterId);
2743             } else {
2744                 result = getNewResource(clusterId);
2745                 int resultKey = querySupport.getId(result);
2746                 long resultCluster = querySupport.getClusterId(resultKey);
2747                 if (clusterId != resultCluster)
2748                     clusterSetsSupport.put(resourceId, resultCluster);
2749                 return result;
2750             }
2751         }
2752     }
2753
2754     public void getNewClusterSet(Resource clusterSet)
2755     throws DatabaseException {
2756         if (DEBUG)
2757             System.out.println("new cluster set=" + clusterSet);
2758         long resourceId = clusterSet.getResourceId();
2759         if (clusterSetsSupport.containsKey(resourceId))
2760             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2761         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2762     }
2763     public boolean containsClusterSet(Resource clusterSet)
2764     throws ServiceException {
2765         long resourceId = clusterSet.getResourceId();
2766         return clusterSetsSupport.containsKey(resourceId);
2767     }
2768     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2769         Resource r = defaultClusterSet;
2770         defaultClusterSet = clusterSet;
2771         return r;
2772     }
2773     void printDiagnostics() {
2774
2775         if (DIAGNOSTICS) {
2776
2777             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2778             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2779             for(ClusterI cluster : clusterTable.getClusters()) {
2780                 try {
2781                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2782                         residentClusters.set(residentClusters.get() + 1);
2783                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2784                     }
2785                 } catch (DatabaseException e) {
2786                     Logger.defaultLogError(e);
2787                 }
2788             }
2789
2790             System.out.println("--------------------------------");
2791             System.out.println("Cluster information:");
2792             System.out.println("-amount of resident clusters=" + residentClusters.get());
2793             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2794
2795             for(ClusterI cluster : clusterTable.getClusters()) {
2796                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2797                 try {
2798                     if (!cluster.isLoaded())
2799                         System.out.println("not loaded]");
2800                     else if (cluster.isEmpty())
2801                         System.out.println("is empty]");
2802                     else {
2803                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2804                         System.out.print(",references=" + cluster.getReferenceCount());
2805                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2806                     }
2807                 } catch (DatabaseException e) {
2808                     Logger.defaultLogError(e);
2809                     System.out.println("is corrupted]");
2810                 }
2811             }
2812
2813             queryProvider2.printDiagnostics();
2814             System.out.println("--------------------------------");
2815         }
2816
2817     }
2818
2819     public void fireStartReadTransaction() {
2820
2821         if (DIAGNOSTICS)
2822             System.out.println("StartReadTransaction");
2823
2824         for (SessionEventListener listener : eventListeners) {
2825             try {
2826                 listener.readTransactionStarted();
2827             } catch (Throwable t) {
2828                 t.printStackTrace();
2829             }
2830         }
2831
2832     }
2833
2834     public void fireFinishReadTransaction() {
2835
2836         if (DIAGNOSTICS)
2837             System.out.println("FinishReadTransaction");
2838
2839         for (SessionEventListener listener : eventListeners) {
2840             try {
2841                 listener.readTransactionFinished();
2842             } catch (Throwable t) {
2843                 t.printStackTrace();
2844             }
2845         }
2846
2847     }
2848
2849     public void fireStartWriteTransaction() {
2850
2851         if (DIAGNOSTICS)
2852             System.out.println("StartWriteTransaction");
2853
2854         for (SessionEventListener listener : eventListeners) {
2855             try {
2856                 listener.writeTransactionStarted();
2857             } catch (Throwable t) {
2858                 t.printStackTrace();
2859             }
2860         }
2861
2862     }
2863
2864     public void fireFinishWriteTransaction() {
2865
2866         if (DIAGNOSTICS)
2867             System.out.println("FinishWriteTransaction");
2868
2869         for (SessionEventListener listener : eventListeners) {
2870             try {
2871                 listener.writeTransactionFinished();
2872             } catch (Throwable t) {
2873                 t.printStackTrace();
2874             }
2875         }
2876
2877         Indexing.resetDependenciesIndexingDisabled();
2878
2879     }
2880
2881     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2882         throw new Error("Not supported at the moment.");
2883     }
2884
2885     State getState() {
2886         return state;
2887     }
2888
2889     ClusterTable getClusterTable() {
2890         return clusterTable;
2891     }
2892
2893     public GraphSession getGraphSession() {
2894         return graphSession;
2895     }
2896
2897     public QueryProcessor getQueryProvider2() {
2898         return queryProvider2;
2899     }
2900
2901     ClientChangesImpl getClientChanges() {
2902         return clientChanges;
2903     }
2904
2905     boolean getWriteOnly() {
2906         return writeOnly;
2907     }
2908
2909     static int counter = 0;
2910
2911     public void onClusterLoaded(long clusterId) {
2912
2913         clusterTable.updateSize();
2914
2915     }
2916
2917
2918
2919
2920     /*
2921      * Implementation of the interface RequestProcessor
2922      */
2923
2924     @Override
2925     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2926
2927         assertNotSession();
2928         assertAlive();
2929
2930         assert(request != null);
2931
2932         final DataContainer<T> result = new DataContainer<T>();
2933         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2934
2935         syncRequest(request, new AsyncProcedure<T>() {
2936
2937             @Override
2938             public void execute(AsyncReadGraph graph, T t) {
2939                 result.set(t);
2940             }
2941
2942             @Override
2943             public void exception(AsyncReadGraph graph, Throwable t) {
2944                 exception.set(t);
2945             }
2946
2947         });
2948
2949         Throwable t = exception.get();
2950         if(t != null) {
2951             if(t instanceof DatabaseException) throw (DatabaseException)t;
2952             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2953         }
2954
2955         return result.get();
2956
2957     }
2958
2959 //    @Override
2960 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2961 //        assertNotSession();
2962 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2963 //    }
2964
2965     @Override
2966     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2967         assertNotSession();
2968         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2969     }
2970
2971     @Override
2972     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2973         assertNotSession();
2974         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2975     }
2976
2977     @Override
2978     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2979         assertNotSession();
2980         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2981     }
2982
2983     @Override
2984     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2985         assertNotSession();
2986         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2987     }
2988
2989     @Override
2990     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2991
2992         assertNotSession();
2993
2994         assert(request != null);
2995
2996         final DataContainer<T> result = new DataContainer<T>();
2997         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2998
2999         syncRequest(request, new AsyncProcedure<T>() {
3000
3001             @Override
3002             public void execute(AsyncReadGraph graph, T t) {
3003                 result.set(t);
3004             }
3005
3006             @Override
3007             public void exception(AsyncReadGraph graph, Throwable t) {
3008                 exception.set(t);
3009             }
3010
3011         });
3012
3013         Throwable t = exception.get();
3014         if(t != null) {