DB-client fixes
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package fi.vtt.simantics.procore.internal;\r
13 \r
14 import java.io.File;\r
15 import java.io.IOException;\r
16 import java.io.InputStream;\r
17 import java.nio.charset.Charset;\r
18 import java.nio.file.FileVisitResult;\r
19 import java.nio.file.Files;\r
20 import java.nio.file.Path;\r
21 import java.nio.file.SimpleFileVisitor;\r
22 import java.nio.file.attribute.BasicFileAttributes;\r
23 import java.util.ArrayList;\r
24 import java.util.Collection;\r
25 import java.util.HashSet;\r
26 import java.util.TreeMap;\r
27 import java.util.concurrent.CopyOnWriteArrayList;\r
28 import java.util.concurrent.Semaphore;\r
29 import java.util.concurrent.atomic.AtomicInteger;\r
30 \r
31 import org.eclipse.core.runtime.Platform;\r
32 import org.simantics.databoard.Bindings;\r
33 import org.simantics.db.AsyncReadGraph;\r
34 import org.simantics.db.ChangeSet;\r
35 import org.simantics.db.DevelopmentKeys;\r
36 import org.simantics.db.ExternalValueSupport;\r
37 import org.simantics.db.Metadata;\r
38 import org.simantics.db.MonitorContext;\r
39 import org.simantics.db.MonitorHandler;\r
40 import org.simantics.db.Resource;\r
41 import org.simantics.db.ResourceSerializer;\r
42 import org.simantics.db.Session;\r
43 import org.simantics.db.SessionManager;\r
44 import org.simantics.db.SessionVariables;\r
45 import org.simantics.db.VirtualGraph;\r
46 import org.simantics.db.WriteGraph;\r
47 import org.simantics.db.authentication.UserAuthenticationAgent;\r
48 import org.simantics.db.authentication.UserAuthenticator;\r
49 import org.simantics.db.common.Indexing;\r
50 import org.simantics.db.common.TransactionPolicyRelease;\r
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;\r
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;\r
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;\r
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;\r
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;\r
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;\r
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;\r
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;\r
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;\r
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;\r
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;\r
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;\r
63 import org.simantics.db.common.utils.Logger;\r
64 import org.simantics.db.event.ChangeEvent;\r
65 import org.simantics.db.event.ChangeListener;\r
66 import org.simantics.db.event.SessionEventListener;\r
67 import org.simantics.db.exception.CancelTransactionException;\r
68 import org.simantics.db.exception.ClusterSetExistException;\r
69 import org.simantics.db.exception.DatabaseException;\r
70 import org.simantics.db.exception.ImmutableException;\r
71 import org.simantics.db.exception.InvalidResourceReferenceException;\r
72 import org.simantics.db.exception.ResourceNotFoundException;\r
73 import org.simantics.db.exception.RuntimeDatabaseException;\r
74 import org.simantics.db.exception.ServiceException;\r
75 import org.simantics.db.exception.ServiceNotFoundException;\r
76 import org.simantics.db.impl.ClusterBase;\r
77 import org.simantics.db.impl.ClusterI;\r
78 import org.simantics.db.impl.ClusterTraitsBase;\r
79 import org.simantics.db.impl.ClusterTranslator;\r
80 import org.simantics.db.impl.ResourceImpl;\r
81 import org.simantics.db.impl.TransientGraph;\r
82 import org.simantics.db.impl.VirtualGraphImpl;\r
83 import org.simantics.db.impl.graph.DelayedWriteGraph;\r
84 import org.simantics.db.impl.graph.ReadGraphImpl;\r
85 import org.simantics.db.impl.graph.WriteGraphImpl;\r
86 import org.simantics.db.impl.graph.WriteSupport;\r
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;\r
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;\r
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;\r
90 import org.simantics.db.impl.query.QueryProcessor;\r
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;\r
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;\r
93 import org.simantics.db.impl.service.QueryDebug;\r
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;\r
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;\r
96 import org.simantics.db.procedure.AsyncListener;\r
97 import org.simantics.db.procedure.AsyncMultiListener;\r
98 import org.simantics.db.procedure.AsyncMultiProcedure;\r
99 import org.simantics.db.procedure.AsyncProcedure;\r
100 import org.simantics.db.procedure.Listener;\r
101 import org.simantics.db.procedure.ListenerBase;\r
102 import org.simantics.db.procedure.MultiListener;\r
103 import org.simantics.db.procedure.MultiProcedure;\r
104 import org.simantics.db.procedure.Procedure;\r
105 import org.simantics.db.procedure.SyncListener;\r
106 import org.simantics.db.procedure.SyncMultiListener;\r
107 import org.simantics.db.procedure.SyncMultiProcedure;\r
108 import org.simantics.db.procedure.SyncProcedure;\r
109 import org.simantics.db.procore.cluster.ClusterImpl;\r
110 import org.simantics.db.procore.cluster.ClusterTraits;\r
111 import org.simantics.db.procore.protocol.Constants;\r
112 import org.simantics.db.procore.protocol.DebugPolicy;\r
113 import org.simantics.db.request.AsyncMultiRead;\r
114 import org.simantics.db.request.AsyncRead;\r
115 import org.simantics.db.request.DelayedWrite;\r
116 import org.simantics.db.request.DelayedWriteResult;\r
117 import org.simantics.db.request.ExternalRead;\r
118 import org.simantics.db.request.MultiRead;\r
119 import org.simantics.db.request.Read;\r
120 import org.simantics.db.request.ReadInterface;\r
121 import org.simantics.db.request.UndoTraits;\r
122 import org.simantics.db.request.Write;\r
123 import org.simantics.db.request.WriteInterface;\r
124 import org.simantics.db.request.WriteOnly;\r
125 import org.simantics.db.request.WriteOnlyResult;\r
126 import org.simantics.db.request.WriteResult;\r
127 import org.simantics.db.request.WriteTraits;\r
128 import org.simantics.db.service.ByteReader;\r
129 import org.simantics.db.service.ClusterBuilder;\r
130 import org.simantics.db.service.ClusterBuilderFactory;\r
131 import org.simantics.db.service.ClusterControl;\r
132 import org.simantics.db.service.ClusterSetsSupport;\r
133 import org.simantics.db.service.ClusterUID;\r
134 import org.simantics.db.service.ClusteringSupport;\r
135 import org.simantics.db.service.CollectionSupport;\r
136 import org.simantics.db.service.DebugSupport;\r
137 import org.simantics.db.service.DirectQuerySupport;\r
138 import org.simantics.db.service.GraphChangeListenerSupport;\r
139 import org.simantics.db.service.InitSupport;\r
140 import org.simantics.db.service.LifecycleSupport;\r
141 import org.simantics.db.service.ManagementSupport;\r
142 import org.simantics.db.service.QueryControl;\r
143 import org.simantics.db.service.SerialisationSupport;\r
144 import org.simantics.db.service.ServerInformation;\r
145 import org.simantics.db.service.ServiceActivityMonitor;\r
146 import org.simantics.db.service.SessionEventSupport;\r
147 import org.simantics.db.service.SessionMonitorSupport;\r
148 import org.simantics.db.service.SessionUserSupport;\r
149 import org.simantics.db.service.StatementSupport;\r
150 import org.simantics.db.service.TransactionPolicySupport;\r
151 import org.simantics.db.service.TransactionSupport;\r
152 import org.simantics.db.service.TransferableGraphSupport;\r
153 import org.simantics.db.service.UndoRedoSupport;\r
154 import org.simantics.db.service.VirtualGraphSupport;\r
155 import org.simantics.db.service.XSupport;\r
156 import org.simantics.layer0.Layer0;\r
157 import org.simantics.utils.DataContainer;\r
158 import org.simantics.utils.Development;\r
159 import org.simantics.utils.datastructures.Callback;\r
160 import org.simantics.utils.threads.logger.ITask;\r
161 import org.simantics.utils.threads.logger.ThreadLogger;\r
162 \r
163 import gnu.trove.procedure.TLongProcedure;\r
164 import gnu.trove.set.hash.TLongHashSet;\r
165 \r
166 \r
167 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {\r
168 \r
169     protected static final boolean DEBUG        = false;\r
170 \r
171     private static final boolean DIAGNOSTICS       = false;\r
172 \r
173     private TransactionPolicySupport transactionPolicy;\r
174 \r
175     final private ClusterControl clusterControl;\r
176 \r
177     final protected BuiltinSupportImpl builtinSupport;\r
178     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;\r
179     final protected ClusterSetsSupport clusterSetsSupport;\r
180     final protected LifecycleSupportImpl lifecycleSupport;\r
181 \r
182     protected QuerySupportImpl querySupport;\r
183     protected ResourceSupportImpl resourceSupport;\r
184     protected WriteSupport writeSupport;\r
185     public ClusterTranslator clusterTranslator;\r
186 \r
187     boolean dirtyPrimitives = false;\r
188 \r
189     public static final int SERVICE_MODE_CREATE = 2;\r
190     public static final int SERVICE_MODE_ALLOW = 1;\r
191     public int serviceMode = 0;\r
192     public boolean createdImmutableClusters = false;\r
193     public TLongHashSet createdClusters = new TLongHashSet();\r
194 \r
195     private Layer0 L0;\r
196 \r
197     /**\r
198      * The service locator maintained by the workbench. These services are\r
199      * initialized during workbench during the <code>init</code> method.\r
200      */\r
201     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();\r
202 \r
203     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();\r
204 \r
205     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();\r
206 \r
207     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();\r
208 \r
209     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();\r
210 \r
211     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();\r
212 \r
213     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();\r
214 \r
215     final protected State                                        state              = new State();\r
216 \r
217     protected GraphSession                                       graphSession       = null;\r
218 \r
219     protected SessionManager                                     sessionManagerImpl = null;\r
220 \r
221     protected UserAuthenticationAgent                            authAgent          = null;\r
222 \r
223     protected UserAuthenticator                                  authenticator      = null;\r
224 \r
225     protected Resource                                           user               = null;\r
226 \r
227     protected ClusterStream                                      clusterStream      = null;\r
228 \r
229     protected SessionRequestManager                         requestManager = null;\r
230 \r
231     public ClusterTable                                       clusterTable       = null;\r
232 \r
233     public QueryProcessor                                     queryProvider2 = null;\r
234 \r
235     ClientChangesImpl                                  clientChanges      = null;\r
236 \r
237     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];\r
238 \r
239 //    protected long                                               newClusterId       = Constants.NullClusterId;\r
240 \r
241     protected int                                                flushCounter       = 0;\r
242 \r
243     protected boolean                                            writeOnly          = false;\r
244 \r
245     WriteState<?>                                                writeState = null;\r
246     WriteStateBase<?>                                            delayedWriteState = null;\r
247     protected Resource defaultClusterSet = null; // If not null then used for newResource().\r
248 \r
249     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {\r
250 \r
251 //        if (authAgent == null)\r
252 //            throw new IllegalArgumentException("null authentication agent");\r
253 \r
254         File t = StaticSessionProperties.virtualGraphStoragePath;\r
255         if (null == t)\r
256             t = new File(".");\r
257         this.clusterTable = new ClusterTable(this, t);\r
258         this.builtinSupport = new BuiltinSupportImpl(this);\r
259         this.sessionManagerImpl = sessionManagerImpl;\r
260         this.user = null;\r
261 //        this.authAgent = authAgent;\r
262 \r
263         serviceLocator.registerService(Session.class, this);\r
264         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));\r
265         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));\r
266         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));\r
267         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));\r
268         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));\r
269         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));\r
270         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));\r
271         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));\r
272         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));\r
273         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));\r
274         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));\r
275         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));\r
276         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));\r
277         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));\r
278         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));\r
279         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));\r
280         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));\r
281         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());\r
282         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));\r
283         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());\r
284         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());\r
285         ServiceActivityUpdaterForWriteTransactions.register(this);\r
286 \r
287         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);\r
288         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);\r
289         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);\r
290         this.lifecycleSupport = new LifecycleSupportImpl(this);\r
291         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);\r
292         this.transactionPolicy = new TransactionPolicyRelease();\r
293         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);\r
294         this.clusterControl = new ClusterControlImpl(this);\r
295         serviceLocator.registerService(ClusterControl.class, clusterControl);\r
296         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.\r
297         this.clusterSetsSupport.setReadDirectory(t.toPath());\r
298         this.clusterSetsSupport.updateWriteDirectory(t.toPath());\r
299         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);\r
300     }\r
301 \r
302     /*\r
303      *\r
304      * Session interface\r
305      */\r
306 \r
307 \r
308     @Override\r
309     public Resource getRootLibrary() {\r
310         return queryProvider2.getRootLibraryResource();\r
311     }\r
312 \r
313     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {\r
314         if (!graphSession.dbSession.refreshEnabled())\r
315             return;\r
316         try {\r
317             getClusterTable().refresh(csid, this, clusterUID);\r
318         } catch (Throwable t) {\r
319             Logger.defaultLogError("Refesh failed.", t);\r
320         }\r
321     }\r
322 \r
323     static final class TaskHelper {\r
324         private final String name;\r
325         private Object result;\r
326         final Semaphore sema = new Semaphore(0);\r
327         private Throwable throwable = null;\r
328         final Callback<DatabaseException> callback = new Callback<DatabaseException>() {\r
329             @Override\r
330             public void run(DatabaseException e) {\r
331                 synchronized (TaskHelper.this) {\r
332                     throwable = e;\r
333                 }\r
334             }\r
335         };\r
336         final Procedure<Object> proc = new Procedure<Object>() {\r
337             @Override\r
338             public void execute(Object result) {\r
339                 callback.run(null);\r
340             }\r
341             @Override\r
342             public void exception(Throwable t) {\r
343                 if (t instanceof DatabaseException)\r
344                     callback.run((DatabaseException)t);\r
345                 else\r
346                     callback.run(new DatabaseException("" + name + "operation failed.", t));\r
347             }\r
348         };\r
349         final WriteTraits writeTraits = new WriteTraits() {\r
350             @Override\r
351             public UndoTraits getUndoTraits() {\r
352                 return null;\r
353             }\r
354             @Override\r
355             public VirtualGraph getProvider() {\r
356                 return null;\r
357             }\r
358         };\r
359         TaskHelper(String name) {\r
360             this.name = name;\r
361         }\r
362         <T> T getResult() {\r
363             return (T)result;\r
364         }\r
365         void setResult(Object result) {\r
366             this.result = result;\r
367         }\r
368 //        Throwable throwableGet() {\r
369 //            return throwable;\r
370 //        }\r
371         synchronized void throwableSet(Throwable t) {\r
372             throwable = t;\r
373         }\r
374 //        void throwableSet(String t) {\r
375 //            throwable = new InternalException("" + name + " operation failed. " + t);\r
376 //        }\r
377         synchronized void throwableCheck()\r
378         throws DatabaseException {\r
379             if (null != throwable)\r
380                 if (throwable instanceof DatabaseException)\r
381                     throw (DatabaseException)throwable;\r
382                 else\r
383                     throw new DatabaseException("Undo operation failed.", throwable);\r
384         }\r
385         void throw_(String message)\r
386         throws DatabaseException {\r
387             throw new DatabaseException("" + name + " operation failed. " + message);\r
388         }\r
389     }\r
390 \r
391 \r
392 \r
393     private ListenerBase getListenerBase(Object procedure) {\r
394         if (procedure instanceof ListenerBase)\r
395             return (ListenerBase) procedure;\r
396         else\r
397             return null;\r
398     }\r
399 \r
400     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {\r
401         scheduleRequest(request, callback, notify, null);\r
402     }\r
403 \r
404     /* (non-Javadoc)\r
405      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)\r
406      */\r
407     @Override\r
408     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {\r
409 \r
410         assert (request != null);\r
411 \r
412         if(Development.DEVELOPMENT) {\r
413             try {\r
414             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))\r
415                 System.err.println("schedule write '" + request + "'");\r
416             } catch (Throwable t) {\r
417                 Logger.defaultLogError(t);\r
418             }\r
419         }\r
420 \r
421         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
422 \r
423         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {\r
424 \r
425             @Override\r
426             public void run(int thread) {\r
427 \r
428                 if(Development.DEVELOPMENT) {\r
429                     try {\r
430                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))\r
431                         System.err.println("perform write '" + request + "'");\r
432                     } catch (Throwable t) {\r
433                         Logger.defaultLogError(t);\r
434                     }\r
435                 }\r
436 \r
437                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
438 \r
439                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
440 \r
441                 try {\r
442 \r
443                     flushCounter = 0;\r
444                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
445 \r
446                     VirtualGraph vg = getProvider(request.getProvider());\r
447 \r
448                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
449                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {\r
450 \r
451                         @Override\r
452                         public void execute(Object result) {\r
453                             if(callback != null) callback.run(null);\r
454                         }\r
455 \r
456                         @Override\r
457                         public void exception(Throwable t) {\r
458                             if(callback != null) callback.run((DatabaseException)t);\r
459                         }\r
460 \r
461                     });\r
462 \r
463                     assert (null != writer);\r
464 //                    writer.state.barrier.inc();\r
465 \r
466                     try {\r
467                         request.perform(writer);\r
468                         assert (null != writer);\r
469                     } catch (Throwable t) {\r
470                         if (!(t instanceof CancelTransactionException))\r
471                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);\r
472                         writeState.except(t);\r
473                     } finally {\r
474 //                        writer.state.barrier.dec();\r
475 //                        writer.waitAsync(request);\r
476                     }\r
477 \r
478 \r
479                     assert(!queryProvider2.dirty);\r
480 \r
481                 } catch (Throwable e) {\r
482 \r
483                     // Log it first, just to be safe that the error is always logged.\r
484                     if (!(e instanceof CancelTransactionException))\r
485                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);\r
486 \r
487 //                    writeState.getGraph().state.barrier.dec();\r
488 //                    writeState.getGraph().waitAsync(request);\r
489 \r
490                     writeState.except(e);\r
491 \r
492 //                    try {\r
493 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.\r
494 //                        // All we can do here is to log those, can't really pass them anywhere.\r
495 //                        if (callback != null) {\r
496 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);\r
497 //                            else callback.run(new DatabaseException(e));\r
498 //                        }\r
499 //                    } catch (Throwable e2) {\r
500 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);\r
501 //                    }\r
502 //                    boolean empty = clusterStream.reallyFlush();\r
503 //                    int callerThread = -1;\r
504 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);\r
505 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);\r
506 //                    try {\r
507 //                        // Send all local changes to server so it can calculate correct reverse change set.\r
508 //                        // This will call clusterStream.accept().\r
509 //                        state.cancelCommit(context, clusterStream);\r
510 //                        if (!empty) {\r
511 //                            if (!context.isOk()) // this is a blocking operation\r
512 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");\r
513 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());\r
514 //                        }\r
515 //                        state.cancelCommit2(context, clusterStream);\r
516 //                    } catch (DatabaseException e3) {\r
517 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);\r
518 //                    } finally {\r
519 //                        clusterStream.setOff(false);\r
520 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
521 //                    }\r
522 \r
523                 } finally {\r
524 \r
525                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
526 \r
527                 }\r
528 \r
529                 task.finish();\r
530 \r
531                 if(Development.DEVELOPMENT) {\r
532                     try {\r
533                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))\r
534                         System.err.println("finish write '" + request + "'");\r
535                     } catch (Throwable t) {\r
536                         Logger.defaultLogError(t);\r
537                     }\r
538                 }\r
539 \r
540             }\r
541 \r
542         }, combine);\r
543 \r
544     }\r
545 \r
546     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {\r
547         scheduleRequest(request, procedure, notify, null);\r
548     }\r
549 \r
550     /* (non-Javadoc)\r
551      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)\r
552      */\r
553     @Override\r
554     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {\r
555 \r
556         assert (request != null);\r
557 \r
558         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
559 \r
560         requestManager.scheduleWrite(new SessionTask(request, thread) {\r
561 \r
562             @Override\r
563             public void run(int thread) {\r
564 \r
565                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
566 \r
567                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
568 \r
569                 flushCounter = 0;\r
570                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
571 \r
572                 VirtualGraph vg = getProvider(request.getProvider());\r
573                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
574 \r
575                 try {\r
576                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);\r
577                     writeState = writeStateT;\r
578 \r
579                     assert (null != writer);\r
580 //                    writer.state.barrier.inc();\r
581                     writeStateT.setResult(request.perform(writer));\r
582                     assert (null != writer);\r
583 \r
584 //                    writer.state.barrier.dec();\r
585 //                    writer.waitAsync(null);\r
586 \r
587                 } catch (Throwable e) {\r
588 \r
589 //                    writer.state.barrier.dec();\r
590 //                    writer.waitAsync(null);\r
591 \r
592                     writeState.except(e);\r
593 \r
594 //                  state.stopWriteTransaction(clusterStream);\r
595 //\r
596 //              } catch (Throwable e) {\r
597 //                  // Log it first, just to be safe that the error is always logged.\r
598 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);\r
599 //\r
600 //                  try {\r
601 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.\r
602 //                      // All we can do here is to log those, can't really pass them anywhere.\r
603 //                      if (procedure != null) {\r
604 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);\r
605 //                          else procedure.exception(new DatabaseException(e));\r
606 //                      }\r
607 //                  } catch (Throwable e2) {\r
608 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);\r
609 //                  }\r
610 //\r
611 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
612 //\r
613 //                  state.stopWriteTransaction(clusterStream);\r
614 \r
615                 } finally {\r
616                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
617                 }\r
618 \r
619 //              if(notify != null) notify.release();\r
620 \r
621                 task.finish();\r
622 \r
623             }\r
624 \r
625         }, combine);\r
626 \r
627     }\r
628 \r
629     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {\r
630         scheduleRequest(request, callback, notify, null);\r
631     }\r
632 \r
633     /* (non-Javadoc)\r
634      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)\r
635      */\r
636     @Override\r
637     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {\r
638 \r
639         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");\r
640 \r
641         assert (request != null);\r
642 \r
643         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
644 \r
645         requestManager.scheduleWrite(new SessionTask(request, thread) {\r
646 \r
647             @Override\r
648             public void run(int thread) {\r
649                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
650 \r
651                 Procedure<Object> stateProcedure = new Procedure<Object>() {\r
652                     @Override\r
653                     public void execute(Object result) {\r
654                         if (callback != null)\r
655                             callback.run(null);\r
656                     }\r
657                     @Override\r
658                     public void exception(Throwable t) {\r
659                         if (callback != null) {\r
660                             if (t instanceof DatabaseException) callback.run((DatabaseException) t);\r
661                             else callback.run(new DatabaseException(t));\r
662                         } else\r
663                             Logger.defaultLogError("Unhandled exception", t);\r
664                     }\r
665                 };\r
666 \r
667                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
668                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);\r
669                 DelayedWriteGraph dwg = null;\r
670 //                newGraph.state.barrier.inc();\r
671 \r
672                 try {\r
673                     dwg = new DelayedWriteGraph(newGraph);\r
674                     request.perform(dwg);\r
675                 } catch (Throwable e) {\r
676                     delayedWriteState.except(e);\r
677                     total.finish();\r
678                     return;\r
679                 } finally {\r
680 //                    newGraph.state.barrier.dec();\r
681 //                    newGraph.waitAsync(request);\r
682                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
683                 }\r
684 \r
685                 delayedWriteState = null;\r
686 \r
687                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");\r
688                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
689 \r
690                 flushCounter = 0;\r
691                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
692 \r
693                 acquireWriteOnly();\r
694 \r
695                 VirtualGraph vg = getProvider(request.getProvider());\r
696                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();\r
697 \r
698                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
699 \r
700                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);\r
701 \r
702                 assert (null != writer);\r
703 //                writer.state.barrier.inc();\r
704 \r
705                 try {\r
706                     // Cannot cancel\r
707                     dwg.commit(writer, request);\r
708 \r
709                         if(defaultClusterSet != null) {\r
710                                 XSupport xs = getService(XSupport.class);\r
711                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);\r
712                                 ClusteringSupport cs = getService(ClusteringSupport.class);\r
713                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));\r
714                         }\r
715 \r
716                     // This makes clusters available from server\r
717                     clusterStream.reallyFlush();\r
718 \r
719                     releaseWriteOnly(writer);\r
720                     handleUpdatesAndMetadata(writer);\r
721 \r
722                 } catch (ServiceException e) {\r
723 //                    writer.state.barrier.dec();\r
724 //                    writer.waitAsync(null);\r
725 \r
726                     // These shall be requested from server\r
727                     clusterTable.removeWriteOnlyClusters();\r
728                     // This makes clusters available from server\r
729                     clusterStream.reallyFlush();\r
730 \r
731                     releaseWriteOnly(writer);\r
732                     writeState.except(e);\r
733                 } finally {\r
734                     // Debugging & Profiling\r
735                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
736                     task2.finish();\r
737                     total.finish();\r
738                 }\r
739             }\r
740 \r
741         }, combine);\r
742 \r
743     }\r
744 \r
745     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {\r
746         scheduleRequest(request, procedure, notify, null);\r
747     }\r
748 \r
749     /* (non-Javadoc)\r
750      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)\r
751      */\r
752     @Override\r
753     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {\r
754         throw new Error("Not implemented");\r
755     }\r
756 \r
757     protected ClusterImpl getNewResourceCluster() throws DatabaseException {\r
758         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);\r
759         if((serviceMode & SERVICE_MODE_CREATE) > 0) {\r
760             createdClusters.add(cluster.clusterId);\r
761         }\r
762         return cluster;\r
763     }\r
764 \r
765     class WriteOnlySupport implements WriteSupport {\r
766 \r
767         ClusterStream stream;\r
768         ClusterImpl currentCluster;\r
769 \r
770         public WriteOnlySupport() {\r
771             this.stream = clusterStream;\r
772         }\r
773 \r
774         @Override\r
775         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {\r
776             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );\r
777         }\r
778 \r
779         @Override\r
780         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {\r
781 \r
782             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);\r
783 \r
784             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)\r
785                 if(s != queryProvider2.getRootLibrary())\r
786                     throw new ImmutableException("Trying to modify immutable resource key=" + s);\r
787 \r
788             try {\r
789                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));\r
790             } catch (DatabaseException e) {\r
791                 Logger.defaultLogError(e);\r
792                 //throw e;\r
793             }\r
794 \r
795             clientChanges.invalidate(s);\r
796 \r
797             if (cluster.isWriteOnly())\r
798                 return;\r
799             queryProvider2.updateStatements(s, p);\r
800 \r
801         }\r
802 \r
803         @Override\r
804         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {\r
805             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);\r
806         }\r
807 \r
808         @Override\r
809         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {\r
810 \r
811             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);\r
812             try {\r
813                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));\r
814             } catch (DatabaseException e) {\r
815                 Logger.defaultLogError(e);\r
816             }\r
817 \r
818             clientChanges.invalidate(rid);\r
819 \r
820             if (cluster.isWriteOnly())\r
821                 return;\r
822             queryProvider2.updateValue(rid);\r
823 \r
824         }\r
825 \r
826         @Override\r
827         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {\r
828 \r
829             if(amount < 65536) {\r
830                 claimValue(provider,resource, reader.readBytes(null, amount));\r
831                 return;\r
832             }\r
833 \r
834             byte[] bytes = new byte[65536];\r
835 \r
836             int rid = ((ResourceImpl)resource).id;\r
837             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);\r
838             try {\r
839                 int left = amount;\r
840                 while(left > 0) {\r
841                     int block = Math.min(left, 65536);\r
842                     reader.readBytes(bytes, block);\r
843                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));\r
844                     left -= block;\r
845                 }\r
846             } catch (DatabaseException e) {\r
847                 Logger.defaultLogError(e);\r
848             }\r
849 \r
850             clientChanges.invalidate(rid);\r
851 \r
852             if (cluster.isWriteOnly())\r
853                 return;\r
854             queryProvider2.updateValue(rid);\r
855 \r
856         }\r
857 \r
858         private void maintainCluster(ClusterImpl before, ClusterI after_) {\r
859             if(after_ != null && after_ != before) {\r
860                 ClusterImpl after = (ClusterImpl)after_;\r
861                 if(currentCluster == before) currentCluster = after;\r
862                 clusterTable.replaceCluster(after);\r
863             }\r
864         }\r
865 \r
866         public int createResourceKey(int foreignCounter) throws DatabaseException {\r
867             if(currentCluster == null)\r
868                 currentCluster = getNewResourceCluster();\r
869             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {\r
870                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();\r
871                 newCluster.foreignLookup = new byte[foreignCounter];\r
872                 currentCluster = newCluster;\r
873                 if (DEBUG)\r
874                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);\r
875             }\r
876             return currentCluster.createResource(clusterTranslator);\r
877         }\r
878 \r
879         @Override\r
880         public Resource createResource(VirtualGraph provider) throws DatabaseException {\r
881             if(currentCluster == null) {\r
882                 if (null != defaultClusterSet) {\r
883                         ResourceImpl result = getNewResource(defaultClusterSet);\r
884                     currentCluster = clusterTable.getClusterByResourceKey(result.id);\r
885                     return result;\r
886                 } else {\r
887                     currentCluster = getNewResourceCluster();\r
888                 }\r
889             }\r
890             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {\r
891                 if (null != defaultClusterSet) {\r
892                         ResourceImpl result = getNewResource(defaultClusterSet);\r
893                     currentCluster = clusterTable.getClusterByResourceKey(result.id);\r
894                     return result;\r
895                 } else {\r
896                     currentCluster = getNewResourceCluster();\r
897                 }\r
898             }\r
899             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));\r
900         }\r
901 \r
902         @Override\r
903         public Resource createResource(VirtualGraph provider, long clusterId)\r
904         throws DatabaseException {\r
905             return getNewResource(clusterId);\r
906         }\r
907 \r
908         @Override\r
909         public Resource createResource(VirtualGraph provider, Resource clusterSet)\r
910         throws DatabaseException {\r
911             return getNewResource(clusterSet);\r
912         }\r
913 \r
914         @Override\r
915         public void createClusterSet(VirtualGraph provider, Resource clusterSet)\r
916                 throws DatabaseException {\r
917             getNewClusterSet(clusterSet);\r
918         }\r
919 \r
920         @Override\r
921         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)\r
922         throws ServiceException {\r
923             return containsClusterSet(clusterSet);\r
924         }\r
925 \r
926         public void selectCluster(long cluster) {\r
927                 currentCluster = clusterTable.getClusterByClusterId(cluster);\r
928                 long setResourceId = clusterSetsSupport.getSet(cluster);\r
929                 clusterSetsSupport.put(setResourceId, cluster);\r
930         }\r
931 \r
932         @Override\r
933         public Resource setDefaultClusterSet(Resource clusterSet)\r
934         throws ServiceException {\r
935                 Resource result = setDefaultClusterSet4NewResource(clusterSet);\r
936                 if(clusterSet != null) {\r
937                         long id = clusterSetsSupport.get(clusterSet.getResourceId());\r
938                         currentCluster = clusterTable.getClusterByClusterId(id);\r
939                         return result;\r
940                 } else {\r
941                         currentCluster = null;\r
942                         return null;\r
943                 }\r
944         }\r
945 \r
946         @Override\r
947         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {\r
948 \r
949                  provider = getProvider(provider);\r
950              if (null == provider) {\r
951                  int key = ((ResourceImpl)resource).id;\r
952 \r
953 \r
954 \r
955                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);\r
956 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)\r
957 //                      if(key != queryProvider2.getRootLibrary())\r
958 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);\r
959 \r
960 //                 try {\r
961 //                     cluster.removeValue(key, clusterTranslator);\r
962 //                 } catch (DatabaseException e) {\r
963 //                     Logger.defaultLogError(e);\r
964 //                     return;\r
965 //                 }\r
966 \r
967                  clusterTable.writeOnlyInvalidate(cluster);\r
968 \r
969                  try {\r
970                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);\r
971                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);\r
972                          clusterTranslator.removeValue(cluster);\r
973                  } catch (DatabaseException e) {\r
974                          Logger.defaultLogError(e);\r
975                  }\r
976 \r
977                  queryProvider2.invalidateResource(key);\r
978                  clientChanges.invalidate(key);\r
979 \r
980              } else {\r
981                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);\r
982                  queryProvider2.updateValue(querySupport.getId(resource));\r
983                  clientChanges.claimValue(resource);\r
984              }\r
985 \r
986 \r
987         }\r
988 \r
989         @Override\r
990         public void flush(boolean intermediate) {\r
991             throw new UnsupportedOperationException();\r
992         }\r
993 \r
994         @Override\r
995         public void flushCluster() {\r
996             clusterTable.flushCluster(graphSession);\r
997             if(defaultClusterSet != null) {\r
998                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);\r
999             }\r
1000             currentCluster = null;\r
1001         }\r
1002 \r
1003         @Override\r
1004         public void flushCluster(Resource r) {\r
1005             throw new UnsupportedOperationException("flushCluster resource " + r);\r
1006         }\r
1007 \r
1008         @Override\r
1009         public void gc() {\r
1010         }\r
1011 \r
1012         @Override\r
1013         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,\r
1014                 Resource object) {\r
1015 \r
1016                 int s = ((ResourceImpl)subject).id;\r
1017                 int p = ((ResourceImpl)predicate).id;\r
1018                 int o = ((ResourceImpl)object).id;\r
1019 \r
1020                 provider = getProvider(provider);\r
1021                 if (null == provider) {\r
1022 \r
1023                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);\r
1024                         clusterTable.writeOnlyInvalidate(cluster);\r
1025 \r
1026                         try {\r
1027 \r
1028                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);\r
1029                                 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);\r
1030                                 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);\r
1031 \r
1032                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);\r
1033                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);\r
1034 \r
1035                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);\r
1036                                 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);\r
1037                                 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);\r
1038                                 clusterTranslator.removeStatement(cluster);\r
1039 \r
1040                                 queryProvider2.invalidateResource(s);\r
1041                                 clientChanges.invalidate(s);\r
1042 \r
1043                         } catch (DatabaseException e) {\r
1044 \r
1045                                 Logger.defaultLogError(e);\r
1046 \r
1047                         }\r
1048 \r
1049                         return true;\r
1050 \r
1051                 } else {\r
1052                         \r
1053                         ((VirtualGraphImpl)provider).deny(s, p, o);\r
1054                         queryProvider2.invalidateResource(s);\r
1055                         clientChanges.invalidate(s);\r
1056 \r
1057                         return true;\r
1058 \r
1059                 }\r
1060 \r
1061         }\r
1062 \r
1063         @Override\r
1064         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {\r
1065             throw new UnsupportedOperationException();\r
1066         }\r
1067 \r
1068         @Override\r
1069         public boolean writeOnly() {\r
1070             return true;\r
1071         }\r
1072 \r
1073         @Override\r
1074         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {\r
1075             writeSupport.performWriteRequest(graph, request);\r
1076         }\r
1077 \r
1078         @Override\r
1079         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {\r
1080             throw new UnsupportedOperationException();\r
1081         }\r
1082 \r
1083         @Override\r
1084         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {\r
1085             throw new UnsupportedOperationException();\r
1086         }\r
1087 \r
1088         @Override\r
1089         public <T> void addMetadata(Metadata data) throws ServiceException {\r
1090             writeSupport.addMetadata(data);\r
1091         }\r
1092 \r
1093         @Override\r
1094         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {\r
1095             return writeSupport.getMetadata(clazz);\r
1096         }\r
1097 \r
1098         @Override\r
1099         public TreeMap<String, byte[]> getMetadata() {\r
1100             return writeSupport.getMetadata();\r
1101         }\r
1102 \r
1103         @Override\r
1104         public void commitDone(WriteTraits writeTraits, long csid) {\r
1105             writeSupport.commitDone(writeTraits, csid);\r
1106         }\r
1107 \r
1108         @Override\r
1109         public void clearUndoList(WriteTraits writeTraits) {\r
1110             writeSupport.clearUndoList(writeTraits);\r
1111         }\r
1112         @Override\r
1113         public int clearMetadata() {\r
1114             return writeSupport.clearMetadata();\r
1115         }\r
1116 \r
1117                 @Override\r
1118                 public void startUndo() {\r
1119                         writeSupport.startUndo();\r
1120                 }\r
1121     }\r
1122 \r
1123     class VirtualWriteOnlySupport implements WriteSupport {\r
1124 \r
1125 //        @Override\r
1126 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,\r
1127 //                Resource object) {\r
1128 //            throw new UnsupportedOperationException();\r
1129 //        }\r
1130 \r
1131         @Override\r
1132         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {\r
1133 \r
1134             TransientGraph impl = (TransientGraph)provider;\r
1135             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));\r
1136             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));\r
1137             clientChanges.claim(subject, predicate, object);\r
1138 \r
1139         }\r
1140 \r
1141         @Override\r
1142         public void claim(VirtualGraph provider, int subject, int predicate, int object) {\r
1143 \r
1144             TransientGraph impl = (TransientGraph)provider;\r
1145             impl.claim(subject, predicate, object);\r
1146             getQueryProvider2().updateStatements(subject, predicate);\r
1147             clientChanges.claim(subject, predicate, object);\r
1148 \r
1149         }\r
1150 \r
1151         @Override\r
1152         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {\r
1153             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);\r
1154         }\r
1155 \r
1156         @Override\r
1157         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {\r
1158             ((VirtualGraphImpl)provider).claimValue(resource, value, length);\r
1159             getQueryProvider2().updateValue(resource);\r
1160             clientChanges.claimValue(resource);\r
1161         }\r
1162 \r
1163         @Override\r
1164         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {\r
1165             byte[] value = reader.readBytes(null, amount);\r
1166             claimValue(provider, resource, value);\r
1167         }\r
1168 \r
1169         @Override\r
1170         public Resource createResource(VirtualGraph provider) {\r
1171             TransientGraph impl = (TransientGraph)provider;\r
1172             return impl.getResource(impl.newResource(false));\r
1173         }\r
1174 \r
1175         @Override\r
1176         public Resource createResource(VirtualGraph provider, long clusterId) {\r
1177             throw new UnsupportedOperationException();\r
1178         }\r
1179 \r
1180         @Override\r
1181         public Resource createResource(VirtualGraph provider, Resource clusterSet)\r
1182         throws DatabaseException {\r
1183             throw new UnsupportedOperationException();\r
1184         }\r
1185 \r
1186         @Override\r
1187         public void createClusterSet(VirtualGraph provider, Resource clusterSet)\r
1188         throws DatabaseException {\r
1189             throw new UnsupportedOperationException();\r
1190         }\r
1191 \r
1192         @Override\r
1193         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)\r
1194         throws ServiceException {\r
1195             throw new UnsupportedOperationException();\r
1196         }\r
1197 \r
1198         @Override\r
1199         public Resource setDefaultClusterSet(Resource clusterSet)\r
1200         throws ServiceException {\r
1201                 return null;\r
1202         }\r
1203 \r
1204         @Override\r
1205         public void denyValue(VirtualGraph provider, Resource resource) {\r
1206             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);\r
1207             getQueryProvider2().updateValue(querySupport.getId(resource));\r
1208             // NOTE: this only keeps track of value changes by-resource.\r
1209             clientChanges.claimValue(resource);\r
1210         }\r
1211 \r
1212         @Override\r
1213         public void flush(boolean intermediate) {\r
1214             throw new UnsupportedOperationException();\r
1215         }\r
1216 \r
1217         @Override\r
1218         public void flushCluster() {\r
1219             throw new UnsupportedOperationException();\r
1220         }\r
1221 \r
1222         @Override\r
1223         public void flushCluster(Resource r) {\r
1224             throw new UnsupportedOperationException("Resource " + r);\r
1225         }\r
1226 \r
1227         @Override\r
1228         public void gc() {\r
1229         }\r
1230 \r
1231         @Override\r
1232         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {\r
1233             TransientGraph impl = (TransientGraph) provider;\r
1234             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));\r
1235             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));\r
1236             clientChanges.deny(subject, predicate, object);\r
1237             return true;\r
1238         }\r
1239 \r
1240         @Override\r
1241         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {\r
1242             throw new UnsupportedOperationException();\r
1243         }\r
1244 \r
1245         @Override\r
1246         public boolean writeOnly() {\r
1247             return true;\r
1248         }\r
1249 \r
1250         @Override\r
1251         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {\r
1252             throw new UnsupportedOperationException();\r
1253         }\r
1254 \r
1255         @Override\r
1256         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {\r
1257             throw new UnsupportedOperationException();\r
1258         }\r
1259 \r
1260         @Override\r
1261         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {\r
1262             throw new UnsupportedOperationException();\r
1263         }\r
1264 \r
1265         @Override\r
1266         public <T> void addMetadata(Metadata data) throws ServiceException {\r
1267             throw new UnsupportedOperationException();\r
1268         }\r
1269 \r
1270         @Override\r
1271         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {\r
1272             throw new UnsupportedOperationException();\r
1273         }\r
1274 \r
1275         @Override\r
1276         public TreeMap<String, byte[]> getMetadata() {\r
1277             throw new UnsupportedOperationException();\r
1278         }\r
1279 \r
1280         @Override\r
1281         public void commitDone(WriteTraits writeTraits, long csid) {\r
1282         }\r
1283 \r
1284         @Override\r
1285         public void clearUndoList(WriteTraits writeTraits) {\r
1286         }\r
1287 \r
1288         @Override\r
1289         public int clearMetadata() {\r
1290             return 0;\r
1291         }\r
1292 \r
1293                 @Override\r
1294                 public void startUndo() {\r
1295                 }\r
1296     }\r
1297 \r
1298     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {\r
1299 \r
1300         try {\r
1301 \r
1302             int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1303 \r
1304             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
1305 \r
1306             flushCounter = 0;\r
1307             clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
1308 \r
1309             acquireWriteOnly();\r
1310 \r
1311             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();\r
1312 \r
1313             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());\r
1314 \r
1315             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);\r
1316             writeState = writeStateT;\r
1317 \r
1318             assert (null != writer);\r
1319 //            writer.state.barrier.inc();\r
1320             long start = System.nanoTime();\r
1321             T result = request.perform(writer);\r
1322             long duration = System.nanoTime() - start;\r
1323             if (DEBUG) {\r
1324                 System.err.println("################");\r
1325                 System.err.println("WriteOnly duration " + 1e-9*duration);\r
1326             }\r
1327             writeStateT.setResult(result);\r
1328 \r
1329             // This makes clusters available from server\r
1330             clusterStream.reallyFlush();\r
1331 \r
1332             // This will trigger query updates\r
1333             releaseWriteOnly(writer);\r
1334             assert (null != writer);\r
1335 \r
1336             handleUpdatesAndMetadata(writer);\r
1337 \r
1338         } catch (CancelTransactionException e) {\r
1339 \r
1340             releaseWriteOnly(writeState.getGraph());\r
1341 \r
1342             clusterTable.removeWriteOnlyClusters();\r
1343             state.stopWriteTransaction(clusterStream);\r
1344 \r
1345         } catch (Throwable e) {\r
1346 \r
1347             e.printStackTrace();\r
1348 \r
1349             releaseWriteOnly(writeState.getGraph());\r
1350 \r
1351             clusterTable.removeWriteOnlyClusters();\r
1352 \r
1353             if (callback != null)\r
1354                 callback.exception(new DatabaseException(e));\r
1355 \r
1356             state.stopWriteTransaction(clusterStream);\r
1357             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);\r
1358 \r
1359         } finally  {\r
1360 \r
1361             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
1362 \r
1363         }\r
1364 \r
1365     }\r
1366 \r
1367     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {\r
1368         scheduleRequest(request, callback, notify, null);\r
1369     }\r
1370 \r
1371     @Override\r
1372     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {\r
1373 \r
1374         assertAlive();\r
1375 \r
1376         assert (request != null);\r
1377 \r
1378         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1379 \r
1380         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {\r
1381 \r
1382             @Override\r
1383             public void run(int thread) {\r
1384 \r
1385                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
1386 \r
1387                     try {\r
1388 \r
1389                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
1390 \r
1391                         flushCounter = 0;\r
1392                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
1393 \r
1394                         acquireWriteOnly();\r
1395 \r
1396                         VirtualGraph vg = getProvider(request.getProvider());\r
1397                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();\r
1398 \r
1399                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
1400 \r
1401                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {\r
1402 \r
1403                             @Override\r
1404                             public void execute(Object result) {\r
1405                                 if(callback != null) callback.run(null);\r
1406                             }\r
1407 \r
1408                             @Override\r
1409                             public void exception(Throwable t) {\r
1410                                 if(callback != null) callback.run((DatabaseException)t);\r
1411                             }\r
1412 \r
1413                         });\r
1414 \r
1415                         assert (null != writer);\r
1416 //                        writer.state.barrier.inc();\r
1417 \r
1418                         try {\r
1419 \r
1420                             request.perform(writer);\r
1421 \r
1422                         } catch (Throwable e) {\r
1423 \r
1424 //                            writer.state.barrier.dec();\r
1425 //                            writer.waitAsync(null);\r
1426 \r
1427                             releaseWriteOnly(writer);\r
1428 \r
1429                             clusterTable.removeWriteOnlyClusters();\r
1430 \r
1431                             if(!(e instanceof CancelTransactionException)) {\r
1432                                 if (callback != null)\r
1433                                     callback.run(new DatabaseException(e));\r
1434                             }\r
1435 \r
1436                             writeState.except(e);\r
1437 \r
1438                             return;\r
1439 \r
1440                         }\r
1441 \r
1442                         // This makes clusters available from server\r
1443                         boolean empty = clusterStream.reallyFlush();\r
1444                         // This was needed to make WO requests call metadata listeners.\r
1445                         // NOTE: the calling event does not contain clientChanges information.\r
1446                         if (!empty && clientChanges.isEmpty())\r
1447                             clientChanges.setNotEmpty(true);\r
1448                         releaseWriteOnly(writer);\r
1449                         assert (null != writer);\r
1450                     } finally  {\r
1451 \r
1452                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
1453 \r
1454                     }\r
1455 \r
1456 \r
1457                 task.finish();\r
1458 \r
1459             }\r
1460 \r
1461         }, combine);\r
1462 \r
1463     }\r
1464 \r
1465     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {\r
1466         scheduleRequest(request, callback, notify, null);\r
1467     }\r
1468 \r
1469     /* (non-Javadoc)\r
1470      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)\r
1471      */\r
1472     @Override\r
1473     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {\r
1474 \r
1475         assert (request != null);\r
1476 \r
1477         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1478 \r
1479         requestManager.scheduleWrite(new SessionTask(request, thread) {\r
1480 \r
1481             @Override\r
1482             public void run(int thread) {\r
1483 \r
1484                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
1485 \r
1486                 performWriteOnly(request, notify, callback);\r
1487 \r
1488                 task.finish();\r
1489 \r
1490             }\r
1491 \r
1492         }, combine);\r
1493 \r
1494     }\r
1495 \r
1496     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {\r
1497 \r
1498         assert (request != null);\r
1499         assert (procedure != null);\r
1500 \r
1501         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1502 \r
1503         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {\r
1504 \r
1505             @Override\r
1506             public void run(int thread) {\r
1507 \r
1508                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1509 \r
1510                 ListenerBase listener = getListenerBase(procedure);\r
1511 \r
1512                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
1513 \r
1514                 try {\r
1515 \r
1516                     if (listener != null) {\r
1517 \r
1518                         try {\r
1519                                 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {\r
1520 \r
1521                                         @Override\r
1522                                         public void exception(AsyncReadGraph graph, Throwable t) {\r
1523                                                 procedure.exception(graph, t);\r
1524                                                 if(throwable != null) {\r
1525                                                         throwable.set(t);\r
1526                                                 } else {\r
1527                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);\r
1528                                                 }\r
1529                                         }\r
1530 \r
1531                                         @Override\r
1532                                         public void execute(AsyncReadGraph graph, T t) {\r
1533                                                 if(result != null) result.set(t);\r
1534                                                 procedure.execute(graph, t);\r
1535                                         }\r
1536 \r
1537                                 }, listener);\r
1538                         } catch (Throwable t) {\r
1539                             // This is handled by the AsyncProcedure\r
1540                                 //Logger.defaultLogError("Internal error", t);\r
1541                         }\r
1542 \r
1543                     } else {\r
1544 \r
1545                         try {\r
1546 \r
1547 //                            newGraph.state.barrier.inc();\r
1548 \r
1549                             T t = request.perform(newGraph);\r
1550 \r
1551                             try {\r
1552 \r
1553                                 if(result != null) result.set(t);\r
1554                                 procedure.execute(newGraph, t);\r
1555 \r
1556                             } catch (Throwable th) {\r
1557 \r
1558                                 if(throwable != null) {\r
1559                                     throwable.set(th);\r
1560                                 } else {\r
1561                                     Logger.defaultLogError("Unhandled exception", th);\r
1562                                 }\r
1563 \r
1564                             }\r
1565 \r
1566                         } catch (Throwable t) {\r
1567 \r
1568                             if (DEBUG)\r
1569                                 t.printStackTrace();\r
1570 \r
1571                             if(throwable != null) {\r
1572                                 throwable.set(t);\r
1573                             } else {\r
1574                                 Logger.defaultLogError("Unhandled exception", t);\r
1575                             }\r
1576 \r
1577                             try {\r
1578 \r
1579                                 procedure.exception(newGraph, t);\r
1580 \r
1581                             } catch (Throwable t2) {\r
1582 \r
1583                                 if(throwable != null) {\r
1584                                     throwable.set(t2);\r
1585                                 } else {\r
1586                                     Logger.defaultLogError("Unhandled exception", t2);\r
1587                                 }\r
1588 \r
1589                             }\r
1590 \r
1591                         }\r
1592 \r
1593 //                        newGraph.state.barrier.dec();\r
1594 //                        newGraph.waitAsync(request);\r
1595 \r
1596                     }\r
1597 \r
1598                 } finally {\r
1599 \r
1600                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1601 \r
1602                 }\r
1603 \r
1604             }\r
1605 \r
1606         });\r
1607 \r
1608     }\r
1609 \r
1610     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {\r
1611 \r
1612         assert (request != null);\r
1613         assert (procedure != null);\r
1614 \r
1615         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1616 \r
1617         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {\r
1618 \r
1619             @Override\r
1620             public void run(int thread) {\r
1621 \r
1622                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1623 \r
1624                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
1625 \r
1626                 try {\r
1627 \r
1628                     if (listener != null) {\r
1629 \r
1630                         newGraph.processor.query(newGraph, request, null, procedure, listener);\r
1631 \r
1632 //                        newGraph.waitAsync(request);\r
1633 \r
1634                     } else {\r
1635 \r
1636                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(\r
1637                                 procedure, "request");\r
1638 \r
1639                         try {\r
1640 \r
1641 //                            newGraph.state.barrier.inc();\r
1642 \r
1643                             request.perform(newGraph, wrapper);\r
1644 \r
1645 //                            newGraph.waitAsync(request);\r
1646 \r
1647                         } catch (Throwable t) {\r
1648 \r
1649                             wrapper.exception(newGraph, t);\r
1650 //                            newGraph.waitAsync(request);\r
1651 \r
1652 \r
1653                         }\r
1654 \r
1655                     }\r
1656 \r
1657                 } finally {\r
1658 \r
1659                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1660 \r
1661                 }\r
1662 \r
1663             }\r
1664 \r
1665         });\r
1666 \r
1667     }\r
1668 \r
1669     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {\r
1670 \r
1671         assert (request != null);\r
1672         assert (procedure != null);\r
1673 \r
1674         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1675 \r
1676         int sync = notify != null ? thread : -1;\r
1677 \r
1678         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {\r
1679 \r
1680             @Override\r
1681             public void run(int thread) {\r
1682 \r
1683                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1684 \r
1685                 ListenerBase listener = getListenerBase(procedure);\r
1686 \r
1687                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
1688 \r
1689                 try {\r
1690 \r
1691                     if (listener != null) {\r
1692 \r
1693                         newGraph.processor.query(newGraph, request, null, procedure, listener);\r
1694 \r
1695 //                        newGraph.waitAsync(request);\r
1696 \r
1697                     } else {\r
1698 \r
1699                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);\r
1700 \r
1701                         try {\r
1702 \r
1703                             request.perform(newGraph, wrapper);\r
1704 \r
1705                         } catch (Throwable t) {\r
1706 \r
1707                             t.printStackTrace();\r
1708 \r
1709                         }\r
1710 \r
1711                     }\r
1712 \r
1713                 } finally {\r
1714 \r
1715                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1716 \r
1717                 }\r
1718 \r
1719             }\r
1720 \r
1721         });\r
1722 \r
1723     }\r
1724 \r
1725     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {\r
1726 \r
1727         assert (request != null);\r
1728         assert (procedure != null);\r
1729 \r
1730         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1731 \r
1732         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {\r
1733 \r
1734             @Override\r
1735             public void run(int thread) {\r
1736 \r
1737                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1738 \r
1739                 ListenerBase listener = getListenerBase(procedure);\r
1740 \r
1741                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
1742 \r
1743                 try {\r
1744 \r
1745                     if (listener != null) {\r
1746 \r
1747                         newGraph.processor.query(newGraph, request, null, new Procedure<T>() {\r
1748 \r
1749                             @Override\r
1750                             public void exception(Throwable t) {\r
1751                                 procedure.exception(t);\r
1752                                 if(throwable != null) {\r
1753                                     throwable.set(t);\r
1754                                 }\r
1755                             }\r
1756 \r
1757                             @Override\r
1758                             public void execute(T t) {\r
1759                                 if(result != null) result.set(t);\r
1760                                 procedure.execute(t);\r
1761                             }\r
1762 \r
1763                         }, listener);\r
1764 \r
1765 //                        newGraph.waitAsync(request);\r
1766 \r
1767                     } else {\r
1768 \r
1769 //                        newGraph.state.barrier.inc();\r
1770 \r
1771                         request.register(newGraph, new Listener<T>() {\r
1772 \r
1773                             @Override\r
1774                             public void exception(Throwable t) {\r
1775                                 if(throwable != null) throwable.set(t);\r
1776                                 procedure.exception(t);\r
1777 //                                newGraph.state.barrier.dec();\r
1778                             }\r
1779 \r
1780                             @Override\r
1781                             public void execute(T t) {\r
1782                                 if(result != null) result.set(t);\r
1783                                 procedure.execute(t);\r
1784 //                                newGraph.state.barrier.dec();\r
1785                             }\r
1786 \r
1787                             @Override\r
1788                             public boolean isDisposed() {\r
1789                                 return true;\r
1790                             }\r
1791 \r
1792                         });\r
1793 \r
1794 //                        newGraph.waitAsync(request);\r
1795 \r
1796                     }\r
1797 \r
1798                 } finally {\r
1799 \r
1800                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1801 \r
1802                 }\r
1803 \r
1804             }\r
1805 \r
1806         });\r
1807 \r
1808     }\r
1809 \r
1810 \r
1811     @Override\r
1812     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {\r
1813 \r
1814         scheduleRequest(request, procedure, null, null, null);\r
1815 \r
1816     }\r
1817 \r
1818     @Override\r
1819     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {\r
1820         assertNotSession();\r
1821         Semaphore notify = new Semaphore(0);\r
1822         DataContainer<Throwable> container = new DataContainer<Throwable>();\r
1823         DataContainer<T> result = new DataContainer<T>();\r
1824         scheduleRequest(request, procedure, notify, container, result);\r
1825         acquire(notify, request);\r
1826         Throwable throwable = container.get();\r
1827         if(throwable != null) {\r
1828             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1829             else throw new DatabaseException("Unexpected exception", throwable);\r
1830         }\r
1831         return result.get();\r
1832     }\r
1833 \r
1834     @Override\r
1835     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {\r
1836         assertNotSession();\r
1837         Semaphore notify = new Semaphore(0);\r
1838         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();\r
1839         final DataContainer<T> resultContainer = new DataContainer<T>();\r
1840         scheduleRequest(request, new AsyncProcedure<T>() {\r
1841             @Override\r
1842             public void exception(AsyncReadGraph graph, Throwable throwable) {\r
1843                 exceptionContainer.set(throwable);\r
1844                 procedure.exception(graph, throwable);\r
1845             }\r
1846             @Override\r
1847             public void execute(AsyncReadGraph graph, T result) {\r
1848                 resultContainer.set(result);\r
1849                 procedure.execute(graph, result);\r
1850             }\r
1851         }, getListenerBase(procedure), notify);\r
1852         acquire(notify, request, procedure);\r
1853         Throwable throwable = exceptionContainer.get();\r
1854         if (throwable != null) {\r
1855             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1856             else throw new DatabaseException("Unexpected exception", throwable);\r
1857         }\r
1858         return resultContainer.get();\r
1859     }\r
1860 \r
1861 \r
1862     @Override\r
1863     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {\r
1864         assertNotSession();\r
1865         Semaphore notify = new Semaphore(0);\r
1866         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();\r
1867         scheduleRequest(request, new AsyncMultiProcedure<T>() {\r
1868             @Override\r
1869             public void exception(AsyncReadGraph graph, Throwable throwable) {\r
1870                 exceptionContainer.set(throwable);\r
1871                 procedure.exception(graph, throwable);\r
1872             }\r
1873             @Override\r
1874             public void execute(AsyncReadGraph graph, T result) {\r
1875                 procedure.execute(graph, result);\r
1876             }\r
1877             @Override\r
1878             public void finished(AsyncReadGraph graph) {\r
1879                 procedure.finished(graph);\r
1880             }\r
1881         }, notify);\r
1882         acquire(notify, request, procedure);\r
1883         Throwable throwable = exceptionContainer.get();\r
1884         if (throwable != null) {\r
1885             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1886             else throw new DatabaseException("Unexpected exception", throwable);\r
1887         }\r
1888         // TODO: implement return value\r
1889         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");\r
1890         return null;\r
1891     }\r
1892 \r
1893     @Override\r
1894     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {\r
1895         return syncRequest(request, new ProcedureAdapter<T>());\r
1896 \r
1897 \r
1898 //        assert(request != null);\r
1899 //\r
1900 //        final DataContainer<T> result = new DataContainer<T>();\r
1901 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
1902 //\r
1903 //        syncRequest(request, new Procedure<T>() {\r
1904 //\r
1905 //            @Override\r
1906 //            public void execute(T t) {\r
1907 //                result.set(t);\r
1908 //            }\r
1909 //\r
1910 //            @Override\r
1911 //            public void exception(Throwable t) {\r
1912 //                exception.set(t);\r
1913 //            }\r
1914 //\r
1915 //        });\r
1916 //\r
1917 //        Throwable t = exception.get();\r
1918 //        if(t != null) {\r
1919 //            if(t instanceof DatabaseException) throw (DatabaseException)t;\r
1920 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);\r
1921 //        }\r
1922 //\r
1923 //        return result.get();\r
1924 \r
1925     }\r
1926 \r
1927     @Override\r
1928     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {\r
1929         return syncRequest(request, (Procedure<T>)procedure);\r
1930     }\r
1931 \r
1932 \r
1933 //    @Override\r
1934 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {\r
1935 //        assertNotSession();\r
1936 //        Semaphore notify = new Semaphore(0);\r
1937 //        DataContainer<Throwable> container = new DataContainer<Throwable>();\r
1938 //        DataContainer<T> result = new DataContainer<T>();\r
1939 //        scheduleRequest(request, procedure, notify, container, result);\r
1940 //        acquire(notify, request);\r
1941 //        Throwable throwable = container.get();\r
1942 //        if(throwable != null) {\r
1943 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1944 //            else throw new DatabaseException("Unexpected exception", throwable);\r
1945 //        }\r
1946 //        return result.get();\r
1947 //    }\r
1948 \r
1949 \r
1950     @Override\r
1951     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {\r
1952         assertNotSession();\r
1953         Semaphore notify = new Semaphore(0);\r
1954         final DataContainer<Throwable> container = new DataContainer<Throwable>();\r
1955         final DataContainer<T> result = new DataContainer<T>();\r
1956         scheduleRequest(request, procedure, notify, container, result);\r
1957         acquire(notify, request);\r
1958         Throwable throwable = container.get();\r
1959         if (throwable != null) {\r
1960             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1961             else throw new DatabaseException("Unexpected exception", throwable);\r
1962         }\r
1963         return result.get();\r
1964     }\r
1965 \r
1966     @Override\r
1967     public void syncRequest(Write request) throws DatabaseException  {\r
1968         assertNotSession();\r
1969         assertAlive();\r
1970         Semaphore notify = new Semaphore(0);\r
1971         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();\r
1972         scheduleRequest(request, new Callback<DatabaseException>() {\r
1973 \r
1974             @Override\r
1975             public void run(DatabaseException e) {\r
1976               exception.set(e);\r
1977             }\r
1978 \r
1979         }, notify);\r
1980         acquire(notify, request);\r
1981         if(exception.get() != null) throw exception.get();\r
1982     }\r
1983 \r
1984     @Override\r
1985     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {\r
1986         assertNotSession();\r
1987         Semaphore notify = new Semaphore(0);\r
1988         final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
1989         final DataContainer<T> result = new DataContainer<T>();\r
1990         scheduleRequest(request, new Procedure<T>() {\r
1991 \r
1992             @Override\r
1993             public void exception(Throwable t) {\r
1994               exception.set(t);\r
1995             }\r
1996 \r
1997             @Override\r
1998             public void execute(T t) {\r
1999               result.set(t);\r
2000             }\r
2001 \r
2002         }, notify);\r
2003         acquire(notify, request);\r
2004         if(exception.get() != null) {\r
2005             Throwable t = exception.get();\r
2006             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
2007             else throw new DatabaseException(t);\r
2008         }\r
2009         return result.get();\r
2010     }\r
2011 \r
2012     @Override\r
2013     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {\r
2014         assertNotSession();\r
2015         Semaphore notify = new Semaphore(0);\r
2016         final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
2017         final DataContainer<T> result = new DataContainer<T>();\r
2018         scheduleRequest(request, new Procedure<T>() {\r
2019 \r
2020             @Override\r
2021             public void exception(Throwable t) {\r
2022               exception.set(t);\r
2023             }\r
2024 \r
2025             @Override\r
2026             public void execute(T t) {\r
2027               result.set(t);\r
2028             }\r
2029 \r
2030         }, notify);\r
2031         acquire(notify, request);\r
2032         if(exception.get() != null) {\r
2033             Throwable t = exception.get();\r
2034             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
2035             else throw new DatabaseException(t);\r
2036         }\r
2037         return result.get();\r
2038     }\r
2039 \r
2040     @Override\r
2041     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {\r
2042         assertNotSession();\r
2043         Semaphore notify = new Semaphore(0);\r
2044         final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
2045         final DataContainer<T> result = new DataContainer<T>();\r
2046         scheduleRequest(request, new Procedure<T>() {\r
2047 \r
2048             @Override\r
2049             public void exception(Throwable t) {\r
2050               exception.set(t);\r
2051             }\r
2052 \r
2053             @Override\r
2054             public void execute(T t) {\r
2055               result.set(t);\r
2056             }\r
2057 \r
2058         }, notify);\r
2059         acquire(notify, request);\r
2060         if(exception.get() != null) {\r
2061             Throwable t = exception.get();\r
2062             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
2063             else throw new DatabaseException(t);\r
2064         }\r
2065         return result.get();\r
2066     }\r
2067 \r
2068     @Override\r
2069     public void syncRequest(DelayedWrite request) throws DatabaseException  {\r
2070         assertNotSession();\r
2071         Semaphore notify = new Semaphore(0);\r
2072         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();\r
2073         scheduleRequest(request, new Callback<DatabaseException>() {\r
2074             @Override\r
2075             public void run(DatabaseException e) {\r
2076                 exception.set(e);\r
2077             }\r
2078         }, notify);\r
2079         acquire(notify, request);\r
2080         if(exception.get() != null) throw exception.get();\r
2081     }\r
2082 \r
2083     @Override\r
2084     public void syncRequest(WriteOnly request) throws DatabaseException  {\r
2085         assertNotSession();\r
2086         assertAlive();\r
2087         Semaphore notify = new Semaphore(0);\r
2088         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();\r
2089         scheduleRequest(request, new Callback<DatabaseException>() {\r
2090             @Override\r
2091             public void run(DatabaseException e) {\r
2092                 exception.set(e);\r
2093             }\r
2094         }, notify);\r
2095         acquire(notify, request);\r
2096         if(exception.get() != null) throw exception.get();\r
2097     }\r
2098 \r
2099     /*\r
2100      *\r
2101      * GraphRequestProcessor interface\r
2102      */\r
2103 \r
2104     @Override\r
2105     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {\r
2106 \r
2107         scheduleRequest(request, procedure, null, null);\r
2108 \r
2109     }\r
2110 \r
2111     @Override\r
2112     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {\r
2113 \r
2114         scheduleRequest(request, procedure, null);\r
2115 \r
2116     }\r
2117 \r
2118     @Override\r
2119     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {\r
2120 \r
2121         scheduleRequest(request, procedure, null, null, null);\r
2122 \r
2123     }\r
2124 \r
2125     @Override\r
2126     public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {\r
2127 \r
2128         scheduleRequest(request, callback, null);\r
2129 \r
2130     }\r
2131 \r
2132     @Override\r
2133     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {\r
2134 \r
2135         scheduleRequest(request, procedure, null);\r
2136 \r
2137     }\r
2138 \r
2139     @Override\r
2140     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {\r
2141 \r
2142         scheduleRequest(request, procedure, null);\r
2143 \r
2144     }\r
2145 \r
2146     @Override\r
2147     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {\r
2148 \r
2149         scheduleRequest(request, procedure, null);\r
2150 \r
2151     }\r
2152 \r
2153     @Override\r
2154     public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {\r
2155 \r
2156         scheduleRequest(request, callback, null);\r
2157 \r
2158     }\r
2159 \r
2160     @Override\r
2161     public void asyncRequest(final Write r) {\r
2162         asyncRequest(r, null);\r
2163     }\r
2164 \r
2165     @Override\r
2166     public void asyncRequest(final DelayedWrite r) {\r
2167         asyncRequest(r, null);\r
2168     }\r
2169 \r
2170     @Override\r
2171     public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {\r
2172 \r
2173         scheduleRequest(request, callback, null);\r
2174 \r
2175     }\r
2176 \r
2177     @Override\r
2178     public void asyncRequest(final WriteOnly request) {\r
2179 \r
2180         asyncRequest(request, null);\r
2181 \r
2182     }\r
2183 \r
2184     @Override\r
2185     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {\r
2186         r.request(this, procedure);\r
2187     }\r
2188 \r
2189     @Override\r
2190     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {\r
2191         r.request(this, procedure);\r
2192     }\r
2193 \r
2194     @Override\r
2195     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {\r
2196         r.request(this, procedure);\r
2197     }\r
2198 \r
2199     @Override\r
2200     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {\r
2201         r.request(this, procedure);\r
2202     }\r
2203 \r
2204     @Override\r
2205     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {\r
2206         r.request(this, procedure);\r
2207     }\r
2208 \r
2209     @Override\r
2210     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {\r
2211         r.request(this, procedure);\r
2212     }\r
2213 \r
2214     @Override\r
2215     public <T> T sync(ReadInterface<T> r) throws DatabaseException {\r
2216         return r.request(this);\r
2217     }\r
2218 \r
2219     @Override\r
2220     public <T> T sync(WriteInterface<T> r) throws DatabaseException {\r
2221         return r.request(this);\r
2222     }\r
2223 \r
2224     @Override\r
2225     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {\r
2226         r.request(this, procedure);\r
2227     }\r
2228 \r
2229     @Override\r
2230     public <T> void async(WriteInterface<T> r) {\r
2231         r.request(this, new ProcedureAdapter<T>());\r
2232     }\r
2233 \r
2234     //@Override\r
2235     public void incAsync() {\r
2236         state.incAsync();\r
2237     }\r
2238 \r
2239     //@Override\r
2240     public void decAsync() {\r
2241         state.decAsync();\r
2242     }\r
2243 \r
2244     public long getCluster(ResourceImpl resource) {\r
2245         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);\r
2246         return cluster.getClusterId();\r
2247     }\r
2248 \r
2249     public long getCluster(int id) {\r
2250         if (clusterTable == null)\r
2251             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");\r
2252         return clusterTable.getClusterIdByResourceKeyNoThrow(id);\r
2253     }\r
2254 \r
2255     public ResourceImpl getResource(int id) {\r
2256         return new ResourceImpl(resourceSupport, id);\r
2257     }\r
2258 \r
2259     public ResourceImpl getResource(int resourceIndex, long clusterId) {\r
2260         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));\r
2261         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);\r
2262         int key = proxy.getClusterKey();\r
2263         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);\r
2264         return new ResourceImpl(resourceSupport, resourceKey);\r
2265     }\r
2266 \r
2267     public ResourceImpl getResource2(int id) {\r
2268         assert (id != 0);\r
2269         return new ResourceImpl(resourceSupport, id);\r
2270     }\r
2271 \r
2272     final public int getId(ResourceImpl impl) {\r
2273         return impl.id;\r
2274     }\r
2275 \r
2276     public static final Charset UTF8 = Charset.forName("utf-8");\r
2277 \r
2278 \r
2279 \r
2280 \r
2281     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {\r
2282         for(TransientGraph g : support.providers) {\r
2283             if(g.isPending(subject)) return false;\r
2284         }\r
2285         return true;\r
2286     }\r
2287 \r
2288     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {\r
2289         for(TransientGraph g : support.providers) {\r
2290             if(g.isPending(subject, predicate)) return false;\r
2291         }\r
2292         return true;\r
2293     }\r
2294 \r
2295     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {\r
2296 \r
2297         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {\r
2298 \r
2299             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);\r
2300 \r
2301             @Override\r
2302             public void run(ReadGraphImpl graph) {\r
2303                 if(ready.decrementAndGet() == 0) {\r
2304                     runnable.run(graph);\r
2305                 }\r
2306             }\r
2307 \r
2308         };\r
2309 \r
2310         for(TransientGraph g : support.providers) {\r
2311             if(g.isPending(subject)) {\r
2312                 try {\r
2313                     g.load(graph, subject, composite);\r
2314                 } catch (DatabaseException e) {\r
2315                     e.printStackTrace();\r
2316                 }\r
2317             } else {\r
2318                 composite.run(graph);\r
2319             }\r
2320         }\r
2321 \r
2322         composite.run(graph);\r
2323 \r
2324     }\r
2325 \r
2326     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {\r
2327 \r
2328         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {\r
2329 \r
2330             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);\r
2331 \r
2332             @Override\r
2333             public void run(ReadGraphImpl graph) {\r
2334                 if(ready.decrementAndGet() == 0) {\r
2335                     runnable.run(graph);\r
2336                 }\r
2337             }\r
2338 \r
2339         };\r
2340 \r
2341         for(TransientGraph g : support.providers) {\r
2342             if(g.isPending(subject, predicate)) {\r
2343                 try {\r
2344                     g.load(graph, subject, predicate, composite);\r
2345                 } catch (DatabaseException e) {\r
2346                     e.printStackTrace();\r
2347                 }\r
2348             } else {\r
2349                 composite.run(graph);\r
2350             }\r
2351         }\r
2352 \r
2353         composite.run(graph);\r
2354 \r
2355     }\r
2356 \r
2357 //    void dumpHeap() {\r
2358 //\r
2359 //        try {\r
2360 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),\r
2361 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(\r
2362 //                    "d:/heap" + flushCounter + ".txt", true);\r
2363 //        } catch (IOException e) {\r
2364 //            e.printStackTrace();\r
2365 //        }\r
2366 //\r
2367 //    }\r
2368 \r
2369     void fireReactionsToSynchronize(ChangeSet cs) {\r
2370 \r
2371         // Do not fire empty events\r
2372         if (cs.isEmpty())\r
2373             return;\r
2374 \r
2375         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());\r
2376 //        g.state.barrier.inc();\r
2377 \r
2378         try {\r
2379 \r
2380             if (!cs.isEmpty()) {\r
2381                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);\r
2382                 for (ChangeListener l : changeListeners2) {\r
2383                     try {\r
2384                         l.graphChanged(e2);\r
2385                     } catch (Exception ex) {\r
2386                         ex.printStackTrace();\r
2387                     }\r
2388                 }\r
2389             }\r
2390 \r
2391         } finally {\r
2392 \r
2393 //            g.state.barrier.dec();\r
2394 //            g.waitAsync(null);\r
2395 \r
2396         }\r
2397 \r
2398     }\r
2399 \r
2400     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {\r
2401         try {\r
2402 \r
2403             // Do not fire empty events\r
2404             if (cs2.isEmpty())\r
2405                 return;\r
2406 \r
2407 //            graph.restart();\r
2408 //            graph.state.barrier.inc();\r
2409 \r
2410             try {\r
2411 \r
2412                 if (!cs2.isEmpty()) {\r
2413 \r
2414                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);\r
2415                     for (ChangeListener l : changeListeners2) {\r
2416                         try {\r
2417                             l.graphChanged(e2);\r
2418 //                            System.out.println("changelistener " + l);\r
2419                         } catch (Exception ex) {\r
2420                             ex.printStackTrace();\r
2421                         }\r
2422                     }\r
2423 \r
2424                 }\r
2425 \r
2426             } finally {\r
2427 \r
2428 //                graph.state.barrier.dec();\r
2429 //                graph.waitAsync(null);\r
2430 \r
2431             }\r
2432         } catch (Throwable t) {\r
2433             t.printStackTrace();\r
2434 \r
2435         }\r
2436     }\r
2437     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {\r
2438 \r
2439         try {\r
2440 \r
2441             // Do not fire empty events\r
2442             if (cs2.isEmpty())\r
2443                 return;\r
2444 \r
2445             // Do not fire on virtual requests\r
2446             if(graph.getProvider() != null)\r
2447                 return;\r
2448 \r
2449             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);\r
2450 \r
2451             try {\r
2452 \r
2453                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);\r
2454                 for (ChangeListener l : metadataListeners) {\r
2455                     try {\r
2456                         l.graphChanged(e2);\r
2457                     } catch (Throwable ex) {\r
2458                         ex.printStackTrace();\r
2459                     }\r
2460                 }\r
2461 \r
2462             } finally {\r
2463 \r
2464             }\r
2465 \r
2466         } catch (Throwable t) {\r
2467             t.printStackTrace();\r
2468         }\r
2469 \r
2470     }\r
2471 \r
2472 \r
2473     /*\r
2474      * (non-Javadoc)\r
2475      *\r
2476      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)\r
2477      */\r
2478     @Override\r