]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Make Write-interfaces as @FunctionalInterface for lambdas
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.eclipse.core.runtime.Platform;
32 import org.simantics.databoard.Bindings;
33 import org.simantics.db.AsyncReadGraph;
34 import org.simantics.db.ChangeSet;
35 import org.simantics.db.DevelopmentKeys;
36 import org.simantics.db.ExternalValueSupport;
37 import org.simantics.db.Metadata;
38 import org.simantics.db.MonitorContext;
39 import org.simantics.db.MonitorHandler;
40 import org.simantics.db.Resource;
41 import org.simantics.db.ResourceSerializer;
42 import org.simantics.db.Session;
43 import org.simantics.db.SessionManager;
44 import org.simantics.db.SessionVariables;
45 import org.simantics.db.VirtualGraph;
46 import org.simantics.db.WriteGraph;
47 import org.simantics.db.authentication.UserAuthenticationAgent;
48 import org.simantics.db.authentication.UserAuthenticator;
49 import org.simantics.db.common.Indexing;
50 import org.simantics.db.common.TransactionPolicyRelease;
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
63 import org.simantics.db.common.utils.Logger;
64 import org.simantics.db.event.ChangeEvent;
65 import org.simantics.db.event.ChangeListener;
66 import org.simantics.db.event.SessionEventListener;
67 import org.simantics.db.exception.CancelTransactionException;
68 import org.simantics.db.exception.ClusterSetExistException;
69 import org.simantics.db.exception.DatabaseException;
70 import org.simantics.db.exception.ImmutableException;
71 import org.simantics.db.exception.InvalidResourceReferenceException;
72 import org.simantics.db.exception.ResourceNotFoundException;
73 import org.simantics.db.exception.RuntimeDatabaseException;
74 import org.simantics.db.exception.ServiceException;
75 import org.simantics.db.exception.ServiceNotFoundException;
76 import org.simantics.db.impl.ClusterBase;
77 import org.simantics.db.impl.ClusterI;
78 import org.simantics.db.impl.ClusterTraitsBase;
79 import org.simantics.db.impl.ClusterTranslator;
80 import org.simantics.db.impl.ResourceImpl;
81 import org.simantics.db.impl.TransientGraph;
82 import org.simantics.db.impl.VirtualGraphImpl;
83 import org.simantics.db.impl.graph.DelayedWriteGraph;
84 import org.simantics.db.impl.graph.ReadGraphImpl;
85 import org.simantics.db.impl.graph.WriteGraphImpl;
86 import org.simantics.db.impl.graph.WriteSupport;
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
90 import org.simantics.db.impl.query.QueryProcessor;
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
93 import org.simantics.db.impl.service.QueryDebug;
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
96 import org.simantics.db.procedure.AsyncListener;
97 import org.simantics.db.procedure.AsyncMultiListener;
98 import org.simantics.db.procedure.AsyncMultiProcedure;
99 import org.simantics.db.procedure.AsyncProcedure;
100 import org.simantics.db.procedure.Listener;
101 import org.simantics.db.procedure.ListenerBase;
102 import org.simantics.db.procedure.MultiListener;
103 import org.simantics.db.procedure.MultiProcedure;
104 import org.simantics.db.procedure.Procedure;
105 import org.simantics.db.procedure.SyncListener;
106 import org.simantics.db.procedure.SyncMultiListener;
107 import org.simantics.db.procedure.SyncMultiProcedure;
108 import org.simantics.db.procedure.SyncProcedure;
109 import org.simantics.db.procore.cluster.ClusterImpl;
110 import org.simantics.db.procore.cluster.ClusterTraits;
111 import org.simantics.db.procore.protocol.Constants;
112 import org.simantics.db.procore.protocol.DebugPolicy;
113 import org.simantics.db.request.AsyncMultiRead;
114 import org.simantics.db.request.AsyncRead;
115 import org.simantics.db.request.DelayedWrite;
116 import org.simantics.db.request.DelayedWriteResult;
117 import org.simantics.db.request.ExternalRead;
118 import org.simantics.db.request.MultiRead;
119 import org.simantics.db.request.Read;
120 import org.simantics.db.request.ReadInterface;
121 import org.simantics.db.request.Write;
122 import org.simantics.db.request.WriteInterface;
123 import org.simantics.db.request.WriteOnly;
124 import org.simantics.db.request.WriteOnlyResult;
125 import org.simantics.db.request.WriteResult;
126 import org.simantics.db.request.WriteTraits;
127 import org.simantics.db.service.ByteReader;
128 import org.simantics.db.service.ClusterBuilder;
129 import org.simantics.db.service.ClusterBuilderFactory;
130 import org.simantics.db.service.ClusterControl;
131 import org.simantics.db.service.ClusterSetsSupport;
132 import org.simantics.db.service.ClusterUID;
133 import org.simantics.db.service.ClusteringSupport;
134 import org.simantics.db.service.CollectionSupport;
135 import org.simantics.db.service.DebugSupport;
136 import org.simantics.db.service.DirectQuerySupport;
137 import org.simantics.db.service.GraphChangeListenerSupport;
138 import org.simantics.db.service.InitSupport;
139 import org.simantics.db.service.LifecycleSupport;
140 import org.simantics.db.service.ManagementSupport;
141 import org.simantics.db.service.QueryControl;
142 import org.simantics.db.service.SerialisationSupport;
143 import org.simantics.db.service.ServerInformation;
144 import org.simantics.db.service.ServiceActivityMonitor;
145 import org.simantics.db.service.SessionEventSupport;
146 import org.simantics.db.service.SessionMonitorSupport;
147 import org.simantics.db.service.SessionUserSupport;
148 import org.simantics.db.service.StatementSupport;
149 import org.simantics.db.service.TransactionPolicySupport;
150 import org.simantics.db.service.TransactionSupport;
151 import org.simantics.db.service.TransferableGraphSupport;
152 import org.simantics.db.service.UndoRedoSupport;
153 import org.simantics.db.service.VirtualGraphSupport;
154 import org.simantics.db.service.XSupport;
155 import org.simantics.layer0.Layer0;
156 import org.simantics.utils.DataContainer;
157 import org.simantics.utils.Development;
158 import org.simantics.utils.datastructures.Callback;
159 import org.simantics.utils.threads.logger.ITask;
160 import org.simantics.utils.threads.logger.ThreadLogger;
161
162 import gnu.trove.procedure.TLongProcedure;
163 import gnu.trove.set.hash.TLongHashSet;
164
165
166 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
167
168     protected static final boolean DEBUG        = false;
169
170     private static final boolean DIAGNOSTICS       = false;
171
172     private TransactionPolicySupport transactionPolicy;
173
174     final private ClusterControl clusterControl;
175
176     final protected BuiltinSupportImpl builtinSupport;
177     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
178     final protected ClusterSetsSupport clusterSetsSupport;
179     final protected LifecycleSupportImpl lifecycleSupport;
180
181     protected QuerySupportImpl querySupport;
182     protected ResourceSupportImpl resourceSupport;
183     protected WriteSupport writeSupport;
184     public ClusterTranslator clusterTranslator;
185
186     boolean dirtyPrimitives = false;
187
188     public static final int SERVICE_MODE_CREATE = 2;
189     public static final int SERVICE_MODE_ALLOW = 1;
190     public int serviceMode = 0;
191     public boolean createdImmutableClusters = false;
192     public TLongHashSet createdClusters = new TLongHashSet();
193
194     private Layer0 L0;
195
196     /**
197      * The service locator maintained by the workbench. These services are
198      * initialized during workbench during the <code>init</code> method.
199      */
200     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
201
202     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
203
204     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
205
206     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
207
208     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
209
210     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
211
212     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
213
214     final protected State                                        state              = new State();
215
216     protected GraphSession                                       graphSession       = null;
217
218     protected SessionManager                                     sessionManagerImpl = null;
219
220     protected UserAuthenticationAgent                            authAgent          = null;
221
222     protected UserAuthenticator                                  authenticator      = null;
223
224     protected Resource                                           user               = null;
225
226     protected ClusterStream                                      clusterStream      = null;
227
228     protected SessionRequestManager                         requestManager = null;
229
230     public ClusterTable                                       clusterTable       = null;
231
232     public QueryProcessor                                     queryProvider2 = null;
233
234     ClientChangesImpl                                  clientChanges      = null;
235
236     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
237
238 //    protected long                                               newClusterId       = Constants.NullClusterId;
239
240     protected int                                                flushCounter       = 0;
241
242     protected boolean                                            writeOnly          = false;
243
244     WriteState<?>                                                writeState = null;
245     WriteStateBase<?>                                            delayedWriteState = null;
246     protected Resource defaultClusterSet = null; // If not null then used for newResource().
247
248     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
249
250 //        if (authAgent == null)
251 //            throw new IllegalArgumentException("null authentication agent");
252
253         File t = StaticSessionProperties.virtualGraphStoragePath;
254         if (null == t)
255             t = new File(".");
256         this.clusterTable = new ClusterTable(this, t);
257         this.builtinSupport = new BuiltinSupportImpl(this);
258         this.sessionManagerImpl = sessionManagerImpl;
259         this.user = null;
260 //        this.authAgent = authAgent;
261
262         serviceLocator.registerService(Session.class, this);
263         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
264         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
265         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
266         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
267         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
268         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
269         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
270         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
271         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
272         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
273         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
274         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
275         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
276         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
277         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
278         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
279         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
280         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
281         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
282         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
283         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
284         ServiceActivityUpdaterForWriteTransactions.register(this);
285
286         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
287         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
288         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
289         this.lifecycleSupport = new LifecycleSupportImpl(this);
290         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
291         this.transactionPolicy = new TransactionPolicyRelease();
292         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
293         this.clusterControl = new ClusterControlImpl(this);
294         serviceLocator.registerService(ClusterControl.class, clusterControl);
295         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
296         this.clusterSetsSupport.setReadDirectory(t.toPath());
297         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
298         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
299     }
300
301     /*
302      *
303      * Session interface
304      */
305
306
307     @Override
308     public Resource getRootLibrary() {
309         return queryProvider2.getRootLibraryResource();
310     }
311
312     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
313         if (!graphSession.dbSession.refreshEnabled())
314             return;
315         try {
316             getClusterTable().refresh(csid, this, clusterUID);
317         } catch (Throwable t) {
318             Logger.defaultLogError("Refesh failed.", t);
319         }
320     }
321
322     static final class TaskHelper {
323         private final String name;
324         private Object result;
325         final Semaphore sema = new Semaphore(0);
326         private Throwable throwable = null;
327         final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
328             @Override
329             public void run(DatabaseException e) {
330                 synchronized (TaskHelper.this) {
331                     throwable = e;
332                 }
333             }
334         };
335         final Procedure<Object> proc = new Procedure<Object>() {
336             @Override
337             public void execute(Object result) {
338                 callback.run(null);
339             }
340             @Override
341             public void exception(Throwable t) {
342                 if (t instanceof DatabaseException)
343                     callback.run((DatabaseException)t);
344                 else
345                     callback.run(new DatabaseException("" + name + "operation failed.", t));
346             }
347         };
348         final WriteTraits writeTraits = new WriteTraits() {};
349         TaskHelper(String name) {
350             this.name = name;
351         }
352         <T> T getResult() {
353             return (T)result;
354         }
355         void setResult(Object result) {
356             this.result = result;
357         }
358 //        Throwable throwableGet() {
359 //            return throwable;
360 //        }
361         synchronized void throwableSet(Throwable t) {
362             throwable = t;
363         }
364 //        void throwableSet(String t) {
365 //            throwable = new InternalException("" + name + " operation failed. " + t);
366 //        }
367         synchronized void throwableCheck()
368         throws DatabaseException {
369             if (null != throwable)
370                 if (throwable instanceof DatabaseException)
371                     throw (DatabaseException)throwable;
372                 else
373                     throw new DatabaseException("Undo operation failed.", throwable);
374         }
375         void throw_(String message)
376         throws DatabaseException {
377             throw new DatabaseException("" + name + " operation failed. " + message);
378         }
379     }
380
381
382
383     private ListenerBase getListenerBase(Object procedure) {
384         if (procedure instanceof ListenerBase)
385             return (ListenerBase) procedure;
386         else
387             return null;
388     }
389
390     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
391         scheduleRequest(request, callback, notify, null);
392     }
393
394     /* (non-Javadoc)
395      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
396      */
397     @Override
398     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
399
400         assert (request != null);
401
402         if(Development.DEVELOPMENT) {
403             try {
404             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
405                 System.err.println("schedule write '" + request + "'");
406             } catch (Throwable t) {
407                 Logger.defaultLogError(t);
408             }
409         }
410
411         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
412
413         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
414
415             @Override
416             public void run(int thread) {
417
418                 if(Development.DEVELOPMENT) {
419                     try {
420                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
421                         System.err.println("perform write '" + request + "'");
422                     } catch (Throwable t) {
423                         Logger.defaultLogError(t);
424                     }
425                 }
426
427                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
428
429                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
430
431                 try {
432
433                     flushCounter = 0;
434                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
435
436                     VirtualGraph vg = getProvider(request.getProvider());
437
438                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
439                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
440
441                         @Override
442                         public void execute(Object result) {
443                             if(callback != null) callback.run(null);
444                         }
445
446                         @Override
447                         public void exception(Throwable t) {
448                             if(callback != null) callback.run((DatabaseException)t);
449                         }
450
451                     });
452
453                     assert (null != writer);
454 //                    writer.state.barrier.inc();
455
456                     try {
457                         request.perform(writer);
458                         assert (null != writer);
459                     } catch (Throwable t) {
460                         if (!(t instanceof CancelTransactionException))
461                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
462                         writeState.except(t);
463                     } finally {
464 //                        writer.state.barrier.dec();
465 //                        writer.waitAsync(request);
466                     }
467
468
469                     assert(!queryProvider2.dirty);
470
471                 } catch (Throwable e) {
472
473                     // Log it first, just to be safe that the error is always logged.
474                     if (!(e instanceof CancelTransactionException))
475                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
476
477 //                    writeState.getGraph().state.barrier.dec();
478 //                    writeState.getGraph().waitAsync(request);
479
480                     writeState.except(e);
481
482 //                    try {
483 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
484 //                        // All we can do here is to log those, can't really pass them anywhere.
485 //                        if (callback != null) {
486 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
487 //                            else callback.run(new DatabaseException(e));
488 //                        }
489 //                    } catch (Throwable e2) {
490 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
491 //                    }
492 //                    boolean empty = clusterStream.reallyFlush();
493 //                    int callerThread = -1;
494 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
495 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
496 //                    try {
497 //                        // Send all local changes to server so it can calculate correct reverse change set.
498 //                        // This will call clusterStream.accept().
499 //                        state.cancelCommit(context, clusterStream);
500 //                        if (!empty) {
501 //                            if (!context.isOk()) // this is a blocking operation
502 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
503 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
504 //                        }
505 //                        state.cancelCommit2(context, clusterStream);
506 //                    } catch (DatabaseException e3) {
507 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
508 //                    } finally {
509 //                        clusterStream.setOff(false);
510 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
511 //                    }
512
513                 } finally {
514
515                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
516
517                 }
518
519                 task.finish();
520
521                 if(Development.DEVELOPMENT) {
522                     try {
523                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
524                         System.err.println("finish write '" + request + "'");
525                     } catch (Throwable t) {
526                         Logger.defaultLogError(t);
527                     }
528                 }
529
530             }
531
532         }, combine);
533
534     }
535
536     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
537         scheduleRequest(request, procedure, notify, null);
538     }
539
540     /* (non-Javadoc)
541      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
542      */
543     @Override
544     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
545
546         assert (request != null);
547
548         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
549
550         requestManager.scheduleWrite(new SessionTask(request, thread) {
551
552             @Override
553             public void run(int thread) {
554
555                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
556
557                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
558
559                 flushCounter = 0;
560                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
561
562                 VirtualGraph vg = getProvider(request.getProvider());
563                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
564
565                 try {
566                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
567                     writeState = writeStateT;
568
569                     assert (null != writer);
570 //                    writer.state.barrier.inc();
571                     writeStateT.setResult(request.perform(writer));
572                     assert (null != writer);
573
574 //                    writer.state.barrier.dec();
575 //                    writer.waitAsync(null);
576
577                 } catch (Throwable e) {
578
579 //                    writer.state.barrier.dec();
580 //                    writer.waitAsync(null);
581
582                     writeState.except(e);
583
584 //                  state.stopWriteTransaction(clusterStream);
585 //
586 //              } catch (Throwable e) {
587 //                  // Log it first, just to be safe that the error is always logged.
588 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
589 //
590 //                  try {
591 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
592 //                      // All we can do here is to log those, can't really pass them anywhere.
593 //                      if (procedure != null) {
594 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
595 //                          else procedure.exception(new DatabaseException(e));
596 //                      }
597 //                  } catch (Throwable e2) {
598 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
599 //                  }
600 //
601 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
602 //
603 //                  state.stopWriteTransaction(clusterStream);
604
605                 } finally {
606                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
607                 }
608
609 //              if(notify != null) notify.release();
610
611                 task.finish();
612
613             }
614
615         }, combine);
616
617     }
618
619     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
620         scheduleRequest(request, callback, notify, null);
621     }
622
623     /* (non-Javadoc)
624      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
625      */
626     @Override
627     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
628
629         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
630
631         assert (request != null);
632
633         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
634
635         requestManager.scheduleWrite(new SessionTask(request, thread) {
636
637             @Override
638             public void run(int thread) {
639                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
640
641                 Procedure<Object> stateProcedure = new Procedure<Object>() {
642                     @Override
643                     public void execute(Object result) {
644                         if (callback != null)
645                             callback.run(null);
646                     }
647                     @Override
648                     public void exception(Throwable t) {
649                         if (callback != null) {
650                             if (t instanceof DatabaseException) callback.run((DatabaseException) t);
651                             else callback.run(new DatabaseException(t));
652                         } else
653                             Logger.defaultLogError("Unhandled exception", t);
654                     }
655                 };
656
657                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
658                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
659                 DelayedWriteGraph dwg = null;
660 //                newGraph.state.barrier.inc();
661
662                 try {
663                     dwg = new DelayedWriteGraph(newGraph);
664                     request.perform(dwg);
665                 } catch (Throwable e) {
666                     delayedWriteState.except(e);
667                     total.finish();
668                     return;
669                 } finally {
670 //                    newGraph.state.barrier.dec();
671 //                    newGraph.waitAsync(request);
672                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
673                 }
674
675                 delayedWriteState = null;
676
677                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
678                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
679
680                 flushCounter = 0;
681                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
682
683                 acquireWriteOnly();
684
685                 VirtualGraph vg = getProvider(request.getProvider());
686                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
687
688                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
689
690                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
691
692                 assert (null != writer);
693 //                writer.state.barrier.inc();
694
695                 try {
696                     // Cannot cancel
697                     dwg.commit(writer, request);
698
699                         if(defaultClusterSet != null) {
700                                 XSupport xs = getService(XSupport.class);
701                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
702                                 ClusteringSupport cs = getService(ClusteringSupport.class);
703                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
704                         }
705
706                     // This makes clusters available from server
707                     clusterStream.reallyFlush();
708
709                     releaseWriteOnly(writer);
710                     handleUpdatesAndMetadata(writer);
711
712                 } catch (ServiceException e) {
713 //                    writer.state.barrier.dec();
714 //                    writer.waitAsync(null);
715
716                     // These shall be requested from server
717                     clusterTable.removeWriteOnlyClusters();
718                     // This makes clusters available from server
719                     clusterStream.reallyFlush();
720
721                     releaseWriteOnly(writer);
722                     writeState.except(e);
723                 } finally {
724                     // Debugging & Profiling
725                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
726                     task2.finish();
727                     total.finish();
728                 }
729             }
730
731         }, combine);
732
733     }
734
735     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
736         scheduleRequest(request, procedure, notify, null);
737     }
738
739     /* (non-Javadoc)
740      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
741      */
742     @Override
743     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
744         throw new Error("Not implemented");
745     }
746
747     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
748         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
749         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
750             createdClusters.add(cluster.clusterId);
751         }
752         return cluster;
753     }
754
755     class WriteOnlySupport implements WriteSupport {
756
757         ClusterStream stream;
758         ClusterImpl currentCluster;
759
760         public WriteOnlySupport() {
761             this.stream = clusterStream;
762         }
763
764         @Override
765         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
766             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
767         }
768
769         @Override
770         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
771
772             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
773
774             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
775                 if(s != queryProvider2.getRootLibrary())
776                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
777
778             try {
779                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
780             } catch (DatabaseException e) {
781                 Logger.defaultLogError(e);
782                 //throw e;
783             }
784
785             clientChanges.invalidate(s);
786
787             if (cluster.isWriteOnly())
788                 return;
789             queryProvider2.updateStatements(s, p);
790
791         }
792
793         @Override
794         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
795             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
796         }
797
798         @Override
799         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
800
801             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
802             try {
803                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
804             } catch (DatabaseException e) {
805                 Logger.defaultLogError(e);
806             }
807
808             clientChanges.invalidate(rid);
809
810             if (cluster.isWriteOnly())
811                 return;
812             queryProvider2.updateValue(rid);
813
814         }
815
816         @Override
817         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
818
819             if(amount < 65536) {
820                 claimValue(provider,resource, reader.readBytes(null, amount));
821                 return;
822             }
823
824             byte[] bytes = new byte[65536];
825
826             int rid = ((ResourceImpl)resource).id;
827             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
828             try {
829                 int left = amount;
830                 while(left > 0) {
831                     int block = Math.min(left, 65536);
832                     reader.readBytes(bytes, block);
833                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
834                     left -= block;
835                 }
836             } catch (DatabaseException e) {
837                 Logger.defaultLogError(e);
838             }
839
840             clientChanges.invalidate(rid);
841
842             if (cluster.isWriteOnly())
843                 return;
844             queryProvider2.updateValue(rid);
845
846         }
847
848         private void maintainCluster(ClusterImpl before, ClusterI after_) {
849             if(after_ != null && after_ != before) {
850                 ClusterImpl after = (ClusterImpl)after_;
851                 if(currentCluster == before) currentCluster = after;
852                 clusterTable.replaceCluster(after);
853             }
854         }
855
856         public int createResourceKey(int foreignCounter) throws DatabaseException {
857             if(currentCluster == null)
858                 currentCluster = getNewResourceCluster();
859             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
860                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
861                 newCluster.foreignLookup = new byte[foreignCounter];
862                 currentCluster = newCluster;
863                 if (DEBUG)
864                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
865             }
866             return currentCluster.createResource(clusterTranslator);
867         }
868
869         @Override
870         public Resource createResource(VirtualGraph provider) throws DatabaseException {
871             if(currentCluster == null) {
872                 if (null != defaultClusterSet) {
873                         ResourceImpl result = getNewResource(defaultClusterSet);
874                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
875                     return result;
876                 } else {
877                     currentCluster = getNewResourceCluster();
878                 }
879             }
880             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
881                 if (null != defaultClusterSet) {
882                         ResourceImpl result = getNewResource(defaultClusterSet);
883                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
884                     return result;
885                 } else {
886                     currentCluster = getNewResourceCluster();
887                 }
888             }
889             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
890         }
891
892         @Override
893         public Resource createResource(VirtualGraph provider, long clusterId)
894         throws DatabaseException {
895             return getNewResource(clusterId);
896         }
897
898         @Override
899         public Resource createResource(VirtualGraph provider, Resource clusterSet)
900         throws DatabaseException {
901             return getNewResource(clusterSet);
902         }
903
904         @Override
905         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
906                 throws DatabaseException {
907             getNewClusterSet(clusterSet);
908         }
909
910         @Override
911         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
912         throws ServiceException {
913             return containsClusterSet(clusterSet);
914         }
915
916         public void selectCluster(long cluster) {
917                 currentCluster = clusterTable.getClusterByClusterId(cluster);
918                 long setResourceId = clusterSetsSupport.getSet(cluster);
919                 clusterSetsSupport.put(setResourceId, cluster);
920         }
921
922         @Override
923         public Resource setDefaultClusterSet(Resource clusterSet)
924         throws ServiceException {
925                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
926                 if(clusterSet != null) {
927                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
928                         currentCluster = clusterTable.getClusterByClusterId(id);
929                         return result;
930                 } else {
931                         currentCluster = null;
932                         return null;
933                 }
934         }
935
936         @Override
937         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
938
939                  provider = getProvider(provider);
940              if (null == provider) {
941                  int key = ((ResourceImpl)resource).id;
942
943
944
945                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
946 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
947 //                      if(key != queryProvider2.getRootLibrary())
948 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
949
950 //                 try {
951 //                     cluster.removeValue(key, clusterTranslator);
952 //                 } catch (DatabaseException e) {
953 //                     Logger.defaultLogError(e);
954 //                     return;
955 //                 }
956
957                  clusterTable.writeOnlyInvalidate(cluster);
958
959                  try {
960                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
961                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
962                          clusterTranslator.removeValue(cluster);
963                  } catch (DatabaseException e) {
964                          Logger.defaultLogError(e);
965                  }
966
967                  queryProvider2.invalidateResource(key);
968                  clientChanges.invalidate(key);
969
970              } else {
971                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
972                  queryProvider2.updateValue(querySupport.getId(resource));
973                  clientChanges.claimValue(resource);
974              }
975
976
977         }
978
979         @Override
980         public void flush(boolean intermediate) {
981             throw new UnsupportedOperationException();
982         }
983
984         @Override
985         public void flushCluster() {
986             clusterTable.flushCluster(graphSession);
987             if(defaultClusterSet != null) {
988                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
989             }
990             currentCluster = null;
991         }
992
993         @Override
994         public void flushCluster(Resource r) {
995             throw new UnsupportedOperationException("flushCluster resource " + r);
996         }
997
998         @Override
999         public void gc() {
1000         }
1001
1002         @Override
1003         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1004                 Resource object) {
1005
1006                 int s = ((ResourceImpl)subject).id;
1007                 int p = ((ResourceImpl)predicate).id;
1008                 int o = ((ResourceImpl)object).id;
1009
1010                 provider = getProvider(provider);
1011                 if (null == provider) {
1012
1013                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1014                         clusterTable.writeOnlyInvalidate(cluster);
1015
1016                         try {
1017
1018                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1019                                 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1020                                 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1021
1022                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1023                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1024
1025                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1026                                 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1027                                 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1028                                 clusterTranslator.removeStatement(cluster);
1029
1030                                 queryProvider2.invalidateResource(s);
1031                                 clientChanges.invalidate(s);
1032
1033                         } catch (DatabaseException e) {
1034
1035                                 Logger.defaultLogError(e);
1036
1037                         }
1038
1039                         return true;
1040
1041                 } else {
1042                         
1043                         ((VirtualGraphImpl)provider).deny(s, p, o);
1044                         queryProvider2.invalidateResource(s);
1045                         clientChanges.invalidate(s);
1046
1047                         return true;
1048
1049                 }
1050
1051         }
1052
1053         @Override
1054         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1055             throw new UnsupportedOperationException();
1056         }
1057
1058         @Override
1059         public boolean writeOnly() {
1060             return true;
1061         }
1062
1063         @Override
1064         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1065             writeSupport.performWriteRequest(graph, request);
1066         }
1067
1068         @Override
1069         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1070             throw new UnsupportedOperationException();
1071         }
1072
1073         @Override
1074         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1075             throw new UnsupportedOperationException();
1076         }
1077
1078         @Override
1079         public <T> void addMetadata(Metadata data) throws ServiceException {
1080             writeSupport.addMetadata(data);
1081         }
1082
1083         @Override
1084         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1085             return writeSupport.getMetadata(clazz);
1086         }
1087
1088         @Override
1089         public TreeMap<String, byte[]> getMetadata() {
1090             return writeSupport.getMetadata();
1091         }
1092
1093         @Override
1094         public void commitDone(WriteTraits writeTraits, long csid) {
1095             writeSupport.commitDone(writeTraits, csid);
1096         }
1097
1098         @Override
1099         public void clearUndoList(WriteTraits writeTraits) {
1100             writeSupport.clearUndoList(writeTraits);
1101         }
1102         @Override
1103         public int clearMetadata() {
1104             return writeSupport.clearMetadata();
1105         }
1106
1107                 @Override
1108                 public void startUndo() {
1109                         writeSupport.startUndo();
1110                 }
1111     }
1112
1113     class VirtualWriteOnlySupport implements WriteSupport {
1114
1115 //        @Override
1116 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1117 //                Resource object) {
1118 //            throw new UnsupportedOperationException();
1119 //        }
1120
1121         @Override
1122         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1123
1124             TransientGraph impl = (TransientGraph)provider;
1125             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1126             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1127             clientChanges.claim(subject, predicate, object);
1128
1129         }
1130
1131         @Override
1132         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1133
1134             TransientGraph impl = (TransientGraph)provider;
1135             impl.claim(subject, predicate, object);
1136             getQueryProvider2().updateStatements(subject, predicate);
1137             clientChanges.claim(subject, predicate, object);
1138
1139         }
1140
1141         @Override
1142         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1143             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1144         }
1145
1146         @Override
1147         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1148             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1149             getQueryProvider2().updateValue(resource);
1150             clientChanges.claimValue(resource);
1151         }
1152
1153         @Override
1154         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1155             byte[] value = reader.readBytes(null, amount);
1156             claimValue(provider, resource, value);
1157         }
1158
1159         @Override
1160         public Resource createResource(VirtualGraph provider) {
1161             TransientGraph impl = (TransientGraph)provider;
1162             return impl.getResource(impl.newResource(false));
1163         }
1164
1165         @Override
1166         public Resource createResource(VirtualGraph provider, long clusterId) {
1167             throw new UnsupportedOperationException();
1168         }
1169
1170         @Override
1171         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1172         throws DatabaseException {
1173             throw new UnsupportedOperationException();
1174         }
1175
1176         @Override
1177         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1178         throws DatabaseException {
1179             throw new UnsupportedOperationException();
1180         }
1181
1182         @Override
1183         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1184         throws ServiceException {
1185             throw new UnsupportedOperationException();
1186         }
1187
1188         @Override
1189         public Resource setDefaultClusterSet(Resource clusterSet)
1190         throws ServiceException {
1191                 return null;
1192         }
1193
1194         @Override
1195         public void denyValue(VirtualGraph provider, Resource resource) {
1196             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1197             getQueryProvider2().updateValue(querySupport.getId(resource));
1198             // NOTE: this only keeps track of value changes by-resource.
1199             clientChanges.claimValue(resource);
1200         }
1201
1202         @Override
1203         public void flush(boolean intermediate) {
1204             throw new UnsupportedOperationException();
1205         }
1206
1207         @Override
1208         public void flushCluster() {
1209             throw new UnsupportedOperationException();
1210         }
1211
1212         @Override
1213         public void flushCluster(Resource r) {
1214             throw new UnsupportedOperationException("Resource " + r);
1215         }
1216
1217         @Override
1218         public void gc() {
1219         }
1220
1221         @Override
1222         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1223             TransientGraph impl = (TransientGraph) provider;
1224             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1225             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1226             clientChanges.deny(subject, predicate, object);
1227             return true;
1228         }
1229
1230         @Override
1231         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1232             throw new UnsupportedOperationException();
1233         }
1234
1235         @Override
1236         public boolean writeOnly() {
1237             return true;
1238         }
1239
1240         @Override
1241         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1242             throw new UnsupportedOperationException();
1243         }
1244
1245         @Override
1246         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1247             throw new UnsupportedOperationException();
1248         }
1249
1250         @Override
1251         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1252             throw new UnsupportedOperationException();
1253         }
1254
1255         @Override
1256         public <T> void addMetadata(Metadata data) throws ServiceException {
1257             throw new UnsupportedOperationException();
1258         }
1259
1260         @Override
1261         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1262             throw new UnsupportedOperationException();
1263         }
1264
1265         @Override
1266         public TreeMap<String, byte[]> getMetadata() {
1267             throw new UnsupportedOperationException();
1268         }
1269
1270         @Override
1271         public void commitDone(WriteTraits writeTraits, long csid) {
1272         }
1273
1274         @Override
1275         public void clearUndoList(WriteTraits writeTraits) {
1276         }
1277
1278         @Override
1279         public int clearMetadata() {
1280             return 0;
1281         }
1282
1283                 @Override
1284                 public void startUndo() {
1285                 }
1286     }
1287
1288     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1289
1290         try {
1291
1292             int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1293
1294             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1295
1296             flushCounter = 0;
1297             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1298
1299             acquireWriteOnly();
1300
1301             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1302
1303             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1304
1305             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1306             writeState = writeStateT;
1307
1308             assert (null != writer);
1309 //            writer.state.barrier.inc();
1310             long start = System.nanoTime();
1311             T result = request.perform(writer);
1312             long duration = System.nanoTime() - start;
1313             if (DEBUG) {
1314                 System.err.println("################");
1315                 System.err.println("WriteOnly duration " + 1e-9*duration);
1316             }
1317             writeStateT.setResult(result);
1318
1319             // This makes clusters available from server
1320             clusterStream.reallyFlush();
1321
1322             // This will trigger query updates
1323             releaseWriteOnly(writer);
1324             assert (null != writer);
1325
1326             handleUpdatesAndMetadata(writer);
1327
1328         } catch (CancelTransactionException e) {
1329
1330             releaseWriteOnly(writeState.getGraph());
1331
1332             clusterTable.removeWriteOnlyClusters();
1333             state.stopWriteTransaction(clusterStream);
1334
1335         } catch (Throwable e) {
1336
1337             e.printStackTrace();
1338
1339             releaseWriteOnly(writeState.getGraph());
1340
1341             clusterTable.removeWriteOnlyClusters();
1342
1343             if (callback != null)
1344                 callback.exception(new DatabaseException(e));
1345
1346             state.stopWriteTransaction(clusterStream);
1347             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1348
1349         } finally  {
1350
1351             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1352
1353         }
1354
1355     }
1356
1357     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
1358         scheduleRequest(request, callback, notify, null);
1359     }
1360
1361     @Override
1362     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1363
1364         assertAlive();
1365
1366         assert (request != null);
1367
1368         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1369
1370         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1371
1372             @Override
1373             public void run(int thread) {
1374
1375                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1376
1377                     try {
1378
1379                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1380
1381                         flushCounter = 0;
1382                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1383
1384                         acquireWriteOnly();
1385
1386                         VirtualGraph vg = getProvider(request.getProvider());
1387                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1388
1389                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1390
1391                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1392
1393                             @Override
1394                             public void execute(Object result) {
1395                                 if(callback != null) callback.run(null);
1396                             }
1397
1398                             @Override
1399                             public void exception(Throwable t) {
1400                                 if(callback != null) callback.run((DatabaseException)t);
1401                             }
1402
1403                         });
1404
1405                         assert (null != writer);
1406 //                        writer.state.barrier.inc();
1407
1408                         try {
1409
1410                             request.perform(writer);
1411
1412                         } catch (Throwable e) {
1413
1414 //                            writer.state.barrier.dec();
1415 //                            writer.waitAsync(null);
1416
1417                             releaseWriteOnly(writer);
1418
1419                             clusterTable.removeWriteOnlyClusters();
1420
1421                             if(!(e instanceof CancelTransactionException)) {
1422                                 if (callback != null)
1423                                     callback.run(new DatabaseException(e));
1424                             }
1425
1426                             writeState.except(e);
1427
1428                             return;
1429
1430                         }
1431
1432                         // This makes clusters available from server
1433                         boolean empty = clusterStream.reallyFlush();
1434                         // This was needed to make WO requests call metadata listeners.
1435                         // NOTE: the calling event does not contain clientChanges information.
1436                         if (!empty && clientChanges.isEmpty())
1437                             clientChanges.setNotEmpty(true);
1438                         releaseWriteOnly(writer);
1439                         assert (null != writer);
1440                     } finally  {
1441
1442                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1443
1444                     }
1445
1446
1447                 task.finish();
1448
1449             }
1450
1451         }, combine);
1452
1453     }
1454
1455     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1456         scheduleRequest(request, callback, notify, null);
1457     }
1458
1459     /* (non-Javadoc)
1460      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1461      */
1462     @Override
1463     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1464
1465         assert (request != null);
1466
1467         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1468
1469         requestManager.scheduleWrite(new SessionTask(request, thread) {
1470
1471             @Override
1472             public void run(int thread) {
1473
1474                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1475
1476                 performWriteOnly(request, notify, callback);
1477
1478                 task.finish();
1479
1480             }
1481
1482         }, combine);
1483
1484     }
1485
1486     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1487
1488         assert (request != null);
1489         assert (procedure != null);
1490
1491         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1492
1493         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1494
1495             @Override
1496             public void run(int thread) {
1497
1498                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1499
1500                 ListenerBase listener = getListenerBase(procedure);
1501
1502                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1503
1504                 try {
1505
1506                     if (listener != null) {
1507
1508                         try {
1509                                 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1510
1511                                         @Override
1512                                         public void exception(AsyncReadGraph graph, Throwable t) {
1513                                                 procedure.exception(graph, t);
1514                                                 if(throwable != null) {
1515                                                         throwable.set(t);
1516                                                 } else {
1517                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1518                                                 }
1519                                         }
1520
1521                                         @Override
1522                                         public void execute(AsyncReadGraph graph, T t) {
1523                                                 if(result != null) result.set(t);
1524                                                 procedure.execute(graph, t);
1525                                         }
1526
1527                                 }, listener);
1528                         } catch (Throwable t) {
1529                             // This is handled by the AsyncProcedure
1530                                 //Logger.defaultLogError("Internal error", t);
1531                         }
1532
1533                     } else {
1534
1535                         try {
1536
1537 //                            newGraph.state.barrier.inc();
1538
1539                             T t = request.perform(newGraph);
1540
1541                             try {
1542
1543                                 if(result != null) result.set(t);
1544                                 procedure.execute(newGraph, t);
1545
1546                             } catch (Throwable th) {
1547
1548                                 if(throwable != null) {
1549                                     throwable.set(th);
1550                                 } else {
1551                                     Logger.defaultLogError("Unhandled exception", th);
1552                                 }
1553
1554                             }
1555
1556                         } catch (Throwable t) {
1557
1558                             if (DEBUG)
1559                                 t.printStackTrace();
1560
1561                             if(throwable != null) {
1562                                 throwable.set(t);
1563                             } else {
1564                                 Logger.defaultLogError("Unhandled exception", t);
1565                             }
1566
1567                             try {
1568
1569                                 procedure.exception(newGraph, t);
1570
1571                             } catch (Throwable t2) {
1572
1573                                 if(throwable != null) {
1574                                     throwable.set(t2);
1575                                 } else {
1576                                     Logger.defaultLogError("Unhandled exception", t2);
1577                                 }
1578
1579                             }
1580
1581                         }
1582
1583 //                        newGraph.state.barrier.dec();
1584 //                        newGraph.waitAsync(request);
1585
1586                     }
1587
1588                 } finally {
1589
1590                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1591
1592                 }
1593
1594             }
1595
1596         });
1597
1598     }
1599
1600     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1601
1602         assert (request != null);
1603         assert (procedure != null);
1604
1605         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1606
1607         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1608
1609             @Override
1610             public void run(int thread) {
1611
1612                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1613
1614                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1615
1616                 try {
1617
1618                     if (listener != null) {
1619
1620                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1621
1622 //                        newGraph.waitAsync(request);
1623
1624                     } else {
1625
1626                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1627                                 procedure, "request");
1628
1629                         try {
1630
1631 //                            newGraph.state.barrier.inc();
1632
1633                             request.perform(newGraph, wrapper);
1634
1635 //                            newGraph.waitAsync(request);
1636
1637                         } catch (Throwable t) {
1638
1639                             wrapper.exception(newGraph, t);
1640 //                            newGraph.waitAsync(request);
1641
1642
1643                         }
1644
1645                     }
1646
1647                 } finally {
1648
1649                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1650
1651                 }
1652
1653             }
1654
1655         });
1656
1657     }
1658
1659     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1660
1661         assert (request != null);
1662         assert (procedure != null);
1663
1664         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1665
1666         int sync = notify != null ? thread : -1;
1667
1668         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1669
1670             @Override
1671             public void run(int thread) {
1672
1673                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1674
1675                 ListenerBase listener = getListenerBase(procedure);
1676
1677                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1678
1679                 try {
1680
1681                     if (listener != null) {
1682
1683                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1684
1685 //                        newGraph.waitAsync(request);
1686
1687                     } else {
1688
1689                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1690
1691                         try {
1692
1693                             request.perform(newGraph, wrapper);
1694
1695                         } catch (Throwable t) {
1696
1697                             t.printStackTrace();
1698
1699                         }
1700
1701                     }
1702
1703                 } finally {
1704
1705                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1706
1707                 }
1708
1709             }
1710
1711         });
1712
1713     }
1714
1715     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1716
1717         assert (request != null);
1718         assert (procedure != null);
1719
1720         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1721
1722         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1723
1724             @Override
1725             public void run(int thread) {
1726
1727                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1728
1729                 ListenerBase listener = getListenerBase(procedure);
1730
1731                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1732
1733                 try {
1734
1735                     if (listener != null) {
1736
1737                         newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1738
1739                             @Override
1740                             public void exception(Throwable t) {
1741                                 procedure.exception(t);
1742                                 if(throwable != null) {
1743                                     throwable.set(t);
1744                                 }
1745                             }
1746
1747                             @Override
1748                             public void execute(T t) {
1749                                 if(result != null) result.set(t);
1750                                 procedure.execute(t);
1751                             }
1752
1753                         }, listener);
1754
1755 //                        newGraph.waitAsync(request);
1756
1757                     } else {
1758
1759 //                        newGraph.state.barrier.inc();
1760
1761                         request.register(newGraph, new Listener<T>() {
1762
1763                             @Override
1764                             public void exception(Throwable t) {
1765                                 if(throwable != null) throwable.set(t);
1766                                 procedure.exception(t);
1767 //                                newGraph.state.barrier.dec();
1768                             }
1769
1770                             @Override
1771                             public void execute(T t) {
1772                                 if(result != null) result.set(t);
1773                                 procedure.execute(t);
1774 //                                newGraph.state.barrier.dec();
1775                             }
1776
1777                             @Override
1778                             public boolean isDisposed() {
1779                                 return true;
1780                             }
1781
1782                         });
1783
1784 //                        newGraph.waitAsync(request);
1785
1786                     }
1787
1788                 } finally {
1789
1790                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1791
1792                 }
1793
1794             }
1795
1796         });
1797
1798     }
1799
1800
1801     @Override
1802     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1803
1804         scheduleRequest(request, procedure, null, null, null);
1805
1806     }
1807
1808     @Override
1809     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1810         assertNotSession();
1811         Semaphore notify = new Semaphore(0);
1812         DataContainer<Throwable> container = new DataContainer<Throwable>();
1813         DataContainer<T> result = new DataContainer<T>();
1814         scheduleRequest(request, procedure, notify, container, result);
1815         acquire(notify, request);
1816         Throwable throwable = container.get();
1817         if(throwable != null) {
1818             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1819             else throw new DatabaseException("Unexpected exception", throwable);
1820         }
1821         return result.get();
1822     }
1823
1824     @Override
1825     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1826         assertNotSession();
1827         Semaphore notify = new Semaphore(0);
1828         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1829         final DataContainer<T> resultContainer = new DataContainer<T>();
1830         scheduleRequest(request, new AsyncProcedure<T>() {
1831             @Override
1832             public void exception(AsyncReadGraph graph, Throwable throwable) {
1833                 exceptionContainer.set(throwable);
1834                 procedure.exception(graph, throwable);
1835             }
1836             @Override
1837             public void execute(AsyncReadGraph graph, T result) {
1838                 resultContainer.set(result);
1839                 procedure.execute(graph, result);
1840             }
1841         }, getListenerBase(procedure), notify);
1842         acquire(notify, request, procedure);
1843         Throwable throwable = exceptionContainer.get();
1844         if (throwable != null) {
1845             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1846             else throw new DatabaseException("Unexpected exception", throwable);
1847         }
1848         return resultContainer.get();
1849     }
1850
1851
1852     @Override
1853     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1854         assertNotSession();
1855         Semaphore notify = new Semaphore(0);
1856         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1857         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1858             @Override
1859             public void exception(AsyncReadGraph graph, Throwable throwable) {
1860                 exceptionContainer.set(throwable);
1861                 procedure.exception(graph, throwable);
1862             }
1863             @Override
1864             public void execute(AsyncReadGraph graph, T result) {
1865                 procedure.execute(graph, result);
1866             }
1867             @Override
1868             public void finished(AsyncReadGraph graph) {
1869                 procedure.finished(graph);
1870             }
1871         }, notify);
1872         acquire(notify, request, procedure);
1873         Throwable throwable = exceptionContainer.get();
1874         if (throwable != null) {
1875             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1876             else throw new DatabaseException("Unexpected exception", throwable);
1877         }
1878         // TODO: implement return value
1879         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1880         return null;
1881     }
1882
1883     @Override
1884     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1885         return syncRequest(request, new ProcedureAdapter<T>());
1886
1887
1888 //        assert(request != null);
1889 //
1890 //        final DataContainer<T> result = new DataContainer<T>();
1891 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1892 //
1893 //        syncRequest(request, new Procedure<T>() {
1894 //
1895 //            @Override
1896 //            public void execute(T t) {
1897 //                result.set(t);
1898 //            }
1899 //
1900 //            @Override
1901 //            public void exception(Throwable t) {
1902 //                exception.set(t);
1903 //            }
1904 //
1905 //        });
1906 //
1907 //        Throwable t = exception.get();
1908 //        if(t != null) {
1909 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1910 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1911 //        }
1912 //
1913 //        return result.get();
1914
1915     }
1916
1917     @Override
1918     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1919         return syncRequest(request, (Procedure<T>)procedure);
1920     }
1921
1922
1923 //    @Override
1924 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1925 //        assertNotSession();
1926 //        Semaphore notify = new Semaphore(0);
1927 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1928 //        DataContainer<T> result = new DataContainer<T>();
1929 //        scheduleRequest(request, procedure, notify, container, result);
1930 //        acquire(notify, request);
1931 //        Throwable throwable = container.get();
1932 //        if(throwable != null) {
1933 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1934 //            else throw new DatabaseException("Unexpected exception", throwable);
1935 //        }
1936 //        return result.get();
1937 //    }
1938
1939
1940     @Override
1941     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1942         assertNotSession();
1943         Semaphore notify = new Semaphore(0);
1944         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1945         final DataContainer<T> result = new DataContainer<T>();
1946         scheduleRequest(request, procedure, notify, container, result);
1947         acquire(notify, request);
1948         Throwable throwable = container.get();
1949         if (throwable != null) {
1950             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1951             else throw new DatabaseException("Unexpected exception", throwable);
1952         }
1953         return result.get();
1954     }
1955
1956     @Override
1957     public void syncRequest(Write request) throws DatabaseException  {
1958         assertNotSession();
1959         assertAlive();
1960         Semaphore notify = new Semaphore(0);
1961         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1962         scheduleRequest(request, new Callback<DatabaseException>() {
1963
1964             @Override
1965             public void run(DatabaseException e) {
1966               exception.set(e);
1967             }
1968
1969         }, notify);
1970         acquire(notify, request);
1971         if(exception.get() != null) throw exception.get();
1972     }
1973
1974     @Override
1975     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1976         assertNotSession();
1977         Semaphore notify = new Semaphore(0);
1978         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1979         final DataContainer<T> result = new DataContainer<T>();
1980         scheduleRequest(request, new Procedure<T>() {
1981
1982             @Override
1983             public void exception(Throwable t) {
1984               exception.set(t);
1985             }
1986
1987             @Override
1988             public void execute(T t) {
1989               result.set(t);
1990             }
1991
1992         }, notify);
1993         acquire(notify, request);
1994         if(exception.get() != null) {
1995             Throwable t = exception.get();
1996             if(t instanceof DatabaseException) throw (DatabaseException)t;
1997             else throw new DatabaseException(t);
1998         }
1999         return result.get();
2000     }
2001
2002     @Override
2003     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2004         assertNotSession();
2005         Semaphore notify = new Semaphore(0);
2006         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2007         final DataContainer<T> result = new DataContainer<T>();
2008         scheduleRequest(request, new Procedure<T>() {
2009
2010             @Override
2011             public void exception(Throwable t) {
2012               exception.set(t);
2013             }
2014
2015             @Override
2016             public void execute(T t) {
2017               result.set(t);
2018             }
2019
2020         }, notify);
2021         acquire(notify, request);
2022         if(exception.get() != null) {
2023             Throwable t = exception.get();
2024             if(t instanceof DatabaseException) throw (DatabaseException)t;
2025             else throw new DatabaseException(t);
2026         }
2027         return result.get();
2028     }
2029
2030     @Override
2031     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2032         assertNotSession();
2033         Semaphore notify = new Semaphore(0);
2034         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2035         final DataContainer<T> result = new DataContainer<T>();
2036         scheduleRequest(request, new Procedure<T>() {
2037
2038             @Override
2039             public void exception(Throwable t) {
2040               exception.set(t);
2041             }
2042
2043             @Override
2044             public void execute(T t) {
2045               result.set(t);
2046             }
2047
2048         }, notify);
2049         acquire(notify, request);
2050         if(exception.get() != null) {
2051             Throwable t = exception.get();
2052             if(t instanceof DatabaseException) throw (DatabaseException)t;
2053             else throw new DatabaseException(t);
2054         }
2055         return result.get();
2056     }
2057
2058     @Override
2059     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2060         assertNotSession();
2061         Semaphore notify = new Semaphore(0);
2062         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2063         scheduleRequest(request, new Callback<DatabaseException>() {
2064             @Override
2065             public void run(DatabaseException e) {
2066                 exception.set(e);
2067             }
2068         }, notify);
2069         acquire(notify, request);
2070         if(exception.get() != null) throw exception.get();
2071     }
2072
2073     @Override
2074     public void syncRequest(WriteOnly request) throws DatabaseException  {
2075         assertNotSession();
2076         assertAlive();
2077         Semaphore notify = new Semaphore(0);
2078         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2079         scheduleRequest(request, new Callback<DatabaseException>() {
2080             @Override
2081             public void run(DatabaseException e) {
2082                 exception.set(e);
2083             }
2084         }, notify);
2085         acquire(notify, request);
2086         if(exception.get() != null) throw exception.get();
2087     }
2088
2089     /*
2090      *
2091      * GraphRequestProcessor interface
2092      */
2093
2094     @Override
2095     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2096
2097         scheduleRequest(request, procedure, null, null);
2098
2099     }
2100
2101     @Override
2102     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2103
2104         scheduleRequest(request, procedure, null);
2105
2106     }
2107
2108     @Override
2109     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2110
2111         scheduleRequest(request, procedure, null, null, null);
2112
2113     }
2114
2115     @Override
2116     public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
2117
2118         scheduleRequest(request, callback, null);
2119
2120     }
2121
2122     @Override
2123     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2124
2125         scheduleRequest(request, procedure, null);
2126
2127     }
2128
2129     @Override
2130     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2131
2132         scheduleRequest(request, procedure, null);
2133
2134     }
2135
2136     @Override
2137     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2138
2139         scheduleRequest(request, procedure, null);
2140
2141     }
2142
2143     @Override
2144     public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
2145
2146         scheduleRequest(request, callback, null);
2147
2148     }
2149
2150     @Override
2151     public void asyncRequest(final Write r) {
2152         asyncRequest(r, null);
2153     }
2154
2155     @Override
2156     public void asyncRequest(final DelayedWrite r) {
2157         asyncRequest(r, null);
2158     }
2159
2160     @Override
2161     public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
2162
2163         scheduleRequest(request, callback, null);
2164
2165     }
2166
2167     @Override
2168     public void asyncRequest(final WriteOnly request) {
2169
2170         asyncRequest(request, null);
2171
2172     }
2173
2174     @Override
2175     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2176         r.request(this, procedure);
2177     }
2178
2179     @Override
2180     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2181         r.request(this, procedure);
2182     }
2183
2184     @Override
2185     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2186         r.request(this, procedure);
2187     }
2188
2189     @Override
2190     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2191         r.request(this, procedure);
2192     }
2193
2194     @Override
2195     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2196         r.request(this, procedure);
2197     }
2198
2199     @Override
2200     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2201         r.request(this, procedure);
2202     }
2203
2204     @Override
2205     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2206         return r.request(this);
2207     }
2208
2209     @Override
2210     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2211         return r.request(this);
2212     }
2213
2214     @Override
2215     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2216         r.request(this, procedure);
2217     }
2218
2219     @Override
2220     public <T> void async(WriteInterface<T> r) {
2221         r.request(this, new ProcedureAdapter<T>());
2222     }
2223
2224     //@Override
2225     public void incAsync() {
2226         state.incAsync();
2227     }
2228
2229     //@Override
2230     public void decAsync() {
2231         state.decAsync();
2232     }
2233
2234     public long getCluster(ResourceImpl resource) {
2235         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2236         return cluster.getClusterId();
2237     }
2238
2239     public long getCluster(int id) {
2240         if (clusterTable == null)
2241             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2242         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2243     }
2244
2245     public ResourceImpl getResource(int id) {
2246         return new ResourceImpl(resourceSupport, id);
2247     }
2248
2249     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2250         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2251         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2252         int key = proxy.getClusterKey();
2253         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2254         return new ResourceImpl(resourceSupport, resourceKey);
2255     }
2256
2257     public ResourceImpl getResource2(int id) {
2258         assert (id != 0);
2259         return new ResourceImpl(resourceSupport, id);
2260     }
2261
2262     final public int getId(ResourceImpl impl) {
2263         return impl.id;
2264     }
2265
2266     public static final Charset UTF8 = Charset.forName("utf-8");
2267
2268
2269
2270
2271     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2272         for(TransientGraph g : support.providers) {
2273             if(g.isPending(subject)) return false;
2274         }
2275         return true;
2276     }
2277
2278     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2279         for(TransientGraph g : support.providers) {
2280             if(g.isPending(subject, predicate)) return false;
2281         }
2282         return true;
2283     }
2284
2285     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
2286
2287         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2288
2289             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2290
2291             @Override
2292             public void run(ReadGraphImpl graph) {
2293                 if(ready.decrementAndGet() == 0) {
2294                     runnable.run(graph);
2295                 }
2296             }
2297
2298         };
2299
2300         for(TransientGraph g : support.providers) {
2301             if(g.isPending(subject)) {
2302                 try {
2303                     g.load(graph, subject, composite);
2304                 } catch (DatabaseException e) {
2305                     e.printStackTrace();
2306                 }
2307             } else {
2308                 composite.run(graph);
2309             }
2310         }
2311
2312         composite.run(graph);
2313
2314     }
2315
2316     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
2317
2318         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2319
2320             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2321
2322             @Override
2323             public void run(ReadGraphImpl graph) {
2324                 if(ready.decrementAndGet() == 0) {
2325                     runnable.run(graph);
2326                 }
2327             }
2328
2329         };
2330
2331         for(TransientGraph g : support.providers) {
2332             if(g.isPending(subject, predicate)) {
2333                 try {
2334                     g.load(graph, subject, predicate, composite);
2335                 } catch (DatabaseException e) {
2336                     e.printStackTrace();
2337                 }
2338             } else {
2339                 composite.run(graph);
2340             }
2341         }
2342
2343         composite.run(graph);
2344
2345     }
2346
2347 //    void dumpHeap() {
2348 //
2349 //        try {
2350 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2351 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2352 //                    "d:/heap" + flushCounter + ".txt", true);
2353 //        } catch (IOException e) {
2354 //            e.printStackTrace();
2355 //        }
2356 //
2357 //    }
2358
2359     void fireReactionsToSynchronize(ChangeSet cs) {
2360
2361         // Do not fire empty events
2362         if (cs.isEmpty())
2363             return;
2364
2365         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2366 //        g.state.barrier.inc();
2367
2368         try {
2369
2370             if (!cs.isEmpty()) {
2371                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2372                 for (ChangeListener l : changeListeners2) {
2373                     try {
2374                         l.graphChanged(e2);
2375                     } catch (Exception ex) {
2376                         ex.printStackTrace();
2377                     }
2378                 }
2379             }
2380
2381         } finally {
2382
2383 //            g.state.barrier.dec();
2384 //            g.waitAsync(null);
2385
2386         }
2387
2388     }
2389
2390     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2391         try {
2392
2393             // Do not fire empty events
2394             if (cs2.isEmpty())
2395                 return;
2396
2397 //            graph.restart();
2398 //            graph.state.barrier.inc();
2399
2400             try {
2401
2402                 if (!cs2.isEmpty()) {
2403
2404                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2405                     for (ChangeListener l : changeListeners2) {
2406                         try {
2407                             l.graphChanged(e2);
2408 //                            System.out.println("changelistener " + l);
2409                         } catch (Exception ex) {
2410                             ex.printStackTrace();
2411                         }
2412                     }
2413
2414                 }
2415
2416             } finally {
2417
2418 //                graph.state.barrier.dec();
2419 //                graph.waitAsync(null);
2420
2421             }
2422         } catch (Throwable t) {
2423             t.printStackTrace();
2424
2425         }
2426     }
2427     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2428
2429         try {
2430
2431             // Do not fire empty events
2432             if (cs2.isEmpty())
2433                 return;
2434
2435             // Do not fire on virtual requests
2436             if(graph.getProvider() != null)
2437                 return;
2438
2439             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2440
2441             try {
2442
2443                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2444                 for (ChangeListener l : metadataListeners) {
2445                     try {
2446                         l.graphChanged(e2);
2447                     } catch (Throwable ex) {
2448                         ex.printStackTrace();
2449                     }
2450                 }
2451
2452             } finally {
2453
2454             }
2455
2456         } catch (Throwable t) {
2457             t.printStackTrace();
2458         }
2459
2460     }
2461
2462
2463     /*
2464      * (non-Javadoc)
2465      *
2466      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2467      */
2468     @Override
2469     public <T> T getService(Class<T> api) {
2470         T t = peekService(api);
2471         if (t == null) {
2472             if (state.isClosed())
2473                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2474             throw new ServiceNotFoundException(this, api);
2475         }
2476         return t;
2477     }
2478
2479     /**
2480      * @return
2481      */
2482     protected abstract ServerInformation getCachedServerInformation();
2483
2484         private Class<?> serviceKey1 = null;
2485         private Class<?> serviceKey2 = null;
2486         private Object service1 = null;
2487         private Object service2 = null;
2488
2489     /*
2490      * (non-Javadoc)
2491      *
2492      * @see
2493      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2494      */
2495     @SuppressWarnings("unchecked")
2496     @Override
2497     public synchronized <T> T peekService(Class<T> api) {
2498
2499                 if(serviceKey1 == api) {
2500                         return (T)service1;
2501                 } else if (serviceKey2 == api) {
2502                         // Promote this key
2503                         Object result = service2;
2504                         service2 = service1;
2505                         serviceKey2 = serviceKey1;
2506                         service1 = result;
2507                         serviceKey1 = api;
2508                         return (T)result;
2509                 }
2510
2511         if (Layer0.class == api)
2512                 return (T) L0;
2513         if (ServerInformation.class == api)
2514             return (T) getCachedServerInformation();
2515         else if (WriteGraphImpl.class == api)
2516             return (T) writeState.getGraph();
2517         else if (ClusterBuilder.class == api)
2518             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2519         else if (ClusterBuilderFactory.class == api)
2520             return (T)new ClusterBuilderFactoryImpl(this);
2521
2522                 service2 = service1;
2523                 serviceKey2 = serviceKey1;
2524
2525         service1 = serviceLocator.peekService(api);
2526         serviceKey1 = api;
2527
2528         return (T)service1;
2529
2530     }
2531
2532     /*
2533      * (non-Javadoc)
2534      *
2535      * @see
2536      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2537      */
2538     @Override
2539     public boolean hasService(Class<?> api) {
2540         return serviceLocator.hasService(api);
2541     }
2542
2543     /**
2544      * @param api the api that must be implemented by the specified service
2545      * @param service the service implementation
2546      */
2547     @Override
2548     public <T> void registerService(Class<T> api, T service) {
2549         if(Layer0.class == api) {
2550                 L0 = (Layer0)service;
2551                 return;
2552         }
2553         serviceLocator.registerService(api, service);
2554         if (TransactionPolicySupport.class == api) {
2555             transactionPolicy = (TransactionPolicySupport)service;
2556             state.resetTransactionPolicy();
2557         }
2558         if (api == serviceKey1)
2559                 service1 = service;
2560         else if (api == serviceKey2)
2561                 service2 = service;
2562     }
2563
2564     // ----------------
2565     // SessionMonitor
2566     // ----------------
2567
2568     void fireSessionVariableChange(String variable) {
2569         for (MonitorHandler h : monitorHandlers) {
2570             MonitorContext ctx = monitorContexts.getLeft(h);
2571             assert ctx != null;
2572             // SafeRunner functionality repeated here to avoid dependency.
2573             try {
2574                 h.valuesChanged(ctx);
2575             } catch (Exception e) {
2576                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2577             } catch (LinkageError e) {
2578                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2579             }
2580         }
2581     }
2582
2583
2584     class ResourceSerializerImpl implements ResourceSerializer {
2585
2586         public long createRandomAccessId(int id) throws DatabaseException {
2587             if(id < 0)
2588                 return id;
2589             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2590             long cluster = getCluster(id);
2591             if (0 == cluster)
2592                 return 0; // Better to return 0 then invalid id.
2593             long result = ClusterTraitsBase.createResourceId(cluster, index);
2594             return result;
2595         }
2596
2597         @Override
2598         public long getRandomAccessId(Resource resource) throws DatabaseException {
2599
2600             ResourceImpl resourceImpl = (ResourceImpl) resource;
2601             return createRandomAccessId(resourceImpl.id);
2602
2603         }
2604
2605         @Override
2606         public int getTransientId(Resource resource) throws DatabaseException {
2607
2608             ResourceImpl resourceImpl = (ResourceImpl) resource;
2609             return resourceImpl.id;
2610
2611         }
2612
2613         @Override
2614         public String createRandomAccessId(Resource resource)
2615         throws InvalidResourceReferenceException {
2616
2617             if(resource == null) throw new IllegalArgumentException();
2618
2619             ResourceImpl resourceImpl = (ResourceImpl) resource;
2620
2621             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2622
2623             int r;
2624             try {
2625                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2626             } catch (DatabaseException e1) {
2627                 throw new InvalidResourceReferenceException(e1);
2628             }
2629             try {
2630                 // Serialize as '<resource index>_<cluster id>'
2631                 return "" + r + "_" + getCluster(resourceImpl);
2632             } catch (Throwable e) {
2633                 e.printStackTrace();
2634                 throw new InvalidResourceReferenceException(e);
2635             } finally {
2636             }
2637         }
2638
2639         public int getTransientId(long serialized) throws DatabaseException {
2640             if (serialized <= 0)
2641                 return (int)serialized;
2642             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2643             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2644             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2645             if (cluster == null)
2646                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2647             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2648             return key;
2649         }
2650
2651         @Override
2652         public Resource getResource(long randomAccessId) throws DatabaseException {
2653             return getResourceByKey(getTransientId(randomAccessId));
2654         }
2655
2656         @Override
2657         public Resource getResource(int transientId) throws DatabaseException {
2658             return getResourceByKey(transientId);
2659         }
2660
2661         @Override
2662         public Resource getResource(String randomAccessId)
2663         throws InvalidResourceReferenceException {
2664 &nb