]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Fixes for non-compiling code after merge 5930811a7
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package fi.vtt.simantics.procore.internal;\r
13 \r
14 import java.io.File;\r
15 import java.io.IOException;\r
16 import java.io.InputStream;\r
17 import java.nio.charset.Charset;\r
18 import java.nio.file.FileVisitResult;\r
19 import java.nio.file.Files;\r
20 import java.nio.file.Path;\r
21 import java.nio.file.SimpleFileVisitor;\r
22 import java.nio.file.attribute.BasicFileAttributes;\r
23 import java.util.ArrayList;\r
24 import java.util.Collection;\r
25 import java.util.HashSet;\r
26 import java.util.TreeMap;\r
27 import java.util.concurrent.CopyOnWriteArrayList;\r
28 import java.util.concurrent.Semaphore;\r
29 import java.util.concurrent.atomic.AtomicInteger;\r
30 \r
31 import org.eclipse.core.runtime.Platform;\r
32 import org.simantics.databoard.Bindings;\r
33 import org.simantics.db.AsyncReadGraph;\r
34 import org.simantics.db.ChangeSet;\r
35 import org.simantics.db.DevelopmentKeys;\r
36 import org.simantics.db.ExternalValueSupport;\r
37 import org.simantics.db.Metadata;\r
38 import org.simantics.db.MonitorContext;\r
39 import org.simantics.db.MonitorHandler;\r
40 import org.simantics.db.Resource;\r
41 import org.simantics.db.ResourceSerializer;\r
42 import org.simantics.db.Session;\r
43 import org.simantics.db.SessionManager;\r
44 import org.simantics.db.SessionVariables;\r
45 import org.simantics.db.VirtualGraph;\r
46 import org.simantics.db.WriteGraph;\r
47 import org.simantics.db.authentication.UserAuthenticationAgent;\r
48 import org.simantics.db.authentication.UserAuthenticator;\r
49 import org.simantics.db.common.Indexing;\r
50 import org.simantics.db.common.TransactionPolicyRelease;\r
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;\r
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;\r
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;\r
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;\r
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;\r
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;\r
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;\r
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;\r
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;\r
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;\r
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;\r
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;\r
63 import org.simantics.db.common.utils.Logger;\r
64 import org.simantics.db.event.ChangeEvent;\r
65 import org.simantics.db.event.ChangeListener;\r
66 import org.simantics.db.event.SessionEventListener;\r
67 import org.simantics.db.exception.CancelTransactionException;\r
68 import org.simantics.db.exception.ClusterSetExistException;\r
69 import org.simantics.db.exception.DatabaseException;\r
70 import org.simantics.db.exception.ImmutableException;\r
71 import org.simantics.db.exception.InvalidResourceReferenceException;\r
72 import org.simantics.db.exception.ResourceNotFoundException;\r
73 import org.simantics.db.exception.RuntimeDatabaseException;\r
74 import org.simantics.db.exception.ServiceException;\r
75 import org.simantics.db.exception.ServiceNotFoundException;\r
76 import org.simantics.db.impl.ClusterBase;\r
77 import org.simantics.db.impl.ClusterI;\r
78 import org.simantics.db.impl.ClusterTraitsBase;\r
79 import org.simantics.db.impl.ClusterTranslator;\r
80 import org.simantics.db.impl.ResourceImpl;\r
81 import org.simantics.db.impl.TransientGraph;\r
82 import org.simantics.db.impl.VirtualGraphImpl;\r
83 import org.simantics.db.impl.graph.DelayedWriteGraph;\r
84 import org.simantics.db.impl.graph.ReadGraphImpl;\r
85 import org.simantics.db.impl.graph.WriteGraphImpl;\r
86 import org.simantics.db.impl.graph.WriteSupport;\r
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;\r
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;\r
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;\r
90 import org.simantics.db.impl.query.QueryProcessor;\r
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;\r
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;\r
93 import org.simantics.db.impl.service.QueryDebug;\r
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;\r
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;\r
96 import org.simantics.db.procedure.AsyncListener;\r
97 import org.simantics.db.procedure.AsyncMultiListener;\r
98 import org.simantics.db.procedure.AsyncMultiProcedure;\r
99 import org.simantics.db.procedure.AsyncProcedure;\r
100 import org.simantics.db.procedure.Listener;\r
101 import org.simantics.db.procedure.ListenerBase;\r
102 import org.simantics.db.procedure.MultiListener;\r
103 import org.simantics.db.procedure.MultiProcedure;\r
104 import org.simantics.db.procedure.Procedure;\r
105 import org.simantics.db.procedure.SyncListener;\r
106 import org.simantics.db.procedure.SyncMultiListener;\r
107 import org.simantics.db.procedure.SyncMultiProcedure;\r
108 import org.simantics.db.procedure.SyncProcedure;\r
109 import org.simantics.db.procore.cluster.ClusterImpl;\r
110 import org.simantics.db.procore.cluster.ClusterTraits;\r
111 import org.simantics.db.procore.protocol.Constants;\r
112 import org.simantics.db.procore.protocol.DebugPolicy;\r
113 import org.simantics.db.request.AsyncMultiRead;\r
114 import org.simantics.db.request.AsyncRead;\r
115 import org.simantics.db.request.DelayedWrite;\r
116 import org.simantics.db.request.DelayedWriteResult;\r
117 import org.simantics.db.request.ExternalRead;\r
118 import org.simantics.db.request.MultiRead;\r
119 import org.simantics.db.request.Read;\r
120 import org.simantics.db.request.ReadInterface;\r
121 import org.simantics.db.request.UndoTraits;\r
122 import org.simantics.db.request.Write;\r
123 import org.simantics.db.request.WriteInterface;\r
124 import org.simantics.db.request.WriteOnly;\r
125 import org.simantics.db.request.WriteOnlyResult;\r
126 import org.simantics.db.request.WriteResult;\r
127 import org.simantics.db.request.WriteTraits;\r
128 import org.simantics.db.service.ByteReader;\r
129 import org.simantics.db.service.ClusterBuilder;\r
130 import org.simantics.db.service.ClusterBuilderFactory;\r
131 import org.simantics.db.service.ClusterControl;\r
132 import org.simantics.db.service.ClusterSetsSupport;\r
133 import org.simantics.db.service.ClusterUID;\r
134 import org.simantics.db.service.ClusteringSupport;\r
135 import org.simantics.db.service.CollectionSupport;\r
136 import org.simantics.db.service.DebugSupport;\r
137 import org.simantics.db.service.DirectQuerySupport;\r
138 import org.simantics.db.service.GraphChangeListenerSupport;\r
139 import org.simantics.db.service.InitSupport;\r
140 import org.simantics.db.service.LifecycleSupport;\r
141 import org.simantics.db.service.ManagementSupport;\r
142 import org.simantics.db.service.QueryControl;\r
143 import org.simantics.db.service.SerialisationSupport;\r
144 import org.simantics.db.service.ServerInformation;\r
145 import org.simantics.db.service.ServiceActivityMonitor;\r
146 import org.simantics.db.service.SessionEventSupport;\r
147 import org.simantics.db.service.SessionMonitorSupport;\r
148 import org.simantics.db.service.SessionUserSupport;\r
149 import org.simantics.db.service.StatementSupport;\r
150 import org.simantics.db.service.TransactionPolicySupport;\r
151 import org.simantics.db.service.TransactionSupport;\r
152 import org.simantics.db.service.TransferableGraphSupport;\r
153 import org.simantics.db.service.UndoRedoSupport;\r
154 import org.simantics.db.service.VirtualGraphSupport;\r
155 import org.simantics.db.service.XSupport;\r
156 import org.simantics.layer0.Layer0;\r
157 import org.simantics.utils.DataContainer;\r
158 import org.simantics.utils.Development;\r
159 import org.simantics.utils.datastructures.Callback;\r
160 import org.simantics.utils.threads.logger.ITask;\r
161 import org.simantics.utils.threads.logger.ThreadLogger;\r
162 \r
163 import gnu.trove.procedure.TLongProcedure;\r
164 import gnu.trove.set.hash.TLongHashSet;\r
165 \r
166 \r
167 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {\r
168 \r
169     protected static final boolean DEBUG        = false;\r
170 \r
171     private static final boolean DIAGNOSTICS       = false;\r
172 \r
173     private TransactionPolicySupport transactionPolicy;\r
174 \r
175     final private ClusterControl clusterControl;\r
176 \r
177     final protected BuiltinSupportImpl builtinSupport;\r
178     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;\r
179     final protected ClusterSetsSupport clusterSetsSupport;\r
180     final protected LifecycleSupportImpl lifecycleSupport;\r
181 \r
182     protected QuerySupportImpl querySupport;\r
183     protected ResourceSupportImpl resourceSupport;\r
184     protected WriteSupport writeSupport;\r
185     public ClusterTranslator clusterTranslator;\r
186 \r
187     boolean dirtyPrimitives = false;\r
188 \r
189     public static final int SERVICE_MODE_CREATE = 2;\r
190     public static final int SERVICE_MODE_ALLOW = 1;\r
191     public int serviceMode = 0;\r
192     public boolean createdImmutableClusters = false;\r
193     public TLongHashSet createdClusters = new TLongHashSet();\r
194 \r
195     private Layer0 L0;\r
196 \r
197     /**\r
198      * The service locator maintained by the workbench. These services are\r
199      * initialized during workbench during the <code>init</code> method.\r
200      */\r
201     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();\r
202 \r
203     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();\r
204 \r
205     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();\r
206 \r
207     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();\r
208 \r
209     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();\r
210 \r
211     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();\r
212 \r
213     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();\r
214 \r
215     final protected State                                        state              = new State();\r
216 \r
217     protected GraphSession                                       graphSession       = null;\r
218 \r
219     protected SessionManager                                     sessionManagerImpl = null;\r
220 \r
221     protected UserAuthenticationAgent                            authAgent          = null;\r
222 \r
223     protected UserAuthenticator                                  authenticator      = null;\r
224 \r
225     protected Resource                                           user               = null;\r
226 \r
227     protected ClusterStream                                      clusterStream      = null;\r
228 \r
229     protected SessionRequestManager                         requestManager = null;\r
230 \r
231     public ClusterTable                                       clusterTable       = null;\r
232 \r
233     public QueryProcessor                                     queryProvider2 = null;\r
234 \r
235     ClientChangesImpl                                  clientChanges      = null;\r
236 \r
237     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];\r
238 \r
239 //    protected long                                               newClusterId       = Constants.NullClusterId;\r
240 \r
241     protected int                                                flushCounter       = 0;\r
242 \r
243     protected boolean                                            writeOnly          = false;\r
244 \r
245     WriteState<?>                                                writeState = null;\r
246     WriteStateBase<?>                                            delayedWriteState = null;\r
247     protected Resource defaultClusterSet = null; // If not null then used for newResource().\r
248 \r
249     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {\r
250 \r
251 //        if (authAgent == null)\r
252 //            throw new IllegalArgumentException("null authentication agent");\r
253 \r
254         File t = StaticSessionProperties.virtualGraphStoragePath;\r
255         if (null == t)\r
256             t = new File(".");\r
257         this.clusterTable = new ClusterTable(this, t);\r
258         this.builtinSupport = new BuiltinSupportImpl(this);\r
259         this.sessionManagerImpl = sessionManagerImpl;\r
260         this.user = null;\r
261 //        this.authAgent = authAgent;\r
262 \r
263         serviceLocator.registerService(Session.class, this);\r
264         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));\r
265         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));\r
266         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));\r
267         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));\r
268         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));\r
269         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));\r
270         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));\r
271         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));\r
272         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));\r
273         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));\r
274         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));\r
275         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));\r
276         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));\r
277         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));\r
278         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));\r
279         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));\r
280         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));\r
281         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());\r
282         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));\r
283         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());\r
284         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());\r
285         ServiceActivityUpdaterForWriteTransactions.register(this);\r
286 \r
287         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);\r
288         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);\r
289         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);\r
290         this.lifecycleSupport = new LifecycleSupportImpl(this);\r
291         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);\r
292         this.transactionPolicy = new TransactionPolicyRelease();\r
293         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);\r
294         this.clusterControl = new ClusterControlImpl(this);\r
295         serviceLocator.registerService(ClusterControl.class, clusterControl);\r
296         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.\r
297         this.clusterSetsSupport.setReadDirectory(t.toPath());\r
298         this.clusterSetsSupport.updateWriteDirectory(t.toPath());\r
299         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);\r
300     }\r
301 \r
302     /*\r
303      *\r
304      * Session interface\r
305      */\r
306 \r
307 \r
308     @Override\r
309     public Resource getRootLibrary() {\r
310         return queryProvider2.getRootLibraryResource();\r
311     }\r
312 \r
313     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {\r
314         if (!graphSession.dbSession.refreshEnabled())\r
315             return;\r
316         try {\r
317             getClusterTable().refresh(csid, this, clusterUID);\r
318         } catch (Throwable t) {\r
319             Logger.defaultLogError("Refesh failed.", t);\r
320         }\r
321     }\r
322 \r
323     static final class TaskHelper {\r
324         private final String name;\r
325         private Object result;\r
326         final Semaphore sema = new Semaphore(0);\r
327         private Throwable throwable = null;\r
328         final Callback<DatabaseException> callback = new Callback<DatabaseException>() {\r
329             @Override\r
330             public void run(DatabaseException e) {\r
331                 synchronized (TaskHelper.this) {\r
332                     throwable = e;\r
333                 }\r
334             }\r
335         };\r
336         final Procedure<Object> proc = new Procedure<Object>() {\r
337             @Override\r
338             public void execute(Object result) {\r
339                 callback.run(null);\r
340             }\r
341             @Override\r
342             public void exception(Throwable t) {\r
343                 if (t instanceof DatabaseException)\r
344                     callback.run((DatabaseException)t);\r
345                 else\r
346                     callback.run(new DatabaseException("" + name + "operation failed.", t));\r
347             }\r
348         };\r
349         final WriteTraits writeTraits = new WriteTraits() {\r
350             @Override\r
351             public UndoTraits getUndoTraits() {\r
352                 return null;\r
353             }\r
354             @Override\r
355             public VirtualGraph getProvider() {\r
356                 return null;\r
357             }\r
358         };\r
359         TaskHelper(String name) {\r
360             this.name = name;\r
361         }\r
362         <T> T getResult() {\r
363             return (T)result;\r
364         }\r
365         void setResult(Object result) {\r
366             this.result = result;\r
367         }\r
368 //        Throwable throwableGet() {\r
369 //            return throwable;\r
370 //        }\r
371         synchronized void throwableSet(Throwable t) {\r
372             throwable = t;\r
373         }\r
374 //        void throwableSet(String t) {\r
375 //            throwable = new InternalException("" + name + " operation failed. " + t);\r
376 //        }\r
377         synchronized void throwableCheck()\r
378         throws DatabaseException {\r
379             if (null != throwable)\r
380                 if (throwable instanceof DatabaseException)\r
381                     throw (DatabaseException)throwable;\r
382                 else\r
383                     throw new DatabaseException("Undo operation failed.", throwable);\r
384         }\r
385         void throw_(String message)\r
386         throws DatabaseException {\r
387             throw new DatabaseException("" + name + " operation failed. " + message);\r
388         }\r
389     }\r
390 \r
391 \r
392 \r
393     private ListenerBase getListenerBase(Object procedure) {\r
394         if (procedure instanceof ListenerBase)\r
395             return (ListenerBase) procedure;\r
396         else\r
397             return null;\r
398     }\r
399 \r
400     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {\r
401         scheduleRequest(request, callback, notify, null);\r
402     }\r
403 \r
404     /* (non-Javadoc)\r
405      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)\r
406      */\r
407     @Override\r
408     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {\r
409 \r
410         assert (request != null);\r
411 \r
412         if(Development.DEVELOPMENT) {\r
413             try {\r
414             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))\r
415                 System.err.println("schedule write '" + request + "'");\r
416             } catch (Throwable t) {\r
417                 Logger.defaultLogError(t);\r
418             }\r
419         }\r
420 \r
421         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
422 \r
423         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {\r
424 \r
425             @Override\r
426             public void run(int thread) {\r
427 \r
428                 if(Development.DEVELOPMENT) {\r
429                     try {\r
430                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))\r
431                         System.err.println("perform write '" + request + "'");\r
432                     } catch (Throwable t) {\r
433                         Logger.defaultLogError(t);\r
434                     }\r
435                 }\r
436 \r
437                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
438 \r
439                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
440 \r
441                 try {\r
442 \r
443                     flushCounter = 0;\r
444                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
445 \r
446                     VirtualGraph vg = getProvider(request.getProvider());\r
447 \r
448                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
449                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {\r
450 \r
451                         @Override\r
452                         public void execute(Object result) {\r
453                             if(callback != null) callback.run(null);\r
454                         }\r
455 \r
456                         @Override\r
457                         public void exception(Throwable t) {\r
458                             if(callback != null) callback.run((DatabaseException)t);\r
459                         }\r
460 \r
461                     });\r
462 \r
463                     assert (null != writer);\r
464 //                    writer.state.barrier.inc();\r
465 \r
466                     try {\r
467                         request.perform(writer);\r
468                         assert (null != writer);\r
469                     } catch (Throwable t) {\r
470                         if (!(t instanceof CancelTransactionException))\r
471                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);\r
472                         writeState.except(t);\r
473                     } finally {\r
474 //                        writer.state.barrier.dec();\r
475 //                        writer.waitAsync(request);\r
476                     }\r
477 \r
478 \r
479                     assert(!queryProvider2.dirty);\r
480 \r
481                 } catch (Throwable e) {\r
482 \r
483                     // Log it first, just to be safe that the error is always logged.\r
484                     if (!(e instanceof CancelTransactionException))\r
485                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);\r
486 \r
487 //                    writeState.getGraph().state.barrier.dec();\r
488 //                    writeState.getGraph().waitAsync(request);\r
489 \r
490                     writeState.except(e);\r
491 \r
492 //                    try {\r
493 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.\r
494 //                        // All we can do here is to log those, can't really pass them anywhere.\r
495 //                        if (callback != null) {\r
496 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);\r
497 //                            else callback.run(new DatabaseException(e));\r
498 //                        }\r
499 //                    } catch (Throwable e2) {\r
500 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);\r
501 //                    }\r
502 //                    boolean empty = clusterStream.reallyFlush();\r
503 //                    int callerThread = -1;\r
504 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);\r
505 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);\r
506 //                    try {\r
507 //                        // Send all local changes to server so it can calculate correct reverse change set.\r
508 //                        // This will call clusterStream.accept().\r
509 //                        state.cancelCommit(context, clusterStream);\r
510 //                        if (!empty) {\r
511 //                            if (!context.isOk()) // this is a blocking operation\r
512 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");\r
513 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());\r
514 //                        }\r
515 //                        state.cancelCommit2(context, clusterStream);\r
516 //                    } catch (DatabaseException e3) {\r
517 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);\r
518 //                    } finally {\r
519 //                        clusterStream.setOff(false);\r
520 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
521 //                    }\r
522 \r
523                 } finally {\r
524 \r
525                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
526 \r
527                 }\r
528 \r
529                 task.finish();\r
530 \r
531                 if(Development.DEVELOPMENT) {\r
532                     try {\r
533                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))\r
534                         System.err.println("finish write '" + request + "'");\r
535                     } catch (Throwable t) {\r
536                         Logger.defaultLogError(t);\r
537                     }\r
538                 }\r
539 \r
540             }\r
541 \r
542         }, combine);\r
543 \r
544     }\r
545 \r
546     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {\r
547         scheduleRequest(request, procedure, notify, null);\r
548     }\r
549 \r
550     /* (non-Javadoc)\r
551      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)\r
552      */\r
553     @Override\r
554     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {\r
555 \r
556         assert (request != null);\r
557 \r
558         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
559 \r
560         requestManager.scheduleWrite(new SessionTask(request, thread) {\r
561 \r
562             @Override\r
563             public void run(int thread) {\r
564 \r
565                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
566 \r
567                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
568 \r
569                 flushCounter = 0;\r
570                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
571 \r
572                 VirtualGraph vg = getProvider(request.getProvider());\r
573                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
574 \r
575                 try {\r
576                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);\r
577                     writeState = writeStateT;\r
578 \r
579                     assert (null != writer);\r
580 //                    writer.state.barrier.inc();\r
581                     writeStateT.setResult(request.perform(writer));\r
582                     assert (null != writer);\r
583 \r
584 //                    writer.state.barrier.dec();\r
585 //                    writer.waitAsync(null);\r
586 \r
587                 } catch (Throwable e) {\r
588 \r
589 //                    writer.state.barrier.dec();\r
590 //                    writer.waitAsync(null);\r
591 \r
592                     writeState.except(e);\r
593 \r
594 //                  state.stopWriteTransaction(clusterStream);\r
595 //\r
596 //              } catch (Throwable e) {\r
597 //                  // Log it first, just to be safe that the error is always logged.\r
598 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);\r
599 //\r
600 //                  try {\r
601 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.\r
602 //                      // All we can do here is to log those, can't really pass them anywhere.\r
603 //                      if (procedure != null) {\r
604 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);\r
605 //                          else procedure.exception(new DatabaseException(e));\r
606 //                      }\r
607 //                  } catch (Throwable e2) {\r
608 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);\r
609 //                  }\r
610 //\r
611 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
612 //\r
613 //                  state.stopWriteTransaction(clusterStream);\r
614 \r
615                 } finally {\r
616                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
617                 }\r
618 \r
619 //              if(notify != null) notify.release();\r
620 \r
621                 task.finish();\r
622 \r
623             }\r
624 \r
625         }, combine);\r
626 \r
627     }\r
628 \r
629     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {\r
630         scheduleRequest(request, callback, notify, null);\r
631     }\r
632 \r
633     /* (non-Javadoc)\r
634      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)\r
635      */\r
636     @Override\r
637     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {\r
638 \r
639         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");\r
640 \r
641         assert (request != null);\r
642 \r
643         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
644 \r
645         requestManager.scheduleWrite(new SessionTask(request, thread) {\r
646 \r
647             @Override\r
648             public void run(int thread) {\r
649                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
650 \r
651                 Procedure<Object> stateProcedure = new Procedure<Object>() {\r
652                     @Override\r
653                     public void execute(Object result) {\r
654                         if (callback != null)\r
655                             callback.run(null);\r
656                     }\r
657                     @Override\r
658                     public void exception(Throwable t) {\r
659                         if (callback != null) {\r
660                             if (t instanceof DatabaseException) callback.run((DatabaseException) t);\r
661                             else callback.run(new DatabaseException(t));\r
662                         } else\r
663                             Logger.defaultLogError("Unhandled exception", t);\r
664                     }\r
665                 };\r
666 \r
667                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
668                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);\r
669                 DelayedWriteGraph dwg = null;\r
670 //                newGraph.state.barrier.inc();\r
671 \r
672                 try {\r
673                     dwg = new DelayedWriteGraph(newGraph);\r
674                     request.perform(dwg);\r
675                 } catch (Throwable e) {\r
676                     delayedWriteState.except(e);\r
677                     total.finish();\r
678                     return;\r
679                 } finally {\r
680 //                    newGraph.state.barrier.dec();\r
681 //                    newGraph.waitAsync(request);\r
682                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
683                 }\r
684 \r
685                 delayedWriteState = null;\r
686 \r
687                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");\r
688                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
689 \r
690                 flushCounter = 0;\r
691                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
692 \r
693                 acquireWriteOnly();\r
694 \r
695                 VirtualGraph vg = getProvider(request.getProvider());\r
696                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();\r
697 \r
698                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
699 \r
700                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);\r
701 \r
702                 assert (null != writer);\r
703 //                writer.state.barrier.inc();\r
704 \r
705                 try {\r
706                     // Cannot cancel\r
707                     dwg.commit(writer, request);\r
708 \r
709                         if(defaultClusterSet != null) {\r
710                                 XSupport xs = getService(XSupport.class);\r
711                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);\r
712                                 ClusteringSupport cs = getService(ClusteringSupport.class);\r
713                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));\r
714                         }\r
715 \r
716                     // This makes clusters available from server\r
717                     clusterStream.reallyFlush();\r
718 \r
719                     releaseWriteOnly(writer);\r
720                     handleUpdatesAndMetadata(writer);\r
721 \r
722                 } catch (ServiceException e) {\r
723 //                    writer.state.barrier.dec();\r
724 //                    writer.waitAsync(null);\r
725 \r
726                     // These shall be requested from server\r
727                     clusterTable.removeWriteOnlyClusters();\r
728                     // This makes clusters available from server\r
729                     clusterStream.reallyFlush();\r
730 \r
731                     releaseWriteOnly(writer);\r
732                     writeState.except(e);\r
733                 } finally {\r
734                     // Debugging & Profiling\r
735                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
736                     task2.finish();\r
737                     total.finish();\r
738                 }\r
739             }\r
740 \r
741         }, combine);\r
742 \r
743     }\r
744 \r
745     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {\r
746         scheduleRequest(request, procedure, notify, null);\r
747     }\r
748 \r
749     /* (non-Javadoc)\r
750      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)\r
751      */\r
752     @Override\r
753     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {\r
754         throw new Error("Not implemented");\r
755     }\r
756 \r
757     protected ClusterImpl getNewResourceCluster() throws DatabaseException {\r
758         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);\r
759         if((serviceMode & SERVICE_MODE_CREATE) > 0) {\r
760             createdClusters.add(cluster.clusterId);\r
761         }\r
762         return cluster;\r
763     }\r
764 \r
765     class WriteOnlySupport implements WriteSupport {\r
766 \r
767         ClusterStream stream;\r
768         ClusterImpl currentCluster;\r
769 \r
770         public WriteOnlySupport() {\r
771             this.stream = clusterStream;\r
772         }\r
773 \r
774         @Override\r
775         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {\r
776             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );\r
777         }\r
778 \r
779         @Override\r
780         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {\r
781 \r
782             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);\r
783 \r
784             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)\r
785                 if(s != queryProvider2.getRootLibrary())\r
786                     throw new ImmutableException("Trying to modify immutable resource key=" + s);\r
787 \r
788             try {\r
789                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));\r
790             } catch (DatabaseException e) {\r
791                 Logger.defaultLogError(e);\r
792                 //throw e;\r
793             }\r
794 \r
795             clientChanges.invalidate(s);\r
796 \r
797             if (cluster.isWriteOnly())\r
798                 return;\r
799             queryProvider2.updateStatements(s, p);\r
800 \r
801         }\r
802 \r
803         @Override\r
804         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {\r
805             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);\r
806         }\r
807 \r
808         @Override\r
809         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {\r
810 \r
811             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);\r
812             try {\r
813                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));\r
814             } catch (DatabaseException e) {\r
815                 Logger.defaultLogError(e);\r
816             }\r
817 \r
818             clientChanges.invalidate(rid);\r
819 \r
820             if (cluster.isWriteOnly())\r
821                 return;\r
822             queryProvider2.updateValue(rid);\r
823 \r
824         }\r
825 \r
826         @Override\r
827         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {\r
828 \r
829             if(amount < 65536) {\r
830                 claimValue(provider,resource, reader.readBytes(null, amount));\r
831                 return;\r
832             }\r
833 \r
834             byte[] bytes = new byte[65536];\r
835 \r
836             int rid = ((ResourceImpl)resource).id;\r
837             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);\r
838             try {\r
839                 int left = amount;\r
840                 while(left > 0) {\r
841                     int block = Math.min(left, 65536);\r
842                     reader.readBytes(bytes, block);\r
843                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));\r
844                     left -= block;\r
845                 }\r
846             } catch (DatabaseException e) {\r
847                 Logger.defaultLogError(e);\r
848             }\r
849 \r
850             clientChanges.invalidate(rid);\r
851 \r
852             if (cluster.isWriteOnly())\r
853                 return;\r
854             queryProvider2.updateValue(rid);\r
855 \r
856         }\r
857 \r
858         private void maintainCluster(ClusterImpl before, ClusterI after_) {\r
859             if(after_ != null && after_ != before) {\r
860                 ClusterImpl after = (ClusterImpl)after_;\r
861                 if(currentCluster == before) currentCluster = after;\r
862                 clusterTable.replaceCluster(after);\r
863             }\r
864         }\r
865 \r
866         public int createResourceKey(int foreignCounter) throws DatabaseException {\r
867             if(currentCluster == null)\r
868                 currentCluster = getNewResourceCluster();\r
869             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {\r
870                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();\r
871                 newCluster.foreignLookup = new byte[foreignCounter];\r
872                 currentCluster = newCluster;\r
873                 if (DEBUG)\r
874                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);\r
875             }\r
876             return currentCluster.createResource(clusterTranslator);\r
877         }\r
878 \r
879         @Override\r
880         public Resource createResource(VirtualGraph provider) throws DatabaseException {\r
881             if(currentCluster == null) {\r
882                 if (null != defaultClusterSet) {\r
883                         ResourceImpl result = getNewResource(defaultClusterSet);\r
884                     currentCluster = clusterTable.getClusterByResourceKey(result.id);\r
885                     return result;\r
886                 } else {\r
887                     currentCluster = getNewResourceCluster();\r
888                 }\r
889             }\r
890             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {\r
891                 if (null != defaultClusterSet) {\r
892                         ResourceImpl result = getNewResource(defaultClusterSet);\r
893                     currentCluster = clusterTable.getClusterByResourceKey(result.id);\r
894                     return result;\r
895                 } else {\r
896                     currentCluster = getNewResourceCluster();\r
897                 }\r
898             }\r
899             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));\r
900         }\r
901 \r
902         @Override\r
903         public Resource createResource(VirtualGraph provider, long clusterId)\r
904         throws DatabaseException {\r
905             return getNewResource(clusterId);\r
906         }\r
907 \r
908         @Override\r
909         public Resource createResource(VirtualGraph provider, Resource clusterSet)\r
910         throws DatabaseException {\r
911             return getNewResource(clusterSet);\r
912         }\r
913 \r
914         @Override\r
915         public void createClusterSet(VirtualGraph provider, Resource clusterSet)\r
916                 throws DatabaseException {\r
917             getNewClusterSet(clusterSet);\r
918         }\r
919 \r
920         @Override\r
921         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)\r
922         throws ServiceException {\r
923             return containsClusterSet(clusterSet);\r
924         }\r
925 \r
926         public void selectCluster(long cluster) {\r
927                 currentCluster = clusterTable.getClusterByClusterId(cluster);\r
928                 long setResourceId = clusterSetsSupport.getSet(cluster);\r
929                 clusterSetsSupport.put(setResourceId, cluster);\r
930         }\r
931 \r
932         @Override\r
933         public Resource setDefaultClusterSet(Resource clusterSet)\r
934         throws ServiceException {\r
935                 Resource result = setDefaultClusterSet4NewResource(clusterSet);\r
936                 if(clusterSet != null) {\r
937                         long id = clusterSetsSupport.get(clusterSet.getResourceId());\r
938                         currentCluster = clusterTable.getClusterByClusterId(id);\r
939                         return result;\r
940                 } else {\r
941                         currentCluster = null;\r
942                         return null;\r
943                 }\r
944         }\r
945 \r
946         @Override\r
947         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {\r
948 \r
949                  provider = getProvider(provider);\r
950              if (null == provider) {\r
951                  int key = ((ResourceImpl)resource).id;\r
952 \r
953 \r
954 \r
955                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);\r
956 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)\r
957 //                      if(key != queryProvider2.getRootLibrary())\r
958 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);\r
959 \r
960 //                 try {\r
961 //                     cluster.removeValue(key, clusterTranslator);\r
962 //                 } catch (DatabaseException e) {\r
963 //                     Logger.defaultLogError(e);\r
964 //                     return;\r
965 //                 }\r
966 \r
967                  clusterTable.writeOnlyInvalidate(cluster);\r
968 \r
969                  try {\r
970                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);\r
971                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);\r
972                          clusterTranslator.removeValue(cluster);\r
973                  } catch (DatabaseException e) {\r
974                          Logger.defaultLogError(e);\r
975                  }\r
976 \r
977                  queryProvider2.invalidateResource(key);\r
978                  clientChanges.invalidate(key);\r
979 \r
980              } else {\r
981                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);\r
982                  queryProvider2.updateValue(querySupport.getId(resource));\r
983                  clientChanges.claimValue(resource);\r
984              }\r
985 \r
986 \r
987         }\r
988 \r
989         @Override\r
990         public void flush(boolean intermediate) {\r
991             throw new UnsupportedOperationException();\r
992         }\r
993 \r
994         @Override\r
995         public void flushCluster() {\r
996             clusterTable.flushCluster(graphSession);\r
997             if(defaultClusterSet != null) {\r
998                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);\r
999             }\r
1000             currentCluster = null;\r
1001         }\r
1002 \r
1003         @Override\r
1004         public void flushCluster(Resource r) {\r
1005             throw new UnsupportedOperationException("flushCluster resource " + r);\r
1006         }\r
1007 \r
1008         @Override\r
1009         public void gc() {\r
1010         }\r
1011 \r
1012         @Override\r
1013         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,\r
1014                 Resource object) {\r
1015 \r
1016                 int s = ((ResourceImpl)subject).id;\r
1017                 int p = ((ResourceImpl)predicate).id;\r
1018                 int o = ((ResourceImpl)object).id;\r
1019 \r
1020                 provider = getProvider(provider);\r
1021                 if (null == provider) {\r
1022 \r
1023                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);\r
1024                         clusterTable.writeOnlyInvalidate(cluster);\r
1025 \r
1026                         try {\r
1027 \r
1028                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);\r
1029                                 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);\r
1030                                 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);\r
1031 \r
1032                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);\r
1033                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);\r
1034 \r
1035                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);\r
1036                                 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);\r
1037                                 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);\r
1038                                 clusterTranslator.removeStatement(cluster);\r
1039 \r
1040                                 queryProvider2.invalidateResource(s);\r
1041                                 clientChanges.invalidate(s);\r
1042 \r
1043                         } catch (DatabaseException e) {\r
1044 \r
1045                                 Logger.defaultLogError(e);\r
1046 \r
1047                         }\r
1048 \r
1049                         return true;\r
1050 \r
1051                 } else {\r
1052                         \r
1053                         ((VirtualGraphImpl)provider).deny(s, p, o);\r
1054                         queryProvider2.invalidateResource(s);\r
1055                         clientChanges.invalidate(s);\r
1056 \r
1057                         return true;\r
1058 \r
1059                 }\r
1060 \r
1061         }\r
1062 \r
1063         @Override\r
1064         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {\r
1065             throw new UnsupportedOperationException();\r
1066         }\r
1067 \r
1068         @Override\r
1069         public boolean writeOnly() {\r
1070             return true;\r
1071         }\r
1072 \r
1073         @Override\r
1074         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {\r
1075             writeSupport.performWriteRequest(graph, request);\r
1076         }\r
1077 \r
1078         @Override\r
1079         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {\r
1080             throw new UnsupportedOperationException();\r
1081         }\r
1082 \r
1083         @Override\r
1084         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {\r
1085             throw new UnsupportedOperationException();\r
1086         }\r
1087 \r
1088         @Override\r
1089         public <T> void addMetadata(Metadata data) throws ServiceException {\r
1090             writeSupport.addMetadata(data);\r
1091         }\r
1092 \r
1093         @Override\r
1094         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {\r
1095             return writeSupport.getMetadata(clazz);\r
1096         }\r
1097 \r
1098         @Override\r
1099         public TreeMap<String, byte[]> getMetadata() {\r
1100             return writeSupport.getMetadata();\r
1101         }\r
1102 \r
1103         @Override\r
1104         public void commitDone(WriteTraits writeTraits, long csid) {\r
1105             writeSupport.commitDone(writeTraits, csid);\r
1106         }\r
1107 \r
1108         @Override\r
1109         public void clearUndoList(WriteTraits writeTraits) {\r
1110             writeSupport.clearUndoList(writeTraits);\r
1111         }\r
1112         @Override\r
1113         public int clearMetadata() {\r
1114             return writeSupport.clearMetadata();\r
1115         }\r
1116 \r
1117                 @Override\r
1118                 public void startUndo() {\r
1119                         writeSupport.startUndo();\r
1120                 }\r
1121     }\r
1122 \r
1123     class VirtualWriteOnlySupport implements WriteSupport {\r
1124 \r
1125 //        @Override\r
1126 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,\r
1127 //                Resource object) {\r
1128 //            throw new UnsupportedOperationException();\r
1129 //        }\r
1130 \r
1131         @Override\r
1132         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {\r
1133 \r
1134             TransientGraph impl = (TransientGraph)provider;\r
1135             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));\r
1136             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));\r
1137             clientChanges.claim(subject, predicate, object);\r
1138 \r
1139         }\r
1140 \r
1141         @Override\r
1142         public void claim(VirtualGraph provider, int subject, int predicate, int object) {\r
1143 \r
1144             TransientGraph impl = (TransientGraph)provider;\r
1145             impl.claim(subject, predicate, object);\r
1146             getQueryProvider2().updateStatements(subject, predicate);\r
1147             clientChanges.claim(subject, predicate, object);\r
1148 \r
1149         }\r
1150 \r
1151         @Override\r
1152         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {\r
1153             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);\r
1154         }\r
1155 \r
1156         @Override\r
1157         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {\r
1158             ((VirtualGraphImpl)provider).claimValue(resource, value, length);\r
1159             getQueryProvider2().updateValue(resource);\r
1160             clientChanges.claimValue(resource);\r
1161         }\r
1162 \r
1163         @Override\r
1164         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {\r
1165             byte[] value = reader.readBytes(null, amount);\r
1166             claimValue(provider, resource, value);\r
1167         }\r
1168 \r
1169         @Override\r
1170         public Resource createResource(VirtualGraph provider) {\r
1171             TransientGraph impl = (TransientGraph)provider;\r
1172             return impl.getResource(impl.newResource(false));\r
1173         }\r
1174 \r
1175         @Override\r
1176         public Resource createResource(VirtualGraph provider, long clusterId) {\r
1177             throw new UnsupportedOperationException();\r
1178         }\r
1179 \r
1180         @Override\r
1181         public Resource createResource(VirtualGraph provider, Resource clusterSet)\r
1182         throws DatabaseException {\r
1183             throw new UnsupportedOperationException();\r
1184         }\r
1185 \r
1186         @Override\r
1187         public void createClusterSet(VirtualGraph provider, Resource clusterSet)\r
1188         throws DatabaseException {\r
1189             throw new UnsupportedOperationException();\r
1190         }\r
1191 \r
1192         @Override\r
1193         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)\r
1194         throws ServiceException {\r
1195             throw new UnsupportedOperationException();\r
1196         }\r
1197 \r
1198         @Override\r
1199         public Resource setDefaultClusterSet(Resource clusterSet)\r
1200         throws ServiceException {\r
1201                 return null;\r
1202         }\r
1203 \r
1204         @Override\r
1205         public void denyValue(VirtualGraph provider, Resource resource) {\r
1206             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);\r
1207             getQueryProvider2().updateValue(querySupport.getId(resource));\r
1208             // NOTE: this only keeps track of value changes by-resource.\r
1209             clientChanges.claimValue(resource);\r
1210         }\r
1211 \r
1212         @Override\r
1213         public void flush(boolean intermediate) {\r
1214             throw new UnsupportedOperationException();\r
1215         }\r
1216 \r
1217         @Override\r
1218         public void flushCluster() {\r
1219             throw new UnsupportedOperationException();\r
1220         }\r
1221 \r
1222         @Override\r
1223         public void flushCluster(Resource r) {\r
1224             throw new UnsupportedOperationException("Resource " + r);\r
1225         }\r
1226 \r
1227         @Override\r
1228         public void gc() {\r
1229         }\r
1230 \r
1231         @Override\r
1232         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {\r
1233             TransientGraph impl = (TransientGraph) provider;\r
1234             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));\r
1235             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));\r
1236             clientChanges.deny(subject, predicate, object);\r
1237             return true;\r
1238         }\r
1239 \r
1240         @Override\r
1241         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {\r
1242             throw new UnsupportedOperationException();\r
1243         }\r
1244 \r
1245         @Override\r
1246         public boolean writeOnly() {\r
1247             return true;\r
1248         }\r
1249 \r
1250         @Override\r
1251         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {\r
1252             throw new UnsupportedOperationException();\r
1253         }\r
1254 \r
1255         @Override\r
1256         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {\r
1257             throw new UnsupportedOperationException();\r
1258         }\r
1259 \r
1260         @Override\r
1261         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {\r
1262             throw new UnsupportedOperationException();\r
1263         }\r
1264 \r
1265         @Override\r
1266         public <T> void addMetadata(Metadata data) throws ServiceException {\r
1267             throw new UnsupportedOperationException();\r
1268         }\r
1269 \r
1270         @Override\r
1271         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {\r
1272             throw new UnsupportedOperationException();\r
1273         }\r
1274 \r
1275         @Override\r
1276         public TreeMap<String, byte[]> getMetadata() {\r
1277             throw new UnsupportedOperationException();\r
1278         }\r
1279 \r
1280         @Override\r
1281         public void commitDone(WriteTraits writeTraits, long csid) {\r
1282         }\r
1283 \r
1284         @Override\r
1285         public void clearUndoList(WriteTraits writeTraits) {\r
1286         }\r
1287 \r
1288         @Override\r
1289         public int clearMetadata() {\r
1290             return 0;\r
1291         }\r
1292 \r
1293                 @Override\r
1294                 public void startUndo() {\r
1295                 }\r
1296     }\r
1297 \r
1298     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {\r
1299 \r
1300         try {\r
1301 \r
1302             int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1303 \r
1304             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
1305 \r
1306             flushCounter = 0;\r
1307             clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
1308 \r
1309             acquireWriteOnly();\r
1310 \r
1311             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();\r
1312 \r
1313             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());\r
1314 \r
1315             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);\r
1316             writeState = writeStateT;\r
1317 \r
1318             assert (null != writer);\r
1319 //            writer.state.barrier.inc();\r
1320             long start = System.nanoTime();\r
1321             T result = request.perform(writer);\r
1322             long duration = System.nanoTime() - start;\r
1323             if (DEBUG) {\r
1324                 System.err.println("################");\r
1325                 System.err.println("WriteOnly duration " + 1e-9*duration);\r
1326             }\r
1327             writeStateT.setResult(result);\r
1328 \r
1329             // This makes clusters available from server\r
1330             clusterStream.reallyFlush();\r
1331 \r
1332             // This will trigger query updates\r
1333             releaseWriteOnly(writer);\r
1334             assert (null != writer);\r
1335 \r
1336             handleUpdatesAndMetadata(writer);\r
1337 \r
1338         } catch (CancelTransactionException e) {\r
1339 \r
1340             releaseWriteOnly(writeState.getGraph());\r
1341 \r
1342             clusterTable.removeWriteOnlyClusters();\r
1343             state.stopWriteTransaction(clusterStream);\r
1344 \r
1345         } catch (Throwable e) {\r
1346 \r
1347             e.printStackTrace();\r
1348 \r
1349             releaseWriteOnly(writeState.getGraph());\r
1350 \r
1351             clusterTable.removeWriteOnlyClusters();\r
1352 \r
1353             if (callback != null)\r
1354                 callback.exception(new DatabaseException(e));\r
1355 \r
1356             state.stopWriteTransaction(clusterStream);\r
1357             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);\r
1358 \r
1359         } finally  {\r
1360 \r
1361             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
1362 \r
1363         }\r
1364 \r
1365     }\r
1366 \r
1367     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {\r
1368         scheduleRequest(request, callback, notify, null);\r
1369     }\r
1370 \r
1371     @Override\r
1372     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {\r
1373 \r
1374         assertAlive();\r
1375 \r
1376         assert (request != null);\r
1377 \r
1378         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1379 \r
1380         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {\r
1381 \r
1382             @Override\r
1383             public void run(int thread) {\r
1384 \r
1385                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
1386 \r
1387                     try {\r
1388 \r
1389                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
1390 \r
1391                         flushCounter = 0;\r
1392                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
1393 \r
1394                         acquireWriteOnly();\r
1395 \r
1396                         VirtualGraph vg = getProvider(request.getProvider());\r
1397                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();\r
1398 \r
1399                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
1400 \r
1401                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {\r
1402 \r
1403                             @Override\r
1404                             public void execute(Object result) {\r
1405                                 if(callback != null) callback.run(null);\r
1406                             }\r
1407 \r
1408                             @Override\r
1409                             public void exception(Throwable t) {\r
1410                                 if(callback != null) callback.run((DatabaseException)t);\r
1411                             }\r
1412 \r
1413                         });\r
1414 \r
1415                         assert (null != writer);\r
1416 //                        writer.state.barrier.inc();\r
1417 \r
1418                         try {\r
1419 \r
1420                             request.perform(writer);\r
1421 \r
1422                         } catch (Throwable e) {\r
1423 \r
1424 //                            writer.state.barrier.dec();\r
1425 //                            writer.waitAsync(null);\r
1426 \r
1427                             releaseWriteOnly(writer);\r
1428 \r
1429                             clusterTable.removeWriteOnlyClusters();\r
1430 \r
1431                             if(!(e instanceof CancelTransactionException)) {\r
1432                                 if (callback != null)\r
1433                                     callback.run(new DatabaseException(e));\r
1434                             }\r
1435 \r
1436                             writeState.except(e);\r
1437 \r
1438                             return;\r
1439 \r
1440                         }\r
1441 \r
1442                         // This makes clusters available from server\r
1443                         boolean empty = clusterStream.reallyFlush();\r
1444                         // This was needed to make WO requests call metadata listeners.\r
1445                         // NOTE: the calling event does not contain clientChanges information.\r
1446                         if (!empty && clientChanges.isEmpty())\r
1447                             clientChanges.setNotEmpty(true);\r
1448                         releaseWriteOnly(writer);\r
1449                         assert (null != writer);\r
1450                     } finally  {\r
1451 \r
1452                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
1453 \r
1454                     }\r
1455 \r
1456 \r
1457                 task.finish();\r
1458 \r
1459             }\r
1460 \r
1461         }, combine);\r
1462 \r
1463     }\r
1464 \r
1465     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {\r
1466         scheduleRequest(request, callback, notify, null);\r
1467     }\r
1468 \r
1469     /* (non-Javadoc)\r
1470      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)\r
1471      */\r
1472     @Override\r
1473     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {\r
1474 \r
1475         assert (request != null);\r
1476 \r
1477         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1478 \r
1479         requestManager.scheduleWrite(new SessionTask(request, thread) {\r
1480 \r
1481             @Override\r
1482             public void run(int thread) {\r
1483 \r
1484                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
1485 \r
1486                 performWriteOnly(request, notify, callback);\r
1487 \r
1488                 task.finish();\r
1489 \r
1490             }\r
1491 \r
1492         }, combine);\r
1493 \r
1494     }\r
1495 \r
1496     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {\r
1497 \r
1498         assert (request != null);\r
1499         assert (procedure != null);\r
1500 \r
1501         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1502 \r
1503         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {\r
1504 \r
1505             @Override\r
1506             public void run(int thread) {\r
1507 \r
1508                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1509 \r
1510                 ListenerBase listener = getListenerBase(procedure);\r
1511 \r
1512                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
1513 \r
1514                 try {\r
1515 \r
1516                     if (listener != null) {\r
1517 \r
1518                         try {\r
1519                                 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {\r
1520 \r
1521                                         @Override\r
1522                                         public void exception(AsyncReadGraph graph, Throwable t) {\r
1523                                                 procedure.exception(graph, t);\r
1524                                                 if(throwable != null) {\r
1525                                                         throwable.set(t);\r
1526                                                 } else {\r
1527                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);\r
1528                                                 }\r
1529                                         }\r
1530 \r
1531                                         @Override\r
1532                                         public void execute(AsyncReadGraph graph, T t) {\r
1533                                                 if(result != null) result.set(t);\r
1534                                                 procedure.execute(graph, t);\r
1535                                         }\r
1536 \r
1537                                 }, listener);\r
1538                         } catch (Throwable t) {\r
1539                             // This is handled by the AsyncProcedure\r
1540                                 //Logger.defaultLogError("Internal error", t);\r
1541                         }\r
1542 \r
1543                     } else {\r
1544 \r
1545                         try {\r
1546 \r
1547 //                            newGraph.state.barrier.inc();\r
1548 \r
1549                             T t = request.perform(newGraph);\r
1550 \r
1551                             try {\r
1552 \r
1553                                 if(result != null) result.set(t);\r
1554                                 procedure.execute(newGraph, t);\r
1555 \r
1556                             } catch (Throwable th) {\r
1557 \r
1558                                 if(throwable != null) {\r
1559                                     throwable.set(th);\r
1560                                 } else {\r
1561                                     Logger.defaultLogError("Unhandled exception", th);\r
1562                                 }\r
1563 \r
1564                             }\r
1565 \r
1566                         } catch (Throwable t) {\r
1567 \r
1568                             if (DEBUG)\r
1569                                 t.printStackTrace();\r
1570 \r
1571                             if(throwable != null) {\r
1572                                 throwable.set(t);\r
1573                             } else {\r
1574                                 Logger.defaultLogError("Unhandled exception", t);\r
1575                             }\r
1576 \r
1577                             try {\r
1578 \r
1579                                 procedure.exception(newGraph, t);\r
1580 \r
1581                             } catch (Throwable t2) {\r
1582 \r
1583                                 if(throwable != null) {\r
1584                                     throwable.set(t2);\r
1585                                 } else {\r
1586                                     Logger.defaultLogError("Unhandled exception", t2);\r
1587                                 }\r
1588 \r
1589                             }\r
1590 \r
1591                         }\r
1592 \r
1593 //                        newGraph.state.barrier.dec();\r
1594 //                        newGraph.waitAsync(request);\r
1595 \r
1596                     }\r
1597 \r
1598                 } finally {\r
1599 \r
1600                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1601 \r
1602                 }\r
1603 \r
1604             }\r
1605 \r
1606         });\r
1607 \r
1608     }\r
1609 \r
1610     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {\r
1611 \r
1612         assert (request != null);\r
1613         assert (procedure != null);\r
1614 \r
1615         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1616 \r
1617         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {\r
1618 \r
1619             @Override\r
1620             public void run(int thread) {\r
1621 \r
1622                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1623 \r
1624                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
1625 \r
1626                 try {\r
1627 \r
1628                     if (listener != null) {\r
1629 \r
1630                         newGraph.processor.query(newGraph, request, null, procedure, listener);\r
1631 \r
1632 //                        newGraph.waitAsync(request);\r
1633 \r
1634                     } else {\r
1635 \r
1636                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(\r
1637                                 procedure, "request");\r
1638 \r
1639                         try {\r
1640 \r
1641 //                            newGraph.state.barrier.inc();\r
1642 \r
1643                             request.perform(newGraph, wrapper);\r
1644 \r
1645 //                            newGraph.waitAsync(request);\r
1646 \r
1647                         } catch (Throwable t) {\r
1648 \r
1649                             wrapper.exception(newGraph, t);\r
1650 //                            newGraph.waitAsync(request);\r
1651 \r
1652 \r
1653                         }\r
1654 \r
1655                     }\r
1656 \r
1657                 } finally {\r
1658 \r
1659                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1660 \r
1661                 }\r
1662 \r
1663             }\r
1664 \r
1665         });\r
1666 \r
1667     }\r
1668 \r
1669     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {\r
1670 \r
1671         assert (request != null);\r
1672         assert (procedure != null);\r
1673 \r
1674         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1675 \r
1676         int sync = notify != null ? thread : -1;\r
1677 \r
1678         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {\r
1679 \r
1680             @Override\r
1681             public void run(int thread) {\r
1682 \r
1683                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1684 \r
1685                 ListenerBase listener = getListenerBase(procedure);\r
1686 \r
1687                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
1688 \r
1689                 try {\r
1690 \r
1691                     if (listener != null) {\r
1692 \r
1693                         newGraph.processor.query(newGraph, request, null, procedure, listener);\r
1694 \r
1695 //                        newGraph.waitAsync(request);\r
1696 \r
1697                     } else {\r
1698 \r
1699                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);\r
1700 \r
1701                         try {\r
1702 \r
1703                             request.perform(newGraph, wrapper);\r
1704 \r
1705                         } catch (Throwable t) {\r
1706 \r
1707                             t.printStackTrace();\r
1708 \r
1709                         }\r
1710 \r
1711                     }\r
1712 \r
1713                 } finally {\r
1714 \r
1715                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1716 \r
1717                 }\r
1718 \r
1719             }\r
1720 \r
1721         });\r
1722 \r
1723     }\r
1724 \r
1725     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {\r
1726 \r
1727         assert (request != null);\r
1728         assert (procedure != null);\r
1729 \r
1730         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1731 \r
1732         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {\r
1733 \r
1734             @Override\r
1735             public void run(int thread) {\r
1736 \r
1737                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1738 \r
1739                 ListenerBase listener = getListenerBase(procedure);\r
1740 \r
1741                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
1742 \r
1743                 try {\r
1744 \r
1745                     if (listener != null) {\r
1746 \r
1747                         newGraph.processor.query(newGraph, request, null, new Procedure<T>() {\r
1748 \r
1749                             @Override\r
1750                             public void exception(Throwable t) {\r
1751                                 procedure.exception(t);\r
1752                                 if(throwable != null) {\r
1753                                     throwable.set(t);\r
1754                                 }\r
1755                             }\r
1756 \r
1757                             @Override\r
1758                             public void execute(T t) {\r
1759                                 if(result != null) result.set(t);\r
1760                                 procedure.execute(t);\r
1761                             }\r
1762 \r
1763                         }, listener);\r
1764 \r
1765 //                        newGraph.waitAsync(request);\r
1766 \r
1767                     } else {\r
1768 \r
1769 //                        newGraph.state.barrier.inc();\r
1770 \r
1771                         request.register(newGraph, new Listener<T>() {\r
1772 \r
1773                             @Override\r
1774                             public void exception(Throwable t) {\r
1775                                 if(throwable != null) throwable.set(t);\r
1776                                 procedure.exception(t);\r
1777 //                                newGraph.state.barrier.dec();\r
1778                             }\r
1779 \r
1780                             @Override\r
1781                             public void execute(T t) {\r
1782                                 if(result != null) result.set(t);\r
1783                                 procedure.execute(t);\r
1784 //                                newGraph.state.barrier.dec();\r
1785                             }\r
1786 \r
1787                             @Override\r
1788                             public boolean isDisposed() {\r
1789                                 return true;\r
1790                             }\r
1791 \r
1792                         });\r
1793 \r
1794 //                        newGraph.waitAsync(request);\r
1795 \r
1796                     }\r
1797 \r
1798                 } finally {\r
1799 \r
1800                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1801 \r
1802                 }\r
1803 \r
1804             }\r
1805 \r
1806         });\r
1807 \r
1808     }\r
1809 \r
1810 \r
1811     @Override\r
1812     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {\r
1813 \r
1814         scheduleRequest(request, procedure, null, null, null);\r
1815 \r
1816     }\r
1817 \r
1818     @Override\r
1819     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {\r
1820         assertNotSession();\r
1821         Semaphore notify = new Semaphore(0);\r
1822         DataContainer<Throwable> container = new DataContainer<Throwable>();\r
1823         DataContainer<T> result = new DataContainer<T>();\r
1824         scheduleRequest(request, procedure, notify, container, result);\r
1825         acquire(notify, request);\r
1826         Throwable throwable = container.get();\r
1827         if(throwable != null) {\r
1828             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1829             else throw new DatabaseException("Unexpected exception", throwable);\r
1830         }\r
1831         return result.get();\r
1832     }\r
1833 \r
1834     @Override\r
1835     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {\r
1836         assertNotSession();\r
1837         Semaphore notify = new Semaphore(0);\r
1838         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();\r
1839         final DataContainer<T> resultContainer = new DataContainer<T>();\r
1840         scheduleRequest(request, new AsyncProcedure<T>() {\r
1841             @Override\r
1842             public void exception(AsyncReadGraph graph, Throwable throwable) {\r
1843                 exceptionContainer.set(throwable);\r
1844                 procedure.exception(graph, throwable);\r
1845             }\r
1846             @Override\r
1847             public void execute(AsyncReadGraph graph, T result) {\r
1848                 resultContainer.set(result);\r
1849                 procedure.execute(graph, result);\r
1850             }\r
1851         }, getListenerBase(procedure), notify);\r
1852         acquire(notify, request, procedure);\r
1853         Throwable throwable = exceptionContainer.get();\r
1854         if (throwable != null) {\r
1855             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1856             else throw new DatabaseException("Unexpected exception", throwable);\r
1857         }\r
1858         return resultContainer.get();\r
1859     }\r
1860 \r
1861 \r
1862     @Override\r
1863     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {\r
1864         assertNotSession();\r
1865         Semaphore notify = new Semaphore(0);\r
1866         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();\r
1867         scheduleRequest(request, new AsyncMultiProcedure<T>() {\r
1868             @Override\r
1869             public void exception(AsyncReadGraph graph, Throwable throwable) {\r
1870                 exceptionContainer.set(throwable);\r
1871                 procedure.exception(graph, throwable);\r
1872             }\r
1873             @Override\r
1874             public void execute(AsyncReadGraph graph, T result) {\r
1875                 procedure.execute(graph, result);\r
1876             }\r
1877             @Override\r
1878             public void finished(AsyncReadGraph graph) {\r
1879                 procedure.finished(graph);\r
1880             }\r
1881         }, notify);\r
1882         acquire(notify, request, procedure);\r
1883         Throwable throwable = exceptionContainer.get();\r
1884         if (throwable != null) {\r
1885             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1886             else throw new DatabaseException("Unexpected exception", throwable);\r
1887         }\r
1888         // TODO: implement return value\r
1889         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");\r
1890         return null;\r
1891     }\r
1892 \r
1893     @Override\r
1894     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {\r
1895         return syncRequest(request, new ProcedureAdapter<T>());\r
1896 \r
1897 \r
1898 //        assert(request != null);\r
1899 //\r
1900 //        final DataContainer<T> result = new DataContainer<T>();\r
1901 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
1902 //\r
1903 //        syncRequest(request, new Procedure<T>() {\r
1904 //\r
1905 //            @Override\r
1906 //            public void execute(T t) {\r
1907 //                result.set(t);\r
1908 //            }\r
1909 //\r
1910 //            @Override\r
1911 //            public void exception(Throwable t) {\r
1912 //                exception.set(t);\r
1913 //            }\r
1914 //\r
1915 //        });\r
1916 //\r
1917 //        Throwable t = exception.get();\r
1918 //        if(t != null) {\r
1919 //            if(t instanceof DatabaseException) throw (DatabaseException)t;\r
1920 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);\r
1921 //        }\r
1922 //\r
1923 //        return result.get();\r
1924 \r
1925     }\r
1926 \r
1927     @Override\r
1928     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {\r
1929         return syncRequest(request, (Procedure<T>)procedure);\r
1930     }\r
1931 \r
1932 \r
1933 //    @Override\r
1934 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {\r
1935 //        assertNotSession();\r
1936 //        Semaphore notify = new Semaphore(0);\r
1937 //        DataContainer<Throwable> container = new DataContainer<Throwable>();\r
1938 //        DataContainer<T> result = new DataContainer<T>();\r
1939 //        scheduleRequest(request, procedure, notify, container, result);\r
1940 //        acquire(notify, request);\r
1941 //        Throwable throwable = container.get();\r
1942 //        if(throwable != null) {\r
1943 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1944 //            else throw new DatabaseException("Unexpected exception", throwable);\r
1945 //        }\r
1946 //        return result.get();\r
1947 //    }\r
1948 \r
1949 \r
1950     @Override\r
1951     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {\r
1952         assertNotSession();\r
1953         Semaphore notify = new Semaphore(0);\r
1954         final DataContainer<Throwable> container = new DataContainer<Throwable>();\r
1955         final DataContainer<T> result = new DataContainer<T>();\r
1956         scheduleRequest(request, procedure, notify, container, result);\r
1957         acquire(notify, request);\r
1958         Throwable throwable = container.get();\r
1959         if (throwable != null) {\r
1960             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1961             else throw new DatabaseException("Unexpected exception", throwable);\r
1962         }\r
1963         return result.get();\r
1964     }\r
1965 \r
1966     @Override\r
1967     public void syncRequest(Write request) throws DatabaseException  {\r
1968         assertNotSession();\r
1969         assertAlive();\r
1970         Semaphore notify = new Semaphore(0);\r
1971         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();\r
1972         scheduleRequest(request, new Callback<DatabaseException>() {\r
1973 \r
1974             @Override\r
1975             public void run(DatabaseException e) {\r
1976               exception.set(e);\r
1977             }\r
1978 \r
1979         }, notify);\r
1980         acquire(notify, request);\r
1981         if(exception.get() != null) throw exception.get();\r
1982     }\r
1983 \r
1984     @Override\r
1985     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {\r
1986         assertNotSession();\r
1987         Semaphore notify = new Semaphore(0);\r
1988         final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
1989         final DataContainer<T> result = new DataContainer<T>();\r
1990         scheduleRequest(request, new Procedure<T>() {\r
1991 \r
1992             @Override\r
1993             public void exception(Throwable t) {\r
1994               exception.set(t);\r
1995             }\r
1996 \r
1997             @Override\r
1998             public void execute(T t) {\r
1999               result.set(t);\r
2000             }\r
2001 \r
2002         }, notify);\r
2003         acquire(notify, request);\r
2004         if(exception.get() != null) {\r
2005             Throwable t = exception.get();\r
2006             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
2007             else throw new DatabaseException(t);\r
2008         }\r
2009         return result.get();\r
2010     }\r
2011 \r
2012     @Override\r
2013     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {\r
2014         assertNotSession();\r
2015         Semaphore notify = new Semaphore(0);\r
2016         final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
2017         final DataContainer<T> result = new DataContainer<T>();\r
2018         scheduleRequest(request, new Procedure<T>() {\r
2019 \r
2020             @Override\r
2021             public void exception(Throwable t) {\r
2022               exception.set(t);\r
2023             }\r
2024 \r
2025             @Override\r
2026             public void execute(T t) {\r
2027               result.set(t);\r
2028             }\r
2029 \r
2030         }, notify);\r
2031         acquire(notify, request);\r
2032         if(exception.get() != null) {\r
2033             Throwable t = exception.get();\r
2034             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
2035             else throw new DatabaseException(t);\r
2036         }\r
2037         return result.get();\r
2038     }\r
2039 \r
2040     @Override\r
2041     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {\r
2042         assertNotSession();\r
2043         Semaphore notify = new Semaphore(0);\r
2044         final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
2045         final DataContainer<T> result = new DataContainer<T>();\r
2046         scheduleRequest(request, new Procedure<T>() {\r
2047 \r
2048             @Override\r
2049             public void exception(Throwable t) {\r
2050               exception.set(t);\r
2051             }\r
2052 \r
2053             @Override\r
2054             public void execute(T t) {\r
2055               result.set(t);\r
2056             }\r
2057 \r
2058         }, notify);\r
2059         acquire(notify, request);\r
2060         if(exception.get() != null) {\r
2061             Throwable t = exception.get();\r
2062             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
2063             else throw new DatabaseException(t);\r
2064         }\r
2065         return result.get();\r
2066     }\r
2067 \r
2068     @Override\r
2069     public void syncRequest(DelayedWrite request) throws DatabaseException  {\r
2070         assertNotSession();\r
2071         Semaphore notify = new Semaphore(0);\r
2072         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();\r
2073         scheduleRequest(request, new Callback<DatabaseException>() {\r
2074             @Override\r
2075             public void run(DatabaseException e) {\r
2076                 exception.set(e);\r
2077             }\r
2078         }, notify);\r
2079         acquire(notify, request);\r
2080         if(exception.get() != null) throw exception.get();\r
2081     }\r
2082 \r
2083     @Override\r
2084     public void syncRequest(WriteOnly request) throws DatabaseException  {\r
2085         assertNotSession();\r
2086         assertAlive();\r
2087         Semaphore notify = new Semaphore(0);\r
2088         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();\r
2089         scheduleRequest(request, new Callback<DatabaseException>() {\r
2090             @Override\r
2091             public void run(DatabaseException e) {\r
2092                 exception.set(e);\r
2093             }\r
2094         }, notify);\r
2095         acquire(notify, request);\r
2096         if(exception.get() != null) throw exception.get();\r
2097     }\r
2098 \r
2099     /*\r
2100      *\r
2101      * GraphRequestProcessor interface\r
2102      */\r
2103 \r
2104     @Override\r
2105     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {\r
2106 \r
2107         scheduleRequest(request, procedure, null, null);\r
2108 \r
2109     }\r
2110 \r
2111     @Override\r
2112     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {\r
2113 \r
2114         scheduleRequest(request, procedure, null);\r
2115 \r
2116     }\r
2117 \r
2118     @Override\r
2119     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {\r
2120 \r
2121         scheduleRequest(request, procedure, null, null, null);\r
2122 \r
2123     }\r
2124 \r
2125     @Override\r
2126     public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {\r
2127 \r
2128         scheduleRequest(request, callback, null);\r
2129 \r
2130     }\r
2131 \r
2132     @Override\r
2133     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {\r
2134 \r
2135         scheduleRequest(request, procedure, null);\r
2136 \r
2137     }\r
2138 \r
2139     @Override\r
2140     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {\r
2141 \r
2142         scheduleRequest(request, procedure, null);\r
2143 \r
2144     }\r
2145 \r
2146     @Override\r
2147     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {\r
2148 \r
2149         scheduleRequest(request, procedure, null);\r
2150 \r
2151     }\r
2152 \r
2153     @Override\r
2154     public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {\r
2155 \r
2156         scheduleRequest(request, callback, null);\r
2157 \r
2158     }\r
2159 \r
2160     @Override\r
2161     public void asyncRequest(final Write r) {\r
2162         asyncRequest(r, null);\r
2163     }\r
2164 \r
2165     @Override\r
2166     public void asyncRequest(final DelayedWrite r) {\r
2167         asyncRequest(r, null);\r
2168     }\r
2169 \r
2170     @Override\r
2171     public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {\r
2172 \r
2173         scheduleRequest(request, callback, null);\r
2174 \r
2175     }\r
2176 \r
2177     @Override\r
2178     public void asyncRequest(final WriteOnly request) {\r
2179 \r
2180         asyncRequest(request, null);\r
2181 \r
2182     }\r
2183 \r
2184     @Override\r
2185     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {\r
2186         r.request(this, procedure);\r
2187     }\r
2188 \r
2189     @Override\r
2190     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {\r
2191         r.request(this, procedure);\r
2192     }\r
2193 \r
2194     @Override\r
2195     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {\r
2196         r.request(this, procedure);\r
2197     }\r
2198 \r
2199     @Override\r
2200     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {\r
2201         r.request(this, procedure);\r
2202     }\r
2203 \r
2204     @Override\r
2205     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {\r
2206         r.request(this, procedure);\r
2207     }\r
2208 \r
2209     @Override\r
2210     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {\r
2211         r.request(this, procedure);\r
2212     }\r
2213 \r
2214     @Override\r
2215     public <T> T sync(ReadInterface<T> r) throws DatabaseException {\r
2216         return r.request(this);\r
2217     }\r
2218 \r
2219     @Override\r
2220     public <T> T sync(WriteInterface<T> r) throws DatabaseException {\r
2221         return r.request(this);\r
2222     }\r
2223 \r
2224     @Override\r
2225     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {\r
2226         r.request(this, procedure);\r
2227     }\r
2228 \r
2229     @Override\r
2230     public <T> void async(WriteInterface<T> r) {\r
2231         r.request(this, new ProcedureAdapter<T>());\r
2232     }\r
2233 \r
2234     //@Override\r
2235     public void incAsync() {\r
2236         state.incAsync();\r
2237     }\r
2238 \r
2239     //@Override\r
2240     public void decAsync() {\r
2241         state.decAsync();\r
2242     }\r
2243 \r
2244     public long getCluster(ResourceImpl resource) {\r
2245         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);\r
2246         return cluster.getClusterId();\r
2247     }\r
2248 \r
2249     public long getCluster(int id) {\r
2250         if (clusterTable == null)\r
2251             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");\r
2252         return clusterTable.getClusterIdByResourceKeyNoThrow(id);\r
2253     }\r
2254 \r
2255     public ResourceImpl getResource(int id) {\r
2256         return new ResourceImpl(resourceSupport, id);\r
2257     }\r
2258 \r
2259     public ResourceImpl getResource(int resourceIndex, long clusterId) {\r
2260         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));\r
2261         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);\r
2262         int key = proxy.getClusterKey();\r
2263         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);\r
2264         return new ResourceImpl(resourceSupport, resourceKey);\r
2265     }\r
2266 \r
2267     public ResourceImpl getResource2(int id) {\r
2268         assert (id != 0);\r
2269         return new ResourceImpl(resourceSupport, id);\r
2270     }\r
2271 \r
2272     final public int getId(ResourceImpl impl) {\r
2273         return impl.id;\r
2274     }\r
2275 \r
2276     public static final Charset UTF8 = Charset.forName("utf-8");\r
2277 \r
2278 \r
2279 \r
2280 \r
2281     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {\r
2282         for(TransientGraph g : support.providers) {\r
2283             if(g.isPending(subject)) return false;\r
2284         }\r
2285         return true;\r
2286     }\r
2287 \r
2288     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {\r
2289         for(TransientGraph g : support.providers) {\r
2290             if(g.isPending(subject, predicate)) return false;\r
2291         }\r
2292         return true;\r
2293     }\r
2294 \r
2295     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {\r
2296 \r
2297         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {\r
2298 \r
2299             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);\r
2300 \r
2301             @Override\r
2302             public void run(ReadGraphImpl graph) {\r
2303                 if(ready.decrementAndGet() == 0) {\r
2304                     runnable.run(graph);\r
2305                 }\r
2306             }\r
2307 \r
2308         };\r
2309 \r
2310         for(TransientGraph g : support.providers) {\r
2311             if(g.isPending(subject)) {\r
2312                 try {\r
2313                     g.load(graph, subject, composite);\r
2314                 } catch (DatabaseException e) {\r
2315                     e.printStackTrace();\r
2316                 }\r
2317             } else {\r
2318                 composite.run(graph);\r
2319             }\r
2320         }\r
2321 \r
2322         composite.run(graph);\r
2323 \r
2324     }\r
2325 \r
2326     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {\r
2327 \r
2328         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {\r
2329 \r
2330             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);\r
2331 \r
2332             @Override\r
2333             public void run(ReadGraphImpl graph) {\r
2334                 if(ready.decrementAndGet() == 0) {\r
2335                     runnable.run(graph);\r
2336                 }\r
2337             }\r
2338 \r
2339         };\r
2340 \r
2341         for(TransientGraph g : support.providers) {\r
2342             if(g.isPending(subject, predicate)) {\r
2343                 try {\r
2344                     g.load(graph, subject, predicate, composite);\r
2345                 } catch (DatabaseException e) {\r
2346                     e.printStackTrace();\r
2347                 }\r
2348             } else {\r
2349                 composite.run(graph);\r
2350             }\r
2351         }\r
2352 \r
2353         composite.run(graph);\r
2354 \r
2355     }\r
2356 \r
2357 //    void dumpHeap() {\r
2358 //\r
2359 //        try {\r
2360 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),\r
2361 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(\r
2362 //                    "d:/heap" + flushCounter + ".txt", true);\r
2363 //        } catch (IOException e) {\r
2364 //            e.printStackTrace();\r
2365 //        }\r
2366 //\r
2367 //    }\r
2368 \r
2369     void fireReactionsToSynchronize(ChangeSet cs) {\r
2370 \r
2371         // Do not fire empty events\r
2372         if (cs.isEmpty())\r
2373             return;\r
2374 \r
2375         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());\r
2376 //        g.state.barrier.inc();\r
2377 \r
2378         try {\r
2379 \r
2380             if (!cs.isEmpty()) {\r
2381                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);\r
2382                 for (ChangeListener l : changeListeners2) {\r
2383                     try {\r
2384                         l.graphChanged(e2);\r
2385                     } catch (Exception ex) {\r
2386                         ex.printStackTrace();\r
2387                     }\r
2388                 }\r
2389             }\r
2390 \r
2391         } finally {\r
2392 \r
2393 //            g.state.barrier.dec();\r
2394 //            g.waitAsync(null);\r
2395 \r
2396         }\r
2397 \r
2398     }\r
2399 \r
2400     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {\r
2401         try {\r
2402 \r
2403             // Do not fire empty events\r
2404             if (cs2.isEmpty())\r
2405                 return;\r
2406 \r
2407 //            graph.restart();\r
2408 //            graph.state.barrier.inc();\r
2409 \r
2410             try {\r
2411 \r
2412                 if (!cs2.isEmpty()) {\r
2413 \r
2414                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);\r
2415                     for (ChangeListener l : changeListeners2) {\r
2416                         try {\r
2417                             l.graphChanged(e2);\r
2418 //                            System.out.println("changelistener " + l);\r
2419                         } catch (Exception ex) {\r
2420                             ex.printStackTrace();\r
2421                         }\r
2422                     }\r
2423 \r
2424                 }\r
2425 \r
2426             } finally {\r
2427 \r
2428 //                graph.state.barrier.dec();\r
2429 //                graph.waitAsync(null);\r
2430 \r
2431             }\r
2432         } catch (Throwable t) {\r
2433             t.printStackTrace();\r
2434 \r
2435         }\r
2436     }\r
2437     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {\r
2438 \r
2439         try {\r
2440 \r
2441             // Do not fire empty events\r
2442             if (cs2.isEmpty())\r
2443                 return;\r
2444 \r
2445             // Do not fire on virtual requests\r
2446             if(graph.getProvider() != null)\r
2447                 return;\r
2448 \r
2449             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);\r
2450 \r
2451             try {\r
2452 \r
2453                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);\r
2454                 for (ChangeListener l : metadataListeners) {\r
2455                     try {\r
2456                         l.graphChanged(e2);\r
2457                     } catch (Throwable ex) {\r
2458                         ex.printStackTrace();\r
2459                     }\r
2460                 }\r
2461 \r
2462             } finally {\r
2463 \r
2464             }\r
2465 \r
2466         } catch (Throwable t) {\r
2467             t.printStackTrace();\r
2468         }\r
2469 \r
2470     }\r
2471 \r
2472 \r
2473     /*\r
2474      * (non-Javadoc)\r
2475      *\r
2476      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)\r
2477      */\r
2478     @Override\r
2479     public <T> T getService(Class<T> api) {\r
2480         T t = peekService(api);\r
2481         if (t == null) {\r
2482             if (state.isClosed())\r
2483                 throw new ServiceNotFoundException(this, api, "Session has been shut down");\r
2484             throw new ServiceNotFoundException(this, api);\r
2485         }\r
2486         return t;\r
2487     }\r
2488 \r
2489     /**\r
2490      * @return\r
2491      */\r
2492     protected abstract ServerInformation getCachedServerInformation();\r
2493 \r
2494         private Class<?> serviceKey1 = null;\r
2495         private Class<?> serviceKey2 = null;\r
2496         private Object service1 = null;\r
2497         private Object service2 = null;\r
2498 \r
2499     /*\r
2500      * (non-Javadoc)\r
2501      *\r
2502      * @see\r
2503      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)\r
2504      */\r
2505     @SuppressWarnings("unchecked")\r
2506     @Override\r
2507     public synchronized <T> T peekService(Class<T> api) {\r
2508 \r
2509                 if(serviceKey1 == api) {\r
2510                         return (T)service1;\r
2511                 } else if (serviceKey2 == api) {\r
2512                         // Promote this key\r
2513                         Object result = service2;\r
2514                         service2 = service1;\r
2515                         serviceKey2 = serviceKey1;\r
2516                         service1 = result;\r
2517                         serviceKey1 = api;\r
2518                         return (T)result;\r
2519                 }\r
2520 \r
2521         if (Layer0.class == api)\r
2522                 return (T) L0;\r
2523         if (ServerInformation.class == api)\r
2524             return (T) getCachedServerInformation();\r
2525         else if (WriteGraphImpl.class == api)\r
2526             return (T) writeState.getGraph();\r
2527         else if (ClusterBuilder.class == api)\r
2528             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);\r
2529         else if (ClusterBuilderFactory.class == api)\r
2530             return (T)new ClusterBuilderFactoryImpl(this);\r
2531 \r
2532                 service2 = service1;\r
2533                 serviceKey2 = serviceKey1;\r
2534 \r
2535         service1 = serviceLocator.peekService(api);\r
2536         serviceKey1 = api;\r
2537 \r
2538         return (T)service1;\r
2539 \r
2540     }\r
2541 \r
2542     /*\r
2543      * (non-Javadoc)\r
2544      *\r
2545      * @see\r
2546      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)\r
2547      */\r
2548     @Override\r
2549     public boolean hasService(Class<?> api) {\r
2550         return serviceLocator.hasService(api);\r
2551     }\r
2552 \r
2553     /**\r
2554      * @param api the api that must be implemented by the specified service\r
2555      * @param service the service implementation\r
2556      */\r
2557     @Override\r
2558     public <T> void registerService(Class<T> api, T service) {\r
2559         if(Layer0.class == api) {\r
2560                 L0 = (Layer0)service;\r
2561                 return;\r
2562         }\r
2563         serviceLocator.registerService(api, service);\r
2564         if (TransactionPolicySupport.class == api) {\r
2565             transactionPolicy = (TransactionPolicySupport)service;\r
2566             state.resetTransactionPolicy();\r
2567         }\r
2568         if (api == serviceKey1)\r
2569                 service1 = service;\r
2570         else if (api == serviceKey2)\r
2571                 service2 = service;\r
2572     }\r
2573 \r
2574     // ----------------\r
2575     // SessionMonitor\r
2576     // ----------------\r
2577 \r
2578     void fireSessionVariableChange(String variable) {\r
2579         for (MonitorHandler h : monitorHandlers) {\r
2580             MonitorContext ctx = monitorContexts.getLeft(h);\r
2581             assert ctx != null;\r
2582             // SafeRunner functionality repeated here to avoid dependency.\r
2583             try {\r
2584                 h.valuesChanged(ctx);\r
2585             } catch (Exception e) {\r
2586                 Logger.defaultLogError("monitor handler notification produced the following exception", e);\r
2587             } catch (LinkageError e) {\r
2588                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);\r
2589             }\r
2590         }\r
2591     }\r
2592 \r
2593 \r
2594     class ResourceSerializerImpl implements ResourceSerializer {\r
2595 \r
2596         public long createRandomAccessId(int id) throws DatabaseException {\r
2597             if(id < 0)\r
2598                 return id;\r
2599             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);\r
2600             long cluster = getCluster(id);\r
2601             if (0 == cluster)\r
2602                 return 0; // Better to return 0 then invalid id.\r
2603             long result = ClusterTraitsBase.createResourceId(cluster, index);\r
2604             return result;\r
2605         }\r
2606 \r
2607         @Override\r
2608         public long getRandomAccessId(Resource resource) throws DatabaseException {\r
2609 \r
2610             ResourceImpl resourceImpl = (ResourceImpl) resource;\r
2611             return createRandomAccessId(resourceImpl.id);\r
2612 \r
2613         }\r
2614 \r
2615         @Override\r
2616         public int getTransientId(Resource resource) throws DatabaseException {\r
2617 \r
2618             ResourceImpl resourceImpl = (ResourceImpl) resource;\r
2619             return resourceImpl.id;\r
2620 \r
2621         }\r
2622 \r
2623         @Override\r
2624         public String createRandomAccessId(Resource resource)\r
2625         throws InvalidResourceReferenceException {\r
2626 \r
2627             if(resource == null) throw new IllegalArgumentException();\r
2628 \r
2629             ResourceImpl resourceImpl = (ResourceImpl) resource;\r
2630 \r
2631             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";\r
2632 \r
2633             int r;\r
2634             try {\r
2635                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);\r
2636             } catch (DatabaseException e1) {\r
2637                 throw new InvalidResourceReferenceException(e1);\r
2638             }\r
2639             try {\r
2640                 // Serialize as '<resource index>_<cluster id>'\r
2641                 return "" + r + "_" + getCluster(resourceImpl);\r
2642             } catch (Throwable e) {\r
2643                 e.printStackTrace();\r
2644                 throw new InvalidResourceReferenceException(e);\r
2645             } finally {\r
2646             }\r
2647         }\r
2648 \r
2649         public int getTransientId(long serialized) throws DatabaseException {\r
2650             if (serialized <= 0)\r
2651                 return (int)serialized;\r
2652             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);\r
2653             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);\r
2654             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);\r
2655             if (cluster == null)\r
2656                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);\r
2657             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);\r
2658             return key;\r
2659         }\r
2660 \r
2661         @Override\r
2662         public Resource getResource(long randomAccessId) throws DatabaseException {\r
2663             return getResourceByKey(getTransientId(randomAccessId));\r
2664         }\r
2665 \r
2666         @Override\r
2667         public Resource getResource(int transientId) throws DatabaseException {\r
2668             return getResourceByKey(transientId);\r
2669         }\r
2670 \r
2671         @Override\r
2672         public Resource getResource(String randomAccessId)\r
2673         throws InvalidResourceReferenceException {\r
2674             try {\r
2675                 int i = randomAccessId.indexOf('_');\r
2676                 if (i == -1)\r
2677                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"\r
2678                             + randomAccessId + "'");\r
2679                 int r = Integer.parseInt(randomAccessId.substring(0, i));\r
2680                 if(r < 0) return getResourceByKey(r);\r
2681                 long c = Long.parseLong(randomAccessId.substring(i + 1));\r
2682                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);\r
2683                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);\r
2684                 if (cluster.hasResource(key, clusterTranslator))\r
2685                     return getResourceByKey(key);\r
2686             } catch (InvalidResourceReferenceException e) {\r
2687                 throw e;\r
2688             } catch (NumberFormatException e) {\r
2689                 throw new InvalidResourceReferenceException(e);\r
2690             } catch (Throwable e) {\r
2691                 e.printStackTrace();\r
2692                 throw new InvalidResourceReferenceException(e);\r
2693             } finally {\r
2694             }\r
2695             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);\r
2696         }\r
2697 \r
2698         @Override\r
2699         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {\r
2700             try {\r
2701                 return true;\r
2702             } catch (Throwable e) {\r
2703                 e.printStackTrace();\r
2704                 throw new InvalidResourceReferenceException(e);\r
2705             } finally {\r
2706             }\r
2707         }\r
2708     }\r
2709 \r
2710     // Copied from old SessionImpl\r
2711 \r
2712     void check() {\r
2713         if (state.isClosed())\r
2714             throw new Error("Session closed.");\r
2715     }\r
2716 \r
2717 \r
2718 \r
2719     public ResourceImpl getNewResource(long clusterId) {\r
2720         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);\r
2721         int newId;\r
2722         try {\r
2723             newId = cluster.createResource(clusterTranslator);\r
2724         } catch (DatabaseException e) {\r
2725             Logger.defaultLogError(e);\r
2726             return null;\r
2727         }\r
2728         return new ResourceImpl(resourceSupport, newId);\r
2729     }\r
2730 \r
2731     public ResourceImpl getNewResource(Resource clusterSet)\r
2732     throws DatabaseException {\r
2733         long resourceId = clusterSet.getResourceId();\r
2734         Long clusterId = clusterSetsSupport.get(resourceId);\r
2735         if (null == clusterId)\r
2736             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);\r
2737         if (Constants.NewClusterId == clusterId) {\r
2738             clusterId = getService(ClusteringSupport.class).createCluster();\r
2739             if ((serviceMode & SERVICE_MODE_CREATE) > 0)\r
2740                 createdClusters.add(clusterId);\r
2741             clusterSetsSupport.put(resourceId, clusterId);\r
2742             return getNewResource(clusterId);\r
2743         } else {\r
2744             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);\r
2745             ResourceImpl result;\r
2746             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {\r
2747                 clusterId = getService(ClusteringSupport.class).createCluster();\r
2748                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)\r
2749                     createdClusters.add(clusterId);\r
2750                 clusterSetsSupport.put(resourceId, clusterId);\r
2751                 return getNewResource(clusterId);\r
2752             } else {\r
2753                 result = getNewResource(clusterId);\r
2754                 int resultKey = querySupport.getId(result);\r
2755                 long resultCluster = querySupport.getClusterId(resultKey);\r
2756                 if (clusterId != resultCluster)\r
2757                     clusterSetsSupport.put(resourceId, resultCluster);\r
2758                 return result;\r
2759             }\r
2760         }\r
2761     }\r
2762 \r
2763     public void getNewClusterSet(Resource clusterSet)\r
2764     throws DatabaseException {\r
2765         if (DEBUG)\r
2766             System.out.println("new cluster set=" + clusterSet);\r
2767         long resourceId = clusterSet.getResourceId();\r
2768         if (clusterSetsSupport.containsKey(resourceId))\r
2769             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);\r
2770         clusterSetsSupport.put(resourceId, Constants.NewClusterId);\r
2771     }\r
2772     public boolean containsClusterSet(Resource clusterSet)\r
2773     throws ServiceException {\r
2774         long resourceId = clusterSet.getResourceId();\r
2775         return clusterSetsSupport.containsKey(resourceId);\r
2776     }\r
2777     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {\r
2778         Resource r = defaultClusterSet;\r
2779         defaultClusterSet = clusterSet;\r
2780         return r;\r
2781     }\r
2782     void printDiagnostics() {\r
2783 \r
2784         if (DIAGNOSTICS) {\r
2785 \r
2786             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);\r
2787             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);\r
2788             for(ClusterI cluster : clusterTable.getClusters()) {\r
2789                 try {\r
2790                     if (cluster.isLoaded() && !cluster.isEmpty()) {\r
2791                         residentClusters.set(residentClusters.get() + 1);\r
2792                         totalMem.set(totalMem.get() + cluster.getUsedSpace());\r
2793                     }\r
2794                 } catch (DatabaseException e) {\r
2795                     Logger.defaultLogError(e);\r
2796                 }\r
2797             }\r
2798 \r
2799             System.out.println("--------------------------------");\r
2800             System.out.println("Cluster information:");\r
2801             System.out.println("-amount of resident clusters=" + residentClusters.get());\r
2802             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");\r
2803 \r
2804             for(ClusterI cluster : clusterTable.getClusters()) {\r
2805                 System.out.print("Cluster " + cluster.getClusterId() + " [");\r
2806                 try {\r
2807                     if (!cluster.isLoaded())\r
2808                         System.out.println("not loaded]");\r
2809                     else if (cluster.isEmpty())\r
2810                         System.out.println("is empty]");\r
2811                     else {\r
2812                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));\r
2813                         System.out.print(",references=" + cluster.getReferenceCount());\r
2814                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");\r
2815                     }\r
2816                 } catch (DatabaseException e) {\r
2817                     Logger.defaultLogError(e);\r
2818                     System.out.println("is corrupted]");\r
2819                 }\r
2820             }\r
2821 \r
2822             queryProvider2.printDiagnostics();\r
2823             System.out.println("--------------------------------");\r
2824         }\r
2825 \r
2826     }\r
2827 \r
2828     public void fireStartReadTransaction() {\r
2829 \r
2830         if (DIAGNOSTICS)\r
2831             System.out.println("StartReadTransaction");\r
2832 \r
2833         for (SessionEventListener listener : eventListeners) {\r
2834             try {\r
2835                 listener.readTransactionStarted();\r
2836             } catch (Throwable t) {\r
2837                 t.printStackTrace();\r
2838             }\r
2839         }\r
2840 \r
2841     }\r
2842 \r
2843     public void fireFinishReadTransaction() {\r
2844 \r
2845         if (DIAGNOSTICS)\r
2846             System.out.println("FinishReadTransaction");\r
2847 \r
2848         for (SessionEventListener listener : eventListeners) {\r
2849             try {\r
2850                 listener.readTransactionFinished();\r
2851             } catch (Throwable t) {\r
2852                 t.printStackTrace();\r
2853             }\r
2854         }\r
2855 \r
2856     }\r
2857 \r
2858     public void fireStartWriteTransaction() {\r
2859 \r
2860         if (DIAGNOSTICS)\r
2861             System.out.println("StartWriteTransaction");\r
2862 \r
2863         for (SessionEventListener listener : eventListeners) {\r
2864             try {\r
2865                 listener.writeTransactionStarted();\r
2866             } catch (Throwable t) {\r
2867                 t.printStackTrace();\r
2868             }\r
2869         }\r
2870 \r
2871     }\r
2872 \r
2873     public void fireFinishWriteTransaction() {\r
2874 \r
2875         if (DIAGNOSTICS)\r
2876             System.out.println("FinishWriteTransaction");\r
2877 \r
2878         for (SessionEventListener listener : eventListeners) {\r
2879             try {\r
2880                 listener.writeTransactionFinished();\r
2881             } catch (Throwable t) {\r
2882                 t.printStackTrace();\r
2883             }\r
2884         }\r
2885 \r
2886         Indexing.resetDependenciesIndexingDisabled();\r
2887 \r
2888     }\r
2889 \r
2890     Resource deserialize(final long resourceId, final long clusterId) throws IOException {\r
2891         throw new Error("Not supported at the moment.");\r
2892     }\r
2893 \r
2894     State getState() {\r
2895         return state;\r
2896     }\r
2897 \r
2898     ClusterTable getClusterTable() {\r
2899         return clusterTable;\r
2900     }\r
2901 \r
2902     public GraphSession getGraphSession() {\r
2903         return graphSession;\r
2904     }\r
2905 \r
2906     public QueryProcessor getQueryProvider2() {\r
2907         return queryProvider2;\r
2908     }\r
2909 \r
2910     ClientChangesImpl getClientChanges() {\r
2911         return clientChanges;\r
2912     }\r
2913 \r
2914     boolean getWriteOnly() {\r
2915         return writeOnly;\r
2916     }\r
2917 \r
2918     static int counter = 0;\r
2919 \r
2920     public void onClusterLoaded(long clusterId) {\r
2921 \r
2922         clusterTable.updateSize();\r
2923 \r
2924     }\r
2925 \r
2926 \r
2927 \r
2928 \r
2929     /*\r
2930      * Implementation of the interface RequestProcessor\r
2931      */\r
2932 \r
2933     @Override\r
2934     public <T> T syncRequest(final Read<T> request) throws DatabaseException {\r
2935 \r
2936         assertNotSession();\r
2937         assertAlive();\r
2938 \r
2939         assert(request != null);\r
2940 \r
2941         final DataContainer<T> result = new DataContainer<T>();\r
2942         final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
2943 \r
2944         syncRequest(request, new AsyncProcedure<T>() {\r
2945 \r
2946             @Override\r
2947             public void execute(AsyncReadGraph graph, T t) {\r
2948                 result.set(t);\r
2949             }\r
2950 \r
2951             @Override\r
2952             public void exception(AsyncReadGraph graph, Throwable t) {\r
2953                 exception.set(t);\r
2954             }\r
2955 \r
2956         });\r
2957 \r
2958         Throwable t = exception.get();\r
2959         if(t != null) {\r
2960             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
2961             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);\r
2962         }\r
2963 \r
2964         return result.get();\r
2965 \r
2966     }\r
2967 \r
2968 //    @Override\r
2969 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {\r
2970 //        assertNotSession();\r
2971 //        return syncRequest(request, (AsyncProcedure<T>)procedure);\r
2972 //    }\r
2973 \r
2974     @Override\r
2975     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {\r
2976         assertNotSession();\r
2977         return syncRequest(request, new SyncToAsyncListener<T>(procedure));\r
2978     }\r
2979 \r
2980     @Override\r
2981     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {\r
2982         assertNotSession();\r
2983         return syncRequest(request, new NoneToAsyncListener<T>(procedure));\r
2984     }\r
2985 \r
2986     @Override\r
2987     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {\r
2988         assertNotSession();\r
2989         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));\r
2990     }\r
2991 \r
2992     @Override\r
2993     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {\r
2994         assertNotSession();\r
2995         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));\r
2996     }\r
2997 \r
2998     @Override\r
2999     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {\r
3000 \r
3001         assertNotSession();\r
3002 \r
3003         assert(request != null);\r
3004 \r
3005         final DataContainer<T> result = new DataContainer<T>();\r
3006         final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
3007 \r
3008         syncRequest(request, new AsyncProcedure<T>() {\r
3009 \r
3010             @Override\r
3011             public void execute(AsyncReadGraph graph, T t) {\r
3012                 result.set(t);\r
3013             }\r
3014 \r
3015             @Override\r
3016             public void exception(AsyncReadGraph graph, Throwable t) {\r
3017                 exception.set(t);\r
3018             }\r
3019 \r
3020         });\r
3021 \r
3022         Throwable t = exception.get();\r
3023         if(t != null) {\r
3024             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
3025             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);\r
3026         }\r
3027 \r
3028         return result.get();\r
3029 \r
3030     }\r
3031 \r
3032     @Override\r
3033     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {\r
3034         assertNotSession();\r
3035         return syncRequest(request, (AsyncProcedure<T>)procedure);\r
3036     }\r
3037 \r
3038     @Override\r
3039     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {\r
3040         assertNotSession();\r
3041         return syncRequest(request, new SyncToAsyncListener<T>(procedure));\r
3042     }\r
3043 \r
3044     @Override\r
3045     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {\r
3046         assertNotSession();\r
3047         return syncRequest(request, new NoneToAsyncListener<T>(procedure));\r
3048     }\r
3049 \r
3050     @Override\r
3051     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {\r
3052         assertNotSession();\r
3053         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));\r
3054     }\r
3055 \r
3056     @Override\r
3057     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {\r
3058         assertNotSession();\r
3059         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));\r
3060     }\r
3061 \r
3062     @Override\r
3063     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {\r
3064 \r
3065         assertNotSession();\r
3066 \r
3067         assert(request != null);\r
3068 \r
3069         final ArrayList<T> result = new ArrayList<T>();\r
3070         final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
3071 \r
3072         syncRequest(request, new AsyncMultiProcedure<T>() {\r
3073 \r
3074             @Override\r
3075             public void execute(AsyncReadGraph graph, T t) {\r
3076                 synchronized(result) {\r
3077                     result.add(t);\r
3078                 }\r
3079             }\r
3080 \r
3081             @Override\r
3082             public void finished(AsyncReadGraph graph) {\r
3083             }\r
3084 \r
3085             @Override\r
3086             public void exception(AsyncReadGraph graph, Throwable t) {\r
3087                 exception.set(t);\r
3088             }\r
3089 \r
3090 \r
3091         });\r
3092 \r
3093         Throwable t = exception.get();\r
3094         if(t != null) {\r
3095             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
3096             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);\r
3097         }\r
3098 \r
3099         return result;\r
3100 \r
3101     }\r
3102 \r
3103     @Override\r
3104     public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {\r
3105         assertNotSession();\r
3106         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);\r
3107     }\r
3108 \r
3109     @Override\r
3110     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {\r
3111         assertNotSession();\r
3112         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));\r
3113     }\r
3114 \r
3115     @Override\r
3116     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {\r
3117         assertNotSession();\r
3118         return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));\r
3119     }\r
3120 \r
3121     @Override\r
3122     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {\r
3123         assertNotSession();\r
3124         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));\r
3125     }\r
3126 \r
3127     @Override\r
3128     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {\r
3129         assertNotSession();\r
3130         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));\r
3131     }\r
3132 \r
3133     @Override\r
3134     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {\r
3135 \r
3136         assertNotSession();\r
3137 \r
3138         assert(request != null);\r
3139 \r
3140         final ArrayList<T> result = new ArrayList<T>();\r
3141         final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
3142 \r
3143         syncRequest(request, new AsyncMultiProcedure<T>() {\r
3144 \r
3145             @Override\r
3146             public void execute(AsyncReadGraph graph, T t) {\r
3147                 synchronized(result) {\r
3148                     result.add(t);\r
3149                 }\r
3150             }\r
3151 \r
3152             @Override\r
3153             public void finished(AsyncReadGraph graph) {\r
3154             }\r
3155 \r
3156             @Override\r
3157             public void exception(AsyncReadGraph graph, Throwable t) {\r
3158                 exception.set(t);\r
3159             }\r
3160 \r
3161 \r
3162         });\r
3163 \r
3164         Throwable t = exception.get();\r
3165         if(t != null) {\r
3166             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
3167             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);\r
3168         }\r
3169 \r
3170         return result;\r
3171 \r
3172     }\r
3173 \r
3174     @Override\r
3175     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {\r
3176         assertNotSession();\r
3177         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);\r
3178     }\r
3179 \r
3180     @Override\r
3181     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {\r
3182         assertNotSession();\r
3183         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));\r
3184     }\r
3185 \r
3186     @Override\r
3187     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {\r
3188         assertNotSession();\r
3189        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));\r
3190     }\r
3191 \r
3192     @Override\r
3193     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {\r
3194         assertNotSession();\r
3195         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));\r
3196     }\r
3197 \r
3198     @Override\r
3199     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {\r
3200         assertNotSession();\r
3201         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));\r
3202     }\r
3203 \r
3204 \r
3205     @Override\r
3206     public <T> void asyncRequest(Read<T> request) {\r
3207 \r
3208         asyncRequest(request, new ProcedureAdapter<T>() {\r
3209             @Override\r
3210             public void exception(Throwable t) {\r
3211                 t.printStackTrace();\r
3212             }\r
3213         });\r
3214 \r
3215     }\r
3216 \r
3217     @Override\r
3218     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {\r
3219         asyncRequest(request, (AsyncProcedure<T>)procedure);\r
3220     }\r
3221 \r
3222     @Override\r
3223     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {\r
3224         asyncRequest(request, new SyncToAsyncListener<T>(procedure));\r
3225     }\r
3226 \r
3227     @Override\r
3228     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {\r
3229         asyncRequest(request, new NoneToAsyncListener<T>(procedure));\r
3230     }\r
3231 \r
3232     @Override\r
3233     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {\r
3234         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));\r
3235     }\r
3236 \r
3237     @Override\r
3238     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {\r
3239         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));\r
3240     }\r
3241 \r
3242     @Override\r
3243     final public <T> void asyncRequest(final AsyncRead<T> request) {\r
3244 \r
3245         assert(request != null);\r
3246 \r
3247         asyncRequest(request, new ProcedureAdapter<T>() {\r
3248             @Override\r
3249             public void exception(Throwable t) {\r
3250                 t.printStackTrace();\r
3251             }\r
3252         });\r
3253 \r
3254     }\r
3255 \r
3256     @Override\r
3257     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {\r
3258         scheduleRequest(request, procedure, procedure, null);\r
3259     }\r
3260 \r
3261     @Override\r
3262     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {\r
3263         asyncRequest(request, new SyncToAsyncListener<T>(procedure));\r
3264     }\r
3265 \r
3266     @Override\r
3267     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {\r
3268         asyncRequest(request, new NoneToAsyncListener<T>(procedure));\r
3269     }\r
3270 \r
3271     @Override\r
3272     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {\r
3273         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));\r
3274     }\r
3275 \r
3276     @Override\r
3277     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {\r
3278         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));\r
3279     }\r
3280 \r
3281     @Override\r
3282     public <T> void asyncRequest(MultiRead<T> request) {\r
3283 \r
3284         assert(request != null);\r
3285 \r
3286         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {\r
3287             @Override\r
3288             public void exception(AsyncReadGraph graph, Throwable t) {\r
3289                 t.printStackTrace();\r
3290             }\r
3291         });\r
3292 \r
3293     }\r
3294 \r
3295     @Override\r
3296     public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {\r
3297         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);\r
3298     }\r
3299 \r
3300     @Override\r
3301     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {\r
3302         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));\r
3303     }\r
3304 \r
3305     @Override\r
3306     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {\r
3307         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));\r
3308     }\r
3309 \r
3310     @Override\r
3311     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {\r
3312         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));\r
3313     }\r
3314 \r
3315     @Override\r
3316     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {\r
3317         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));\r
3318     }\r
3319 \r
3320     @Override\r
3321     final public <T> void asyncRequest(AsyncMultiRead<T> request) {\r
3322 \r
3323         assert(request != null);\r
3324 \r
3325         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {\r
3326             @Override\r
3327             public void exception(AsyncReadGraph graph, Throwable t) {\r
3328                 t.printStackTrace();\r
3329             }\r
3330         });\r
3331 \r
3332     }\r
3333 \r
3334     @Override\r
3335     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {\r
3336         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);\r
3337     }\r
3338 \r
3339     @Override\r
3340     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {\r
3341         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));\r
3342     }\r
3343 \r
3344     @Override\r
3345     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {\r
3346         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));\r
3347     }\r
3348 \r
3349     @Override\r
3350     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {\r
3351         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));\r
3352     }\r
3353 \r
3354     @Override\r
3355     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {\r
3356         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));\r
3357     }\r
3358 \r
3359     @Override\r
3360     public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {\r
3361         assertNotSession();\r
3362         throw new Error("Not implemented!");\r
3363     }\r
3364 \r
3365     @Override\r
3366     public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {\r
3367         throw new Error("Not implemented!");\r
3368     }\r
3369 \r
3370     @Override\r
3371     final public <T> void asyncRequest(final ExternalRead<T> request) {\r
3372 \r
3373         assert(request != null);\r
3374 \r
3375         asyncRequest(request, new ProcedureAdapter<T>() {\r
3376             @Override\r
3377             public void exception(Throwable t) {\r
3378                 t.printStackTrace();\r
3379             }\r
3380         });\r
3381 \r
3382     }\r
3383 \r
3384     @Override\r
3385     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {\r
3386         asyncRequest(request, (Procedure<T>)procedure);\r
3387     }\r
3388 \r
3389 \r
3390 \r
3391     void check(Throwable t) throws DatabaseException {\r
3392         if(t != null) {\r
3393             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
3394             else throw new DatabaseException("Unexpected exception", t);\r
3395         }\r
3396     }\r
3397 \r
3398     void check(DataContainer<Throwable> container) throws DatabaseException {\r
3399         Throwable t = container.get();\r
3400         if(t != null) {\r
3401             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
3402             else throw new DatabaseException("Unexpected exception", t);\r
3403         }\r
3404     }\r
3405 \r
3406 \r
3407 \r
3408 \r
3409 \r
3410 \r
3411     boolean sameProvider(Write request) {\r
3412         if(writeState.getGraph().provider != null) {\r
3413             return writeState.getGraph().provider.equals(request.getProvider());\r
3414         } else {\r
3415             return request.getProvider() == null;\r
3416         }\r
3417     }\r
3418 \r
3419     boolean plainWrite(WriteGraphImpl graph) {\r
3420         if(graph == null) return false;\r
3421         if(graph.writeSupport.writeOnly()) return false;\r
3422         return true;\r
3423     }\r
3424 \r
3425 \r
3426     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");\r
3427     private void assertNotSession() throws DatabaseException {\r
3428         Thread current = Thread.currentThread();\r
3429         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");\r
3430     }\r
3431 \r
3432     void assertAlive() {\r
3433         if (!state.isAlive())\r
3434             throw new RuntimeDatabaseException("Session has been shut down.");\r
3435     }\r
3436 \r
3437     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {\r
3438         return querySupport.getValueStream(graph, querySupport.getId(resource));\r
3439     }\r
3440 \r
3441     public byte[] getValue(ReadGraphImpl graph, Resource resource) {\r
3442         return querySupport.getValue(graph, querySupport.getId(resource));\r
3443     }\r
3444 \r
3445     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {\r
3446         acquire(semaphore, request, null);\r
3447     }\r
3448 \r
3449     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {\r
3450         assertAlive();\r
3451         try {\r
3452             long tryCount = 0;\r
3453             // Loop until the semaphore is acquired reporting requests that take a long time.\r
3454             while (true) {\r
3455 \r
3456                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {\r
3457                     // Acquired OK.\r
3458                     return;\r
3459                 } else {\r
3460                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {\r
3461                         ++tryCount;\r
3462                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT\r
3463                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)\r
3464                                 * tryCount;\r
3465                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "\r
3466                                 + (waitTime) + " ms. (procedure=" + procedure + ")");\r
3467                     }\r
3468                 }\r
3469 \r
3470                 assertAlive();\r
3471 \r
3472             }\r
3473         } catch (InterruptedException e) {\r
3474             e.printStackTrace();\r
3475             // FIXME: Should perhaps do something else in this case ??\r
3476         }\r
3477     }\r
3478 \r
3479 \r
3480 \r
3481 //    @Override\r
3482 //    public boolean holdOnToTransactionAfterCancel() {\r
3483 //        return transactionPolicy.holdOnToTransactionAfterCancel();\r
3484 //    }\r
3485 //\r
3486 //    @Override\r
3487 //    public boolean holdOnToTransactionAfterCommit() {\r
3488 //        return transactionPolicy.holdOnToTransactionAfterCommit();\r
3489 //    }\r
3490 //\r
3491 //    @Override\r
3492 //    public boolean holdOnToTransactionAfterRead() {\r
3493 //        return transactionPolicy.holdOnToTransactionAfterRead();\r
3494 //    }\r
3495 //\r
3496 //    @Override\r
3497 //    public void onRelinquish() {\r
3498 //        transactionPolicy.onRelinquish();\r
3499 //    }\r
3500 //\r
3501 //    @Override\r
3502 //    public void onRelinquishDone() {\r
3503 //        transactionPolicy.onRelinquishDone();\r
3504 //    }\r
3505 //\r
3506 //    @Override\r
3507 //    public void onRelinquishError() {\r
3508 //        transactionPolicy.onRelinquishError();\r
3509 //    }\r
3510 \r
3511 \r
3512     @Override\r
3513     public Session getSession() {\r
3514         return this;\r
3515     }\r
3516 \r
3517 \r
3518     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);\r
3519     protected abstract ResourceImpl getNewResource() throws DatabaseException;\r
3520 \r
3521     public void ceased(int thread) {\r
3522 \r
3523         requestManager.ceased(thread);\r
3524 \r
3525     }\r
3526 \r
3527     public int getAmountOfQueryThreads() {\r
3528         // This must be a power of two\r
3529         return 1;\r
3530 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());\r
3531     }\r
3532 \r
3533     public Resource getResourceByKey(int key) throws ResourceNotFoundException {\r
3534 \r
3535         return new ResourceImpl(resourceSupport, key);\r
3536 \r
3537     }\r
3538 \r
3539     public void acquireWriteOnly() {\r
3540         writeOnly = true;\r
3541     }\r
3542 \r
3543     public void releaseWriteOnly(ReadGraphImpl graph) {\r
3544         writeOnly = false;\r
3545         queryProvider2.releaseWrite(graph);\r
3546     }\r
3547 \r
3548     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {\r
3549 \r
3550         long start = System.nanoTime();\r
3551 \r
3552         while(dirtyPrimitives) {\r
3553             dirtyPrimitives = false;\r
3554             getQueryProvider2().performDirtyUpdates(writer);\r
3555             getQueryProvider2().performScheduledUpdates(writer);\r
3556         }\r
3557 \r
3558         fireMetadataListeners(writer, clientChanges);\r
3559 \r
3560         if(DebugPolicy.PERFORMANCE_DATA) {\r
3561             long end = System.nanoTime();\r
3562             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));\r
3563         }\r
3564 \r
3565     }\r
3566 \r
3567     void removeTemporaryData() {\r
3568         File platform = Platform.getLocation().toFile();\r
3569         File tempFiles = new File(platform, "tempFiles");\r
3570         File temp = new File(tempFiles, "db");\r
3571         if(!temp.exists()) \r
3572             return;\r
3573         try {\r
3574             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {\r
3575                 @Override\r
3576                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {\r
3577                     try {\r
3578                         Files.delete(file);\r
3579                     } catch (IOException e) {\r
3580                         Logger.defaultLogError(e);\r
3581                     }\r
3582                     return FileVisitResult.CONTINUE;\r
3583                 }\r
3584             });\r
3585         } catch (IOException e) {\r
3586             Logger.defaultLogError(e);\r
3587         }\r
3588     }\r
3589 \r
3590     public void handleCreatedClusters() {\r
3591         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {\r
3592             createdClusters.forEach(new TLongProcedure() {\r
3593 \r
3594                 @Override\r
3595                 public boolean execute(long value) {\r
3596                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);\r
3597                     cluster.setImmutable(true, clusterTranslator);\r
3598                     return true;\r
3599                 }\r
3600             });\r
3601         }\r
3602         createdClusters.clear();\r
3603     }\r
3604 \r
3605     @Override\r
3606     public Object getModificationCounter() {\r
3607         return queryProvider2.modificationCounter;\r
3608     }\r
3609     \r
3610     @Override\r
3611     public void markUndoPoint() {\r
3612         state.setCombine(false);\r
3613     }\r
3614 \r
3615 }\r