]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Fixed all line endings of the repository
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.eclipse.core.runtime.Platform;
32 import org.simantics.databoard.Bindings;
33 import org.simantics.db.AsyncReadGraph;
34 import org.simantics.db.ChangeSet;
35 import org.simantics.db.DevelopmentKeys;
36 import org.simantics.db.ExternalValueSupport;
37 import org.simantics.db.Metadata;
38 import org.simantics.db.MonitorContext;
39 import org.simantics.db.MonitorHandler;
40 import org.simantics.db.Resource;
41 import org.simantics.db.ResourceSerializer;
42 import org.simantics.db.Session;
43 import org.simantics.db.SessionManager;
44 import org.simantics.db.SessionVariables;
45 import org.simantics.db.VirtualGraph;
46 import org.simantics.db.WriteGraph;
47 import org.simantics.db.authentication.UserAuthenticationAgent;
48 import org.simantics.db.authentication.UserAuthenticator;
49 import org.simantics.db.common.Indexing;
50 import org.simantics.db.common.TransactionPolicyRelease;
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
63 import org.simantics.db.common.utils.Logger;
64 import org.simantics.db.event.ChangeEvent;
65 import org.simantics.db.event.ChangeListener;
66 import org.simantics.db.event.SessionEventListener;
67 import org.simantics.db.exception.CancelTransactionException;
68 import org.simantics.db.exception.ClusterSetExistException;
69 import org.simantics.db.exception.DatabaseException;
70 import org.simantics.db.exception.ImmutableException;
71 import org.simantics.db.exception.InvalidResourceReferenceException;
72 import org.simantics.db.exception.ResourceNotFoundException;
73 import org.simantics.db.exception.RuntimeDatabaseException;
74 import org.simantics.db.exception.ServiceException;
75 import org.simantics.db.exception.ServiceNotFoundException;
76 import org.simantics.db.impl.ClusterBase;
77 import org.simantics.db.impl.ClusterI;
78 import org.simantics.db.impl.ClusterTraitsBase;
79 import org.simantics.db.impl.ClusterTranslator;
80 import org.simantics.db.impl.ResourceImpl;
81 import org.simantics.db.impl.TransientGraph;
82 import org.simantics.db.impl.VirtualGraphImpl;
83 import org.simantics.db.impl.graph.DelayedWriteGraph;
84 import org.simantics.db.impl.graph.ReadGraphImpl;
85 import org.simantics.db.impl.graph.WriteGraphImpl;
86 import org.simantics.db.impl.graph.WriteSupport;
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
90 import org.simantics.db.impl.query.QueryProcessor;
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
93 import org.simantics.db.impl.service.QueryDebug;
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
96 import org.simantics.db.procedure.AsyncListener;
97 import org.simantics.db.procedure.AsyncMultiListener;
98 import org.simantics.db.procedure.AsyncMultiProcedure;
99 import org.simantics.db.procedure.AsyncProcedure;
100 import org.simantics.db.procedure.Listener;
101 import org.simantics.db.procedure.ListenerBase;
102 import org.simantics.db.procedure.MultiListener;
103 import org.simantics.db.procedure.MultiProcedure;
104 import org.simantics.db.procedure.Procedure;
105 import org.simantics.db.procedure.SyncListener;
106 import org.simantics.db.procedure.SyncMultiListener;
107 import org.simantics.db.procedure.SyncMultiProcedure;
108 import org.simantics.db.procedure.SyncProcedure;
109 import org.simantics.db.procore.cluster.ClusterImpl;
110 import org.simantics.db.procore.cluster.ClusterTraits;
111 import org.simantics.db.procore.protocol.Constants;
112 import org.simantics.db.procore.protocol.DebugPolicy;
113 import org.simantics.db.request.AsyncMultiRead;
114 import org.simantics.db.request.AsyncRead;
115 import org.simantics.db.request.DelayedWrite;
116 import org.simantics.db.request.DelayedWriteResult;
117 import org.simantics.db.request.ExternalRead;
118 import org.simantics.db.request.MultiRead;
119 import org.simantics.db.request.Read;
120 import org.simantics.db.request.ReadInterface;
121 import org.simantics.db.request.UndoTraits;
122 import org.simantics.db.request.Write;
123 import org.simantics.db.request.WriteInterface;
124 import org.simantics.db.request.WriteOnly;
125 import org.simantics.db.request.WriteOnlyResult;
126 import org.simantics.db.request.WriteResult;
127 import org.simantics.db.request.WriteTraits;
128 import org.simantics.db.service.ByteReader;
129 import org.simantics.db.service.ClusterBuilder;
130 import org.simantics.db.service.ClusterBuilderFactory;
131 import org.simantics.db.service.ClusterControl;
132 import org.simantics.db.service.ClusterSetsSupport;
133 import org.simantics.db.service.ClusterUID;
134 import org.simantics.db.service.ClusteringSupport;
135 import org.simantics.db.service.CollectionSupport;
136 import org.simantics.db.service.DebugSupport;
137 import org.simantics.db.service.DirectQuerySupport;
138 import org.simantics.db.service.GraphChangeListenerSupport;
139 import org.simantics.db.service.InitSupport;
140 import org.simantics.db.service.LifecycleSupport;
141 import org.simantics.db.service.ManagementSupport;
142 import org.simantics.db.service.QueryControl;
143 import org.simantics.db.service.SerialisationSupport;
144 import org.simantics.db.service.ServerInformation;
145 import org.simantics.db.service.ServiceActivityMonitor;
146 import org.simantics.db.service.SessionEventSupport;
147 import org.simantics.db.service.SessionMonitorSupport;
148 import org.simantics.db.service.SessionUserSupport;
149 import org.simantics.db.service.StatementSupport;
150 import org.simantics.db.service.TransactionPolicySupport;
151 import org.simantics.db.service.TransactionSupport;
152 import org.simantics.db.service.TransferableGraphSupport;
153 import org.simantics.db.service.UndoRedoSupport;
154 import org.simantics.db.service.VirtualGraphSupport;
155 import org.simantics.db.service.XSupport;
156 import org.simantics.layer0.Layer0;
157 import org.simantics.utils.DataContainer;
158 import org.simantics.utils.Development;
159 import org.simantics.utils.datastructures.Callback;
160 import org.simantics.utils.threads.logger.ITask;
161 import org.simantics.utils.threads.logger.ThreadLogger;
162
163 import gnu.trove.procedure.TLongProcedure;
164 import gnu.trove.set.hash.TLongHashSet;
165
166
167 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
168
169     protected static final boolean DEBUG        = false;
170
171     private static final boolean DIAGNOSTICS       = false;
172
173     private TransactionPolicySupport transactionPolicy;
174
175     final private ClusterControl clusterControl;
176
177     final protected BuiltinSupportImpl builtinSupport;
178     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
179     final protected ClusterSetsSupport clusterSetsSupport;
180     final protected LifecycleSupportImpl lifecycleSupport;
181
182     protected QuerySupportImpl querySupport;
183     protected ResourceSupportImpl resourceSupport;
184     protected WriteSupport writeSupport;
185     public ClusterTranslator clusterTranslator;
186
187     boolean dirtyPrimitives = false;
188
189     public static final int SERVICE_MODE_CREATE = 2;
190     public static final int SERVICE_MODE_ALLOW = 1;
191     public int serviceMode = 0;
192     public boolean createdImmutableClusters = false;
193     public TLongHashSet createdClusters = new TLongHashSet();
194
195     private Layer0 L0;
196
197     /**
198      * The service locator maintained by the workbench. These services are
199      * initialized during workbench during the <code>init</code> method.
200      */
201     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
202
203     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
204
205     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
206
207     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
208
209     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
210
211     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
212
213     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
214
215     final protected State                                        state              = new State();
216
217     protected GraphSession                                       graphSession       = null;
218
219     protected SessionManager                                     sessionManagerImpl = null;
220
221     protected UserAuthenticationAgent                            authAgent          = null;
222
223     protected UserAuthenticator                                  authenticator      = null;
224
225     protected Resource                                           user               = null;
226
227     protected ClusterStream                                      clusterStream      = null;
228
229     protected SessionRequestManager                         requestManager = null;
230
231     public ClusterTable                                       clusterTable       = null;
232
233     public QueryProcessor                                     queryProvider2 = null;
234
235     ClientChangesImpl                                  clientChanges      = null;
236
237     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
238
239 //    protected long                                               newClusterId       = Constants.NullClusterId;
240
241     protected int                                                flushCounter       = 0;
242
243     protected boolean                                            writeOnly          = false;
244
245     WriteState<?>                                                writeState = null;
246     WriteStateBase<?>                                            delayedWriteState = null;
247     protected Resource defaultClusterSet = null; // If not null then used for newResource().
248
249     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
250
251 //        if (authAgent == null)
252 //            throw new IllegalArgumentException("null authentication agent");
253
254         File t = StaticSessionProperties.virtualGraphStoragePath;
255         if (null == t)
256             t = new File(".");
257         this.clusterTable = new ClusterTable(this, t);
258         this.builtinSupport = new BuiltinSupportImpl(this);
259         this.sessionManagerImpl = sessionManagerImpl;
260         this.user = null;
261 //        this.authAgent = authAgent;
262
263         serviceLocator.registerService(Session.class, this);
264         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
265         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
266         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
267         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
268         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
269         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
270         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
271         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
272         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
273         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
274         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
275         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
276         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
277         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
278         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
279         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
280         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
281         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
282         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
283         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
284         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
285         ServiceActivityUpdaterForWriteTransactions.register(this);
286
287         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
288         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
289         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
290         this.lifecycleSupport = new LifecycleSupportImpl(this);
291         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
292         this.transactionPolicy = new TransactionPolicyRelease();
293         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
294         this.clusterControl = new ClusterControlImpl(this);
295         serviceLocator.registerService(ClusterControl.class, clusterControl);
296         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
297         this.clusterSetsSupport.setReadDirectory(t.toPath());
298         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
299         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
300     }
301
302     /*
303      *
304      * Session interface
305      */
306
307
308     @Override
309     public Resource getRootLibrary() {
310         return queryProvider2.getRootLibraryResource();
311     }
312
313     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
314         if (!graphSession.dbSession.refreshEnabled())
315             return;
316         try {
317             getClusterTable().refresh(csid, this, clusterUID);
318         } catch (Throwable t) {
319             Logger.defaultLogError("Refesh failed.", t);
320         }
321     }
322
323     static final class TaskHelper {
324         private final String name;
325         private Object result;
326         final Semaphore sema = new Semaphore(0);
327         private Throwable throwable = null;
328         final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
329             @Override
330             public void run(DatabaseException e) {
331                 synchronized (TaskHelper.this) {
332                     throwable = e;
333                 }
334             }
335         };
336         final Procedure<Object> proc = new Procedure<Object>() {
337             @Override
338             public void execute(Object result) {
339                 callback.run(null);
340             }
341             @Override
342             public void exception(Throwable t) {
343                 if (t instanceof DatabaseException)
344                     callback.run((DatabaseException)t);
345                 else
346                     callback.run(new DatabaseException("" + name + "operation failed.", t));
347             }
348         };
349         final WriteTraits writeTraits = new WriteTraits() {
350             @Override
351             public UndoTraits getUndoTraits() {
352                 return null;
353             }
354             @Override
355             public VirtualGraph getProvider() {
356                 return null;
357             }
358         };
359         TaskHelper(String name) {
360             this.name = name;
361         }
362         <T> T getResult() {
363             return (T)result;
364         }
365         void setResult(Object result) {
366             this.result = result;
367         }
368 //        Throwable throwableGet() {
369 //            return throwable;
370 //        }
371         synchronized void throwableSet(Throwable t) {
372             throwable = t;
373         }
374 //        void throwableSet(String t) {
375 //            throwable = new InternalException("" + name + " operation failed. " + t);
376 //        }
377         synchronized void throwableCheck()
378         throws DatabaseException {
379             if (null != throwable)
380                 if (throwable instanceof DatabaseException)
381                     throw (DatabaseException)throwable;
382                 else
383                     throw new DatabaseException("Undo operation failed.", throwable);
384         }
385         void throw_(String message)
386         throws DatabaseException {
387             throw new DatabaseException("" + name + " operation failed. " + message);
388         }
389     }
390
391
392
393     private ListenerBase getListenerBase(Object procedure) {
394         if (procedure instanceof ListenerBase)
395             return (ListenerBase) procedure;
396         else
397             return null;
398     }
399
400     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
401         scheduleRequest(request, callback, notify, null);
402     }
403
404     /* (non-Javadoc)
405      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
406      */
407     @Override
408     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
409
410         assert (request != null);
411
412         if(Development.DEVELOPMENT) {
413             try {
414             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
415                 System.err.println("schedule write '" + request + "'");
416             } catch (Throwable t) {
417                 Logger.defaultLogError(t);
418             }
419         }
420
421         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
422
423         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
424
425             @Override
426             public void run(int thread) {
427
428                 if(Development.DEVELOPMENT) {
429                     try {
430                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
431                         System.err.println("perform write '" + request + "'");
432                     } catch (Throwable t) {
433                         Logger.defaultLogError(t);
434                     }
435                 }
436
437                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
438
439                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
440
441                 try {
442
443                     flushCounter = 0;
444                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
445
446                     VirtualGraph vg = getProvider(request.getProvider());
447
448                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
449                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
450
451                         @Override
452                         public void execute(Object result) {
453                             if(callback != null) callback.run(null);
454                         }
455
456                         @Override
457                         public void exception(Throwable t) {
458                             if(callback != null) callback.run((DatabaseException)t);
459                         }
460
461                     });
462
463                     assert (null != writer);
464 //                    writer.state.barrier.inc();
465
466                     try {
467                         request.perform(writer);
468                         assert (null != writer);
469                     } catch (Throwable t) {
470                         if (!(t instanceof CancelTransactionException))
471                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
472                         writeState.except(t);
473                     } finally {
474 //                        writer.state.barrier.dec();
475 //                        writer.waitAsync(request);
476                     }
477
478
479                     assert(!queryProvider2.dirty);
480
481                 } catch (Throwable e) {
482
483                     // Log it first, just to be safe that the error is always logged.
484                     if (!(e instanceof CancelTransactionException))
485                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
486
487 //                    writeState.getGraph().state.barrier.dec();
488 //                    writeState.getGraph().waitAsync(request);
489
490                     writeState.except(e);
491
492 //                    try {
493 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
494 //                        // All we can do here is to log those, can't really pass them anywhere.
495 //                        if (callback != null) {
496 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
497 //                            else callback.run(new DatabaseException(e));
498 //                        }
499 //                    } catch (Throwable e2) {
500 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
501 //                    }
502 //                    boolean empty = clusterStream.reallyFlush();
503 //                    int callerThread = -1;
504 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
505 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
506 //                    try {
507 //                        // Send all local changes to server so it can calculate correct reverse change set.
508 //                        // This will call clusterStream.accept().
509 //                        state.cancelCommit(context, clusterStream);
510 //                        if (!empty) {
511 //                            if (!context.isOk()) // this is a blocking operation
512 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
513 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
514 //                        }
515 //                        state.cancelCommit2(context, clusterStream);
516 //                    } catch (DatabaseException e3) {
517 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
518 //                    } finally {
519 //                        clusterStream.setOff(false);
520 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
521 //                    }
522
523                 } finally {
524
525                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
526
527                 }
528
529                 task.finish();
530
531                 if(Development.DEVELOPMENT) {
532                     try {
533                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
534                         System.err.println("finish write '" + request + "'");
535                     } catch (Throwable t) {
536                         Logger.defaultLogError(t);
537                     }
538                 }
539
540             }
541
542         }, combine);
543
544     }
545
546     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
547         scheduleRequest(request, procedure, notify, null);
548     }
549
550     /* (non-Javadoc)
551      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
552      */
553     @Override
554     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
555
556         assert (request != null);
557
558         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
559
560         requestManager.scheduleWrite(new SessionTask(request, thread) {
561
562             @Override
563             public void run(int thread) {
564
565                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
566
567                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
568
569                 flushCounter = 0;
570                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
571
572                 VirtualGraph vg = getProvider(request.getProvider());
573                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
574
575                 try {
576                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
577                     writeState = writeStateT;
578
579                     assert (null != writer);
580 //                    writer.state.barrier.inc();
581                     writeStateT.setResult(request.perform(writer));
582                     assert (null != writer);
583
584 //                    writer.state.barrier.dec();
585 //                    writer.waitAsync(null);
586
587                 } catch (Throwable e) {
588
589 //                    writer.state.barrier.dec();
590 //                    writer.waitAsync(null);
591
592                     writeState.except(e);
593
594 //                  state.stopWriteTransaction(clusterStream);
595 //
596 //              } catch (Throwable e) {
597 //                  // Log it first, just to be safe that the error is always logged.
598 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
599 //
600 //                  try {
601 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
602 //                      // All we can do here is to log those, can't really pass them anywhere.
603 //                      if (procedure != null) {
604 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
605 //                          else procedure.exception(new DatabaseException(e));
606 //                      }
607 //                  } catch (Throwable e2) {
608 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
609 //                  }
610 //
611 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
612 //
613 //                  state.stopWriteTransaction(clusterStream);
614
615                 } finally {
616                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
617                 }
618
619 //              if(notify != null) notify.release();
620
621                 task.finish();
622
623             }
624
625         }, combine);
626
627     }
628
629     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
630         scheduleRequest(request, callback, notify, null);
631     }
632
633     /* (non-Javadoc)
634      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
635      */
636     @Override
637     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
638
639         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
640
641         assert (request != null);
642
643         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
644
645         requestManager.scheduleWrite(new SessionTask(request, thread) {
646
647             @Override
648             public void run(int thread) {
649                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
650
651                 Procedure<Object> stateProcedure = new Procedure<Object>() {
652                     @Override
653                     public void execute(Object result) {
654                         if (callback != null)
655                             callback.run(null);
656                     }
657                     @Override
658                     public void exception(Throwable t) {
659                         if (callback != null) {
660                             if (t instanceof DatabaseException) callback.run((DatabaseException) t);
661                             else callback.run(new DatabaseException(t));
662                         } else
663                             Logger.defaultLogError("Unhandled exception", t);
664                     }
665                 };
666
667                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
668                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
669                 DelayedWriteGraph dwg = null;
670 //                newGraph.state.barrier.inc();
671
672                 try {
673                     dwg = new DelayedWriteGraph(newGraph);
674                     request.perform(dwg);
675                 } catch (Throwable e) {
676                     delayedWriteState.except(e);
677                     total.finish();
678                     return;
679                 } finally {
680 //                    newGraph.state.barrier.dec();
681 //                    newGraph.waitAsync(request);
682                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
683                 }
684
685                 delayedWriteState = null;
686
687                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
688                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
689
690                 flushCounter = 0;
691                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
692
693                 acquireWriteOnly();
694
695                 VirtualGraph vg = getProvider(request.getProvider());
696                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
697
698                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
699
700                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
701
702                 assert (null != writer);
703 //                writer.state.barrier.inc();
704
705                 try {
706                     // Cannot cancel
707                     dwg.commit(writer, request);
708
709                         if(defaultClusterSet != null) {
710                                 XSupport xs = getService(XSupport.class);
711                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
712                                 ClusteringSupport cs = getService(ClusteringSupport.class);
713                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
714                         }
715
716                     // This makes clusters available from server
717                     clusterStream.reallyFlush();
718
719                     releaseWriteOnly(writer);
720                     handleUpdatesAndMetadata(writer);
721
722                 } catch (ServiceException e) {
723 //                    writer.state.barrier.dec();
724 //                    writer.waitAsync(null);
725
726                     // These shall be requested from server
727                     clusterTable.removeWriteOnlyClusters();
728                     // This makes clusters available from server
729                     clusterStream.reallyFlush();
730
731                     releaseWriteOnly(writer);
732                     writeState.except(e);
733                 } finally {
734                     // Debugging & Profiling
735                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
736                     task2.finish();
737                     total.finish();
738                 }
739             }
740
741         }, combine);
742
743     }
744
745     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
746         scheduleRequest(request, procedure, notify, null);
747     }
748
749     /* (non-Javadoc)
750      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
751      */
752     @Override
753     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
754         throw new Error("Not implemented");
755     }
756
757     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
758         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
759         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
760             createdClusters.add(cluster.clusterId);
761         }
762         return cluster;
763     }
764
765     class WriteOnlySupport implements WriteSupport {
766
767         ClusterStream stream;
768         ClusterImpl currentCluster;
769
770         public WriteOnlySupport() {
771             this.stream = clusterStream;
772         }
773
774         @Override
775         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
776             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
777         }
778
779         @Override
780         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
781
782             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
783
784             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
785                 if(s != queryProvider2.getRootLibrary())
786                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
787
788             try {
789                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
790             } catch (DatabaseException e) {
791                 Logger.defaultLogError(e);
792                 //throw e;
793             }
794
795             clientChanges.invalidate(s);
796
797             if (cluster.isWriteOnly())
798                 return;
799             queryProvider2.updateStatements(s, p);
800
801         }
802
803         @Override
804         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
805             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
806         }
807
808         @Override
809         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
810
811             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
812             try {
813                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
814             } catch (DatabaseException e) {
815                 Logger.defaultLogError(e);
816             }
817
818             clientChanges.invalidate(rid);
819
820             if (cluster.isWriteOnly())
821                 return;
822             queryProvider2.updateValue(rid);
823
824         }
825
826         @Override
827         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
828
829             if(amount < 65536) {
830                 claimValue(provider,resource, reader.readBytes(null, amount));
831                 return;
832             }
833
834             byte[] bytes = new byte[65536];
835
836             int rid = ((ResourceImpl)resource).id;
837             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
838             try {
839                 int left = amount;
840                 while(left > 0) {
841                     int block = Math.min(left, 65536);
842                     reader.readBytes(bytes, block);
843                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
844                     left -= block;
845                 }
846             } catch (DatabaseException e) {
847                 Logger.defaultLogError(e);
848             }
849
850             clientChanges.invalidate(rid);
851
852             if (cluster.isWriteOnly())
853                 return;
854             queryProvider2.updateValue(rid);
855
856         }
857
858         private void maintainCluster(ClusterImpl before, ClusterI after_) {
859             if(after_ != null && after_ != before) {
860                 ClusterImpl after = (ClusterImpl)after_;
861                 if(currentCluster == before) currentCluster = after;
862                 clusterTable.replaceCluster(after);
863             }
864         }
865
866         public int createResourceKey(int foreignCounter) throws DatabaseException {
867             if(currentCluster == null)
868                 currentCluster = getNewResourceCluster();
869             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
870                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
871                 newCluster.foreignLookup = new byte[foreignCounter];
872                 currentCluster = newCluster;
873                 if (DEBUG)
874                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
875             }
876             return currentCluster.createResource(clusterTranslator);
877         }
878
879         @Override
880         public Resource createResource(VirtualGraph provider) throws DatabaseException {
881             if(currentCluster == null) {
882                 if (null != defaultClusterSet) {
883                         ResourceImpl result = getNewResource(defaultClusterSet);
884                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
885                     return result;
886                 } else {
887                     currentCluster = getNewResourceCluster();
888                 }
889             }
890             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
891                 if (null != defaultClusterSet) {
892                         ResourceImpl result = getNewResource(defaultClusterSet);
893                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
894                     return result;
895                 } else {
896                     currentCluster = getNewResourceCluster();
897                 }
898             }
899             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
900         }
901
902         @Override
903         public Resource createResource(VirtualGraph provider, long clusterId)
904         throws DatabaseException {
905             return getNewResource(clusterId);
906         }
907
908         @Override
909         public Resource createResource(VirtualGraph provider, Resource clusterSet)
910         throws DatabaseException {
911             return getNewResource(clusterSet);
912         }
913
914         @Override
915         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
916                 throws DatabaseException {
917             getNewClusterSet(clusterSet);
918         }
919
920         @Override
921         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
922         throws ServiceException {
923             return containsClusterSet(clusterSet);
924         }
925
926         public void selectCluster(long cluster) {
927                 currentCluster = clusterTable.getClusterByClusterId(cluster);
928                 long setResourceId = clusterSetsSupport.getSet(cluster);
929                 clusterSetsSupport.put(setResourceId, cluster);
930         }
931
932         @Override
933         public Resource setDefaultClusterSet(Resource clusterSet)
934         throws ServiceException {
935                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
936                 if(clusterSet != null) {
937                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
938                         currentCluster = clusterTable.getClusterByClusterId(id);
939                         return result;
940                 } else {
941                         currentCluster = null;
942                         return null;
943                 }
944         }
945
946         @Override
947         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
948
949                  provider = getProvider(provider);
950              if (null == provider) {
951                  int key = ((ResourceImpl)resource).id;
952
953
954
955                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
956 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
957 //                      if(key != queryProvider2.getRootLibrary())
958 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
959
960 //                 try {
961 //                     cluster.removeValue(key, clusterTranslator);
962 //                 } catch (DatabaseException e) {
963 //                     Logger.defaultLogError(e);
964 //                     return;
965 //                 }
966
967                  clusterTable.writeOnlyInvalidate(cluster);
968
969                  try {
970                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
971                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
972                          clusterTranslator.removeValue(cluster);
973                  } catch (DatabaseException e) {
974                          Logger.defaultLogError(e);
975                  }
976
977                  queryProvider2.invalidateResource(key);
978                  clientChanges.invalidate(key);
979
980              } else {
981                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
982                  queryProvider2.updateValue(querySupport.getId(resource));
983                  clientChanges.claimValue(resource);
984              }
985
986
987         }
988
989         @Override
990         public void flush(boolean intermediate) {
991             throw new UnsupportedOperationException();
992         }
993
994         @Override
995         public void flushCluster() {
996             clusterTable.flushCluster(graphSession);
997             if(defaultClusterSet != null) {
998                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
999             }
1000             currentCluster = null;
1001         }
1002
1003         @Override
1004         public void flushCluster(Resource r) {
1005             throw new UnsupportedOperationException("flushCluster resource " + r);
1006         }
1007
1008         @Override
1009         public void gc() {
1010         }
1011
1012         @Override
1013         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1014                 Resource object) {
1015
1016                 int s = ((ResourceImpl)subject).id;
1017                 int p = ((ResourceImpl)predicate).id;
1018                 int o = ((ResourceImpl)object).id;
1019
1020                 provider = getProvider(provider);
1021                 if (null == provider) {
1022
1023                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1024                         clusterTable.writeOnlyInvalidate(cluster);
1025
1026                         try {
1027
1028                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1029                                 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1030                                 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1031
1032                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1033                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1034
1035                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1036                                 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1037                                 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1038                                 clusterTranslator.removeStatement(cluster);
1039
1040                                 queryProvider2.invalidateResource(s);
1041                                 clientChanges.invalidate(s);
1042
1043                         } catch (DatabaseException e) {
1044
1045                                 Logger.defaultLogError(e);
1046
1047                         }
1048
1049                         return true;
1050
1051                 } else {
1052                         
1053                         ((VirtualGraphImpl)provider).deny(s, p, o);
1054                         queryProvider2.invalidateResource(s);
1055                         clientChanges.invalidate(s);
1056
1057                         return true;
1058
1059                 }
1060
1061         }
1062
1063         @Override
1064         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1065             throw new UnsupportedOperationException();
1066         }
1067
1068         @Override
1069         public boolean writeOnly() {
1070             return true;
1071         }
1072
1073         @Override
1074         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1075             writeSupport.performWriteRequest(graph, request);
1076         }
1077
1078         @Override
1079         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1080             throw new UnsupportedOperationException();
1081         }
1082
1083         @Override
1084         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1085             throw new UnsupportedOperationException();
1086         }
1087
1088         @Override
1089         public <T> void addMetadata(Metadata data) throws ServiceException {
1090             writeSupport.addMetadata(data);
1091         }
1092
1093         @Override
1094         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1095             return writeSupport.getMetadata(clazz);
1096         }
1097
1098         @Override
1099         public TreeMap<String, byte[]> getMetadata() {
1100             return writeSupport.getMetadata();
1101         }
1102
1103         @Override
1104         public void commitDone(WriteTraits writeTraits, long csid) {
1105             writeSupport.commitDone(writeTraits, csid);
1106         }
1107
1108         @Override
1109         public void clearUndoList(WriteTraits writeTraits) {
1110             writeSupport.clearUndoList(writeTraits);
1111         }
1112         @Override
1113         public int clearMetadata() {
1114             return writeSupport.clearMetadata();
1115         }
1116
1117                 @Override
1118                 public void startUndo() {
1119                         writeSupport.startUndo();
1120                 }
1121     }
1122
1123     class VirtualWriteOnlySupport implements WriteSupport {
1124
1125 //        @Override
1126 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1127 //                Resource object) {
1128 //            throw new UnsupportedOperationException();
1129 //        }
1130
1131         @Override
1132         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1133
1134             TransientGraph impl = (TransientGraph)provider;
1135             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1136             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1137             clientChanges.claim(subject, predicate, object);
1138
1139         }
1140
1141         @Override
1142         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1143
1144             TransientGraph impl = (TransientGraph)provider;
1145             impl.claim(subject, predicate, object);
1146             getQueryProvider2().updateStatements(subject, predicate);
1147             clientChanges.claim(subject, predicate, object);
1148
1149         }
1150
1151         @Override
1152         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1153             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1154         }
1155
1156         @Override
1157         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1158             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1159             getQueryProvider2().updateValue(resource);
1160             clientChanges.claimValue(resource);
1161         }
1162
1163         @Override
1164         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1165             byte[] value = reader.readBytes(null, amount);
1166             claimValue(provider, resource, value);
1167         }
1168
1169         @Override
1170         public Resource createResource(VirtualGraph provider) {
1171             TransientGraph impl = (TransientGraph)provider;
1172             return impl.getResource(impl.newResource(false));
1173         }
1174
1175         @Override
1176         public Resource createResource(VirtualGraph provider, long clusterId) {
1177             throw new UnsupportedOperationException();
1178         }
1179
1180         @Override
1181         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1182         throws DatabaseException {
1183             throw new UnsupportedOperationException();
1184         }
1185
1186         @Override
1187         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1188         throws DatabaseException {
1189             throw new UnsupportedOperationException();
1190         }
1191
1192         @Override
1193         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1194         throws ServiceException {
1195             throw new UnsupportedOperationException();
1196         }
1197
1198         @Override
1199         public Resource setDefaultClusterSet(Resource clusterSet)
1200         throws ServiceException {
1201                 return null;
1202         }
1203
1204         @Override
1205         public void denyValue(VirtualGraph provider, Resource resource) {
1206             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1207             getQueryProvider2().updateValue(querySupport.getId(resource));
1208             // NOTE: this only keeps track of value changes by-resource.
1209             clientChanges.claimValue(resource);
1210         }
1211
1212         @Override
1213         public void flush(boolean intermediate) {
1214             throw new UnsupportedOperationException();
1215         }
1216
1217         @Override
1218         public void flushCluster() {
1219             throw new UnsupportedOperationException();
1220         }
1221
1222         @Override
1223         public void flushCluster(Resource r) {
1224             throw new UnsupportedOperationException("Resource " + r);
1225         }
1226
1227         @Override
1228         public void gc() {
1229         }
1230
1231         @Override
1232         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1233             TransientGraph impl = (TransientGraph) provider;
1234             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1235             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1236             clientChanges.deny(subject, predicate, object);
1237             return true;
1238         }
1239
1240         @Override
1241         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1242             throw new UnsupportedOperationException();
1243         }
1244
1245         @Override
1246         public boolean writeOnly() {
1247             return true;
1248         }
1249
1250         @Override
1251         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1252             throw new UnsupportedOperationException();
1253         }
1254
1255         @Override
1256         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1257             throw new UnsupportedOperationException();
1258         }
1259
1260         @Override
1261         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1262             throw new UnsupportedOperationException();
1263         }
1264
1265         @Override
1266         public <T> void addMetadata(Metadata data) throws ServiceException {
1267             throw new UnsupportedOperationException();
1268         }
1269
1270         @Override
1271         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1272             throw new UnsupportedOperationException();
1273         }
1274
1275         @Override
1276         public TreeMap<String, byte[]> getMetadata() {
1277             throw new UnsupportedOperationException();
1278         }
1279
1280         @Override
1281         public void commitDone(WriteTraits writeTraits, long csid) {
1282         }
1283
1284         @Override
1285         public void clearUndoList(WriteTraits writeTraits) {
1286         }
1287
1288         @Override
1289         public int clearMetadata() {
1290             return 0;
1291         }
1292
1293                 @Override
1294                 public void startUndo() {
1295                 }
1296     }
1297
1298     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1299
1300         try {
1301
1302             int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1303
1304             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1305
1306             flushCounter = 0;
1307             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1308
1309             acquireWriteOnly();
1310
1311             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1312
1313             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1314
1315             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1316             writeState = writeStateT;
1317
1318             assert (null != writer);
1319 //            writer.state.barrier.inc();
1320             long start = System.nanoTime();
1321             T result = request.perform(writer);
1322             long duration = System.nanoTime() - start;
1323             if (DEBUG) {
1324                 System.err.println("################");
1325                 System.err.println("WriteOnly duration " + 1e-9*duration);
1326             }
1327             writeStateT.setResult(result);
1328
1329             // This makes clusters available from server
1330             clusterStream.reallyFlush();
1331
1332             // This will trigger query updates
1333             releaseWriteOnly(writer);
1334             assert (null != writer);
1335
1336             handleUpdatesAndMetadata(writer);
1337
1338         } catch (CancelTransactionException e) {
1339
1340             releaseWriteOnly(writeState.getGraph());
1341
1342             clusterTable.removeWriteOnlyClusters();
1343             state.stopWriteTransaction(clusterStream);
1344
1345         } catch (Throwable e) {
1346
1347             e.printStackTrace();
1348
1349             releaseWriteOnly(writeState.getGraph());
1350
1351             clusterTable.removeWriteOnlyClusters();
1352
1353             if (callback != null)
1354                 callback.exception(new DatabaseException(e));
1355
1356             state.stopWriteTransaction(clusterStream);
1357             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1358
1359         } finally  {
1360
1361             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1362
1363         }
1364
1365     }
1366
1367     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
1368         scheduleRequest(request, callback, notify, null);
1369     }
1370
1371     @Override
1372     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1373
1374         assertAlive();
1375
1376         assert (request != null);
1377
1378         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1379
1380         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1381
1382             @Override
1383             public void run(int thread) {
1384
1385                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1386
1387                     try {
1388
1389                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1390
1391                         flushCounter = 0;
1392                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1393
1394                         acquireWriteOnly();
1395
1396                         VirtualGraph vg = getProvider(request.getProvider());
1397                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1398
1399                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1400
1401                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1402
1403                             @Override
1404                             public void execute(Object result) {
1405                                 if(callback != null) callback.run(null);
1406                             }
1407
1408                             @Override
1409                             public void exception(Throwable t) {
1410                                 if(callback != null) callback.run((DatabaseException)t);
1411                             }
1412
1413                         });
1414
1415                         assert (null != writer);
1416 //                        writer.state.barrier.inc();
1417
1418                         try {
1419
1420                             request.perform(writer);
1421
1422                         } catch (Throwable e) {
1423
1424 //                            writer.state.barrier.dec();
1425 //                            writer.waitAsync(null);
1426
1427                             releaseWriteOnly(writer);
1428
1429                             clusterTable.removeWriteOnlyClusters();
1430
1431                             if(!(e instanceof CancelTransactionException)) {
1432                                 if (callback != null)
1433                                     callback.run(new DatabaseException(e));
1434                             }
1435
1436                             writeState.except(e);
1437
1438                             return;
1439
1440                         }
1441
1442                         // This makes clusters available from server
1443                         boolean empty = clusterStream.reallyFlush();
1444                         // This was needed to make WO requests call metadata listeners.
1445                         // NOTE: the calling event does not contain clientChanges information.
1446                         if (!empty && clientChanges.isEmpty())
1447                             clientChanges.setNotEmpty(true);
1448                         releaseWriteOnly(writer);
1449                         assert (null != writer);
1450                     } finally  {
1451
1452                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1453
1454                     }
1455
1456
1457                 task.finish();
1458
1459             }
1460
1461         }, combine);
1462
1463     }
1464
1465     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1466         scheduleRequest(request, callback, notify, null);
1467     }
1468
1469     /* (non-Javadoc)
1470      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1471      */
1472     @Override
1473     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1474
1475         assert (request != null);
1476
1477         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1478
1479         requestManager.scheduleWrite(new SessionTask(request, thread) {
1480
1481             @Override
1482             public void run(int thread) {
1483
1484                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1485
1486                 performWriteOnly(request, notify, callback);
1487
1488                 task.finish();
1489
1490             }
1491
1492         }, combine);
1493
1494     }
1495
1496     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1497
1498         assert (request != null);
1499         assert (procedure != null);
1500
1501         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1502
1503         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1504
1505             @Override
1506             public void run(int thread) {
1507
1508                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1509
1510                 ListenerBase listener = getListenerBase(procedure);
1511
1512                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1513
1514                 try {
1515
1516                     if (listener != null) {
1517
1518                         try {
1519                                 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1520
1521                                         @Override
1522                                         public void exception(AsyncReadGraph graph, Throwable t) {
1523                                                 procedure.exception(graph, t);
1524                                                 if(throwable != null) {
1525                                                         throwable.set(t);
1526                                                 } else {
1527                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1528                                                 }
1529                                         }
1530
1531                                         @Override
1532                                         public void execute(AsyncReadGraph graph, T t) {
1533                                                 if(result != null) result.set(t);
1534                                                 procedure.execute(graph, t);
1535                                         }
1536
1537                                 }, listener);
1538                         } catch (Throwable t) {
1539                             // This is handled by the AsyncProcedure
1540                                 //Logger.defaultLogError("Internal error", t);
1541                         }
1542
1543                     } else {
1544
1545                         try {
1546
1547 //                            newGraph.state.barrier.inc();
1548
1549                             T t = request.perform(newGraph);
1550
1551                             try {
1552
1553                                 if(result != null) result.set(t);
1554                                 procedure.execute(newGraph, t);
1555
1556                             } catch (Throwable th) {
1557
1558                                 if(throwable != null) {
1559                                     throwable.set(th);
1560                                 } else {
1561                                     Logger.defaultLogError("Unhandled exception", th);
1562                                 }
1563
1564                             }
1565
1566                         } catch (Throwable t) {
1567
1568                             if (DEBUG)
1569                                 t.printStackTrace();
1570
1571                             if(throwable != null) {
1572                                 throwable.set(t);
1573                             } else {
1574                                 Logger.defaultLogError("Unhandled exception", t);
1575                             }
1576
1577                             try {
1578
1579                                 procedure.exception(newGraph, t);
1580
1581                             } catch (Throwable t2) {
1582
1583                                 if(throwable != null) {
1584                                     throwable.set(t2);
1585                                 } else {
1586                                     Logger.defaultLogError("Unhandled exception", t2);
1587                                 }
1588
1589                             }
1590
1591                         }
1592
1593 //                        newGraph.state.barrier.dec();
1594 //                        newGraph.waitAsync(request);
1595
1596                     }
1597
1598                 } finally {
1599
1600                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1601
1602                 }
1603
1604             }
1605
1606         });
1607
1608     }
1609
1610     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1611
1612         assert (request != null);
1613         assert (procedure != null);
1614
1615         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1616
1617         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1618
1619             @Override
1620             public void run(int thread) {
1621
1622                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1623
1624                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1625
1626                 try {
1627
1628                     if (listener != null) {
1629
1630                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1631
1632 //                        newGraph.waitAsync(request);
1633
1634                     } else {
1635
1636                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1637                                 procedure, "request");
1638
1639                         try {
1640
1641 //                            newGraph.state.barrier.inc();
1642
1643                             request.perform(newGraph, wrapper);
1644
1645 //                            newGraph.waitAsync(request);
1646
1647                         } catch (Throwable t) {
1648
1649                             wrapper.exception(newGraph, t);
1650 //                            newGraph.waitAsync(request);
1651
1652
1653                         }
1654
1655                     }
1656
1657                 } finally {
1658
1659                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1660
1661                 }
1662
1663             }
1664
1665         });
1666
1667     }
1668
1669     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1670
1671         assert (request != null);
1672         assert (procedure != null);
1673
1674         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1675
1676         int sync = notify != null ? thread : -1;
1677
1678         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1679
1680             @Override
1681             public void run(int thread) {
1682
1683                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1684
1685                 ListenerBase listener = getListenerBase(procedure);
1686
1687                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1688
1689                 try {
1690
1691                     if (listener != null) {
1692
1693                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1694
1695 //                        newGraph.waitAsync(request);
1696
1697                     } else {
1698
1699                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1700
1701                         try {
1702
1703                             request.perform(newGraph, wrapper);
1704
1705                         } catch (Throwable t) {
1706
1707                             t.printStackTrace();
1708
1709                         }
1710
1711                     }
1712
1713                 } finally {
1714
1715                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1716
1717                 }
1718
1719             }
1720
1721         });
1722
1723     }
1724
1725     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1726
1727         assert (request != null);
1728         assert (procedure != null);
1729
1730         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1731
1732         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1733
1734             @Override
1735             public void run(int thread) {
1736
1737                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1738
1739                 ListenerBase listener = getListenerBase(procedure);
1740
1741                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1742
1743                 try {
1744
1745                     if (listener != null) {
1746
1747                         newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1748
1749                             @Override
1750                             public void exception(Throwable t) {
1751                                 procedure.exception(t);
1752                                 if(throwable != null) {
1753                                     throwable.set(t);
1754                                 }
1755                             }
1756
1757                             @Override
1758                             public void execute(T t) {
1759                                 if(result != null) result.set(t);
1760                                 procedure.execute(t);
1761                             }
1762
1763                         }, listener);
1764
1765 //                        newGraph.waitAsync(request);
1766
1767                     } else {
1768
1769 //                        newGraph.state.barrier.inc();
1770
1771                         request.register(newGraph, new Listener<T>() {
1772
1773                             @Override
1774                             public void exception(Throwable t) {
1775                                 if(throwable != null) throwable.set(t);
1776                                 procedure.exception(t);
1777 //                                newGraph.state.barrier.dec();
1778                             }
1779
1780                             @Override
1781                             public void execute(T t) {
1782                                 if(result != null) result.set(t);
1783                                 procedure.execute(t);
1784 //                                newGraph.state.barrier.dec();
1785                             }
1786
1787                             @Override
1788                             public boolean isDisposed() {
1789                                 return true;
1790                             }
1791
1792                         });
1793
1794 //                        newGraph.waitAsync(request);
1795
1796                     }
1797
1798                 } finally {
1799
1800                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1801
1802                 }
1803
1804             }
1805
1806         });
1807
1808     }
1809
1810
1811     @Override
1812     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1813
1814         scheduleRequest(request, procedure, null, null, null);
1815
1816     }
1817
1818     @Override
1819     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1820         assertNotSession();
1821         Semaphore notify = new Semaphore(0);
1822         DataContainer<Throwable> container = new DataContainer<Throwable>();
1823         DataContainer<T> result = new DataContainer<T>();
1824         scheduleRequest(request, procedure, notify, container, result);
1825         acquire(notify, request);
1826         Throwable throwable = container.get();
1827         if(throwable != null) {
1828             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1829             else throw new DatabaseException("Unexpected exception", throwable);
1830         }
1831         return result.get();
1832     }
1833
1834     @Override
1835     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1836         assertNotSession();
1837         Semaphore notify = new Semaphore(0);
1838         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1839         final DataContainer<T> resultContainer = new DataContainer<T>();
1840         scheduleRequest(request, new AsyncProcedure<T>() {
1841             @Override
1842             public void exception(AsyncReadGraph graph, Throwable throwable) {
1843                 exceptionContainer.set(throwable);
1844                 procedure.exception(graph, throwable);
1845             }
1846             @Override
1847             public void execute(AsyncReadGraph graph, T result) {
1848                 resultContainer.set(result);
1849                 procedure.execute(graph, result);
1850             }
1851         }, getListenerBase(procedure), notify);
1852         acquire(notify, request, procedure);
1853         Throwable throwable = exceptionContainer.get();
1854         if (throwable != null) {
1855             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1856             else throw new DatabaseException("Unexpected exception", throwable);
1857         }
1858         return resultContainer.get();
1859     }
1860
1861
1862     @Override
1863     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1864         assertNotSession();
1865         Semaphore notify = new Semaphore(0);
1866         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1867         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1868             @Override
1869             public void exception(AsyncReadGraph graph, Throwable throwable) {
1870                 exceptionContainer.set(throwable);
1871                 procedure.exception(graph, throwable);
1872             }
1873             @Override
1874             public void execute(AsyncReadGraph graph, T result) {
1875                 procedure.execute(graph, result);
1876             }
1877             @Override
1878             public void finished(AsyncReadGraph graph) {
1879                 procedure.finished(graph);
1880             }
1881         }, notify);
1882         acquire(notify, request, procedure);
1883         Throwable throwable = exceptionContainer.get();
1884         if (throwable != null) {
1885             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1886             else throw new DatabaseException("Unexpected exception", throwable);
1887         }
1888         // TODO: implement return value
1889         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1890         return null;
1891     }
1892
1893     @Override
1894     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1895         return syncRequest(request, new ProcedureAdapter<T>());
1896
1897
1898 //        assert(request != null);
1899 //
1900 //        final DataContainer<T> result = new DataContainer<T>();
1901 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1902 //
1903 //        syncRequest(request, new Procedure<T>() {
1904 //
1905 //            @Override
1906 //            public void execute(T t) {
1907 //                result.set(t);
1908 //            }
1909 //
1910 //            @Override
1911 //            public void exception(Throwable t) {
1912 //                exception.set(t);
1913 //            }
1914 //
1915 //        });
1916 //
1917 //        Throwable t = exception.get();
1918 //        if(t != null) {
1919 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1920 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1921 //        }
1922 //
1923 //        return result.get();
1924
1925     }
1926
1927     @Override
1928     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1929         return syncRequest(request, (Procedure<T>)procedure);
1930     }
1931
1932
1933 //    @Override
1934 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1935 //        assertNotSession();
1936 //        Semaphore notify = new Semaphore(0);
1937 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1938 //        DataContainer<T> result = new DataContainer<T>();
1939 //        scheduleRequest(request, procedure, notify, container, result);
1940 //        acquire(notify, request);
1941 //        Throwable throwable = container.get();
1942 //        if(throwable != null) {
1943 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1944 //            else throw new DatabaseException("Unexpected exception", throwable);
1945 //        }
1946 //        return result.get();
1947 //    }
1948
1949
1950     @Override
1951     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1952         assertNotSession();
1953         Semaphore notify = new Semaphore(0);
1954         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1955         final DataContainer<T> result = new DataContainer<T>();
1956         scheduleRequest(request, procedure, notify, container, result);
1957         acquire(notify, request);
1958         Throwable throwable = container.get();
1959         if (throwable != null) {
1960             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1961             else throw new DatabaseException("Unexpected exception", throwable);
1962         }
1963         return result.get();
1964     }
1965
1966     @Override
1967     public void syncRequest(Write request) throws DatabaseException  {
1968         assertNotSession();
1969         assertAlive();
1970         Semaphore notify = new Semaphore(0);
1971         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1972         scheduleRequest(request, new Callback<DatabaseException>() {
1973
1974             @Override
1975             public void run(DatabaseException e) {
1976               exception.set(e);
1977             }
1978
1979         }, notify);
1980         acquire(notify, request);
1981         if(exception.get() != null) throw exception.get();
1982     }
1983
1984     @Override
1985     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1986         assertNotSession();
1987         Semaphore notify = new Semaphore(0);
1988         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1989         final DataContainer<T> result = new DataContainer<T>();
1990         scheduleRequest(request, new Procedure<T>() {
1991
1992             @Override
1993             public void exception(Throwable t) {
1994               exception.set(t);
1995             }
1996
1997             @Override
1998             public void execute(T t) {
1999               result.set(t);
2000             }
2001
2002         }, notify);
2003         acquire(notify, request);
2004         if(exception.get() != null) {
2005             Throwable t = exception.get();
2006             if(t instanceof DatabaseException) throw (DatabaseException)t;
2007             else throw new DatabaseException(t);
2008         }
2009         return result.get();
2010     }
2011
2012     @Override
2013     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2014         assertNotSession();
2015         Semaphore notify = new Semaphore(0);
2016         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2017         final DataContainer<T> result = new DataContainer<T>();
2018         scheduleRequest(request, new Procedure<T>() {
2019
2020             @Override
2021             public void exception(Throwable t) {
2022               exception.set(t);
2023             }
2024
2025             @Override
2026             public void execute(T t) {
2027               result.set(t);
2028             }
2029
2030         }, notify);
2031         acquire(notify, request);
2032         if(exception.get() != null) {
2033             Throwable t = exception.get();
2034             if(t instanceof DatabaseException) throw (DatabaseException)t;
2035             else throw new DatabaseException(t);
2036         }
2037         return result.get();
2038     }
2039
2040     @Override
2041     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2042         assertNotSession();
2043         Semaphore notify = new Semaphore(0);
2044         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2045         final DataContainer<T> result = new DataContainer<T>();
2046         scheduleRequest(request, new Procedure<T>() {
2047
2048             @Override
2049             public void exception(Throwable t) {
2050               exception.set(t);
2051             }
2052
2053             @Override
2054             public void execute(T t) {
2055               result.set(t);
2056             }
2057
2058         }, notify);
2059         acquire(notify, request);
2060         if(exception.get() != null) {
2061             Throwable t = exception.get();
2062             if(t instanceof DatabaseException) throw (DatabaseException)t;
2063             else throw new DatabaseException(t);
2064         }
2065         return result.get();
2066     }
2067
2068     @Override
2069     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2070         assertNotSession();
2071         Semaphore notify = new Semaphore(0);
2072         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2073         scheduleRequest(request, new Callback<DatabaseException>() {
2074             @Override
2075             public void run(DatabaseException e) {
2076                 exception.set(e);
2077             }
2078         }, notify);
2079         acquire(notify, request);
2080         if(exception.get() != null) throw exception.get();
2081     }
2082
2083     @Override
2084     public void syncRequest(WriteOnly request) throws DatabaseException  {
2085         assertNotSession();
2086         assertAlive();
2087         Semaphore notify = new Semaphore(0);
2088         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2089         scheduleRequest(request, new Callback<DatabaseException>() {
2090             @Override
2091             public void run(DatabaseException e) {
2092                 exception.set(e);
2093             }
2094         }, notify);
2095         acquire(notify, request);
2096         if(exception.get() != null) throw exception.get();
2097     }
2098
2099     /*
2100      *
2101      * GraphRequestProcessor interface
2102      */
2103
2104     @Override
2105     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2106
2107         scheduleRequest(request, procedure, null, null);
2108
2109     }
2110
2111     @Override
2112     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2113
2114         scheduleRequest(request, procedure, null);
2115
2116     }
2117
2118     @Override
2119     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2120
2121         scheduleRequest(request, procedure, null, null, null);
2122
2123     }
2124
2125     @Override
2126     public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
2127
2128         scheduleRequest(request, callback, null);
2129
2130     }
2131
2132     @Override
2133     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2134
2135         scheduleRequest(request, procedure, null);
2136
2137     }
2138
2139     @Override
2140     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2141
2142         scheduleRequest(request, procedure, null);
2143
2144     }
2145
2146     @Override
2147     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2148
2149         scheduleRequest(request, procedure, null);
2150
2151     }
2152
2153     @Override
2154     public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
2155
2156         scheduleRequest(request, callback, null);
2157
2158     }
2159
2160     @Override
2161     public void asyncRequest(final Write r) {
2162         asyncRequest(r, null);
2163     }
2164
2165     @Override
2166     public void asyncRequest(final DelayedWrite r) {
2167         asyncRequest(r, null);
2168     }
2169
2170     @Override
2171     public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
2172
2173         scheduleRequest(request, callback, null);
2174
2175     }
2176
2177     @Override
2178     public void asyncRequest(final WriteOnly request) {
2179
2180         asyncRequest(request, null);
2181
2182     }
2183
2184     @Override
2185     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2186         r.request(this, procedure);
2187     }
2188
2189     @Override
2190     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2191         r.request(this, procedure);
2192     }
2193
2194     @Override
2195     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2196         r.request(this, procedure);
2197     }
2198
2199     @Override
2200     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2201         r.request(this, procedure);
2202     }
2203
2204     @Override
2205     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2206         r.request(this, procedure);
2207     }
2208
2209     @Override
2210     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2211         r.request(this, procedure);
2212     }
2213
2214     @Override
2215     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2216         return r.request(this);
2217     }
2218
2219     @Override
2220     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2221         return r.request(this);
2222     }
2223
2224     @Override
2225     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2226         r.request(this, procedure);
2227     }
2228
2229     @Override
2230     public <T> void async(WriteInterface<T> r) {
2231         r.request(this, new ProcedureAdapter<T>());
2232     }
2233
2234     //@Override
2235     public void incAsync() {
2236         state.incAsync();
2237     }
2238
2239     //@Override
2240     public void decAsync() {
2241         state.decAsync();
2242     }
2243
2244     public long getCluster(ResourceImpl resource) {
2245         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2246         return cluster.getClusterId();
2247     }
2248
2249     public long getCluster(int id) {
2250         if (clusterTable == null)
2251             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2252         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2253     }
2254
2255     public ResourceImpl getResource(int id) {
2256         return new ResourceImpl(resourceSupport, id);
2257     }
2258
2259     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2260         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2261         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2262         int key = proxy.getClusterKey();
2263         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2264         return new ResourceImpl(resourceSupport, resourceKey);
2265     }
2266
2267     public ResourceImpl getResource2(int id) {
2268         assert (id != 0);
2269         return new ResourceImpl(resourceSupport, id);
2270     }
2271
2272     final public int getId(ResourceImpl impl) {
2273         return impl.id;
2274     }
2275
2276     public static final Charset UTF8 = Charset.forName("utf-8");
2277
2278
2279
2280
2281     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2282         for(TransientGraph g : support.providers) {
2283             if(g.isPending(subject)) return false;
2284         }
2285         return true;
2286     }
2287
2288     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2289         for(TransientGraph g : support.providers) {
2290             if(g.isPending(subject, predicate)) return false;
2291         }
2292         return true;
2293     }
2294
2295     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
2296
2297         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2298
2299             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2300
2301             @Override
2302             public void run(ReadGraphImpl graph) {
2303                 if(ready.decrementAndGet() == 0) {
2304                     runnable.run(graph);
2305                 }
2306             }
2307
2308         };
2309
2310         for(TransientGraph g : support.providers) {
2311             if(g.isPending(subject)) {
2312                 try {
2313                     g.load(graph, subject, composite);
2314                 } catch (DatabaseException e) {
2315                     e.printStackTrace();
2316                 }
2317             } else {
2318                 composite.run(graph);
2319             }
2320         }
2321
2322         composite.run(graph);
2323
2324     }
2325
2326     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
2327
2328         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2329
2330             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2331
2332             @Override
2333             public void run(ReadGraphImpl graph) {
2334                 if(ready.decrementAndGet() == 0) {
2335                     runnable.run(graph);
2336                 }
2337             }
2338
2339         };
2340
2341         for(TransientGraph g : support.providers) {
2342             if(g.isPending(subject, predicate)) {
2343                 try {
2344                     g.load(graph, subject, predicate, composite);
2345                 } catch (DatabaseException e) {
2346                     e.printStackTrace();
2347                 }
2348             } else {
2349                 composite.run(graph);
2350             }
2351         }
2352
2353         composite.run(graph);
2354
2355     }
2356
2357 //    void dumpHeap() {
2358 //
2359 //        try {
2360 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2361 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2362 //                    "d:/heap" + flushCounter + ".txt", true);
2363 //        } catch (IOException e) {
2364 //            e.printStackTrace();
2365 //        }
2366 //
2367 //    }
2368
2369     void fireReactionsToSynchronize(ChangeSet cs) {
2370
2371         // Do not fire empty events
2372         if (cs.isEmpty())
2373             return;
2374
2375         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2376 //        g.state.barrier.inc();
2377
2378         try {
2379
2380             if (!cs.isEmpty()) {
2381                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2382                 for (ChangeListener l : changeListeners2) {
2383                     try {
2384                         l.graphChanged(e2);
2385                     } catch (Exception ex) {
2386                         ex.printStackTrace();
2387                     }
2388                 }
2389             }
2390
2391         } finally {
2392
2393 //            g.state.barrier.dec();
2394 //            g.waitAsync(null);
2395
2396         }
2397
2398     }
2399
2400     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2401         try {
2402
2403             // Do not fire empty events
2404             if (cs2.isEmpty())
2405                 return;
2406
2407 //            graph.restart();
2408 //            graph.state.barrier.inc();
2409
2410             try {
2411
2412                 if (!cs2.isEmpty()) {
2413
2414                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2415                     for (ChangeListener l : changeListeners2) {
2416                         try {
2417                             l.graphChanged(e2);
2418 //                            System.out.println("changelistener " + l);
2419                         } catch (Exception ex) {
2420                             ex.printStackTrace();
2421                         }
2422                     }
2423
2424                 }
2425
2426             } finally {
2427
2428 //                graph.state.barrier.dec();
2429 //                graph.waitAsync(null);
2430
2431             }
2432         } catch (Throwable t) {
2433             t.printStackTrace();
2434
2435         }
2436     }
2437     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2438
2439         try {
2440
2441             // Do not fire empty events
2442             if (cs2.isEmpty())
2443                 return;
2444
2445             // Do not fire on virtual requests
2446             if(graph.getProvider() != null)
2447                 return;
2448
2449             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2450
2451             try {
2452
2453                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2454                 for (ChangeListener l : metadataListeners) {
2455                     try {
2456                         l.graphChanged(e2);
2457                     } catch (Throwable ex) {
2458                         ex.printStackTrace();
2459                     }
2460                 }
2461
2462             } finally {
2463
2464             }
2465
2466         } catch (Throwable t) {
2467             t.printStackTrace();
2468         }
2469
2470     }
2471
2472
2473     /*
2474      * (non-Javadoc)
2475      *
2476      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2477      */
2478     @Override
2479     public <T> T getService(Class<T> api) {
2480         T t = peekService(api);
2481         if (t == null) {
2482             if (state.isClosed())
2483                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2484             throw new ServiceNotFoundException(this, api);
2485         }
2486         return t;
2487     }
2488
2489     /**
2490      * @return
2491      */
2492     protected abstract ServerInformation getCachedServerInformation();
2493
2494         private Class<?> serviceKey1 = null;
2495         private Class<?> serviceKey2 = null;
2496         private Object service1 = null;
2497         private Object service2 = null;
2498
2499     /*
2500      * (non-Javadoc)
2501      *
2502      * @see
2503      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2504      */
2505     @SuppressWarnings("unchecked")
2506     @Override
2507     public synchronized <T> T peekService(Class<T> api) {
2508
2509                 if(serviceKey1 == api) {
2510                         return (T)service1;
2511                 } else if (serviceKey2 == api) {
2512                         // Promote this key
2513                         Object result = service2;
2514                         service2 = service1;
2515                         serviceKey2 = serviceKey1;
2516                         service1 = result;
2517                         serviceKey1 = api;
2518                         return (T)result;
2519                 }
2520
2521         if (Layer0.class == api)
2522                 return (T) L0;
2523         if (ServerInformation.class == api)
2524             return (T) getCachedServerInformation();
2525         else if (WriteGraphImpl.class == api)
2526             return (T) writeState.getGraph();
2527         else if (ClusterBuilder.class == api)
2528             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2529         else if (ClusterBuilderFactory.class == api)
2530             return (T)new ClusterBuilderFactoryImpl(this);
2531
2532                 service2 = service1;
2533                 serviceKey2 = serviceKey1;
2534
2535         service1 = serviceLocator.peekService(api);
2536         serviceKey1 = api;
2537
2538         return (T)service1;
2539
2540     }
2541
2542     /*
2543      * (non-Javadoc)
2544      *
2545      * @see
2546      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2547      */
2548     @Override
2549     public boolean hasService(Class<?> api) {
2550         return serviceLocator.hasService(api);
2551     }
2552
2553     /**
2554      * @param api the api that must be implemented by the specified service
2555      * @param service the service implementation
2556      */
2557     @Override
2558     public <T> void registerService(Class<T> api, T service) {
2559         if(Layer0.class == api) {
2560                 L0 = (Layer0)service;
2561                 return;
2562         }
2563         serviceLocator.registerService(api, service);
2564         if (TransactionPolicySupport.class == api) {
2565             transactionPolicy = (TransactionPolicySupport)service;
2566             state.resetTransactionPolicy();
2567         }
2568         if (api == serviceKey1)
2569                 service1 = service;
2570         else if (api == serviceKey2)
2571                 service2 = service;
2572     }
2573
2574     // ----------------
2575     // SessionMonitor
2576     // ----------------
2577
2578     void fireSessionVariableChange(String variable) {
2579         for (MonitorHandler h : monitorHandlers) {
2580             MonitorContext ctx = monitorContexts.getLeft(h);
2581             assert ctx != null;
2582             // SafeRunner functionality repeated here to avoid dependency.
2583             try {
2584                 h.valuesChanged(ctx);
2585             } catch (Exception e) {
2586                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2587             } catch (LinkageError e) {
2588                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2589             }
2590         }
2591     }
2592
2593
2594     class ResourceSerializerImpl implements ResourceSerializer {
2595
2596         public long createRandomAccessId(int id) throws DatabaseException {
2597             if(id < 0)
2598                 return id;
2599             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2600             long cluster = getCluster(id);
2601             if (0 == cluster)
2602                 return 0; // Better to return 0 then invalid id.
2603             long result = ClusterTraitsBase.createResourceId(cluster, index);
2604             return result;
2605         }
2606
2607         @Override
2608         public long getRandomAccessId(Resource resource) throws DatabaseException {
2609
2610             ResourceImpl resourceImpl = (ResourceImpl) resource;
2611             return createRandomAccessId(resourceImpl.id);
2612
2613         }
2614
2615         @Override
2616         public int getTransientId(Resource resource) throws DatabaseException {
2617
2618             ResourceImpl resourceImpl = (ResourceImpl) resource;
2619             return resourceImpl.id;
2620
2621         }
2622
2623         @Override
2624         public String createRandomAccessId(Resource resource)
2625         throws InvalidResourceReferenceException {
2626
2627             if(resource == null) throw new IllegalArgumentException();
2628
2629             ResourceImpl resourceImpl = (ResourceImpl) resource;
2630
2631             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2632
2633             int r;
2634             try {
2635                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2636             } catch (DatabaseException e1) {
2637                 throw new InvalidResourceReferenceException(e1);
2638             }
2639             try {
2640                 // Serialize as '<resource index>_<cluster id>'
2641                 return "" + r + "_" + getCluster(resourceImpl);
2642             } catch (Throwable e) {
2643                 e.printStackTrace();
2644                 throw new InvalidResourceReferenceException(e);
2645             } finally {
2646             }
2647         }
2648
2649         public int getTransientId(long serialized) throws DatabaseException {
2650             if (serialized <= 0)
2651                 return (int)serialized;
2652             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2653             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2654             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2655             if (cluster == null)
2656                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2657             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2658             return key;
2659         }
2660
2661         @Override
2662         public Resource getResource(long randomAccessId) throws DatabaseException {
2663             return getResourceByKey(getTransientId(randomAccessId));
2664         }
2665
2666         @Override
2667         public Resource getResource(int transientId) throws DatabaseException {
2668             return getResourceByKey(transientId);
2669         }
2670
2671         @Override
2672         public Resource getResource(String randomAccessId)
2673         throws InvalidResourceReferenceException {
2674             try {
2675                 int i = randomAccessId.indexOf('_');
2676                 if (i == -1)
2677                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2678                             + randomAccessId + "'");
2679                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2680                 if(r < 0) return getResourceByKey(r);
2681                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2682                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2683                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2684                 if (cluster.hasResource(key, clusterTranslator))
2685                     return getResourceByKey(key);
2686             } catch (InvalidResourceReferenceException e) {
2687                 throw e;
2688             } catch (NumberFormatException e) {
2689                 throw new InvalidResourceReferenceException(e);
2690             } catch (Throwable e) {
2691                 e.printStackTrace();
2692                 throw new InvalidResourceReferenceException(e);
2693             } finally {
2694             }
2695             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2696         }
2697
2698         @Override
2699         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2700             try {
2701                 return true;
2702             } catch (Throwable e) {
2703                 e.printStackTrace();
2704                 throw new InvalidResourceReferenceException(e);
2705             } finally {
2706             }
2707         }
2708     }
2709
2710     // Copied from old SessionImpl
2711
2712     void check() {
2713         if (state.isClosed())
2714             throw new Error("Session closed.");
2715     }
2716
2717
2718
2719     public ResourceImpl getNewResource(long clusterId) {
2720         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2721         int newId;
2722         try {
2723             newId = cluster.createResource(clusterTranslator);
2724         } catch (DatabaseException e) {
2725             Logger.defaultLogError(e);
2726             return null;
2727         }
2728         return new ResourceImpl(resourceSupport, newId);
2729     }
2730
2731     public ResourceImpl getNewResource(Resource clusterSet)
2732     throws DatabaseException {
2733         long resourceId = clusterSet.getResourceId();
2734         Long clusterId = clusterSetsSupport.get(resourceId);
2735         if (null == clusterId)
2736             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2737         if (Constants.NewClusterId == clusterId) {
2738             clusterId = getService(ClusteringSupport.class).createCluster();
2739             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2740                 createdClusters.add(clusterId);
2741             clusterSetsSupport.put(resourceId, clusterId);
2742             return getNewResource(clusterId);
2743         } else {
2744             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2745             ResourceImpl result;
2746             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2747                 clusterId = getService(ClusteringSupport.class).createCluster();
2748                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2749                     createdClusters.add(clusterId);
2750                 clusterSetsSupport.put(resourceId, clusterId);
2751                 return getNewResource(clusterId);
2752             } else {
2753                 result = getNewResource(clusterId);
2754                 int resultKey = querySupport.getId(result);
2755                 long resultCluster = querySupport.getClusterId(resultKey);
2756                 if (clusterId != resultCluster)
2757                     clusterSetsSupport.put(resourceId, resultCluster);
2758                 return result;
2759             }
2760         }
2761     }
2762
2763     public void getNewClusterSet(Resource clusterSet)
2764     throws DatabaseException {
2765         if (DEBUG)
2766             System.out.println("new cluster set=" + clusterSet);
2767         long resourceId = clusterSet.getResourceId();
2768         if (clusterSetsSupport.containsKey(resourceId))
2769             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2770         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2771     }
2772     public boolean containsClusterSet(Resource clusterSet)
2773     throws ServiceException {
2774         long resourceId = clusterSet.getResourceId();
2775         return clusterSetsSupport.containsKey(resourceId);
2776     }
2777     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2778         Resource r = defaultClusterSet;
2779         defaultClusterSet = clusterSet;
2780         return r;
2781     }
2782     void printDiagnostics() {
2783
2784         if (DIAGNOSTICS) {
2785
2786             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2787             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2788             for(ClusterI cluster : clusterTable.getClusters()) {
2789                 try {
2790                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2791                         residentClusters.set(residentClusters.get() + 1);
2792                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2793                     }
2794                 } catch (DatabaseException e) {
2795                     Logger.defaultLogError(e);
2796                 }
2797             }
2798
2799             System.out.println("--------------------------------");
2800             System.out.println("Cluster information:");
2801             System.out.println("-amount of resident clusters=" + residentClusters.get());
2802             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2803
2804             for(ClusterI cluster : clusterTable.getClusters()) {
2805                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2806                 try {
2807                     if (!cluster.isLoaded())
2808                         System.out.println("not loaded]");
2809                     else if (cluster.isEmpty())
2810                         System.out.println("is empty]");
2811                     else {
2812                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2813                         System.out.print(",references=" + cluster.getReferenceCount());
2814                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2815                     }
2816                 } catch (DatabaseException e) {
2817                     Logger.defaultLogError(e);
2818                     System.out.println("is corrupted]");
2819                 }
2820             }
2821
2822             queryProvider2.printDiagnostics();
2823             System.out.println("--------------------------------");
2824         }
2825
2826     }
2827
2828     public void fireStartReadTransaction() {
2829
2830         if (DIAGNOSTICS)
2831             System.out.println("StartReadTransaction");
2832
2833         for (SessionEventListener listener : eventListeners) {
2834             try {
2835                 listener.readTransactionStarted();
2836             } catch (Throwable t) {
2837                 t.printStackTrace();
2838             }
2839         }
2840
2841     }
2842
2843     public void fireFinishReadTransaction() {
2844
2845         if (DIAGNOSTICS)
2846             System.out.println("FinishReadTransaction");
2847
2848         for (SessionEventListener listener : eventListeners) {
2849             try {
2850                 listener.readTransactionFinished();
2851             } catch (Throwable t) {
2852                 t.printStackTrace();
2853             }
2854         }
2855
2856     }
2857
2858     public void fireStartWriteTransaction() {
2859
2860         if (DIAGNOSTICS)
2861             System.out.println("StartWriteTransaction");
2862
2863         for (SessionEventListener listener : eventListeners) {
2864             try {
2865                 listener.writeTransactionStarted();
2866             } catch (Throwable t) {
2867                 t.printStackTrace();
2868             }
2869         }
2870
2871     }
2872
2873     public void fireFinishWriteTransaction() {
2874
2875         if (DIAGNOSTICS)
2876             System.out.println("FinishWriteTransaction");
2877
2878         for (SessionEventListener listener : eventListeners) {
2879             try {
2880                 listener.writeTransactionFinished();
2881             } catch (Throwable t) {
2882                 t.printStackTrace();
2883             }
2884         }
2885
2886         Indexing.resetDependenciesIndexingDisabled();
2887
2888     }
2889
2890     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2891         throw new Error("Not supported at the moment.");
2892     }
2893
2894     State getState() {
2895         return state;
2896     }
2897
2898     ClusterTable getClusterTable() {
2899         return clusterTable;
2900     }
2901
2902     public GraphSession getGraphSession() {
2903         return graphSession;
2904     }
2905
2906     public QueryProcessor getQueryProvider2() {
2907         return queryProvider2;
2908     }
2909
2910     ClientChangesImpl getClientChanges() {
2911         return clientChanges;
2912     }
2913
2914     boolean getWriteOnly() {
2915         return writeOnly;
2916     }
2917
2918     static int counter = 0;
2919
2920     public void onClusterLoaded(long clusterId) {
2921
2922         clusterTable.updateSize();
2923
2924     }
2925
2926
2927
2928
2929     /*
2930      * Implementation of the interface RequestProcessor
2931      */
2932
2933     @Override
2934     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2935
2936         assertNotSession();
2937         assertAlive();
2938
2939         assert(request != null);
2940
2941         final DataContainer<T> result = new DataContainer<T>();
2942         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2943
2944         syncRequest(request, new AsyncProcedure<T>() {
2945
2946             @Override
2947             public void execute(AsyncReadGraph graph, T t) {
2948                 result.set(t);
2949             }
2950
2951             @Override
2952             public void exception(AsyncReadGraph graph, Throwable t) {
2953                 exception.set(t);
2954             }
2955
2956         });
2957
2958         Throwable t = exception.get();
2959         if(t != null) {
2960             if(t instanceof DatabaseException) throw (DatabaseException)t;
2961             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2962         }
2963
2964         return result.get();
2965
2966     }
2967
2968 //    @Override
2969 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2970 //        assertNotSession();
2971 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2972 //    }
2973
2974     @Override
2975     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2976         assertNotSession();
2977         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2978     }
2979
2980     @Override
2981     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2982         assertNotSession();
2983         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2984     }
2985
2986     @Override
2987     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2988         assertNotSession();
2989         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2990     }
2991
2992     @Override
2993     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2994         assertNotSession();
2995         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2996     }
2997
2998     @Override
2999     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3000
3001         assertNotSession();
3002
3003         assert(request != null);
3004
3005         final DataContainer<T> result = new DataContainer<T>();
3006         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3007
3008         syncRequest(request, new AsyncProcedure<T>() {
3009
3010             @Override
3011             public void execute(AsyncReadGraph graph, T t) {
3012                 result.set(t);
3013             }
3014
3015             @Override
3016             public void exception(AsyncReadGraph graph, Throwable t) {
3017                 exception.set(t);
3018             }
3019
3020         });
3021
3022         Throwable t = exception.get();
3023         if(t != null) {
3024             if(t instanceof DatabaseException) throw (DatabaseException)t;
3025             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3026         }
3027
3028         return result.get();
3029
3030     }
3031
3032     @Override
3033     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3034         assertNotSession();
3035         return syncRequest(request, (AsyncProcedure<T>)procedure);
3036     }
3037
3038     @Override
3039     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3040         assertNotSession();
3041         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3042     }
3043
3044     @Override
3045     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3046         assertNotSession();
3047         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3048     }
3049
3050     @Override
3051     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3052         assertNotSession();
3053         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3054     }
3055
3056     @Override
3057     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3058         assertNotSession();
3059         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3060     }
3061
3062     @Override
3063     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3064
3065         assertNotSession();
3066
3067         assert(request != null);
3068
3069         final ArrayList<T> result = new ArrayList<T>();
3070         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3071
3072         syncRequest(request, new AsyncMultiProcedure<T>() {
3073
3074             @Override
3075             public void execute(AsyncReadGraph graph, T t) {
3076                 synchronized(result) {
3077                     result.add(t);
3078                 }
3079             }
3080
3081             @Override
3082             public void finished(AsyncReadGraph graph) {
3083             }
3084
3085             @Override
3086             public void exception(AsyncReadGraph graph, Throwable t) {
3087                 exception.set(t);
3088             }
3089
3090
3091         });
3092
3093         Throwable t = exception.get();
3094         if(t != null) {
3095             if(t instanceof DatabaseException) throw (DatabaseException)t;
3096             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3097         }
3098
3099         return result;
3100
3101     }
3102
3103     @Override
3104     public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3105         assertNotSession();
3106         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3107     }
3108
3109     @Override
3110     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3111         assertNotSession();
3112         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3113     }
3114
3115     @Override
3116     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3117         assertNotSession();
3118         return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3119     }
3120
3121     @Override
3122     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3123         assertNotSession();
3124         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3125     }
3126
3127     @Override
3128     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3129         assertNotSession();
3130         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3131     }
3132
3133     @Override
3134     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3135
3136         assertNotSession();
3137
3138         assert(request != null);
3139
3140         final ArrayList<T> result = new ArrayList<T>();
3141         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3142
3143         syncRequest(request, new AsyncMultiProcedure<T>() {
3144
3145             @Override
3146             public void execute(AsyncReadGraph graph, T t) {
3147                 synchronized(result) {
3148                     result.add(t);
3149                 }
3150             }
3151
3152             @Override
3153             public void finished(AsyncReadGraph graph) {
3154             }
3155
3156             @Override
3157             public void exception(AsyncReadGraph graph, Throwable t) {
3158                 exception.set(t);
3159             }
3160
3161
3162         });
3163
3164         Throwable t = exception.get();
3165         if(t != null) {
3166             if(t instanceof DatabaseException) throw (DatabaseException)t;
3167             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3168         }
3169
3170         return result;
3171
3172     }
3173
3174     @Override
3175     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3176         assertNotSession();
3177         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3178     }
3179
3180     @Override
3181     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3182         assertNotSession();
3183         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3184     }
3185
3186     @Override
3187     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3188         assertNotSession();
3189        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3190     }
3191
3192     @Override
3193     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3194         assertNotSession();
3195         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3196     }
3197
3198     @Override
3199     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3200         assertNotSession();
3201         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3202     }
3203
3204
3205     @Override
3206     public <T> void asyncRequest(Read<T> request) {
3207
3208         asyncRequest(request, new ProcedureAdapter<T>() {
3209             @Override
3210             public void exception(Throwable t) {
3211                 t.printStackTrace();
3212             }
3213         });
3214
3215     }
3216
3217     @Override
3218     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3219         asyncRequest(request, (AsyncProcedure<T>)procedure);
3220     }
3221
3222     @Override
3223     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3224         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3225     }
3226
3227     @Override
3228     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3229         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3230     }
3231
3232     @Override
3233     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3234         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3235     }
3236
3237     @Override
3238     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3239         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3240     }
3241
3242     @Override
3243     final public <T> void asyncRequest(final AsyncRead<T> request) {
3244
3245         assert(request != null);
3246
3247         asyncRequest(request, new ProcedureAdapter<T>() {
3248             @Override
3249             public void exception(Throwable t) {
3250                 t.printStackTrace();
3251             }
3252         });
3253
3254     }
3255
3256     @Override
3257     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3258         scheduleRequest(request, procedure, procedure, null);
3259     }
3260
3261     @Override
3262     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3263         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3264     }
3265
3266     @Override
3267     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3268         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3269     }
3270
3271     @Override
3272     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3273         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3274     }
3275
3276     @Override
3277     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3278         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3279     }
3280
3281     @Override
3282     public <T> void asyncRequest(MultiRead<T> request) {
3283
3284         assert(request != null);
3285
3286         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3287             @Override
3288             public void exception(AsyncReadGraph graph, Throwable t) {
3289                 t.printStackTrace();
3290             }
3291         });
3292
3293     }
3294
3295     @Override
3296     public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3297         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3298     }
3299
3300     @Override
3301     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3302         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3303     }
3304
3305     @Override
3306     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3307         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3308     }
3309
3310     @Override
3311     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3312         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3313     }
3314
3315     @Override
3316     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3317         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3318     }
3319
3320     @Override
3321     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3322
3323         assert(request != null);
3324
3325         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3326             @Override
3327             public void exception(AsyncReadGraph graph, Throwable t) {
3328                 t.printStackTrace();
3329             }
3330         });
3331
3332     }
3333
3334     @Override
3335     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3336         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3337     }
3338
3339     @Override
3340     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3341         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3342     }
3343
3344     @Override
3345     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3346         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3347     }
3348
3349     @Override
3350     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3351         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3352     }
3353
3354     @Override
3355     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3356         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3357     }
3358
3359     @Override
3360     public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3361         assertNotSession();
3362         throw new Error("Not implemented!");
3363     }
3364
3365     @Override
3366     public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3367         throw new Error("Not implemented!");
3368     }
3369
3370     @Override
3371     final public <T> void asyncRequest(final ExternalRead<T> request) {
3372
3373         assert(request != null);
3374
3375         asyncRequest(request, new ProcedureAdapter<T>() {
3376             @Override
3377             public void exception(Throwable t) {
3378                 t.printStackTrace();
3379             }
3380         });
3381
3382     }
3383
3384     @Override
3385     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3386         asyncRequest(request, (Procedure<T>)procedure);
3387     }
3388
3389
3390
3391     void check(Throwable t) throws DatabaseException {
3392         if(t != null) {
3393             if(t instanceof DatabaseException) throw (DatabaseException)t;
3394             else throw new DatabaseException("Unexpected exception", t);
3395         }
3396     }
3397
3398     void check(DataContainer<Throwable> container) throws DatabaseException {
3399         Throwable t = container.get();
3400         if(t != null) {
3401             if(t instanceof DatabaseException) throw (DatabaseException)t;
3402             else throw new DatabaseException("Unexpected exception", t);
3403         }
3404     }
3405
3406
3407
3408
3409
3410
3411     boolean sameProvider(Write request) {
3412         if(writeState.getGraph().provider != null) {
3413             return writeState.getGraph().provider.equals(request.getProvider());
3414         } else {
3415             return request.getProvider() == null;
3416         }
3417     }
3418
3419     boolean plainWrite(WriteGraphImpl graph) {
3420         if(graph == null) return false;
3421         if(graph.writeSupport.writeOnly()) return false;
3422         return true;
3423     }
3424
3425
3426     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3427     private void assertNotSession() throws DatabaseException {
3428         Thread current = Thread.currentThread();
3429         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3430     }
3431
3432     void assertAlive() {
3433         if (!state.isAlive())
3434             throw new RuntimeDatabaseException("Session has been shut down.");
3435     }
3436
3437     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3438         return querySupport.getValueStream(graph, querySupport.getId(resource));
3439     }
3440
3441     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3442         return querySupport.getValue(graph, querySupport.getId(resource));
3443     }
3444
3445     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3446         acquire(semaphore, request, null);
3447     }
3448
3449     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3450         assertAlive();
3451         try {
3452             long tryCount = 0;
3453             // Loop until the semaphore is acquired reporting requests that take a long time.
3454             while (true) {
3455
3456                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3457                     // Acquired OK.
3458                     return;
3459                 } else {
3460                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3461                         ++tryCount;
3462                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3463                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3464                                 * tryCount;
3465                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3466                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3467                     }
3468                 }
3469
3470                 assertAlive();
3471
3472             }
3473         } catch (InterruptedException e) {
3474             e.printStackTrace();
3475             // FIXME: Should perhaps do something else in this case ??
3476         }
3477     }
3478
3479
3480
3481 //    @Override
3482 //    public boolean holdOnToTransactionAfterCancel() {
3483 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3484 //    }
3485 //
3486 //    @Override
3487 //    public boolean holdOnToTransactionAfterCommit() {
3488 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3489 //    }
3490 //
3491 //    @Override
3492 //    public boolean holdOnToTransactionAfterRead() {
3493 //        return transactionPolicy.holdOnToTransactionAfterRead();
3494 //    }
3495 //
3496 //    @Override
3497 //    public void onRelinquish() {
3498 //        transactionPolicy.onRelinquish();
3499 //    }
3500 //
3501 //    @Override
3502 //    public void onRelinquishDone() {
3503 //        transactionPolicy.onRelinquishDone();
3504 //    }
3505 //
3506 //    @Override
3507 //    public void onRelinquishError() {
3508 //        transactionPolicy.onRelinquishError();
3509 //    }
3510
3511
3512     @Override
3513     public Session getSession() {
3514         return this;
3515     }
3516
3517
3518     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3519     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3520
3521     public void ceased(int thread) {
3522
3523         requestManager.ceased(thread);
3524
3525     }
3526
3527     public int getAmountOfQueryThreads() {
3528         // This must be a power of two
3529         return 1;
3530 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3531     }
3532
3533     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3534
3535         return new ResourceImpl(resourceSupport, key);
3536
3537     }
3538
3539     public void acquireWriteOnly() {
3540         writeOnly = true;
3541     }
3542
3543     public void releaseWriteOnly(ReadGraphImpl graph) {
3544         writeOnly = false;
3545         queryProvider2.releaseWrite(graph);
3546     }
3547
3548     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3549
3550         long start = System.nanoTime();
3551
3552         while(dirtyPrimitives) {
3553             dirtyPrimitives = false;
3554             getQueryProvider2().performDirtyUpdates(writer);
3555             getQueryProvider2().performScheduledUpdates(writer);
3556         }
3557
3558         fireMetadataListeners(writer, clientChanges);
3559
3560         if(DebugPolicy.PERFORMANCE_DATA) {
3561             long end = System.nanoTime();
3562             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3563         }
3564
3565     }
3566
3567     void removeTemporaryData() {
3568         File platform = Platform.getLocation().toFile();
3569         File tempFiles = new File(platform, "tempFiles");
3570         File temp = new File(tempFiles, "db");
3571         if(!temp.exists()) 
3572             return;
3573         try {
3574             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3575                 @Override
3576                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3577                     try {
3578                         Files.delete(file);
3579                     } catch (IOException e) {
3580                         Logger.defaultLogError(e);
3581                     }
3582                     return FileVisitResult.CONTINUE;
3583                 }
3584             });
3585         } catch (IOException e) {
3586             Logger.defaultLogError(e);
3587         }
3588     }
3589
3590     public void handleCreatedClusters() {
3591         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3592             createdClusters.forEach(new TLongProcedure() {
3593
3594                 @Override
3595                 public boolean execute(long value) {
3596                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3597                     cluster.setImmutable(true, clusterTranslator);
3598                     return true;
3599                 }
3600             });
3601         }
3602         createdClusters.clear();
3603     }
3604
3605     @Override
3606     public Object getModificationCounter() {
3607         return queryProvider2.modificationCounter;
3608     }
3609     
3610     @Override
3611     public void markUndoPoint() {
3612         state.setCombine(false);
3613     }
3614
3615 }