9a86dc1fa2ae813b0b3a137b51a9063546af27af
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.ExternalValueSupport;
38 import org.simantics.db.Metadata;
39 import org.simantics.db.MonitorContext;
40 import org.simantics.db.MonitorHandler;
41 import org.simantics.db.Resource;
42 import org.simantics.db.ResourceSerializer;
43 import org.simantics.db.Session;
44 import org.simantics.db.SessionManager;
45 import org.simantics.db.SessionVariables;
46 import org.simantics.db.VirtualGraph;
47 import org.simantics.db.WriteGraph;
48 import org.simantics.db.authentication.UserAuthenticationAgent;
49 import org.simantics.db.authentication.UserAuthenticator;
50 import org.simantics.db.common.Indexing;
51 import org.simantics.db.common.TransactionPolicyRelease;
52 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
53 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
62 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
63 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
64 import org.simantics.db.common.utils.Logger;
65 import org.simantics.db.event.ChangeEvent;
66 import org.simantics.db.event.ChangeListener;
67 import org.simantics.db.event.SessionEventListener;
68 import org.simantics.db.exception.CancelTransactionException;
69 import org.simantics.db.exception.ClusterSetExistException;
70 import org.simantics.db.exception.DatabaseException;
71 import org.simantics.db.exception.ImmutableException;
72 import org.simantics.db.exception.InvalidResourceReferenceException;
73 import org.simantics.db.exception.ResourceNotFoundException;
74 import org.simantics.db.exception.RuntimeDatabaseException;
75 import org.simantics.db.exception.ServiceException;
76 import org.simantics.db.exception.ServiceNotFoundException;
77 import org.simantics.db.impl.ClusterBase;
78 import org.simantics.db.impl.ClusterI;
79 import org.simantics.db.impl.ClusterTraitsBase;
80 import org.simantics.db.impl.ClusterTranslator;
81 import org.simantics.db.impl.ResourceImpl;
82 import org.simantics.db.impl.TransientGraph;
83 import org.simantics.db.impl.VirtualGraphImpl;
84 import org.simantics.db.impl.graph.DelayedWriteGraph;
85 import org.simantics.db.impl.graph.ReadGraphImpl;
86 import org.simantics.db.impl.graph.WriteGraphImpl;
87 import org.simantics.db.impl.graph.WriteSupport;
88 import org.simantics.db.impl.internal.RandomAccessValueSupport;
89 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
90 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
91 import org.simantics.db.impl.query.QueryProcessor;
92 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
93 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
94 import org.simantics.db.impl.service.QueryDebug;
95 import org.simantics.db.impl.support.VirtualGraphServerSupport;
96 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
97 import org.simantics.db.procedure.AsyncListener;
98 import org.simantics.db.procedure.AsyncMultiListener;
99 import org.simantics.db.procedure.AsyncMultiProcedure;
100 import org.simantics.db.procedure.AsyncProcedure;
101 import org.simantics.db.procedure.Listener;
102 import org.simantics.db.procedure.ListenerBase;
103 import org.simantics.db.procedure.MultiListener;
104 import org.simantics.db.procedure.MultiProcedure;
105 import org.simantics.db.procedure.Procedure;
106 import org.simantics.db.procedure.SyncListener;
107 import org.simantics.db.procedure.SyncMultiListener;
108 import org.simantics.db.procedure.SyncMultiProcedure;
109 import org.simantics.db.procedure.SyncProcedure;
110 import org.simantics.db.procore.cluster.ClusterImpl;
111 import org.simantics.db.procore.cluster.ClusterTraits;
112 import org.simantics.db.procore.protocol.Constants;
113 import org.simantics.db.procore.protocol.DebugPolicy;
114 import org.simantics.db.request.AsyncMultiRead;
115 import org.simantics.db.request.AsyncRead;
116 import org.simantics.db.request.DelayedWrite;
117 import org.simantics.db.request.DelayedWriteResult;
118 import org.simantics.db.request.ExternalRead;
119 import org.simantics.db.request.MultiRead;
120 import org.simantics.db.request.Read;
121 import org.simantics.db.request.ReadInterface;
122 import org.simantics.db.request.Write;
123 import org.simantics.db.request.WriteInterface;
124 import org.simantics.db.request.WriteOnly;
125 import org.simantics.db.request.WriteOnlyResult;
126 import org.simantics.db.request.WriteResult;
127 import org.simantics.db.request.WriteTraits;
128 import org.simantics.db.service.ByteReader;
129 import org.simantics.db.service.ClusterBuilder;
130 import org.simantics.db.service.ClusterBuilderFactory;
131 import org.simantics.db.service.ClusterControl;
132 import org.simantics.db.service.ClusterSetsSupport;
133 import org.simantics.db.service.ClusterUID;
134 import org.simantics.db.service.ClusteringSupport;
135 import org.simantics.db.service.CollectionSupport;
136 import org.simantics.db.service.DebugSupport;
137 import org.simantics.db.service.DirectQuerySupport;
138 import org.simantics.db.service.GraphChangeListenerSupport;
139 import org.simantics.db.service.InitSupport;
140 import org.simantics.db.service.LifecycleSupport;
141 import org.simantics.db.service.ManagementSupport;
142 import org.simantics.db.service.QueryControl;
143 import org.simantics.db.service.SerialisationSupport;
144 import org.simantics.db.service.ServerInformation;
145 import org.simantics.db.service.ServiceActivityMonitor;
146 import org.simantics.db.service.SessionEventSupport;
147 import org.simantics.db.service.SessionMonitorSupport;
148 import org.simantics.db.service.SessionUserSupport;
149 import org.simantics.db.service.StatementSupport;
150 import org.simantics.db.service.TransactionPolicySupport;
151 import org.simantics.db.service.TransactionSupport;
152 import org.simantics.db.service.TransferableGraphSupport;
153 import org.simantics.db.service.UndoRedoSupport;
154 import org.simantics.db.service.VirtualGraphSupport;
155 import org.simantics.db.service.XSupport;
156 import org.simantics.layer0.Layer0;
157 import org.simantics.utils.DataContainer;
158 import org.simantics.utils.Development;
159 import org.simantics.utils.threads.logger.ITask;
160 import org.simantics.utils.threads.logger.ThreadLogger;
161
162 import gnu.trove.procedure.TLongProcedure;
163 import gnu.trove.set.hash.TLongHashSet;
164
165
166 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
167
168     protected static final boolean DEBUG        = false;
169
170     private static final boolean DIAGNOSTICS       = false;
171
172     private TransactionPolicySupport transactionPolicy;
173
174     final private ClusterControl clusterControl;
175
176     final protected BuiltinSupportImpl builtinSupport;
177     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
178     final protected ClusterSetsSupport clusterSetsSupport;
179     final protected LifecycleSupportImpl lifecycleSupport;
180
181     protected QuerySupportImpl querySupport;
182     protected ResourceSupportImpl resourceSupport;
183     protected WriteSupport writeSupport;
184     public ClusterTranslator clusterTranslator;
185
186     boolean dirtyPrimitives = false;
187
188     public static final int SERVICE_MODE_CREATE = 2;
189     public static final int SERVICE_MODE_ALLOW = 1;
190     public int serviceMode = 0;
191     public boolean createdImmutableClusters = false;
192     public TLongHashSet createdClusters = new TLongHashSet();
193
194     private Layer0 L0;
195
196     /**
197      * The service locator maintained by the workbench. These services are
198      * initialized during workbench during the <code>init</code> method.
199      */
200     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
201
202     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
203
204     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
205
206     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
207
208     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
209
210     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
211
212     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
213
214     final protected State                                        state              = new State();
215
216     protected GraphSession                                       graphSession       = null;
217
218     protected SessionManager                                     sessionManagerImpl = null;
219
220     protected UserAuthenticationAgent                            authAgent          = null;
221
222     protected UserAuthenticator                                  authenticator      = null;
223
224     protected Resource                                           user               = null;
225
226     protected ClusterStream                                      clusterStream      = null;
227
228     protected SessionRequestManager                         requestManager = null;
229
230     public ClusterTable                                       clusterTable       = null;
231
232     public QueryProcessor                                     queryProvider2 = null;
233
234     ClientChangesImpl                                  clientChanges      = null;
235
236     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
237
238 //    protected long                                               newClusterId       = Constants.NullClusterId;
239
240     protected int                                                flushCounter       = 0;
241
242     protected boolean                                            writeOnly          = false;
243
244     WriteState<?>                                                writeState = null;
245     WriteStateBase<?>                                            delayedWriteState = null;
246     protected Resource defaultClusterSet = null; // If not null then used for newResource().
247
248     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
249
250 //        if (authAgent == null)
251 //            throw new IllegalArgumentException("null authentication agent");
252
253         File t = StaticSessionProperties.virtualGraphStoragePath;
254         if (null == t)
255             t = new File(".");
256         this.clusterTable = new ClusterTable(this, t);
257         this.builtinSupport = new BuiltinSupportImpl(this);
258         this.sessionManagerImpl = sessionManagerImpl;
259         this.user = null;
260 //        this.authAgent = authAgent;
261
262         serviceLocator.registerService(Session.class, this);
263         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
264         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
265         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
266         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
267         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
268         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
269         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
270         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
271         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
272         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
273         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
274         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
275         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
276         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
277         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
278         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
279         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
280         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
281         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
282         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
283         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
284         ServiceActivityUpdaterForWriteTransactions.register(this);
285
286         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
287         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
288         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
289         this.lifecycleSupport = new LifecycleSupportImpl(this);
290         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
291         this.transactionPolicy = new TransactionPolicyRelease();
292         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
293         this.clusterControl = new ClusterControlImpl(this);
294         serviceLocator.registerService(ClusterControl.class, clusterControl);
295         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
296         this.clusterSetsSupport.setReadDirectory(t.toPath());
297         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
298         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
299     }
300
301     /*
302      *
303      * Session interface
304      */
305
306
307     @Override
308     public Resource getRootLibrary() {
309         return queryProvider2.getRootLibraryResource();
310     }
311
312     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
313         if (!graphSession.dbSession.refreshEnabled())
314             return;
315         try {
316             getClusterTable().refresh(csid, this, clusterUID);
317         } catch (Throwable t) {
318             Logger.defaultLogError("Refesh failed.", t);
319         }
320     }
321
322     static final class TaskHelper {
323         private final String name;
324         private Object result;
325         final Semaphore sema = new Semaphore(0);
326         private Throwable throwable = null;
327         final Consumer<DatabaseException> callback = e -> {
328             synchronized (TaskHelper.this) {
329                 throwable = e;
330             }
331         };
332         final Procedure<Object> proc = new Procedure<Object>() {
333             @Override
334             public void execute(Object result) {
335                 callback.accept(null);
336             }
337             @Override
338             public void exception(Throwable t) {
339                 if (t instanceof DatabaseException)
340                     callback.accept((DatabaseException)t);
341                 else
342                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
343             }
344         };
345         final WriteTraits writeTraits = new WriteTraits() {};
346         TaskHelper(String name) {
347             this.name = name;
348         }
349         @SuppressWarnings("unchecked")
350         <T> T getResult() {
351             return (T)result;
352         }
353         void setResult(Object result) {
354             this.result = result;
355         }
356 //        Throwable throwableGet() {
357 //            return throwable;
358 //        }
359         synchronized void throwableSet(Throwable t) {
360             throwable = t;
361         }
362 //        void throwableSet(String t) {
363 //            throwable = new InternalException("" + name + " operation failed. " + t);
364 //        }
365         synchronized void throwableCheck()
366         throws DatabaseException {
367             if (null != throwable)
368                 if (throwable instanceof DatabaseException)
369                     throw (DatabaseException)throwable;
370                 else
371                     throw new DatabaseException("Undo operation failed.", throwable);
372         }
373         void throw_(String message)
374         throws DatabaseException {
375             throw new DatabaseException("" + name + " operation failed. " + message);
376         }
377     }
378
379
380
381     private ListenerBase getListenerBase(Object procedure) {
382         if (procedure instanceof ListenerBase)
383             return (ListenerBase) procedure;
384         else
385             return null;
386     }
387
388     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
389         scheduleRequest(request, callback, notify, null);
390     }
391
392     /* (non-Javadoc)
393      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
394      */
395     @Override
396     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
397
398         assert (request != null);
399
400         if(Development.DEVELOPMENT) {
401             try {
402             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
403                 System.err.println("schedule write '" + request + "'");
404             } catch (Throwable t) {
405                 Logger.defaultLogError(t);
406             }
407         }
408
409         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
410
411         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
412
413             @Override
414             public void run(int thread) {
415
416                 if(Development.DEVELOPMENT) {
417                     try {
418                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
419                         System.err.println("perform write '" + request + "'");
420                     } catch (Throwable t) {
421                         Logger.defaultLogError(t);
422                     }
423                 }
424
425                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
426
427                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
428
429                 try {
430
431                     flushCounter = 0;
432                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
433
434                     VirtualGraph vg = getProvider(request.getProvider());
435
436                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
437                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
438
439                         @Override
440                         public void execute(Object result) {
441                             if(callback != null) callback.accept(null);
442                         }
443
444                         @Override
445                         public void exception(Throwable t) {
446                             if(callback != null) callback.accept((DatabaseException)t);
447                         }
448
449                     });
450
451                     assert (null != writer);
452 //                    writer.state.barrier.inc();
453
454                     try {
455                         request.perform(writer);
456                         assert (null != writer);
457                     } catch (Throwable t) {
458                         if (!(t instanceof CancelTransactionException))
459                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
460                         writeState.except(t);
461                     } finally {
462 //                        writer.state.barrier.dec();
463 //                        writer.waitAsync(request);
464                     }
465
466
467                     assert(!queryProvider2.dirty);
468
469                 } catch (Throwable e) {
470
471                     // Log it first, just to be safe that the error is always logged.
472                     if (!(e instanceof CancelTransactionException))
473                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
474
475 //                    writeState.getGraph().state.barrier.dec();
476 //                    writeState.getGraph().waitAsync(request);
477
478                     writeState.except(e);
479
480 //                    try {
481 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
482 //                        // All we can do here is to log those, can't really pass them anywhere.
483 //                        if (callback != null) {
484 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
485 //                            else callback.run(new DatabaseException(e));
486 //                        }
487 //                    } catch (Throwable e2) {
488 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
489 //                    }
490 //                    boolean empty = clusterStream.reallyFlush();
491 //                    int callerThread = -1;
492 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
493 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
494 //                    try {
495 //                        // Send all local changes to server so it can calculate correct reverse change set.
496 //                        // This will call clusterStream.accept().
497 //                        state.cancelCommit(context, clusterStream);
498 //                        if (!empty) {
499 //                            if (!context.isOk()) // this is a blocking operation
500 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
501 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
502 //                        }
503 //                        state.cancelCommit2(context, clusterStream);
504 //                    } catch (DatabaseException e3) {
505 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
506 //                    } finally {
507 //                        clusterStream.setOff(false);
508 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
509 //                    }
510
511                 } finally {
512
513                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
514
515                 }
516
517                 task.finish();
518
519                 if(Development.DEVELOPMENT) {
520                     try {
521                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
522                         System.err.println("finish write '" + request + "'");
523                     } catch (Throwable t) {
524                         Logger.defaultLogError(t);
525                     }
526                 }
527
528             }
529
530         }, combine);
531
532     }
533
534     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
535         scheduleRequest(request, procedure, notify, null);
536     }
537
538     /* (non-Javadoc)
539      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
540      */
541     @Override
542     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
543
544         assert (request != null);
545
546         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
547
548         requestManager.scheduleWrite(new SessionTask(request, thread) {
549
550             @Override
551             public void run(int thread) {
552
553                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
554
555                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
556
557                 flushCounter = 0;
558                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
559
560                 VirtualGraph vg = getProvider(request.getProvider());
561                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
562
563                 try {
564                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
565                     writeState = writeStateT;
566
567                     assert (null != writer);
568 //                    writer.state.barrier.inc();
569                     writeStateT.setResult(request.perform(writer));
570                     assert (null != writer);
571
572 //                    writer.state.barrier.dec();
573 //                    writer.waitAsync(null);
574
575                 } catch (Throwable e) {
576
577 //                    writer.state.barrier.dec();
578 //                    writer.waitAsync(null);
579
580                     writeState.except(e);
581
582 //                  state.stopWriteTransaction(clusterStream);
583 //
584 //              } catch (Throwable e) {
585 //                  // Log it first, just to be safe that the error is always logged.
586 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
587 //
588 //                  try {
589 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
590 //                      // All we can do here is to log those, can't really pass them anywhere.
591 //                      if (procedure != null) {
592 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
593 //                          else procedure.exception(new DatabaseException(e));
594 //                      }
595 //                  } catch (Throwable e2) {
596 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
597 //                  }
598 //
599 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
600 //
601 //                  state.stopWriteTransaction(clusterStream);
602
603                 } finally {
604                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
605                 }
606
607 //              if(notify != null) notify.release();
608
609                 task.finish();
610
611             }
612
613         }, combine);
614
615     }
616
617     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
618         scheduleRequest(request, callback, notify, null);
619     }
620
621     /* (non-Javadoc)
622      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
623      */
624     @Override
625     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
626
627         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
628
629         assert (request != null);
630
631         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
632
633         requestManager.scheduleWrite(new SessionTask(request, thread) {
634
635             @Override
636             public void run(int thread) {
637                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
638
639                 Procedure<Object> stateProcedure = new Procedure<Object>() {
640                     @Override
641                     public void execute(Object result) {
642                         if (callback != null)
643                             callback.accept(null);
644                     }
645                     @Override
646                     public void exception(Throwable t) {
647                         if (callback != null) {
648                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
649                             else callback.accept(new DatabaseException(t));
650                         } else
651                             Logger.defaultLogError("Unhandled exception", t);
652                     }
653                 };
654
655                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
656                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
657                 DelayedWriteGraph dwg = null;
658 //                newGraph.state.barrier.inc();
659
660                 try {
661                     dwg = new DelayedWriteGraph(newGraph);
662                     request.perform(dwg);
663                 } catch (Throwable e) {
664                     delayedWriteState.except(e);
665                     total.finish();
666                     dwg.close();
667                     return;
668                 } finally {
669 //                    newGraph.state.barrier.dec();
670 //                    newGraph.waitAsync(request);
671                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
672                 }
673
674                 delayedWriteState = null;
675
676                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
677                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
678
679                 flushCounter = 0;
680                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
681
682                 acquireWriteOnly();
683
684                 VirtualGraph vg = getProvider(request.getProvider());
685                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
686
687                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
688
689                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
690
691                 assert (null != writer);
692 //                writer.state.barrier.inc();
693
694                 try {
695                     // Cannot cancel
696                     dwg.commit(writer, request);
697
698                         if(defaultClusterSet != null) {
699                                 XSupport xs = getService(XSupport.class);
700                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
701                                 ClusteringSupport cs = getService(ClusteringSupport.class);
702                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
703                         }
704
705                     // This makes clusters available from server
706                     clusterStream.reallyFlush();
707
708                     releaseWriteOnly(writer);
709                     handleUpdatesAndMetadata(writer);
710
711                 } catch (ServiceException e) {
712 //                    writer.state.barrier.dec();
713 //                    writer.waitAsync(null);
714
715                     // These shall be requested from server
716                     clusterTable.removeWriteOnlyClusters();
717                     // This makes clusters available from server
718                     clusterStream.reallyFlush();
719
720                     releaseWriteOnly(writer);
721                     writeState.except(e);
722                 } finally {
723                     // Debugging & Profiling
724                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
725                     task2.finish();
726                     total.finish();
727                 }
728             }
729
730         }, combine);
731
732     }
733
734     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
735         scheduleRequest(request, procedure, notify, null);
736     }
737
738     /* (non-Javadoc)
739      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
740      */
741     @Override
742     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
743         throw new Error("Not implemented");
744     }
745
746     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
747         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
748         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
749             createdClusters.add(cluster.clusterId);
750         }
751         return cluster;
752     }
753
754     class WriteOnlySupport implements WriteSupport {
755
756         ClusterStream stream;
757         ClusterImpl currentCluster;
758
759         public WriteOnlySupport() {
760             this.stream = clusterStream;
761         }
762
763         @Override
764         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
765             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
766         }
767
768         @Override
769         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
770
771             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
772
773             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
774                 if(s != queryProvider2.getRootLibrary())
775                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
776
777             try {
778                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
779             } catch (DatabaseException e) {
780                 Logger.defaultLogError(e);
781                 //throw e;
782             }
783
784             clientChanges.invalidate(s);
785
786             if (cluster.isWriteOnly())
787                 return;
788             queryProvider2.updateStatements(s, p);
789
790         }
791
792         @Override
793         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
794             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
795         }
796
797         @Override
798         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
799
800             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
801             try {
802                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
803             } catch (DatabaseException e) {
804                 Logger.defaultLogError(e);
805             }
806
807             clientChanges.invalidate(rid);
808
809             if (cluster.isWriteOnly())
810                 return;
811             queryProvider2.updateValue(rid);
812
813         }
814
815         @Override
816         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
817
818             if(amount < 65536) {
819                 claimValue(provider,resource, reader.readBytes(null, amount));
820                 return;
821             }
822
823             byte[] bytes = new byte[65536];
824
825             int rid = ((ResourceImpl)resource).id;
826             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
827             try {
828                 int left = amount;
829                 while(left > 0) {
830                     int block = Math.min(left, 65536);
831                     reader.readBytes(bytes, block);
832                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
833                     left -= block;
834                 }
835             } catch (DatabaseException e) {
836                 Logger.defaultLogError(e);
837             }
838
839             clientChanges.invalidate(rid);
840
841             if (cluster.isWriteOnly())
842                 return;
843             queryProvider2.updateValue(rid);
844
845         }
846
847         private void maintainCluster(ClusterImpl before, ClusterI after_) {
848             if(after_ != null && after_ != before) {
849                 ClusterImpl after = (ClusterImpl)after_;
850                 if(currentCluster == before) {
851                     currentCluster = after;
852                 }
853                 clusterTable.replaceCluster(after);
854             }
855         }
856
857         public int createResourceKey(int foreignCounter) throws DatabaseException {
858             if(currentCluster == null) {
859                 currentCluster = getNewResourceCluster();
860             }
861             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
862                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
863                 newCluster.foreignLookup = new byte[foreignCounter];
864                 currentCluster = newCluster;
865                 if (DEBUG)
866                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
867             }
868             return currentCluster.createResource(clusterTranslator);
869         }
870
871         @Override
872         public Resource createResource(VirtualGraph provider) throws DatabaseException {
873             if(currentCluster == null) {
874                 if (null != defaultClusterSet) {
875                         ResourceImpl result = getNewResource(defaultClusterSet);
876                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
877                     return result;
878                 } else {
879                     currentCluster = getNewResourceCluster();
880                 }
881             }
882             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
883                 if (null != defaultClusterSet) {
884                         ResourceImpl result = getNewResource(defaultClusterSet);
885                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
886                     return result;
887                 } else {
888                     currentCluster = getNewResourceCluster();
889                 }
890             }
891             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
892         }
893
894         @Override
895         public Resource createResource(VirtualGraph provider, long clusterId)
896         throws DatabaseException {
897             return getNewResource(clusterId);
898         }
899
900         @Override
901         public Resource createResource(VirtualGraph provider, Resource clusterSet)
902         throws DatabaseException {
903             return getNewResource(clusterSet);
904         }
905
906         @Override
907         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
908                 throws DatabaseException {
909             getNewClusterSet(clusterSet);
910         }
911
912         @Override
913         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
914         throws ServiceException {
915             return containsClusterSet(clusterSet);
916         }
917
918         public void selectCluster(long cluster) {
919                 currentCluster = clusterTable.getClusterByClusterId(cluster);
920                 long setResourceId = clusterSetsSupport.getSet(cluster);
921                 clusterSetsSupport.put(setResourceId, cluster);
922         }
923
924         @Override
925         public Resource setDefaultClusterSet(Resource clusterSet)
926         throws ServiceException {
927                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
928                 if(clusterSet != null) {
929                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
930                         currentCluster = clusterTable.getClusterByClusterId(id);
931                         return result;
932                 } else {
933                         currentCluster = null;
934                         return null;
935                 }
936         }
937
938         @Override
939         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
940
941                  provider = getProvider(provider);
942              if (null == provider) {
943                  int key = ((ResourceImpl)resource).id;
944
945
946
947                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
948 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
949 //                      if(key != queryProvider2.getRootLibrary())
950 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
951
952 //                 try {
953 //                     cluster.removeValue(key, clusterTranslator);
954 //                 } catch (DatabaseException e) {
955 //                     Logger.defaultLogError(e);
956 //                     return;
957 //                 }
958
959                  clusterTable.writeOnlyInvalidate(cluster);
960
961                  try {
962                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
963                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
964                          clusterTranslator.removeValue(cluster);
965                  } catch (DatabaseException e) {
966                          Logger.defaultLogError(e);
967                  }
968
969                  queryProvider2.invalidateResource(key);
970                  clientChanges.invalidate(key);
971
972              } else {
973                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
974                  queryProvider2.updateValue(querySupport.getId(resource));
975                  clientChanges.claimValue(resource);
976              }
977
978
979         }
980
981         @Override
982         public void flush(boolean intermediate) {
983             throw new UnsupportedOperationException();
984         }
985
986         @Override
987         public void flushCluster() {
988             clusterTable.flushCluster(graphSession);
989             if(defaultClusterSet != null) {
990                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
991             }
992             currentCluster = null;
993         }
994
995         @Override
996         public void flushCluster(Resource r) {
997             throw new UnsupportedOperationException("flushCluster resource " + r);
998         }
999
1000         @Override
1001         public void gc() {
1002         }
1003
1004         @Override
1005         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1006                 Resource object) {
1007
1008                 int s = ((ResourceImpl)subject).id;
1009                 int p = ((ResourceImpl)predicate).id;
1010                 int o = ((ResourceImpl)object).id;
1011
1012                 provider = getProvider(provider);
1013                 if (null == provider) {
1014
1015                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1016                         clusterTable.writeOnlyInvalidate(cluster);
1017
1018                         try {
1019
1020                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1021                                 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1022                                 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1023
1024                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1025                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1026
1027                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1028                                 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1029                                 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1030                                 clusterTranslator.removeStatement(cluster);
1031
1032                                 queryProvider2.invalidateResource(s);
1033                                 clientChanges.invalidate(s);
1034
1035                         } catch (DatabaseException e) {
1036
1037                                 Logger.defaultLogError(e);
1038
1039                         }
1040
1041                         return true;
1042
1043                 } else {
1044                         
1045                         ((VirtualGraphImpl)provider).deny(s, p, o);
1046                         queryProvider2.invalidateResource(s);
1047                         clientChanges.invalidate(s);
1048
1049                         return true;
1050
1051                 }
1052
1053         }
1054
1055         @Override
1056         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1057             throw new UnsupportedOperationException();
1058         }
1059
1060         @Override
1061         public boolean writeOnly() {
1062             return true;
1063         }
1064
1065         @Override
1066         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1067             writeSupport.performWriteRequest(graph, request);
1068         }
1069
1070         @Override
1071         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1072             throw new UnsupportedOperationException();
1073         }
1074
1075         @Override
1076         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1077             throw new UnsupportedOperationException();
1078         }
1079
1080         @Override
1081         public <T> void addMetadata(Metadata data) throws ServiceException {
1082             writeSupport.addMetadata(data);
1083         }
1084
1085         @Override
1086         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1087             return writeSupport.getMetadata(clazz);
1088         }
1089
1090         @Override
1091         public TreeMap<String, byte[]> getMetadata() {
1092             return writeSupport.getMetadata();
1093         }
1094
1095         @Override
1096         public void commitDone(WriteTraits writeTraits, long csid) {
1097             writeSupport.commitDone(writeTraits, csid);
1098         }
1099
1100         @Override
1101         public void clearUndoList(WriteTraits writeTraits) {
1102             writeSupport.clearUndoList(writeTraits);
1103         }
1104         @Override
1105         public int clearMetadata() {
1106             return writeSupport.clearMetadata();
1107         }
1108
1109                 @Override
1110                 public void startUndo() {
1111                         writeSupport.startUndo();
1112                 }
1113     }
1114
1115     class VirtualWriteOnlySupport implements WriteSupport {
1116
1117 //        @Override
1118 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1119 //                Resource object) {
1120 //            throw new UnsupportedOperationException();
1121 //        }
1122
1123         @Override
1124         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1125
1126             TransientGraph impl = (TransientGraph)provider;
1127             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1128             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1129             clientChanges.claim(subject, predicate, object);
1130
1131         }
1132
1133         @Override
1134         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1135
1136             TransientGraph impl = (TransientGraph)provider;
1137             impl.claim(subject, predicate, object);
1138             getQueryProvider2().updateStatements(subject, predicate);
1139             clientChanges.claim(subject, predicate, object);
1140
1141         }
1142
1143         @Override
1144         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1145             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1146         }
1147
1148         @Override
1149         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1150             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1151             getQueryProvider2().updateValue(resource);
1152             clientChanges.claimValue(resource);
1153         }
1154
1155         @Override
1156         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1157             byte[] value = reader.readBytes(null, amount);
1158             claimValue(provider, resource, value);
1159         }
1160
1161         @Override
1162         public Resource createResource(VirtualGraph provider) {
1163             TransientGraph impl = (TransientGraph)provider;
1164             return impl.getResource(impl.newResource(false));
1165         }
1166
1167         @Override
1168         public Resource createResource(VirtualGraph provider, long clusterId) {
1169             throw new UnsupportedOperationException();
1170         }
1171
1172         @Override
1173         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1174         throws DatabaseException {
1175             throw new UnsupportedOperationException();
1176         }
1177
1178         @Override
1179         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1180         throws DatabaseException {
1181             throw new UnsupportedOperationException();
1182         }
1183
1184         @Override
1185         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1186         throws ServiceException {
1187             throw new UnsupportedOperationException();
1188         }
1189
1190         @Override
1191         public Resource setDefaultClusterSet(Resource clusterSet)
1192         throws ServiceException {
1193                 return null;
1194         }
1195
1196         @Override
1197         public void denyValue(VirtualGraph provider, Resource resource) {
1198             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1199             getQueryProvider2().updateValue(querySupport.getId(resource));
1200             // NOTE: this only keeps track of value changes by-resource.
1201             clientChanges.claimValue(resource);
1202         }
1203
1204         @Override
1205         public void flush(boolean intermediate) {
1206             throw new UnsupportedOperationException();
1207         }
1208
1209         @Override
1210         public void flushCluster() {
1211             throw new UnsupportedOperationException();
1212         }
1213
1214         @Override
1215         public void flushCluster(Resource r) {
1216             throw new UnsupportedOperationException("Resource " + r);
1217         }
1218
1219         @Override
1220         public void gc() {
1221         }
1222
1223         @Override
1224         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1225             TransientGraph impl = (TransientGraph) provider;
1226             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1227             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1228             clientChanges.deny(subject, predicate, object);
1229             return true;
1230         }
1231
1232         @Override
1233         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1234             throw new UnsupportedOperationException();
1235         }
1236
1237         @Override
1238         public boolean writeOnly() {
1239             return true;
1240         }
1241
1242         @Override
1243         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1244             throw new UnsupportedOperationException();
1245         }
1246
1247         @Override
1248         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1249             throw new UnsupportedOperationException();
1250         }
1251
1252         @Override
1253         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1254             throw new UnsupportedOperationException();
1255         }
1256
1257         @Override
1258         public <T> void addMetadata(Metadata data) throws ServiceException {
1259             throw new UnsupportedOperationException();
1260         }
1261
1262         @Override
1263         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1264             throw new UnsupportedOperationException();
1265         }
1266
1267         @Override
1268         public TreeMap<String, byte[]> getMetadata() {
1269             throw new UnsupportedOperationException();
1270         }
1271
1272         @Override
1273         public void commitDone(WriteTraits writeTraits, long csid) {
1274         }
1275
1276         @Override
1277         public void clearUndoList(WriteTraits writeTraits) {
1278         }
1279
1280         @Override
1281         public int clearMetadata() {
1282             return 0;
1283         }
1284
1285                 @Override
1286                 public void startUndo() {
1287                 }
1288     }
1289
1290     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1291
1292         try {
1293
1294             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1295
1296             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1297
1298             flushCounter = 0;
1299             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1300
1301             acquireWriteOnly();
1302
1303             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1304
1305             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1306
1307             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1308             writeState = writeStateT;
1309
1310             assert (null != writer);
1311 //            writer.state.barrier.inc();
1312             long start = System.nanoTime();
1313             T result = request.perform(writer);
1314             long duration = System.nanoTime() - start;
1315             if (DEBUG) {
1316                 System.err.println("################");
1317                 System.err.println("WriteOnly duration " + 1e-9*duration);
1318             }
1319             writeStateT.setResult(result);
1320
1321             // This makes clusters available from server
1322             clusterStream.reallyFlush();
1323
1324             // This will trigger query updates
1325             releaseWriteOnly(writer);
1326             assert (null != writer);
1327
1328             handleUpdatesAndMetadata(writer);
1329
1330         } catch (CancelTransactionException e) {
1331
1332             releaseWriteOnly(writeState.getGraph());
1333
1334             clusterTable.removeWriteOnlyClusters();
1335             state.stopWriteTransaction(clusterStream);
1336
1337         } catch (Throwable e) {
1338
1339             e.printStackTrace();
1340
1341             releaseWriteOnly(writeState.getGraph());
1342
1343             clusterTable.removeWriteOnlyClusters();
1344
1345             if (callback != null)
1346                 callback.exception(new DatabaseException(e));
1347
1348             state.stopWriteTransaction(clusterStream);
1349             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1350
1351         } finally  {
1352
1353             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1354
1355         }
1356
1357     }
1358
1359     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1360         scheduleRequest(request, callback, notify, null);
1361     }
1362
1363     @Override
1364     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1365
1366         assertAlive();
1367
1368         assert (request != null);
1369
1370         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1371
1372         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1373
1374             @Override
1375             public void run(int thread) {
1376
1377                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1378
1379                     try {
1380
1381                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1382
1383                         flushCounter = 0;
1384                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1385
1386                         acquireWriteOnly();
1387
1388                         VirtualGraph vg = getProvider(request.getProvider());
1389                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1390
1391                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1392
1393                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1394
1395                             @Override
1396                             public void execute(Object result) {
1397                                 if(callback != null) callback.accept(null);
1398                             }
1399
1400                             @Override
1401                             public void exception(Throwable t) {
1402                                 if(callback != null) callback.accept((DatabaseException)t);
1403                             }
1404
1405                         });
1406
1407                         assert (null != writer);
1408 //                        writer.state.barrier.inc();
1409
1410                         try {
1411
1412                             request.perform(writer);
1413
1414                         } catch (Throwable e) {
1415
1416 //                            writer.state.barrier.dec();
1417 //                            writer.waitAsync(null);
1418
1419                             releaseWriteOnly(writer);
1420
1421                             clusterTable.removeWriteOnlyClusters();
1422
1423                             if(!(e instanceof CancelTransactionException)) {
1424                                 if (callback != null)
1425                                     callback.accept(new DatabaseException(e));
1426                             }
1427
1428                             writeState.except(e);
1429
1430                             return;
1431
1432                         }
1433
1434                         // This makes clusters available from server
1435                         boolean empty = clusterStream.reallyFlush();
1436                         // This was needed to make WO requests call metadata listeners.
1437                         // NOTE: the calling event does not contain clientChanges information.
1438                         if (!empty && clientChanges.isEmpty())
1439                             clientChanges.setNotEmpty(true);
1440                         releaseWriteOnly(writer);
1441                         assert (null != writer);
1442                     } finally  {
1443
1444                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1445
1446                     }
1447
1448
1449                 task.finish();
1450
1451             }
1452
1453         }, combine);
1454
1455     }
1456
1457     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1458         scheduleRequest(request, callback, notify, null);
1459     }
1460
1461     /* (non-Javadoc)
1462      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1463      */
1464     @Override
1465     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1466
1467         assert (request != null);
1468
1469         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1470
1471         requestManager.scheduleWrite(new SessionTask(request, thread) {
1472
1473             @Override
1474             public void run(int thread) {
1475
1476                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1477
1478                 performWriteOnly(request, notify, callback);
1479
1480                 task.finish();
1481
1482             }
1483
1484         }, combine);
1485
1486     }
1487
1488     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1489
1490         assert (request != null);
1491         assert (procedure != null);
1492
1493         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1494
1495         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1496
1497             @Override
1498             public void run(int thread) {
1499
1500                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1501
1502                 ListenerBase listener = getListenerBase(procedure);
1503
1504                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1505
1506                 try {
1507
1508                     if (listener != null) {
1509
1510                         try {
1511                                 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1512
1513                                         @Override
1514                                         public void exception(AsyncReadGraph graph, Throwable t) {
1515                                                 procedure.exception(graph, t);
1516                                                 if(throwable != null) {
1517                                                         throwable.set(t);
1518                                                 } else {
1519                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1520                                                 }
1521                                         }
1522
1523                                         @Override
1524                                         public void execute(AsyncReadGraph graph, T t) {
1525                                                 if(result != null) result.set(t);
1526                                                 procedure.execute(graph, t);
1527                                         }
1528
1529                                 }, listener);
1530                         } catch (Throwable t) {
1531                             // This is handled by the AsyncProcedure
1532                                 //Logger.defaultLogError("Internal error", t);
1533                         }
1534
1535                     } else {
1536
1537                         try {
1538
1539 //                            newGraph.state.barrier.inc();
1540
1541                             T t = request.perform(newGraph);
1542
1543                             try {
1544
1545                                 if(result != null) result.set(t);
1546                                 procedure.execute(newGraph, t);
1547
1548                             } catch (Throwable th) {
1549
1550                                 if(throwable != null) {
1551                                     throwable.set(th);
1552                                 } else {
1553                                     Logger.defaultLogError("Unhandled exception", th);
1554                                 }
1555
1556                             }
1557
1558                         } catch (Throwable t) {
1559
1560                             if (DEBUG)
1561                                 t.printStackTrace();
1562
1563                             if(throwable != null) {
1564                                 throwable.set(t);
1565                             } else {
1566                                 Logger.defaultLogError("Unhandled exception", t);
1567                             }
1568
1569                             try {
1570
1571                                 procedure.exception(newGraph, t);
1572
1573                             } catch (Throwable t2) {
1574
1575                                 if(throwable != null) {
1576                                     throwable.set(t2);
1577                                 } else {
1578                                     Logger.defaultLogError("Unhandled exception", t2);
1579                                 }
1580
1581                             }
1582
1583                         }
1584
1585 //                        newGraph.state.barrier.dec();
1586 //                        newGraph.waitAsync(request);
1587
1588                     }
1589
1590                 } finally {
1591
1592                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1593
1594                 }
1595
1596             }
1597
1598         });
1599
1600     }
1601
1602     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1603
1604         assert (request != null);
1605         assert (procedure != null);
1606
1607         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1608
1609         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1610
1611             @Override
1612             public void run(int thread) {
1613
1614                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1615
1616                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1617
1618                 try {
1619
1620                     if (listener != null) {
1621
1622                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1623
1624 //                        newGraph.waitAsync(request);
1625
1626                     } else {
1627
1628                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1629                                 procedure, "request");
1630
1631                         try {
1632
1633 //                            newGraph.state.barrier.inc();
1634
1635                             request.perform(newGraph, wrapper);
1636
1637 //                            newGraph.waitAsync(request);
1638
1639                         } catch (Throwable t) {
1640
1641                             wrapper.exception(newGraph, t);
1642 //                            newGraph.waitAsync(request);
1643
1644
1645                         }
1646
1647                     }
1648
1649                 } finally {
1650
1651                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1652
1653                 }
1654
1655             }
1656
1657         });
1658
1659     }
1660
1661     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1662
1663         assert (request != null);
1664         assert (procedure != null);
1665
1666         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1667
1668         int sync = notify != null ? thread : -1;
1669
1670         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1671
1672             @Override
1673             public void run(int thread) {
1674
1675                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1676
1677                 ListenerBase listener = getListenerBase(procedure);
1678
1679                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1680
1681                 try {
1682
1683                     if (listener != null) {
1684
1685                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1686
1687 //                        newGraph.waitAsync(request);
1688
1689                     } else {
1690
1691                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1692
1693                         try {
1694
1695                             request.perform(newGraph, wrapper);
1696
1697                         } catch (Throwable t) {
1698
1699                             t.printStackTrace();
1700
1701                         }
1702
1703                     }
1704
1705                 } finally {
1706
1707                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1708
1709                 }
1710
1711             }
1712
1713         });
1714
1715     }
1716
1717     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1718
1719         assert (request != null);
1720         assert (procedure != null);
1721
1722         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1723
1724         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1725
1726             @Override
1727             public void run(int thread) {
1728
1729                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1730
1731                 ListenerBase listener = getListenerBase(procedure);
1732
1733                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1734
1735                 try {
1736
1737                     if (listener != null) {
1738
1739                         newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1740
1741                             @Override
1742                             public void exception(Throwable t) {
1743                                 procedure.exception(t);
1744                                 if(throwable != null) {
1745                                     throwable.set(t);
1746                                 }
1747                             }
1748
1749                             @Override
1750                             public void execute(T t) {
1751                                 if(result != null) result.set(t);
1752                                 procedure.execute(t);
1753                             }
1754
1755                         }, listener);
1756
1757 //                        newGraph.waitAsync(request);
1758
1759                     } else {
1760
1761 //                        newGraph.state.barrier.inc();
1762
1763                         request.register(newGraph, new Listener<T>() {
1764
1765                             @Override
1766                             public void exception(Throwable t) {
1767                                 if(throwable != null) throwable.set(t);
1768                                 procedure.exception(t);
1769 //                                newGraph.state.barrier.dec();
1770                             }
1771
1772                             @Override
1773                             public void execute(T t) {
1774                                 if(result != null) result.set(t);
1775                                 procedure.execute(t);
1776 //                                newGraph.state.barrier.dec();
1777                             }
1778
1779                             @Override
1780                             public boolean isDisposed() {
1781                                 return true;
1782                             }
1783
1784                         });
1785
1786 //                        newGraph.waitAsync(request);
1787
1788                     }
1789
1790                 } finally {
1791
1792                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1793
1794                 }
1795
1796             }
1797
1798         });
1799
1800     }
1801
1802
1803     @Override
1804     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1805
1806         scheduleRequest(request, procedure, null, null, null);
1807
1808     }
1809
1810     @Override
1811     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1812         assertNotSession();
1813         Semaphore notify = new Semaphore(0);
1814         DataContainer<Throwable> container = new DataContainer<Throwable>();
1815         DataContainer<T> result = new DataContainer<T>();
1816         scheduleRequest(request, procedure, notify, container, result);
1817         acquire(notify, request);
1818         Throwable throwable = container.get();
1819         if(throwable != null) {
1820             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1821             else throw new DatabaseException("Unexpected exception", throwable);
1822         }
1823         return result.get();
1824     }
1825
1826     @Override
1827     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1828         assertNotSession();
1829         Semaphore notify = new Semaphore(0);
1830         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1831         final DataContainer<T> resultContainer = new DataContainer<T>();
1832         scheduleRequest(request, new AsyncProcedure<T>() {
1833             @Override
1834             public void exception(AsyncReadGraph graph, Throwable throwable) {
1835                 exceptionContainer.set(throwable);
1836                 procedure.exception(graph, throwable);
1837             }
1838             @Override
1839             public void execute(AsyncReadGraph graph, T result) {
1840                 resultContainer.set(result);
1841                 procedure.execute(graph, result);
1842             }
1843         }, getListenerBase(procedure), notify);
1844         acquire(notify, request, procedure);
1845         Throwable throwable = exceptionContainer.get();
1846         if (throwable != null) {
1847             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1848             else throw new DatabaseException("Unexpected exception", throwable);
1849         }
1850         return resultContainer.get();
1851     }
1852
1853
1854     @Override
1855     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1856         assertNotSession();
1857         Semaphore notify = new Semaphore(0);
1858         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1859         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1860             @Override
1861             public void exception(AsyncReadGraph graph, Throwable throwable) {
1862                 exceptionContainer.set(throwable);
1863                 procedure.exception(graph, throwable);
1864             }
1865             @Override
1866             public void execute(AsyncReadGraph graph, T result) {
1867                 procedure.execute(graph, result);
1868             }
1869             @Override
1870             public void finished(AsyncReadGraph graph) {
1871                 procedure.finished(graph);
1872             }
1873         }, notify);
1874         acquire(notify, request, procedure);
1875         Throwable throwable = exceptionContainer.get();
1876         if (throwable != null) {
1877             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1878             else throw new DatabaseException("Unexpected exception", throwable);
1879         }
1880         // TODO: implement return value
1881         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1882         return null;
1883     }
1884
1885     @Override
1886     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1887         return syncRequest(request, new ProcedureAdapter<T>());
1888
1889
1890 //        assert(request != null);
1891 //
1892 //        final DataContainer<T> result = new DataContainer<T>();
1893 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1894 //
1895 //        syncRequest(request, new Procedure<T>() {
1896 //
1897 //            @Override
1898 //            public void execute(T t) {
1899 //                result.set(t);
1900 //            }
1901 //
1902 //            @Override
1903 //            public void exception(Throwable t) {
1904 //                exception.set(t);
1905 //            }
1906 //
1907 //        });
1908 //
1909 //        Throwable t = exception.get();
1910 //        if(t != null) {
1911 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1912 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1913 //        }
1914 //
1915 //        return result.get();
1916
1917     }
1918
1919     @Override
1920     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1921         return syncRequest(request, (Procedure<T>)procedure);
1922     }
1923
1924
1925 //    @Override
1926 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1927 //        assertNotSession();
1928 //        Semaphore notify = new Semaphore(0);
1929 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1930 //        DataContainer<T> result = new DataContainer<T>();
1931 //        scheduleRequest(request, procedure, notify, container, result);
1932 //        acquire(notify, request);
1933 //        Throwable throwable = container.get();
1934 //        if(throwable != null) {
1935 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1936 //            else throw new DatabaseException("Unexpected exception", throwable);
1937 //        }
1938 //        return result.get();
1939 //    }
1940
1941
1942     @Override
1943     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1944         assertNotSession();
1945         Semaphore notify = new Semaphore(0);
1946         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1947         final DataContainer<T> result = new DataContainer<T>();
1948         scheduleRequest(request, procedure, notify, container, result);
1949         acquire(notify, request);
1950         Throwable throwable = container.get();
1951         if (throwable != null) {
1952             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1953             else throw new DatabaseException("Unexpected exception", throwable);
1954         }
1955         return result.get();
1956     }
1957
1958     @Override
1959     public void syncRequest(Write request) throws DatabaseException  {
1960         assertNotSession();
1961         assertAlive();
1962         Semaphore notify = new Semaphore(0);
1963         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1964         scheduleRequest(request, e -> exception.set(e), notify);
1965         acquire(notify, request);
1966         if(exception.get() != null) throw exception.get();
1967     }
1968
1969     @Override
1970     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1971         assertNotSession();
1972         Semaphore notify = new Semaphore(0);
1973         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1974         final DataContainer<T> result = new DataContainer<T>();
1975         scheduleRequest(request, new Procedure<T>() {
1976
1977             @Override
1978             public void exception(Throwable t) {
1979               exception.set(t);
1980             }
1981
1982             @Override
1983             public void execute(T t) {
1984               result.set(t);
1985             }
1986
1987         }, notify);
1988         acquire(notify, request);
1989         if(exception.get() != null) {
1990             Throwable t = exception.get();
1991             if(t instanceof DatabaseException) throw (DatabaseException)t;
1992             else throw new DatabaseException(t);
1993         }
1994         return result.get();
1995     }
1996
1997     @Override
1998     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
1999         assertNotSession();
2000         Semaphore notify = new Semaphore(0);
2001         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2002         final DataContainer<T> result = new DataContainer<T>();
2003         scheduleRequest(request, new Procedure<T>() {
2004
2005             @Override
2006             public void exception(Throwable t) {
2007               exception.set(t);
2008             }
2009
2010             @Override
2011             public void execute(T t) {
2012               result.set(t);
2013             }
2014
2015         }, notify);
2016         acquire(notify, request);
2017         if(exception.get() != null) {
2018             Throwable t = exception.get();
2019             if(t instanceof DatabaseException) throw (DatabaseException)t;
2020             else throw new DatabaseException(t);
2021         }
2022         return result.get();
2023     }
2024
2025     @Override
2026     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2027         assertNotSession();
2028         Semaphore notify = new Semaphore(0);
2029         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2030         final DataContainer<T> result = new DataContainer<T>();
2031         scheduleRequest(request, new Procedure<T>() {
2032
2033             @Override
2034             public void exception(Throwable t) {
2035               exception.set(t);
2036             }
2037
2038             @Override
2039             public void execute(T t) {
2040               result.set(t);
2041             }
2042
2043         }, notify);
2044         acquire(notify, request);
2045         if(exception.get() != null) {
2046             Throwable t = exception.get();
2047             if(t instanceof DatabaseException) throw (DatabaseException)t;
2048             else throw new DatabaseException(t);
2049         }
2050         return result.get();
2051     }
2052
2053     @Override
2054     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2055         assertNotSession();
2056         Semaphore notify = new Semaphore(0);
2057         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2058         scheduleRequest(request, e -> exception.set(e), notify);
2059         acquire(notify, request);
2060         if(exception.get() != null) throw exception.get();
2061     }
2062
2063     @Override
2064     public void syncRequest(WriteOnly request) throws DatabaseException  {
2065         assertNotSession();
2066         assertAlive();
2067         Semaphore notify = new Semaphore(0);
2068         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2069         scheduleRequest(request, e -> exception.set(e), notify);
2070         acquire(notify, request);
2071         if(exception.get() != null) throw exception.get();
2072     }
2073
2074     /*
2075      *
2076      * GraphRequestProcessor interface
2077      */
2078
2079     @Override
2080     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2081
2082         scheduleRequest(request, procedure, null, null);
2083
2084     }
2085
2086     @Override
2087     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2088
2089         scheduleRequest(request, procedure, null);
2090
2091     }
2092
2093     @Override
2094     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2095
2096         scheduleRequest(request, procedure, null, null, null);
2097
2098     }
2099
2100     @Override
2101     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2102
2103         scheduleRequest(request, callback, null);
2104
2105     }
2106
2107     @Override
2108     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2109
2110         scheduleRequest(request, procedure, null);
2111
2112     }
2113
2114     @Override
2115     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2116
2117         scheduleRequest(request, procedure, null);
2118
2119     }
2120
2121     @Override
2122     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2123
2124         scheduleRequest(request, procedure, null);
2125
2126     }
2127
2128     @Override
2129     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2130
2131         scheduleRequest(request, callback, null);
2132
2133     }
2134
2135     @Override
2136     public void asyncRequest(final Write r) {
2137         asyncRequest(r, null);
2138     }
2139
2140     @Override
2141     public void asyncRequest(final DelayedWrite r) {
2142         asyncRequest(r, null);
2143     }
2144
2145     @Override
2146     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2147
2148         scheduleRequest(request, callback, null);
2149
2150     }
2151
2152     @Override
2153     public void asyncRequest(final WriteOnly request) {
2154
2155         asyncRequest(request, null);
2156
2157     }
2158
2159     @Override
2160     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2161         r.request(this, procedure);
2162     }
2163
2164     @Override
2165     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2166         r.request(this, procedure);
2167     }
2168
2169     @Override
2170     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2171         r.request(this, procedure);
2172     }
2173
2174     @Override
2175     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2176         r.request(this, procedure);
2177     }
2178
2179     @Override
2180     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2181         r.request(this, procedure);
2182     }
2183
2184     @Override
2185     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2186         r.request(this, procedure);
2187     }
2188
2189     @Override
2190     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2191         return r.request(this);
2192     }
2193
2194     @Override
2195     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2196         return r.request(this);
2197     }
2198
2199     @Override
2200     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2201         r.request(this, procedure);
2202     }
2203
2204     @Override
2205     public <T> void async(WriteInterface<T> r) {
2206         r.request(this, new ProcedureAdapter<T>());
2207     }
2208
2209     //@Override
2210     public void incAsync() {
2211         state.incAsync();
2212     }
2213
2214     //@Override
2215     public void decAsync() {
2216         state.decAsync();
2217     }
2218
2219     public long getCluster(ResourceImpl resource) {
2220         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2221         return cluster.getClusterId();
2222     }
2223
2224     public long getCluster(int id) {
2225         if (clusterTable == null)
2226             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2227         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2228     }
2229
2230     public ResourceImpl getResource(int id) {
2231         return new ResourceImpl(resourceSupport, id);
2232     }
2233
2234     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2235         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2236         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2237         int key = proxy.getClusterKey();
2238         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2239         return new ResourceImpl(resourceSupport, resourceKey);
2240     }
2241
2242     public ResourceImpl getResource2(int id) {
2243         assert (id != 0);
2244         return new ResourceImpl(resourceSupport, id);
2245     }
2246
2247     final public int getId(ResourceImpl impl) {
2248         return impl.id;
2249     }
2250
2251     public static final Charset UTF8 = Charset.forName("utf-8");
2252
2253
2254
2255
2256     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2257         for(TransientGraph g : support.providers) {
2258             if(g.isPending(subject)) return false;
2259         }
2260         return true;
2261     }
2262
2263     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2264         for(TransientGraph g : support.providers) {
2265             if(g.isPending(subject, predicate)) return false;
2266         }
2267         return true;
2268     }
2269
2270     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2271
2272         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2273
2274             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2275
2276             @Override
2277             public void accept(ReadGraphImpl graph) {
2278                 if(ready.decrementAndGet() == 0) {
2279                     runnable.accept(graph);
2280                 }
2281             }
2282
2283         };
2284
2285         for(TransientGraph g : support.providers) {
2286             if(g.isPending(subject)) {
2287                 try {
2288                     g.load(graph, subject, composite);
2289                 } catch (DatabaseException e) {
2290                     e.printStackTrace();
2291                 }
2292             } else {
2293                 composite.accept(graph);
2294             }
2295         }
2296
2297         composite.accept(graph);
2298
2299     }
2300
2301     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2302
2303         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2304
2305             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2306
2307             @Override
2308             public void accept(ReadGraphImpl graph) {
2309                 if(ready.decrementAndGet() == 0) {
2310                     runnable.accept(graph);
2311                 }
2312             }
2313
2314         };
2315
2316         for(TransientGraph g : support.providers) {
2317             if(g.isPending(subject, predicate)) {
2318                 try {
2319                     g.load(graph, subject, predicate, composite);
2320                 } catch (DatabaseException e) {
2321                     e.printStackTrace();
2322                 }
2323             } else {
2324                 composite.accept(graph);
2325             }
2326         }
2327
2328         composite.accept(graph);
2329
2330     }
2331
2332 //    void dumpHeap() {
2333 //
2334 //        try {
2335 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2336 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2337 //                    "d:/heap" + flushCounter + ".txt", true);
2338 //        } catch (IOException e) {
2339 //            e.printStackTrace();
2340 //        }
2341 //
2342 //    }
2343
2344     void fireReactionsToSynchronize(ChangeSet cs) {
2345
2346         // Do not fire empty events
2347         if (cs.isEmpty())
2348             return;
2349
2350         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2351 //        g.state.barrier.inc();
2352
2353         try {
2354
2355             if (!cs.isEmpty()) {
2356                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2357                 for (ChangeListener l : changeListeners2) {
2358                     try {
2359                         l.graphChanged(e2);
2360                     } catch (Exception ex) {
2361                         ex.printStackTrace();
2362                     }
2363                 }
2364             }
2365
2366         } finally {
2367
2368 //            g.state.barrier.dec();
2369 //            g.waitAsync(null);
2370
2371         }
2372
2373     }
2374
2375     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2376         try {
2377
2378             // Do not fire empty events
2379             if (cs2.isEmpty())
2380                 return;
2381
2382 //            graph.restart();
2383 //            graph.state.barrier.inc();
2384
2385             try {
2386
2387                 if (!cs2.isEmpty()) {
2388
2389                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2390                     for (ChangeListener l : changeListeners2) {
2391                         try {
2392                             l.graphChanged(e2);
2393 //                            System.out.println("changelistener " + l);
2394                         } catch (Exception ex) {
2395                             ex.printStackTrace();
2396                         }
2397                     }
2398
2399                 }
2400
2401             } finally {
2402
2403 //                graph.state.barrier.dec();
2404 //                graph.waitAsync(null);
2405
2406             }
2407         } catch (Throwable t) {
2408             t.printStackTrace();
2409
2410         }
2411     }
2412     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2413
2414         try {
2415
2416             // Do not fire empty events
2417             if (cs2.isEmpty())
2418                 return;
2419
2420             // Do not fire on virtual requests
2421             if(graph.getProvider() != null)
2422                 return;
2423
2424             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2425
2426             try {
2427
2428                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2429                 for (ChangeListener l : metadataListeners) {
2430                     try {
2431                         l.graphChanged(e2);
2432                     } catch (Throwable ex) {
2433                         ex.printStackTrace();
2434                     }
2435                 }
2436
2437             } finally {
2438
2439             }
2440
2441         } catch (Throwable t) {
2442             t.printStackTrace();
2443         }
2444
2445     }
2446
2447
2448     /*
2449      * (non-Javadoc)
2450      *
2451      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2452      */
2453     @Override
2454     public <T> T getService(Class<T> api) {
2455         T t = peekService(api);
2456         if (t == null) {
2457             if (state.isClosed())
2458                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2459             throw new ServiceNotFoundException(this, api);
2460         }
2461         return t;
2462     }
2463
2464     /**
2465      * @return
2466      */
2467     protected abstract ServerInformation getCachedServerInformation();
2468
2469         private Class<?> serviceKey1 = null;
2470         private Class<?> serviceKey2 = null;
2471         private Object service1 = null;
2472         private Object service2 = null;
2473
2474     /*
2475      * (non-Javadoc)
2476      *
2477      * @see
2478      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2479      */
2480     @SuppressWarnings("unchecked")
2481     @Override
2482     public synchronized <T> T peekService(Class<T> api) {
2483
2484                 if(serviceKey1 == api) {
2485                         return (T)service1;
2486                 } else if (serviceKey2 == api) {
2487                         // Promote this key
2488                         Object result = service2;
2489                         service2 = service1;
2490                         serviceKey2 = serviceKey1;
2491                         service1 = result;
2492                         serviceKey1 = api;
2493                         return (T)result;
2494                 }
2495
2496         if (Layer0.class == api)
2497                 return (T) L0;
2498         if (ServerInformation.class == api)
2499             return (T) getCachedServerInformation();
2500         else if (WriteGraphImpl.class == api)
2501             return (T) writeState.getGraph();
2502         else if (ClusterBuilder.class == api)
2503             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2504         else if (ClusterBuilderFactory.class == api)
2505             return (T)new ClusterBuilderFactoryImpl(this);
2506
2507                 service2 = service1;
2508                 serviceKey2 = serviceKey1;
2509
2510         service1 = serviceLocator.peekService(api);
2511         serviceKey1 = api;
2512
2513         return (T)service1;
2514
2515     }
2516
2517     /*
2518      * (non-Javadoc)
2519      *
2520      * @see
2521      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2522      */
2523     @Override
2524     public boolean hasService(Class<?> api) {
2525         return serviceLocator.hasService(api);
2526     }
2527
2528     /**
2529      * @param api the api that must be implemented by the specified service
2530      * @param service the service implementation
2531      */
2532     @Override
2533     public <T> void registerService(Class<T> api, T service) {
2534         if(Layer0.class == api) {
2535                 L0 = (Layer0)service;
2536                 return;
2537         }
2538         serviceLocator.registerService(api, service);
2539         if (TransactionPolicySupport.class == api) {
2540             transactionPolicy = (TransactionPolicySupport)service;
2541             state.resetTransactionPolicy();
2542         }
2543         if (api == serviceKey1)
2544                 service1 = service;
2545         else if (api == serviceKey2)
2546                 service2 = service;
2547     }
2548
2549     // ----------------
2550     // SessionMonitor
2551     // ----------------
2552
2553     void fireSessionVariableChange(String variable) {
2554         for (MonitorHandler h : monitorHandlers) {
2555             MonitorContext ctx = monitorContexts.getLeft(h);
2556             assert ctx != null;
2557             // SafeRunner functionality repeated here to avoid dependency.
2558             try {
2559                 h.valuesChanged(ctx);
2560             } catch (Exception e) {
2561                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2562             } catch (LinkageError e) {
2563                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2564             }
2565         }
2566     }
2567
2568
2569     class ResourceSerializerImpl implements ResourceSerializer {
2570
2571         public long createRandomAccessId(int id) throws DatabaseException {
2572             if(id < 0)
2573                 return id;
2574             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2575             long cluster = getCluster(id);
2576             if (0 == cluster)
2577                 return 0; // Better to return 0 then invalid id.
2578             long result = ClusterTraitsBase.createResourceId(cluster, index);
2579             return result;
2580         }
2581
2582         @Override
2583         public long getRandomAccessId(Resource resource) throws DatabaseException {
2584
2585             ResourceImpl resourceImpl = (ResourceImpl) resource;
2586             return createRandomAccessId(resourceImpl.id);
2587
2588         }
2589
2590         @Override
2591         public int getTransientId(Resource resource) throws DatabaseException {
2592
2593             ResourceImpl resourceImpl = (ResourceImpl) resource;
2594             return resourceImpl.id;
2595
2596         }
2597
2598         @Override
2599         public String createRandomAccessId(Resource resource)
2600         throws InvalidResourceReferenceException {
2601
2602             if(resource == null) throw new IllegalArgumentException();
2603
2604             ResourceImpl resourceImpl = (ResourceImpl) resource;
2605
2606             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2607
2608             int r;
2609             try {
2610                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2611             } catch (DatabaseException e1) {
2612                 throw new InvalidResourceReferenceException(e1);
2613             }
2614             try {
2615                 // Serialize as '<resource index>_<cluster id>'
2616                 return "" + r + "_" + getCluster(resourceImpl);
2617             } catch (Throwable e) {
2618                 e.printStackTrace();
2619                 throw new InvalidResourceReferenceException(e);
2620             } finally {
2621             }
2622         }
2623
2624         public int getTransientId(long serialized) throws DatabaseException {
2625             if (serialized <= 0)
2626                 return (int)serialized;
2627             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2628             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2629             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2630             if (cluster == null)
2631                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2632             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2633             return key;
2634         }
2635
2636         @Override
2637         public Resource getResource(long randomAccessId) throws DatabaseException {
2638             return getResourceByKey(getTransientId(randomAccessId));
2639         }
2640
2641         @Override
2642         public Resource getResource(int transientId) throws DatabaseException {
2643             return getResourceByKey(transientId);
2644         }
2645
2646         @Override
2647         public Resource getResource(String randomAccessId)
2648         throws InvalidResourceReferenceException {
2649             try {
2650                 int i = randomAccessId.indexOf('_');
2651                 if (i == -1)
2652                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2653                             + randomAccessId + "'");
2654                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2655                 if(r < 0) return getResourceByKey(r);
2656                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2657                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2658                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2659                 if (cluster.hasResource(key, clusterTranslator))
2660                     return getResourceByKey(key);
2661             } catch (InvalidResourceReferenceException e) {
2662                 throw e;
2663             } catch (NumberFormatException e) {
2664                 throw new InvalidResourceReferenceException(e);
2665             } catch (Throwable e) {
2666                 e.printStackTrace();
2667                 throw new InvalidResourceReferenceException(e);
2668             } finally {
2669             }
2670             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2671         }
2672
2673         @Override
2674         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2675             try {
2676                 return true;
2677             } catch (Throwable e) {
2678                 e.printStackTrace();
2679                 throw new InvalidResourceReferenceException(e);
2680             } finally {
2681             }
2682         }
2683     }
2684
2685     // Copied from old SessionImpl
2686
2687     void check() {
2688         if (state.isClosed())
2689             throw new Error("Session closed.");
2690     }
2691
2692
2693
2694     public ResourceImpl getNewResource(long clusterId) {
2695         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2696         int newId;
2697         try {
2698             newId = cluster.createResource(clusterTranslator);
2699         } catch (DatabaseException e) {
2700             Logger.defaultLogError(e);
2701             return null;
2702         }
2703         return new ResourceImpl(resourceSupport, newId);
2704     }
2705
2706     public ResourceImpl getNewResource(Resource clusterSet)
2707     throws DatabaseException {
2708         long resourceId = clusterSet.getResourceId();
2709         Long clusterId = clusterSetsSupport.get(resourceId);
2710         if (null == clusterId)
2711             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2712         if (Constants.NewClusterId == clusterId) {
2713             clusterId = getService(ClusteringSupport.class).createCluster();
2714             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2715                 createdClusters.add(clusterId);
2716             clusterSetsSupport.put(resourceId, clusterId);
2717             return getNewResource(clusterId);
2718         } else {
2719             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2720             ResourceImpl result;
2721             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2722                 clusterId = getService(ClusteringSupport.class).createCluster();
2723                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2724                     createdClusters.add(clusterId);
2725                 clusterSetsSupport.put(resourceId, clusterId);
2726                 return getNewResource(clusterId);
2727             } else {
2728                 result = getNewResource(clusterId);
2729                 int resultKey = querySupport.getId(result);
2730                 long resultCluster = querySupport.getClusterId(resultKey);
2731                 if (clusterId != resultCluster)
2732                     clusterSetsSupport.put(resourceId, resultCluster);
2733                 return result;
2734             }
2735         }
2736     }
2737
2738     public void getNewClusterSet(Resource clusterSet)
2739     throws DatabaseException {
2740         if (DEBUG)
2741             System.out.println("new cluster set=" + clusterSet);
2742         long resourceId = clusterSet.getResourceId();
2743         if (clusterSetsSupport.containsKey(resourceId))
2744             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2745         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2746     }
2747     public boolean containsClusterSet(Resource clusterSet)
2748     throws ServiceException {
2749         long resourceId = clusterSet.getResourceId();
2750         return clusterSetsSupport.containsKey(resourceId);
2751     }
2752     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2753         Resource r = defaultClusterSet;
2754         defaultClusterSet = clusterSet;
2755         return r;
2756     }
2757     void printDiagnostics() {
2758
2759         if (DIAGNOSTICS) {
2760
2761             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2762             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2763             for(ClusterI cluster : clusterTable.getClusters()) {
2764                 try {
2765                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2766                         residentClusters.set(residentClusters.get() + 1);
2767                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2768                     }
2769                 } catch (DatabaseException e) {
2770                     Logger.defaultLogError(e);
2771                 }
2772             }
2773
2774             System.out.println("--------------------------------");
2775             System.out.println("Cluster information:");
2776             System.out.println("-amount of resident clusters=" + residentClusters.get());
2777             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2778
2779             for(ClusterI cluster : clusterTable.getClusters()) {
2780                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2781                 try {
2782                     if (!cluster.isLoaded())
2783                         System.out.println("not loaded]");
2784                     else if (cluster.isEmpty())
2785                         System.out.println("is empty]");
2786                     else {
2787                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2788                         System.out.print(",references=" + cluster.getReferenceCount());
2789                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2790                     }
2791                 } catch (DatabaseException e) {
2792                     Logger.defaultLogError(e);
2793                     System.out.println("is corrupted]");
2794                 }
2795             }
2796
2797             queryProvider2.printDiagnostics();
2798             System.out.println("--------------------------------");
2799         }
2800
2801     }
2802
2803     public void fireStartReadTransaction() {
2804
2805         if (DIAGNOSTICS)
2806             System.out.println("StartReadTransaction");
2807
2808         for (SessionEventListener listener : eventListeners) {
2809             try {
2810                 listener.readTransactionStarted();
2811             } catch (Throwable t) {
2812                 t.printStackTrace();
2813             }
2814         }
2815
2816     }
2817
2818     public void fireFinishReadTransaction() {
2819
2820         if (DIAGNOSTICS)
2821             System.out.println("FinishReadTransaction");
2822
2823         for (SessionEventListener listener : eventListeners) {
2824             try {
2825                 listener.readTransactionFinished();
2826             } catch (Throwable t) {
2827                 t.printStackTrace();
2828             }
2829         }
2830
2831     }
2832
2833     public void fireStartWriteTransaction() {
2834
2835         if (DIAGNOSTICS)
2836             System.out.println("StartWriteTransaction");
2837
2838         for (SessionEventListener listener : eventListeners) {
2839             try {
2840                 listener.writeTransactionStarted();
2841             } catch (Throwable t) {
2842                 t.printStackTrace();
2843             }
2844         }
2845
2846     }
2847
2848     public void fireFinishWriteTransaction() {
2849
2850         if (DIAGNOSTICS)
2851             System.out.println("FinishWriteTransaction");
2852
2853         for (SessionEventListener listener : eventListeners) {
2854             try {
2855                 listener.writeTransactionFinished();
2856             } catch (Throwable t) {
2857                 t.printStackTrace();
2858             }
2859         }
2860
2861         Indexing.resetDependenciesIndexingDisabled();
2862
2863     }
2864
2865     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2866         throw new Error("Not supported at the moment.");
2867     }
2868
2869     State getState() {
2870         return state;
2871     }
2872
2873     ClusterTable getClusterTable() {
2874         return clusterTable;
2875     }
2876
2877     public GraphSession getGraphSession() {
2878         return graphSession;
2879     }
2880
2881     public QueryProcessor getQueryProvider2() {
2882         return queryProvider2;
2883     }
2884
2885     ClientChangesImpl getClientChanges() {
2886         return clientChanges;
2887     }
2888
2889     boolean getWriteOnly() {
2890         return writeOnly;
2891     }
2892
2893     static int counter = 0;
2894
2895     public void onClusterLoaded(long clusterId) {
2896
2897         clusterTable.updateSize();
2898
2899     }
2900
2901
2902
2903
2904     /*
2905      * Implementation of the interface RequestProcessor
2906      */
2907
2908     @Override
2909     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2910
2911         assertNotSession();
2912         assertAlive();
2913
2914         assert(request != null);
2915
2916         final DataContainer<T> result = new DataContainer<T>();
2917         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2918
2919         syncRequest(request, new AsyncProcedure<T>() {
2920
2921             @Override
2922             public void execute(AsyncReadGraph graph, T t) {
2923                 result.set(t);
2924             }
2925
2926             @Override
2927             public void exception(AsyncReadGraph graph, Throwable t) {
2928                 exception.set(t);
2929             }
2930
2931         });
2932
2933         Throwable t = exception.get();
2934         if(t != null) {
2935             if(t instanceof DatabaseException) throw (DatabaseException)t;
2936             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2937         }
2938
2939         return result.get();
2940
2941     }
2942
2943 //    @Override
2944 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2945 //        assertNotSession();
2946 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2947 //    }
2948
2949     @Override
2950     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2951         assertNotSession();
2952         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2953     }
2954
2955     @Override
2956     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2957         assertNotSession();
2958         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2959     }
2960
2961     @Override
2962     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2963         assertNotSession();
2964         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2965     }
2966
2967     @Override
2968     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2969         assertNotSession();
2970         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2971     }
2972
2973     @Override
2974     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2975
2976         assertNotSession();
2977
2978         assert(request != null);
2979
2980         final DataContainer<T> result = new DataContainer<T>();
2981         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2982
2983         syncRequest(request, new AsyncProcedure<T>() {
2984
2985             @Override
2986             public void execute(AsyncReadGraph graph, T t) {
2987                 result.set(t);
2988             }
2989
2990             @Override
2991             public void exception(AsyncReadGraph graph, Throwable t) {
2992                 exception.set(t);
2993             }
2994
2995         });
2996
2997         Throwable t = exception.get();
2998         if(t != null) {
2999             if(t instanceof DatabaseException) throw (DatabaseException)t;
3000             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3001         }
3002
3003         return result.get();
3004
3005     }
3006
3007     @Override
3008     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3009         assertNotSession();
3010         return syncRequest(request, (AsyncProcedure<T>)procedure);
3011     }
3012
3013     @Override
3014     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3015