9efb0fc38c8eab1f7414437c40df37b5918267f4
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package fi.vtt.simantics.procore.internal;\r
13 \r
14 import java.io.File;\r
15 import java.io.IOException;\r
16 import java.io.InputStream;\r
17 import java.nio.charset.Charset;\r
18 import java.nio.file.FileVisitResult;\r
19 import java.nio.file.Files;\r
20 import java.nio.file.Path;\r
21 import java.nio.file.SimpleFileVisitor;\r
22 import java.nio.file.attribute.BasicFileAttributes;\r
23 import java.util.ArrayList;\r
24 import java.util.Collection;\r
25 import java.util.HashSet;\r
26 import java.util.TreeMap;\r
27 import java.util.concurrent.CopyOnWriteArrayList;\r
28 import java.util.concurrent.Semaphore;\r
29 import java.util.concurrent.atomic.AtomicInteger;\r
30 \r
31 import org.eclipse.core.runtime.Platform;\r
32 import org.simantics.databoard.Bindings;\r
33 import org.simantics.db.AsyncReadGraph;\r
34 import org.simantics.db.ChangeSet;\r
35 import org.simantics.db.DevelopmentKeys;\r
36 import org.simantics.db.ExternalValueSupport;\r
37 import org.simantics.db.Metadata;\r
38 import org.simantics.db.MonitorContext;\r
39 import org.simantics.db.MonitorHandler;\r
40 import org.simantics.db.Resource;\r
41 import org.simantics.db.ResourceSerializer;\r
42 import org.simantics.db.Session;\r
43 import org.simantics.db.SessionManager;\r
44 import org.simantics.db.SessionVariables;\r
45 import org.simantics.db.VirtualGraph;\r
46 import org.simantics.db.WriteGraph;\r
47 import org.simantics.db.authentication.UserAuthenticationAgent;\r
48 import org.simantics.db.authentication.UserAuthenticator;\r
49 import org.simantics.db.common.Indexing;\r
50 import org.simantics.db.common.TransactionPolicyRelease;\r
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;\r
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;\r
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;\r
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;\r
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;\r
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;\r
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;\r
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;\r
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;\r
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;\r
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;\r
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;\r
63 import org.simantics.db.common.utils.Logger;\r
64 import org.simantics.db.event.ChangeEvent;\r
65 import org.simantics.db.event.ChangeListener;\r
66 import org.simantics.db.event.SessionEventListener;\r
67 import org.simantics.db.exception.CancelTransactionException;\r
68 import org.simantics.db.exception.ClusterSetExistException;\r
69 import org.simantics.db.exception.DatabaseException;\r
70 import org.simantics.db.exception.ImmutableException;\r
71 import org.simantics.db.exception.InvalidResourceReferenceException;\r
72 import org.simantics.db.exception.ResourceNotFoundException;\r
73 import org.simantics.db.exception.RuntimeDatabaseException;\r
74 import org.simantics.db.exception.ServiceException;\r
75 import org.simantics.db.exception.ServiceNotFoundException;\r
76 import org.simantics.db.impl.ClusterBase;\r
77 import org.simantics.db.impl.ClusterI;\r
78 import org.simantics.db.impl.ClusterTraitsBase;\r
79 import org.simantics.db.impl.ClusterTranslator;\r
80 import org.simantics.db.impl.ResourceImpl;\r
81 import org.simantics.db.impl.TransientGraph;\r
82 import org.simantics.db.impl.VirtualGraphImpl;\r
83 import org.simantics.db.impl.graph.DelayedWriteGraph;\r
84 import org.simantics.db.impl.graph.ReadGraphImpl;\r
85 import org.simantics.db.impl.graph.WriteGraphImpl;\r
86 import org.simantics.db.impl.graph.WriteSupport;\r
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;\r
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;\r
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;\r
90 import org.simantics.db.impl.query.QueryProcessor;\r
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;\r
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;\r
93 import org.simantics.db.impl.service.QueryDebug;\r
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;\r
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;\r
96 import org.simantics.db.procedure.AsyncListener;\r
97 import org.simantics.db.procedure.AsyncMultiListener;\r
98 import org.simantics.db.procedure.AsyncMultiProcedure;\r
99 import org.simantics.db.procedure.AsyncProcedure;\r
100 import org.simantics.db.procedure.Listener;\r
101 import org.simantics.db.procedure.ListenerBase;\r
102 import org.simantics.db.procedure.MultiListener;\r
103 import org.simantics.db.procedure.MultiProcedure;\r
104 import org.simantics.db.procedure.Procedure;\r
105 import org.simantics.db.procedure.SyncListener;\r
106 import org.simantics.db.procedure.SyncMultiListener;\r
107 import org.simantics.db.procedure.SyncMultiProcedure;\r
108 import org.simantics.db.procedure.SyncProcedure;\r
109 import org.simantics.db.procore.cluster.ClusterImpl;\r
110 import org.simantics.db.procore.cluster.ClusterTraits;\r
111 import org.simantics.db.procore.protocol.Constants;\r
112 import org.simantics.db.procore.protocol.DebugPolicy;\r
113 import org.simantics.db.request.AsyncMultiRead;\r
114 import org.simantics.db.request.AsyncRead;\r
115 import org.simantics.db.request.DelayedWrite;\r
116 import org.simantics.db.request.DelayedWriteResult;\r
117 import org.simantics.db.request.ExternalRead;\r
118 import org.simantics.db.request.MultiRead;\r
119 import org.simantics.db.request.Read;\r
120 import org.simantics.db.request.ReadInterface;\r
121 import org.simantics.db.request.UndoTraits;\r
122 import org.simantics.db.request.Write;\r
123 import org.simantics.db.request.WriteInterface;\r
124 import org.simantics.db.request.WriteOnly;\r
125 import org.simantics.db.request.WriteOnlyResult;\r
126 import org.simantics.db.request.WriteResult;\r
127 import org.simantics.db.request.WriteTraits;\r
128 import org.simantics.db.service.ByteReader;\r
129 import org.simantics.db.service.ClusterBuilder;\r
130 import org.simantics.db.service.ClusterBuilderFactory;\r
131 import org.simantics.db.service.ClusterControl;\r
132 import org.simantics.db.service.ClusterSetsSupport;\r
133 import org.simantics.db.service.ClusterUID;\r
134 import org.simantics.db.service.ClusteringSupport;\r
135 import org.simantics.db.service.CollectionSupport;\r
136 import org.simantics.db.service.DebugSupport;\r
137 import org.simantics.db.service.DirectQuerySupport;\r
138 import org.simantics.db.service.GraphChangeListenerSupport;\r
139 import org.simantics.db.service.InitSupport;\r
140 import org.simantics.db.service.LifecycleSupport;\r
141 import org.simantics.db.service.ManagementSupport;\r
142 import org.simantics.db.service.QueryControl;\r
143 import org.simantics.db.service.SerialisationSupport;\r
144 import org.simantics.db.service.ServerInformation;\r
145 import org.simantics.db.service.ServiceActivityMonitor;\r
146 import org.simantics.db.service.SessionEventSupport;\r
147 import org.simantics.db.service.SessionMonitorSupport;\r
148 import org.simantics.db.service.SessionUserSupport;\r
149 import org.simantics.db.service.StatementSupport;\r
150 import org.simantics.db.service.TransactionPolicySupport;\r
151 import org.simantics.db.service.TransactionSupport;\r
152 import org.simantics.db.service.TransferableGraphSupport;\r
153 import org.simantics.db.service.UndoRedoSupport;\r
154 import org.simantics.db.service.VirtualGraphSupport;\r
155 import org.simantics.db.service.XSupport;\r
156 import org.simantics.layer0.Layer0;\r
157 import org.simantics.utils.DataContainer;\r
158 import org.simantics.utils.Development;\r
159 import org.simantics.utils.datastructures.Callback;\r
160 import org.simantics.utils.threads.logger.ITask;\r
161 import org.simantics.utils.threads.logger.ThreadLogger;\r
162 \r
163 import gnu.trove.procedure.TLongProcedure;\r
164 import gnu.trove.set.hash.TLongHashSet;\r
165 \r
166 \r
167 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {\r
168 \r
169     protected static final boolean DEBUG        = false;\r
170 \r
171     private static final boolean DIAGNOSTICS       = false;\r
172 \r
173     private TransactionPolicySupport transactionPolicy;\r
174 \r
175     final private ClusterControl clusterControl;\r
176 \r
177     final protected BuiltinSupportImpl builtinSupport;\r
178     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;\r
179     final protected ClusterSetsSupport clusterSetsSupport;\r
180     final protected LifecycleSupportImpl lifecycleSupport;\r
181 \r
182     protected QuerySupportImpl querySupport;\r
183     protected ResourceSupportImpl resourceSupport;\r
184     protected WriteSupport writeSupport;\r
185     public ClusterTranslator clusterTranslator;\r
186 \r
187     boolean dirtyPrimitives = false;\r
188 \r
189     public static final int SERVICE_MODE_CREATE = 2;\r
190     public static final int SERVICE_MODE_ALLOW = 1;\r
191     public int serviceMode = 0;\r
192     public boolean createdImmutableClusters = false;\r
193     public TLongHashSet createdClusters = new TLongHashSet();\r
194 \r
195     private Layer0 L0;\r
196 \r
197     /**\r
198      * The service locator maintained by the workbench. These services are\r
199      * initialized during workbench during the <code>init</code> method.\r
200      */\r
201     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();\r
202 \r
203     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();\r
204 \r
205     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();\r
206 \r
207     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();\r
208 \r
209     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();\r
210 \r
211     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();\r
212 \r
213     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();\r
214 \r
215     final protected State                                        state              = new State();\r
216 \r
217     protected GraphSession                                       graphSession       = null;\r
218 \r
219     protected SessionManager                                     sessionManagerImpl = null;\r
220 \r
221     protected UserAuthenticationAgent                            authAgent          = null;\r
222 \r
223     protected UserAuthenticator                                  authenticator      = null;\r
224 \r
225     protected Resource                                           user               = null;\r
226 \r
227     protected ClusterStream                                      clusterStream      = null;\r
228 \r
229     protected SessionRequestManager                         requestManager = null;\r
230 \r
231     public ClusterTable                                       clusterTable       = null;\r
232 \r
233     public QueryProcessor                                     queryProvider2 = null;\r
234 \r
235     ClientChangesImpl                                  clientChanges      = null;\r
236 \r
237     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];\r
238 \r
239 //    protected long                                               newClusterId       = Constants.NullClusterId;\r
240 \r
241     protected int                                                flushCounter       = 0;\r
242 \r
243     protected boolean                                            writeOnly          = false;\r
244 \r
245     WriteState<?>                                                writeState = null;\r
246     WriteStateBase<?>                                            delayedWriteState = null;\r
247     protected Resource defaultClusterSet = null; // If not null then used for newResource().\r
248 \r
249     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {\r
250 \r
251 //        if (authAgent == null)\r
252 //            throw new IllegalArgumentException("null authentication agent");\r
253 \r
254         File t = StaticSessionProperties.virtualGraphStoragePath;\r
255         if (null == t)\r
256             t = new File(".");\r
257         this.clusterTable = new ClusterTable(this, t);\r
258         this.builtinSupport = new BuiltinSupportImpl(this);\r
259         this.sessionManagerImpl = sessionManagerImpl;\r
260         this.user = null;\r
261 //        this.authAgent = authAgent;\r
262 \r
263         serviceLocator.registerService(Session.class, this);\r
264         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));\r
265         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));\r
266         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));\r
267         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));\r
268         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));\r
269         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));\r
270         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));\r
271         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));\r
272         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));\r
273         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));\r
274         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));\r
275         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));\r
276         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));\r
277         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));\r
278         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));\r
279         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));\r
280         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));\r
281         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());\r
282         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));\r
283         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());\r
284         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());\r
285         ServiceActivityUpdaterForWriteTransactions.register(this);\r
286 \r
287         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);\r
288         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);\r
289         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);\r
290         this.lifecycleSupport = new LifecycleSupportImpl(this);\r
291         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);\r
292         this.transactionPolicy = new TransactionPolicyRelease();\r
293         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);\r
294         this.clusterControl = new ClusterControlImpl(this);\r
295         serviceLocator.registerService(ClusterControl.class, clusterControl);\r
296         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.\r
297         this.clusterSetsSupport.updateReadAndWriteDirectories(t.toPath(), t.toPath());\r
298         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);\r
299     }\r
300 \r
301     /*\r
302      *\r
303      * Session interface\r
304      */\r
305 \r
306 \r
307     @Override\r
308     public Resource getRootLibrary() {\r
309         return queryProvider2.getRootLibraryResource();\r
310     }\r
311 \r
312     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {\r
313         if (!graphSession.dbSession.refreshEnabled())\r
314             return;\r
315         try {\r
316             getClusterTable().refresh(csid, this, clusterUID);\r
317         } catch (Throwable t) {\r
318             Logger.defaultLogError("Refesh failed.", t);\r
319         }\r
320     }\r
321 \r
322     static final class TaskHelper {\r
323         private final String name;\r
324         private Object result;\r
325         final Semaphore sema = new Semaphore(0);\r
326         private Throwable throwable = null;\r
327         final Callback<DatabaseException> callback = new Callback<DatabaseException>() {\r
328             @Override\r
329             public void run(DatabaseException e) {\r
330                 synchronized (TaskHelper.this) {\r
331                     throwable = e;\r
332                 }\r
333             }\r
334         };\r
335         final Procedure<Object> proc = new Procedure<Object>() {\r
336             @Override\r
337             public void execute(Object result) {\r
338                 callback.run(null);\r
339             }\r
340             @Override\r
341             public void exception(Throwable t) {\r
342                 if (t instanceof DatabaseException)\r
343                     callback.run((DatabaseException)t);\r
344                 else\r
345                     callback.run(new DatabaseException("" + name + "operation failed.", t));\r
346             }\r
347         };\r
348         final WriteTraits writeTraits = new WriteTraits() {\r
349             @Override\r
350             public UndoTraits getUndoTraits() {\r
351                 return null;\r
352             }\r
353             @Override\r
354             public VirtualGraph getProvider() {\r
355                 return null;\r
356             }\r
357         };\r
358         TaskHelper(String name) {\r
359             this.name = name;\r
360         }\r
361         <T> T getResult() {\r
362             return (T)result;\r
363         }\r
364         void setResult(Object result) {\r
365             this.result = result;\r
366         }\r
367 //        Throwable throwableGet() {\r
368 //            return throwable;\r
369 //        }\r
370         synchronized void throwableSet(Throwable t) {\r
371             throwable = t;\r
372         }\r
373 //        void throwableSet(String t) {\r
374 //            throwable = new InternalException("" + name + " operation failed. " + t);\r
375 //        }\r
376         synchronized void throwableCheck()\r
377         throws DatabaseException {\r
378             if (null != throwable)\r
379                 if (throwable instanceof DatabaseException)\r
380                     throw (DatabaseException)throwable;\r
381                 else\r
382                     throw new DatabaseException("Undo operation failed.", throwable);\r
383         }\r
384         void throw_(String message)\r
385         throws DatabaseException {\r
386             throw new DatabaseException("" + name + " operation failed. " + message);\r
387         }\r
388     }\r
389 \r
390 \r
391 \r
392     private ListenerBase getListenerBase(Object procedure) {\r
393         if (procedure instanceof ListenerBase)\r
394             return (ListenerBase) procedure;\r
395         else\r
396             return null;\r
397     }\r
398 \r
399     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {\r
400         scheduleRequest(request, callback, notify, null);\r
401     }\r
402 \r
403     /* (non-Javadoc)\r
404      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)\r
405      */\r
406     @Override\r
407     public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {\r
408 \r
409         assert (request != null);\r
410 \r
411         if(Development.DEVELOPMENT) {\r
412             try {\r
413             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))\r
414                 System.err.println("schedule write '" + request + "'");\r
415             } catch (Throwable t) {\r
416                 Logger.defaultLogError(t);\r
417             }\r
418         }\r
419 \r
420         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
421 \r
422         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {\r
423 \r
424             @Override\r
425             public void run(int thread) {\r
426 \r
427                 if(Development.DEVELOPMENT) {\r
428                     try {\r
429                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))\r
430                         System.err.println("perform write '" + request + "'");\r
431                     } catch (Throwable t) {\r
432                         Logger.defaultLogError(t);\r
433                     }\r
434                 }\r
435 \r
436                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
437 \r
438                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
439 \r
440                 try {\r
441 \r
442                     flushCounter = 0;\r
443                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
444 \r
445                     VirtualGraph vg = getProvider(request.getProvider());\r
446 \r
447                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
448                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {\r
449 \r
450                         @Override\r
451                         public void execute(Object result) {\r
452                             if(callback != null) callback.run(null);\r
453                         }\r
454 \r
455                         @Override\r
456                         public void exception(Throwable t) {\r
457                             if(callback != null) callback.run((DatabaseException)t);\r
458                         }\r
459 \r
460                     });\r
461 \r
462                     assert (null != writer);\r
463 //                    writer.state.barrier.inc();\r
464 \r
465                     try {\r
466                         request.perform(writer);\r
467                         assert (null != writer);\r
468                     } catch (Throwable t) {\r
469                         if (!(t instanceof CancelTransactionException))\r
470                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);\r
471                         writeState.except(t);\r
472                     } finally {\r
473 //                        writer.state.barrier.dec();\r
474 //                        writer.waitAsync(request);\r
475                     }\r
476 \r
477 \r
478                     assert(!queryProvider2.dirty);\r
479 \r
480                 } catch (Throwable e) {\r
481 \r
482                     // Log it first, just to be safe that the error is always logged.\r
483                     if (!(e instanceof CancelTransactionException))\r
484                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);\r
485 \r
486 //                    writeState.getGraph().state.barrier.dec();\r
487 //                    writeState.getGraph().waitAsync(request);\r
488 \r
489                     writeState.except(e);\r
490 \r
491 //                    try {\r
492 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.\r
493 //                        // All we can do here is to log those, can't really pass them anywhere.\r
494 //                        if (callback != null) {\r
495 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);\r
496 //                            else callback.run(new DatabaseException(e));\r
497 //                        }\r
498 //                    } catch (Throwable e2) {\r
499 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);\r
500 //                    }\r
501 //                    boolean empty = clusterStream.reallyFlush();\r
502 //                    int callerThread = -1;\r
503 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);\r
504 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);\r
505 //                    try {\r
506 //                        // Send all local changes to server so it can calculate correct reverse change set.\r
507 //                        // This will call clusterStream.accept().\r
508 //                        state.cancelCommit(context, clusterStream);\r
509 //                        if (!empty) {\r
510 //                            if (!context.isOk()) // this is a blocking operation\r
511 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");\r
512 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());\r
513 //                        }\r
514 //                        state.cancelCommit2(context, clusterStream);\r
515 //                    } catch (DatabaseException e3) {\r
516 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);\r
517 //                    } finally {\r
518 //                        clusterStream.setOff(false);\r
519 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
520 //                    }\r
521 \r
522                 } finally {\r
523 \r
524                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
525 \r
526                 }\r
527 \r
528                 task.finish();\r
529 \r
530                 if(Development.DEVELOPMENT) {\r
531                     try {\r
532                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))\r
533                         System.err.println("finish write '" + request + "'");\r
534                     } catch (Throwable t) {\r
535                         Logger.defaultLogError(t);\r
536                     }\r
537                 }\r
538 \r
539             }\r
540 \r
541         }, combine);\r
542 \r
543     }\r
544 \r
545     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {\r
546         scheduleRequest(request, procedure, notify, null);\r
547     }\r
548 \r
549     /* (non-Javadoc)\r
550      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)\r
551      */\r
552     @Override\r
553     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {\r
554 \r
555         assert (request != null);\r
556 \r
557         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
558 \r
559         requestManager.scheduleWrite(new SessionTask(request, thread) {\r
560 \r
561             @Override\r
562             public void run(int thread) {\r
563 \r
564                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
565 \r
566                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
567 \r
568                 flushCounter = 0;\r
569                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
570 \r
571                 VirtualGraph vg = getProvider(request.getProvider());\r
572                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
573 \r
574                 try {\r
575                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);\r
576                     writeState = writeStateT;\r
577 \r
578                     assert (null != writer);\r
579 //                    writer.state.barrier.inc();\r
580                     writeStateT.setResult(request.perform(writer));\r
581                     assert (null != writer);\r
582 \r
583 //                    writer.state.barrier.dec();\r
584 //                    writer.waitAsync(null);\r
585 \r
586                 } catch (Throwable e) {\r
587 \r
588 //                    writer.state.barrier.dec();\r
589 //                    writer.waitAsync(null);\r
590 \r
591                     writeState.except(e);\r
592 \r
593 //                  state.stopWriteTransaction(clusterStream);\r
594 //\r
595 //              } catch (Throwable e) {\r
596 //                  // Log it first, just to be safe that the error is always logged.\r
597 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);\r
598 //\r
599 //                  try {\r
600 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.\r
601 //                      // All we can do here is to log those, can't really pass them anywhere.\r
602 //                      if (procedure != null) {\r
603 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);\r
604 //                          else procedure.exception(new DatabaseException(e));\r
605 //                      }\r
606 //                  } catch (Throwable e2) {\r
607 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);\r
608 //                  }\r
609 //\r
610 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
611 //\r
612 //                  state.stopWriteTransaction(clusterStream);\r
613 \r
614                 } finally {\r
615                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
616                 }\r
617 \r
618 //              if(notify != null) notify.release();\r
619 \r
620                 task.finish();\r
621 \r
622             }\r
623 \r
624         }, combine);\r
625 \r
626     }\r
627 \r
628     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {\r
629         scheduleRequest(request, callback, notify, null);\r
630     }\r
631 \r
632     /* (non-Javadoc)\r
633      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)\r
634      */\r
635     @Override\r
636     public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {\r
637 \r
638         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");\r
639 \r
640         assert (request != null);\r
641 \r
642         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
643 \r
644         requestManager.scheduleWrite(new SessionTask(request, thread) {\r
645 \r
646             @Override\r
647             public void run(int thread) {\r
648                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
649 \r
650                 Procedure<Object> stateProcedure = new Procedure<Object>() {\r
651                     @Override\r
652                     public void execute(Object result) {\r
653                         if (callback != null)\r
654                             callback.run(null);\r
655                     }\r
656                     @Override\r
657                     public void exception(Throwable t) {\r
658                         if (callback != null) {\r
659                             if (t instanceof DatabaseException) callback.run((DatabaseException) t);\r
660                             else callback.run(new DatabaseException(t));\r
661                         } else\r
662                             Logger.defaultLogError("Unhandled exception", t);\r
663                     }\r
664                 };\r
665 \r
666                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
667                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);\r
668                 DelayedWriteGraph dwg = null;\r
669 //                newGraph.state.barrier.inc();\r
670 \r
671                 try {\r
672                     dwg = new DelayedWriteGraph(newGraph);\r
673                     request.perform(dwg);\r
674                 } catch (Throwable e) {\r
675                     delayedWriteState.except(e);\r
676                     total.finish();\r
677                     return;\r
678                 } finally {\r
679 //                    newGraph.state.barrier.dec();\r
680 //                    newGraph.waitAsync(request);\r
681                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
682                 }\r
683 \r
684                 delayedWriteState = null;\r
685 \r
686                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");\r
687                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
688 \r
689                 flushCounter = 0;\r
690                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
691 \r
692                 acquireWriteOnly();\r
693 \r
694                 VirtualGraph vg = getProvider(request.getProvider());\r
695                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();\r
696 \r
697                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
698 \r
699                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);\r
700 \r
701                 assert (null != writer);\r
702 //                writer.state.barrier.inc();\r
703 \r
704                 try {\r
705                     // Cannot cancel\r
706                     dwg.commit(writer, request);\r
707 \r
708                         if(defaultClusterSet != null) {\r
709                                 XSupport xs = getService(XSupport.class);\r
710                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);\r
711                                 ClusteringSupport cs = getService(ClusteringSupport.class);\r
712                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));\r
713                         }\r
714 \r
715                     // This makes clusters available from server\r
716                     clusterStream.reallyFlush();\r
717 \r
718                     releaseWriteOnly(writer);\r
719                     handleUpdatesAndMetadata(writer);\r
720 \r
721                 } catch (ServiceException e) {\r
722 //                    writer.state.barrier.dec();\r
723 //                    writer.waitAsync(null);\r
724 \r
725                     // These shall be requested from server\r
726                     clusterTable.removeWriteOnlyClusters();\r
727                     // This makes clusters available from server\r
728                     clusterStream.reallyFlush();\r
729 \r
730                     releaseWriteOnly(writer);\r
731                     writeState.except(e);\r
732                 } finally {\r
733                     // Debugging & Profiling\r
734                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
735                     task2.finish();\r
736                     total.finish();\r
737                 }\r
738             }\r
739 \r
740         }, combine);\r
741 \r
742     }\r
743 \r
744     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {\r
745         scheduleRequest(request, procedure, notify, null);\r
746     }\r
747 \r
748     /* (non-Javadoc)\r
749      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)\r
750      */\r
751     @Override\r
752     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {\r
753         throw new Error("Not implemented");\r
754     }\r
755 \r
756     protected ClusterImpl getNewResourceCluster() throws DatabaseException {\r
757         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);\r
758         if((serviceMode & SERVICE_MODE_CREATE) > 0) {\r
759             createdClusters.add(cluster.clusterId);\r
760         }\r
761         return cluster;\r
762     }\r
763 \r
764     class WriteOnlySupport implements WriteSupport {\r
765 \r
766         ClusterStream stream;\r
767         ClusterImpl currentCluster;\r
768 \r
769         public WriteOnlySupport() {\r
770             this.stream = clusterStream;\r
771         }\r
772 \r
773         @Override\r
774         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {\r
775             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );\r
776         }\r
777 \r
778         @Override\r
779         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {\r
780 \r
781             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);\r
782 \r
783             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)\r
784                 if(s != queryProvider2.getRootLibrary())\r
785                     throw new ImmutableException("Trying to modify immutable resource key=" + s);\r
786 \r
787             try {\r
788                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));\r
789             } catch (DatabaseException e) {\r
790                 Logger.defaultLogError(e);\r
791                 //throw e;\r
792             }\r
793 \r
794             clientChanges.invalidate(s);\r
795 \r
796             if (cluster.isWriteOnly())\r
797                 return;\r
798             queryProvider2.updateStatements(s, p);\r
799 \r
800         }\r
801 \r
802         @Override\r
803         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {\r
804             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);\r
805         }\r
806 \r
807         @Override\r
808         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {\r
809 \r
810             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);\r
811             try {\r
812                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));\r
813             } catch (DatabaseException e) {\r
814                 Logger.defaultLogError(e);\r
815             }\r
816 \r
817             clientChanges.invalidate(rid);\r
818 \r
819             if (cluster.isWriteOnly())\r
820                 return;\r
821             queryProvider2.updateValue(rid);\r
822 \r
823         }\r
824 \r
825         @Override\r
826         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {\r
827 \r
828             if(amount < 65536) {\r
829                 claimValue(provider,resource, reader.readBytes(null, amount));\r
830                 return;\r
831             }\r
832 \r
833             byte[] bytes = new byte[65536];\r
834 \r
835             int rid = ((ResourceImpl)resource).id;\r
836             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);\r
837             try {\r
838                 int left = amount;\r
839                 while(left > 0) {\r
840                     int block = Math.min(left, 65536);\r
841                     reader.readBytes(bytes, block);\r
842                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));\r
843                     left -= block;\r
844                 }\r
845             } catch (DatabaseException e) {\r
846                 Logger.defaultLogError(e);\r
847             }\r
848 \r
849             clientChanges.invalidate(rid);\r
850 \r
851             if (cluster.isWriteOnly())\r
852                 return;\r
853             queryProvider2.updateValue(rid);\r
854 \r
855         }\r
856 \r
857         private void maintainCluster(ClusterImpl before, ClusterI after_) {\r
858             if(after_ != null && after_ != before) {\r
859                 ClusterImpl after = (ClusterImpl)after_;\r
860                 if(currentCluster == before) currentCluster = after;\r
861                 clusterTable.replaceCluster(after);\r
862             }\r
863         }\r
864 \r
865         public int createResourceKey(int foreignCounter) throws DatabaseException {\r
866             if(currentCluster == null)\r
867                 currentCluster = getNewResourceCluster();\r
868             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {\r
869                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();\r
870                 newCluster.foreignLookup = new byte[foreignCounter];\r
871                 currentCluster = newCluster;\r
872                 if (DEBUG)\r
873                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);\r
874             }\r
875             return currentCluster.createResource(clusterTranslator);\r
876         }\r
877 \r
878         @Override\r
879         public Resource createResource(VirtualGraph provider) throws DatabaseException {\r
880             if(currentCluster == null) {\r
881                 if (null != defaultClusterSet) {\r
882                         ResourceImpl result = getNewResource(defaultClusterSet);\r
883                     currentCluster = clusterTable.getClusterByResourceKey(result.id);\r
884                     return result;\r
885                 } else {\r
886                     currentCluster = getNewResourceCluster();\r
887                 }\r
888             }\r
889             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {\r
890                 if (null != defaultClusterSet) {\r
891                         ResourceImpl result = getNewResource(defaultClusterSet);\r
892                     currentCluster = clusterTable.getClusterByResourceKey(result.id);\r
893                     return result;\r
894                 } else {\r
895                     currentCluster = getNewResourceCluster();\r
896                 }\r
897             }\r
898             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));\r
899         }\r
900 \r
901         @Override\r
902         public Resource createResource(VirtualGraph provider, long clusterId)\r
903         throws DatabaseException {\r
904             return getNewResource(clusterId);\r
905         }\r
906 \r
907         @Override\r
908         public Resource createResource(VirtualGraph provider, Resource clusterSet)\r
909         throws DatabaseException {\r
910             return getNewResource(clusterSet);\r
911         }\r
912 \r
913         @Override\r
914         public void createClusterSet(VirtualGraph provider, Resource clusterSet)\r
915                 throws DatabaseException {\r
916             getNewClusterSet(clusterSet);\r
917         }\r
918 \r
919         @Override\r
920         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)\r
921         throws ServiceException {\r
922             return containsClusterSet(clusterSet);\r
923         }\r
924 \r
925         public void selectCluster(long cluster) {\r
926                 currentCluster = clusterTable.getClusterByClusterId(cluster);\r
927                 long setResourceId = clusterSetsSupport.getSet(cluster);\r
928                 clusterSetsSupport.put(setResourceId, cluster);\r
929         }\r
930 \r
931         @Override\r
932         public Resource setDefaultClusterSet(Resource clusterSet)\r
933         throws ServiceException {\r
934                 Resource result = setDefaultClusterSet4NewResource(clusterSet);\r
935                 if(clusterSet != null) {\r
936                         long id = clusterSetsSupport.get(clusterSet.getResourceId());\r
937                         currentCluster = clusterTable.getClusterByClusterId(id);\r
938                         return result;\r
939                 } else {\r
940                         currentCluster = null;\r
941                         return null;\r
942                 }\r
943         }\r
944 \r
945         @Override\r
946         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {\r
947 \r
948                  provider = getProvider(provider);\r
949              if (null == provider) {\r
950                  int key = ((ResourceImpl)resource).id;\r
951 \r
952 \r
953 \r
954                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);\r
955 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)\r
956 //                      if(key != queryProvider2.getRootLibrary())\r
957 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);\r
958 \r
959 //                 try {\r
960 //                     cluster.removeValue(key, clusterTranslator);\r
961 //                 } catch (DatabaseException e) {\r
962 //                     Logger.defaultLogError(e);\r
963 //                     return;\r
964 //                 }\r
965 \r
966                  clusterTable.writeOnlyInvalidate(cluster);\r
967 \r
968                  try {\r
969                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);\r
970                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);\r
971                          clusterTranslator.removeValue(cluster);\r
972                  } catch (DatabaseException e) {\r
973                          Logger.defaultLogError(e);\r
974                  }\r
975 \r
976                  queryProvider2.invalidateResource(key);\r
977                  clientChanges.invalidate(key);\r
978 \r
979              } else {\r
980                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);\r
981                  queryProvider2.updateValue(querySupport.getId(resource));\r
982                  clientChanges.claimValue(resource);\r
983              }\r
984 \r
985 \r
986         }\r
987 \r
988         @Override\r
989         public void flush(boolean intermediate) {\r
990             throw new UnsupportedOperationException();\r
991         }\r
992 \r
993         @Override\r
994         public void flushCluster() {\r
995             clusterTable.flushCluster(graphSession);\r
996             if(defaultClusterSet != null) {\r
997                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);\r
998             }\r
999             currentCluster = null;\r
1000         }\r
1001 \r
1002         @Override\r
1003         public void flushCluster(Resource r) {\r
1004             throw new UnsupportedOperationException("flushCluster resource " + r);\r
1005         }\r
1006 \r
1007         @Override\r
1008         public void gc() {\r
1009         }\r
1010 \r
1011         @Override\r
1012         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,\r
1013                 Resource object) {\r
1014 \r
1015                 int s = ((ResourceImpl)subject).id;\r
1016                 int p = ((ResourceImpl)predicate).id;\r
1017                 int o = ((ResourceImpl)object).id;\r
1018 \r
1019                 provider = getProvider(provider);\r
1020                 if (null == provider) {\r
1021 \r
1022                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);\r
1023                         clusterTable.writeOnlyInvalidate(cluster);\r
1024 \r
1025                         try {\r
1026 \r
1027                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);\r
1028                                 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);\r
1029                                 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);\r
1030 \r
1031                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);\r
1032                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);\r
1033 \r
1034                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);\r
1035                                 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);\r
1036                                 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);\r
1037                                 clusterTranslator.removeStatement(cluster);\r
1038 \r
1039                                 queryProvider2.invalidateResource(s);\r
1040                                 clientChanges.invalidate(s);\r
1041 \r
1042                         } catch (DatabaseException e) {\r
1043 \r
1044                                 Logger.defaultLogError(e);\r
1045 \r
1046                         }\r
1047 \r
1048                         return true;\r
1049 \r
1050                 } else {\r
1051                         \r
1052                         ((VirtualGraphImpl)provider).deny(s, p, o);\r
1053                         queryProvider2.invalidateResource(s);\r
1054                         clientChanges.invalidate(s);\r
1055 \r
1056                         return true;\r
1057 \r
1058                 }\r
1059 \r
1060         }\r
1061 \r
1062         @Override\r
1063         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {\r
1064             throw new UnsupportedOperationException();\r
1065         }\r
1066 \r
1067         @Override\r
1068         public boolean writeOnly() {\r
1069             return true;\r
1070         }\r
1071 \r
1072         @Override\r
1073         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {\r
1074             writeSupport.performWriteRequest(graph, request);\r
1075         }\r
1076 \r
1077         @Override\r
1078         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {\r
1079             throw new UnsupportedOperationException();\r
1080         }\r
1081 \r
1082         @Override\r
1083         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {\r
1084             throw new UnsupportedOperationException();\r
1085         }\r
1086 \r
1087         @Override\r
1088         public <T> void addMetadata(Metadata data) throws ServiceException {\r
1089             writeSupport.addMetadata(data);\r
1090         }\r
1091 \r
1092         @Override\r
1093         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {\r
1094             return writeSupport.getMetadata(clazz);\r
1095         }\r
1096 \r
1097         @Override\r
1098         public TreeMap<String, byte[]> getMetadata() {\r
1099             return writeSupport.getMetadata();\r
1100         }\r
1101 \r
1102         @Override\r
1103         public void commitDone(WriteTraits writeTraits, long csid) {\r
1104             writeSupport.commitDone(writeTraits, csid);\r
1105         }\r
1106 \r
1107         @Override\r
1108         public void clearUndoList(WriteTraits writeTraits) {\r
1109             writeSupport.clearUndoList(writeTraits);\r
1110         }\r
1111         @Override\r
1112         public int clearMetadata() {\r
1113             return writeSupport.clearMetadata();\r
1114         }\r
1115 \r
1116                 @Override\r
1117                 public void startUndo() {\r
1118                         writeSupport.startUndo();\r
1119                 }\r
1120     }\r
1121 \r
1122     class VirtualWriteOnlySupport implements WriteSupport {\r
1123 \r
1124 //        @Override\r
1125 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,\r
1126 //                Resource object) {\r
1127 //            throw new UnsupportedOperationException();\r
1128 //        }\r
1129 \r
1130         @Override\r
1131         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {\r
1132 \r
1133             TransientGraph impl = (TransientGraph)provider;\r
1134             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));\r
1135             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));\r
1136             clientChanges.claim(subject, predicate, object);\r
1137 \r
1138         }\r
1139 \r
1140         @Override\r
1141         public void claim(VirtualGraph provider, int subject, int predicate, int object) {\r
1142 \r
1143             TransientGraph impl = (TransientGraph)provider;\r
1144             impl.claim(subject, predicate, object);\r
1145             getQueryProvider2().updateStatements(subject, predicate);\r
1146             clientChanges.claim(subject, predicate, object);\r
1147 \r
1148         }\r
1149 \r
1150         @Override\r
1151         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {\r
1152             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);\r
1153         }\r
1154 \r
1155         @Override\r
1156         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {\r
1157             ((VirtualGraphImpl)provider).claimValue(resource, value, length);\r
1158             getQueryProvider2().updateValue(resource);\r
1159             clientChanges.claimValue(resource);\r
1160         }\r
1161 \r
1162         @Override\r
1163         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {\r
1164             byte[] value = reader.readBytes(null, amount);\r
1165             claimValue(provider, resource, value);\r
1166         }\r
1167 \r
1168         @Override\r
1169         public Resource createResource(VirtualGraph provider) {\r
1170             TransientGraph impl = (TransientGraph)provider;\r
1171             return impl.getResource(impl.newResource(false));\r
1172         }\r
1173 \r
1174         @Override\r
1175         public Resource createResource(VirtualGraph provider, long clusterId) {\r
1176             throw new UnsupportedOperationException();\r
1177         }\r
1178 \r
1179         @Override\r
1180         public Resource createResource(VirtualGraph provider, Resource clusterSet)\r
1181         throws DatabaseException {\r
1182             throw new UnsupportedOperationException();\r
1183         }\r
1184 \r
1185         @Override\r
1186         public void createClusterSet(VirtualGraph provider, Resource clusterSet)\r
1187         throws DatabaseException {\r
1188             throw new UnsupportedOperationException();\r
1189         }\r
1190 \r
1191         @Override\r
1192         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)\r
1193         throws ServiceException {\r
1194             throw new UnsupportedOperationException();\r
1195         }\r
1196 \r
1197         @Override\r
1198         public Resource setDefaultClusterSet(Resource clusterSet)\r
1199         throws ServiceException {\r
1200                 return null;\r
1201         }\r
1202 \r
1203         @Override\r
1204         public void denyValue(VirtualGraph provider, Resource resource) {\r
1205             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);\r
1206             getQueryProvider2().updateValue(querySupport.getId(resource));\r
1207             // NOTE: this only keeps track of value changes by-resource.\r
1208             clientChanges.claimValue(resource);\r
1209         }\r
1210 \r
1211         @Override\r
1212         public void flush(boolean intermediate) {\r
1213             throw new UnsupportedOperationException();\r
1214         }\r
1215 \r
1216         @Override\r
1217         public void flushCluster() {\r
1218             throw new UnsupportedOperationException();\r
1219         }\r
1220 \r
1221         @Override\r
1222         public void flushCluster(Resource r) {\r
1223             throw new UnsupportedOperationException("Resource " + r);\r
1224         }\r
1225 \r
1226         @Override\r
1227         public void gc() {\r
1228         }\r
1229 \r
1230         @Override\r
1231         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {\r
1232             TransientGraph impl = (TransientGraph) provider;\r
1233             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));\r
1234             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));\r
1235             clientChanges.deny(subject, predicate, object);\r
1236             return true;\r
1237         }\r
1238 \r
1239         @Override\r
1240         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {\r
1241             throw new UnsupportedOperationException();\r
1242         }\r
1243 \r
1244         @Override\r
1245         public boolean writeOnly() {\r
1246             return true;\r
1247         }\r
1248 \r
1249         @Override\r
1250         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {\r
1251             throw new UnsupportedOperationException();\r
1252         }\r
1253 \r
1254         @Override\r
1255         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {\r
1256             throw new UnsupportedOperationException();\r
1257         }\r
1258 \r
1259         @Override\r
1260         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {\r
1261             throw new UnsupportedOperationException();\r
1262         }\r
1263 \r
1264         @Override\r
1265         public <T> void addMetadata(Metadata data) throws ServiceException {\r
1266             throw new UnsupportedOperationException();\r
1267         }\r
1268 \r
1269         @Override\r
1270         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {\r
1271             throw new UnsupportedOperationException();\r
1272         }\r
1273 \r
1274         @Override\r
1275         public TreeMap<String, byte[]> getMetadata() {\r
1276             throw new UnsupportedOperationException();\r
1277         }\r
1278 \r
1279         @Override\r
1280         public void commitDone(WriteTraits writeTraits, long csid) {\r
1281         }\r
1282 \r
1283         @Override\r
1284         public void clearUndoList(WriteTraits writeTraits) {\r
1285         }\r
1286 \r
1287         @Override\r
1288         public int clearMetadata() {\r
1289             return 0;\r
1290         }\r
1291 \r
1292                 @Override\r
1293                 public void startUndo() {\r
1294                 }\r
1295     }\r
1296 \r
1297     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {\r
1298 \r
1299         try {\r
1300 \r
1301             int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1302 \r
1303             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
1304 \r
1305             flushCounter = 0;\r
1306             clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
1307 \r
1308             acquireWriteOnly();\r
1309 \r
1310             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();\r
1311 \r
1312             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());\r
1313 \r
1314             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);\r
1315             writeState = writeStateT;\r
1316 \r
1317             assert (null != writer);\r
1318 //            writer.state.barrier.inc();\r
1319             long start = System.nanoTime();\r
1320             T result = request.perform(writer);\r
1321             long duration = System.nanoTime() - start;\r
1322             if (DEBUG) {\r
1323                 System.err.println("################");\r
1324                 System.err.println("WriteOnly duration " + 1e-9*duration);\r
1325             }\r
1326             writeStateT.setResult(result);\r
1327 \r
1328             // This makes clusters available from server\r
1329             clusterStream.reallyFlush();\r
1330 \r
1331             // This will trigger query updates\r
1332             releaseWriteOnly(writer);\r
1333             assert (null != writer);\r
1334 \r
1335             handleUpdatesAndMetadata(writer);\r
1336 \r
1337         } catch (CancelTransactionException e) {\r
1338 \r
1339             releaseWriteOnly(writeState.getGraph());\r
1340 \r
1341             clusterTable.removeWriteOnlyClusters();\r
1342             state.stopWriteTransaction(clusterStream);\r
1343 \r
1344         } catch (Throwable e) {\r
1345 \r
1346             e.printStackTrace();\r
1347 \r
1348             releaseWriteOnly(writeState.getGraph());\r
1349 \r
1350             clusterTable.removeWriteOnlyClusters();\r
1351 \r
1352             if (callback != null)\r
1353                 callback.exception(new DatabaseException(e));\r
1354 \r
1355             state.stopWriteTransaction(clusterStream);\r
1356             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);\r
1357 \r
1358         } finally  {\r
1359 \r
1360             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
1361 \r
1362         }\r
1363 \r
1364     }\r
1365 \r
1366     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {\r
1367         scheduleRequest(request, callback, notify, null);\r
1368     }\r
1369 \r
1370     @Override\r
1371     public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {\r
1372 \r
1373         assertAlive();\r
1374 \r
1375         assert (request != null);\r
1376 \r
1377         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1378 \r
1379         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {\r
1380 \r
1381             @Override\r
1382             public void run(int thread) {\r
1383 \r
1384                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
1385 \r
1386                     try {\r
1387 \r
1388                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
1389 \r
1390                         flushCounter = 0;\r
1391                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
1392 \r
1393                         acquireWriteOnly();\r
1394 \r
1395                         VirtualGraph vg = getProvider(request.getProvider());\r
1396                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();\r
1397 \r
1398                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
1399 \r
1400                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {\r
1401 \r
1402                             @Override\r
1403                             public void execute(Object result) {\r
1404                                 if(callback != null) callback.run(null);\r
1405                             }\r
1406 \r
1407                             @Override\r
1408                             public void exception(Throwable t) {\r
1409                                 if(callback != null) callback.run((DatabaseException)t);\r
1410                             }\r
1411 \r
1412                         });\r
1413 \r
1414                         assert (null != writer);\r
1415 //                        writer.state.barrier.inc();\r
1416 \r
1417                         try {\r
1418 \r
1419                             request.perform(writer);\r
1420 \r
1421                         } catch (Throwable e) {\r
1422 \r
1423 //                            writer.state.barrier.dec();\r
1424 //                            writer.waitAsync(null);\r
1425 \r
1426                             releaseWriteOnly(writer);\r
1427 \r
1428                             clusterTable.removeWriteOnlyClusters();\r
1429 \r
1430                             if(!(e instanceof CancelTransactionException)) {\r
1431                                 if (callback != null)\r
1432                                     callback.run(new DatabaseException(e));\r
1433                             }\r
1434 \r
1435                             writeState.except(e);\r
1436 \r
1437                             return;\r
1438 \r
1439                         }\r
1440 \r
1441                         // This makes clusters available from server\r
1442                         boolean empty = clusterStream.reallyFlush();\r
1443                         // This was needed to make WO requests call metadata listeners.\r
1444                         // NOTE: the calling event does not contain clientChanges information.\r
1445                         if (!empty && clientChanges.isEmpty())\r
1446                             clientChanges.setNotEmpty(true);\r
1447                         releaseWriteOnly(writer);\r
1448                         assert (null != writer);\r
1449                     } finally  {\r
1450 \r
1451                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
1452 \r
1453                     }\r
1454 \r
1455 \r
1456                 task.finish();\r
1457 \r
1458             }\r
1459 \r
1460         }, combine);\r
1461 \r
1462     }\r
1463 \r
1464     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {\r
1465         scheduleRequest(request, callback, notify, null);\r
1466     }\r
1467 \r
1468     /* (non-Javadoc)\r
1469      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)\r
1470      */\r
1471     @Override\r
1472     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {\r
1473 \r
1474         assert (request != null);\r
1475 \r
1476         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1477 \r
1478         requestManager.scheduleWrite(new SessionTask(request, thread) {\r
1479 \r
1480             @Override\r
1481             public void run(int thread) {\r
1482 \r
1483                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
1484 \r
1485                 performWriteOnly(request, notify, callback);\r
1486 \r
1487                 task.finish();\r
1488 \r
1489             }\r
1490 \r
1491         }, combine);\r
1492 \r
1493     }\r
1494 \r
1495     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {\r
1496 \r
1497         assert (request != null);\r
1498         assert (procedure != null);\r
1499 \r
1500         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1501 \r
1502         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {\r
1503 \r
1504             @Override\r
1505             public void run(int thread) {\r
1506 \r
1507                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1508 \r
1509                 ListenerBase listener = getListenerBase(procedure);\r
1510 \r
1511                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
1512 \r
1513                 try {\r
1514 \r
1515                     if (listener != null) {\r
1516 \r
1517                         try {\r
1518                                 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {\r
1519 \r
1520                                         @Override\r
1521                                         public void exception(AsyncReadGraph graph, Throwable t) {\r
1522                                                 procedure.exception(graph, t);\r
1523                                                 if(throwable != null) {\r
1524                                                         throwable.set(t);\r
1525                                                 } else {\r
1526                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);\r
1527                                                 }\r
1528                                         }\r
1529 \r
1530                                         @Override\r
1531                                         public void execute(AsyncReadGraph graph, T t) {\r
1532                                                 if(result != null) result.set(t);\r
1533                                                 procedure.execute(graph, t);\r
1534                                         }\r
1535 \r
1536                                 }, listener);\r
1537                         } catch (Throwable t) {\r
1538                             // This is handled by the AsyncProcedure\r
1539                                 //Logger.defaultLogError("Internal error", t);\r
1540                         }\r
1541 \r
1542                     } else {\r
1543 \r
1544                         try {\r
1545 \r
1546 //                            newGraph.state.barrier.inc();\r
1547 \r
1548                             T t = request.perform(newGraph);\r
1549 \r
1550                             try {\r
1551 \r
1552                                 if(result != null) result.set(t);\r
1553                                 procedure.execute(newGraph, t);\r
1554 \r
1555                             } catch (Throwable th) {\r
1556 \r
1557                                 if(throwable != null) {\r
1558                                     throwable.set(th);\r
1559                                 } else {\r
1560                                     Logger.defaultLogError("Unhandled exception", th);\r
1561                                 }\r
1562 \r
1563                             }\r
1564 \r
1565                         } catch (Throwable t) {\r
1566 \r
1567                             if (DEBUG)\r
1568                                 t.printStackTrace();\r
1569 \r
1570                             if(throwable != null) {\r
1571                                 throwable.set(t);\r
1572                             } else {\r
1573                                 Logger.defaultLogError("Unhandled exception", t);\r
1574                             }\r
1575 \r
1576                             try {\r
1577 \r
1578                                 procedure.exception(newGraph, t);\r
1579 \r
1580                             } catch (Throwable t2) {\r
1581 \r
1582                                 if(throwable != null) {\r
1583                                     throwable.set(t2);\r
1584                                 } else {\r
1585                                     Logger.defaultLogError("Unhandled exception", t2);\r
1586                                 }\r
1587 \r
1588                             }\r
1589 \r
1590                         }\r
1591 \r
1592 //                        newGraph.state.barrier.dec();\r
1593 //                        newGraph.waitAsync(request);\r
1594 \r
1595                     }\r
1596 \r
1597                 } finally {\r
1598 \r
1599                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1600 \r
1601                 }\r
1602 \r
1603             }\r
1604 \r
1605         });\r
1606 \r
1607     }\r
1608 \r
1609     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {\r
1610 \r
1611         assert (request != null);\r
1612         assert (procedure != null);\r
1613 \r
1614         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1615 \r
1616         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {\r
1617 \r
1618             @Override\r
1619             public void run(int thread) {\r
1620 \r
1621                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1622 \r
1623                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
1624 \r
1625                 try {\r
1626 \r
1627                     if (listener != null) {\r
1628 \r
1629                         newGraph.processor.query(newGraph, request, null, procedure, listener);\r
1630 \r
1631 //                        newGraph.waitAsync(request);\r
1632 \r
1633                     } else {\r
1634 \r
1635                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(\r
1636                                 procedure, "request");\r
1637 \r
1638                         try {\r
1639 \r
1640 //                            newGraph.state.barrier.inc();\r
1641 \r
1642                             request.perform(newGraph, wrapper);\r
1643 \r
1644 //                            newGraph.waitAsync(request);\r
1645 \r
1646                         } catch (Throwable t) {\r
1647 \r
1648                             wrapper.exception(newGraph, t);\r
1649 //                            newGraph.waitAsync(request);\r
1650 \r
1651 \r
1652                         }\r
1653 \r
1654                     }\r
1655 \r
1656                 } finally {\r
1657 \r
1658                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1659 \r
1660                 }\r
1661 \r
1662             }\r
1663 \r
1664         });\r
1665 \r
1666     }\r
1667 \r
1668     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {\r
1669 \r
1670         assert (request != null);\r
1671         assert (procedure != null);\r
1672 \r
1673         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1674 \r
1675         int sync = notify != null ? thread : -1;\r
1676 \r
1677         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {\r
1678 \r
1679             @Override\r
1680             public void run(int thread) {\r
1681 \r
1682                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1683 \r
1684                 ListenerBase listener = getListenerBase(procedure);\r
1685 \r
1686                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
1687 \r
1688                 try {\r
1689 \r
1690                     if (listener != null) {\r
1691 \r
1692                         newGraph.processor.query(newGraph, request, null, procedure, listener);\r
1693 \r
1694 //                        newGraph.waitAsync(request);\r
1695 \r
1696                     } else {\r
1697 \r
1698                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);\r
1699 \r
1700                         try {\r
1701 \r
1702                             request.perform(newGraph, wrapper);\r
1703 \r
1704                         } catch (Throwable t) {\r
1705 \r
1706                             t.printStackTrace();\r
1707 \r
1708                         }\r
1709 \r
1710                     }\r
1711 \r
1712                 } finally {\r
1713 \r
1714                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1715 \r
1716                 }\r
1717 \r
1718             }\r
1719 \r
1720         });\r
1721 \r
1722     }\r
1723 \r
1724     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {\r
1725 \r
1726         assert (request != null);\r
1727         assert (procedure != null);\r
1728 \r
1729         int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
1730 \r
1731         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {\r
1732 \r
1733             @Override\r
1734             public void run(int thread) {\r
1735 \r
1736                 fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1737 \r
1738                 ListenerBase listener = getListenerBase(procedure);\r
1739 \r
1740                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
1741 \r
1742                 try {\r
1743 \r
1744                     if (listener != null) {\r
1745 \r
1746                         newGraph.processor.query(newGraph, request, null, new Procedure<T>() {\r
1747 \r
1748                             @Override\r
1749                             public void exception(Throwable t) {\r
1750                                 procedure.exception(t);\r
1751                                 if(throwable != null) {\r
1752                                     throwable.set(t);\r
1753                                 }\r
1754                             }\r
1755 \r
1756                             @Override\r
1757                             public void execute(T t) {\r
1758                                 if(result != null) result.set(t);\r
1759                                 procedure.execute(t);\r
1760                             }\r
1761 \r
1762                         }, listener);\r
1763 \r
1764 //                        newGraph.waitAsync(request);\r
1765 \r
1766                     } else {\r
1767 \r
1768 //                        newGraph.state.barrier.inc();\r
1769 \r
1770                         request.register(newGraph, new Listener<T>() {\r
1771 \r
1772                             @Override\r
1773                             public void exception(Throwable t) {\r
1774                                 if(throwable != null) throwable.set(t);\r
1775                                 procedure.exception(t);\r
1776 //                                newGraph.state.barrier.dec();\r
1777                             }\r
1778 \r
1779                             @Override\r
1780                             public void execute(T t) {\r
1781                                 if(result != null) result.set(t);\r
1782                                 procedure.execute(t);\r
1783 //                                newGraph.state.barrier.dec();\r
1784                             }\r
1785 \r
1786                             @Override\r
1787                             public boolean isDisposed() {\r
1788                                 return true;\r
1789                             }\r
1790 \r
1791                         });\r
1792 \r
1793 //                        newGraph.waitAsync(request);\r
1794 \r
1795                     }\r
1796 \r
1797                 } finally {\r
1798 \r
1799                     fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
1800 \r
1801                 }\r
1802 \r
1803             }\r
1804 \r
1805         });\r
1806 \r
1807     }\r
1808 \r
1809 \r
1810     @Override\r
1811     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {\r
1812 \r
1813         scheduleRequest(request, procedure, null, null, null);\r
1814 \r
1815     }\r
1816 \r
1817     @Override\r
1818     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {\r
1819         assertNotSession();\r
1820         Semaphore notify = new Semaphore(0);\r
1821         DataContainer<Throwable> container = new DataContainer<Throwable>();\r
1822         DataContainer<T> result = new DataContainer<T>();\r
1823         scheduleRequest(request, procedure, notify, container, result);\r
1824         acquire(notify, request);\r
1825         Throwable throwable = container.get();\r
1826         if(throwable != null) {\r
1827             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1828             else throw new DatabaseException("Unexpected exception", throwable);\r
1829         }\r
1830         return result.get();\r
1831     }\r
1832 \r
1833     @Override\r
1834     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {\r
1835         assertNotSession();\r
1836         Semaphore notify = new Semaphore(0);\r
1837         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();\r
1838         final DataContainer<T> resultContainer = new DataContainer<T>();\r
1839         scheduleRequest(request, new AsyncProcedure<T>() {\r
1840             @Override\r
1841             public void exception(AsyncReadGraph graph, Throwable throwable) {\r
1842                 exceptionContainer.set(throwable);\r
1843                 procedure.exception(graph, throwable);\r
1844             }\r
1845             @Override\r
1846             public void execute(AsyncReadGraph graph, T result) {\r
1847                 resultContainer.set(result);\r
1848                 procedure.execute(graph, result);\r
1849             }\r
1850         }, getListenerBase(procedure), notify);\r
1851         acquire(notify, request, procedure);\r
1852         Throwable throwable = exceptionContainer.get();\r
1853         if (throwable != null) {\r
1854             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1855             else throw new DatabaseException("Unexpected exception", throwable);\r
1856         }\r
1857         return resultContainer.get();\r
1858     }\r
1859 \r
1860 \r
1861     @Override\r
1862     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {\r
1863         assertNotSession();\r
1864         Semaphore notify = new Semaphore(0);\r
1865         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();\r
1866         scheduleRequest(request, new AsyncMultiProcedure<T>() {\r
1867             @Override\r
1868             public void exception(AsyncReadGraph graph, Throwable throwable) {\r
1869                 exceptionContainer.set(throwable);\r
1870                 procedure.exception(graph, throwable);\r
1871             }\r
1872             @Override\r
1873             public void execute(AsyncReadGraph graph, T result) {\r
1874                 procedure.execute(graph, result);\r
1875             }\r
1876             @Override\r
1877             public void finished(AsyncReadGraph graph) {\r
1878                 procedure.finished(graph);\r
1879             }\r
1880         }, notify);\r
1881         acquire(notify, request, procedure);\r
1882         Throwable throwable = exceptionContainer.get();\r
1883         if (throwable != null) {\r
1884             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1885             else throw new DatabaseException("Unexpected exception", throwable);\r
1886         }\r
1887         // TODO: implement return value\r
1888         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");\r
1889         return null;\r
1890     }\r
1891 \r
1892     @Override\r
1893     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {\r
1894         return syncRequest(request, new ProcedureAdapter<T>());\r
1895 \r
1896 \r
1897 //        assert(request != null);\r
1898 //\r
1899 //        final DataContainer<T> result = new DataContainer<T>();\r
1900 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
1901 //\r
1902 //        syncRequest(request, new Procedure<T>() {\r
1903 //\r
1904 //            @Override\r
1905 //            public void execute(T t) {\r
1906 //                result.set(t);\r
1907 //            }\r
1908 //\r
1909 //            @Override\r
1910 //            public void exception(Throwable t) {\r
1911 //                exception.set(t);\r
1912 //            }\r
1913 //\r
1914 //        });\r
1915 //\r
1916 //        Throwable t = exception.get();\r
1917 //        if(t != null) {\r
1918 //            if(t instanceof DatabaseException) throw (DatabaseException)t;\r
1919 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);\r
1920 //        }\r
1921 //\r
1922 //        return result.get();\r
1923 \r
1924     }\r
1925 \r
1926     @Override\r
1927     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {\r
1928         return syncRequest(request, (Procedure<T>)procedure);\r
1929     }\r
1930 \r
1931 \r
1932 //    @Override\r
1933 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {\r
1934 //        assertNotSession();\r
1935 //        Semaphore notify = new Semaphore(0);\r
1936 //        DataContainer<Throwable> container = new DataContainer<Throwable>();\r
1937 //        DataContainer<T> result = new DataContainer<T>();\r
1938 //        scheduleRequest(request, procedure, notify, container, result);\r
1939 //        acquire(notify, request);\r
1940 //        Throwable throwable = container.get();\r
1941 //        if(throwable != null) {\r
1942 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1943 //            else throw new DatabaseException("Unexpected exception", throwable);\r
1944 //        }\r
1945 //        return result.get();\r
1946 //    }\r
1947 \r
1948 \r
1949     @Override\r
1950     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {\r
1951         assertNotSession();\r
1952         Semaphore notify = new Semaphore(0);\r
1953         final DataContainer<Throwable> container = new DataContainer<Throwable>();\r
1954         final DataContainer<T> result = new DataContainer<T>();\r
1955         scheduleRequest(request, procedure, notify, container, result);\r
1956         acquire(notify, request);\r
1957         Throwable throwable = container.get();\r
1958         if (throwable != null) {\r
1959             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
1960             else throw new DatabaseException("Unexpected exception", throwable);\r
1961         }\r
1962         return result.get();\r
1963     }\r
1964 \r
1965     @Override\r
1966     public void syncRequest(Write request) throws DatabaseException  {\r
1967         assertNotSession();\r
1968         assertAlive();\r
1969         Semaphore notify = new Semaphore(0);\r
1970         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();\r
1971         scheduleRequest(request, new Callback<DatabaseException>() {\r
1972 \r
1973             @Override\r
1974             public void run(DatabaseException e) {\r
1975               exception.set(e);\r
1976             }\r
1977 \r
1978         }, notify);\r
1979         acquire(notify, request);\r
1980         if(exception.get() != null) throw exception.get();\r
1981     }\r
1982 \r
1983     @Override\r
1984     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {\r
1985         assertNotSession();\r
1986         Semaphore notify = new Semaphore(0);\r
1987         final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
1988         final DataContainer<T> result = new DataContainer<T>();\r
1989         scheduleRequest(request, new Procedure<T>() {\r
1990 \r
1991             @Override\r
1992             public void exception(Throwable t) {\r
1993               exception.set(t);\r
1994             }\r
1995 \r
1996             @Override\r
1997             public void execute(T t) {\r
1998               result.set(t);\r
1999             }\r
2000 \r
2001         }, notify);\r
2002         acquire(notify, request);\r
2003         if(exception.get() != null) {\r
2004             Throwable t = exception.get();\r
2005             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
2006             else throw new DatabaseException(t);\r
2007         }\r
2008         return result.get();\r
2009     }\r
2010 \r
2011     @Override\r
2012     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {\r
2013         assertNotSession();\r
2014         Semaphore notify = new Semaphore(0);\r
2015         final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
2016         final DataContainer<T> result = new DataContainer<T>();\r
2017         scheduleRequest(request, new Procedure<T>() {\r
2018 \r
2019             @Override\r
2020             public void exception(Throwable t) {\r
2021               exception.set(t);\r
2022             }\r
2023 \r
2024             @Override\r
2025             public void execute(T t) {\r
2026               result.set(t);\r
2027             }\r
2028 \r
2029         }, notify);\r
2030         acquire(notify, request);\r
2031         if(exception.get() != null) {\r
2032             Throwable t = exception.get();\r
2033             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
2034             else throw new DatabaseException(t);\r
2035         }\r
2036         return result.get();\r
2037     }\r
2038 \r
2039     @Override\r
2040     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {\r
2041         assertNotSession();\r
2042         Semaphore notify = new Semaphore(0);\r
2043         final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
2044         final DataContainer<T> result = new DataContainer<T>();\r
2045         scheduleRequest(request, new Procedure<T>() {\r
2046 \r
2047             @Override\r
2048             public void exception(Throwable t) {\r
2049               exception.set(t);\r
2050             }\r
2051 \r
2052             @Override\r
2053             public void execute(T t) {\r
2054               result.set(t);\r
2055             }\r
2056 \r
2057         }, notify);\r
2058         acquire(notify, request);\r
2059         if(exception.get() != null) {\r
2060             Throwable t = exception.get();\r
2061             if(t instanceof DatabaseException) throw (DatabaseException)t;\r
2062             else throw new DatabaseException(t);\r
2063         }\r
2064         return result.get();\r
2065     }\r
2066 \r
2067     @Override\r
2068     public void syncRequest(DelayedWrite request) throws DatabaseException  {\r
2069         assertNotSession();\r
2070         Semaphore notify = new Semaphore(0);\r
2071         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();\r
2072         scheduleRequest(request, new Callback<DatabaseException>() {\r
2073             @Override\r
2074             public void run(DatabaseException e) {\r
2075                 exception.set(e);\r
2076             }\r
2077         }, notify);\r
2078         acquire(notify, request);\r
2079         if(exception.get() != null) throw exception.get();\r
2080     }\r
2081 \r
2082     @Override\r
2083     public void syncRequest(WriteOnly request) throws DatabaseException  {\r
2084         assertNotSession();\r
2085         assertAlive();\r
2086         Semaphore notify = new Semaphore(0);\r
2087         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();\r
2088         scheduleRequest(request, new Callback<DatabaseException>() {\r
2089             @Override\r
2090             public void run(DatabaseException e) {\r
2091                 exception.set(e);\r
2092             }\r
2093         }, notify);\r
2094         acquire(notify, request);\r
2095         if(exception.get() != null) throw exception.get();\r
2096     }\r
2097 \r
2098     /*\r
2099      *\r
2100      * GraphRequestProcessor interface\r
2101      */\r
2102 \r
2103     @Override\r
2104     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {\r
2105 \r
2106         scheduleRequest(request, procedure, null, null);\r
2107 \r
2108     }\r
2109 \r
2110     @Override\r
2111     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {\r
2112 \r
2113         scheduleRequest(request, procedure, null);\r
2114 \r
2115     }\r
2116 \r
2117     @Override\r
2118     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {\r
2119 \r
2120         scheduleRequest(request, procedure, null, null, null);\r
2121 \r
2122     }\r
2123 \r
2124     @Override\r
2125     public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {\r
2126 \r
2127         scheduleRequest(request, callback, null);\r
2128 \r
2129     }\r
2130 \r
2131     @Override\r
2132     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {\r
2133 \r
2134         scheduleRequest(request, procedure, null);\r
2135 \r
2136     }\r
2137 \r
2138     @Override\r
2139     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {\r
2140 \r
2141         scheduleRequest(request, procedure, null);\r
2142 \r
2143     }\r
2144 \r
2145     @Override\r
2146     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {\r
2147 \r
2148         scheduleRequest(request, procedure, null);\r
2149 \r
2150     }\r
2151 \r
2152     @Override\r
2153     public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {\r
2154 \r
2155         scheduleRequest(request, callback, null);\r
2156 \r
2157     }\r
2158 \r
2159     @Override\r
2160     public void asyncRequest(final Write r) {\r
2161         asyncRequest(r, null);\r
2162     }\r
2163 \r
2164     @Override\r
2165     public void asyncRequest(final DelayedWrite r) {\r
2166         asyncRequest(r, null);\r
2167     }\r
2168 \r
2169     @Override\r
2170     public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {\r
2171 \r
2172         scheduleRequest(request, callback, null);\r
2173 \r
2174     }\r
2175 \r
2176     @Override\r
2177     public void asyncRequest(final WriteOnly request) {\r
2178 \r
2179         asyncRequest(request, null);\r
2180 \r
2181     }\r
2182 \r
2183     @Override\r
2184     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {\r
2185         r.request(this, procedure);\r
2186     }\r
2187 \r
2188     @Override\r
2189     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {\r
2190         r.request(this, procedure);\r
2191     }\r
2192 \r
2193     @Override\r
2194     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {\r
2195         r.request(this, procedure);\r
2196     }\r
2197 \r
2198     @Override\r
2199     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {\r
2200         r.request(this, procedure);\r
2201     }\r
2202 \r
2203     @Override\r
2204     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {\r
2205         r.request(this, procedure);\r
2206     }\r
2207 \r
2208     @Override\r
2209     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {\r
2210         r.request(this, procedure);\r
2211     }\r
2212 \r
2213     @Override\r
2214     public <T> T sync(ReadInterface<T> r) throws DatabaseException {\r
2215         return r.request(this);\r
2216     }\r
2217 \r
2218     @Override\r
2219     public <T> T sync(WriteInterface<T> r) throws DatabaseException {\r
2220         return r.request(this);\r
2221     }\r
2222 \r
2223     @Override\r
2224     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {\r
2225         r.request(this, procedure);\r
2226     }\r
2227 \r
2228     @Override\r
2229     public <T> void async(WriteInterface<T> r) {\r
2230         r.request(this, new ProcedureAdapter<T>());\r
2231     }\r
2232 \r
2233     //@Override\r
2234     public void incAsync() {\r
2235         state.incAsync();\r
2236     }\r
2237 \r
2238     //@Override\r
2239     public void decAsync() {\r
2240         state.decAsync();\r
2241     }\r
2242 \r
2243     public long getCluster(ResourceImpl resource) {\r
2244         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);\r
2245         return cluster.getClusterId();\r
2246     }\r
2247 \r
2248     public long getCluster(int id) {\r
2249         if (clusterTable == null)\r
2250             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");\r
2251         return clusterTable.getClusterIdByResourceKeyNoThrow(id);\r
2252     }\r
2253 \r
2254     public ResourceImpl getResource(int id) {\r
2255         return new ResourceImpl(resourceSupport, id);\r
2256     }\r
2257 \r
2258     public ResourceImpl getResource(int resourceIndex, long clusterId) {\r
2259         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));\r
2260         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);\r
2261         int key = proxy.getClusterKey();\r
2262         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);\r
2263         return new ResourceImpl(resourceSupport, resourceKey);\r
2264     }\r
2265 \r
2266     public ResourceImpl getResource2(int id) {\r
2267         assert (id != 0);\r
2268         return new ResourceImpl(resourceSupport, id);\r
2269     }\r
2270 \r
2271     final public int getId(ResourceImpl impl) {\r
2272         return impl.id;\r
2273     }\r
2274 \r
2275     public static final Charset UTF8 = Charset.forName("utf-8");\r
2276 \r
2277 \r
2278 \r
2279 \r
2280     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {\r
2281         for(TransientGraph g : support.providers) {\r
2282             if(g.isPending(subject)) return false;\r
2283         }\r
2284         return true;\r
2285     }\r
2286 \r
2287     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {\r
2288         for(TransientGraph g : support.providers) {\r
2289             if(g.isPending(subject, predicate)) return false;\r
2290         }\r
2291         return true;\r
2292     }\r
2293 \r
2294     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {\r
2295 \r
2296         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {\r
2297 \r
2298             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);\r
2299 \r
2300             @Override\r
2301             public void run(ReadGraphImpl graph) {\r
2302                 if(ready.decrementAndGet() == 0) {\r
2303                     runnable.run(graph);\r
2304                 }\r
2305             }\r
2306 \r
2307         };\r
2308 \r
2309         for(TransientGraph g : support.providers) {\r
2310             if(g.isPending(subject)) {\r
2311                 try {\r
2312                     g.load(graph, subject, composite);\r
2313                 } catch (DatabaseException e) {\r
2314                     e.printStackTrace();\r
2315                 }\r
2316             } else {\r
2317                 composite.run(graph);\r
2318             }\r
2319         }\r
2320 \r
2321         composite.run(graph);\r
2322 \r
2323     }\r
2324 \r
2325     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {\r
2326 \r
2327         Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {\r
2328 \r
2329             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);\r
2330 \r
2331             @Override\r
2332             public void run(ReadGraphImpl graph) {\r
2333                 if(ready.decrementAndGet() == 0) {\r
2334                     runnable.run(graph);\r
2335                 }\r
2336             }\r
2337 \r
2338         };\r
2339 \r
2340         for(TransientGraph g : support.providers) {\r
2341             if(g.isPending(subject, predicate)) {\r
2342                 try {\r
2343                     g.load(graph, subject, predicate, composite);\r
2344                 } catch (DatabaseException e) {\r
2345                     e.printStackTrace();\r
2346                 }\r
2347             } else {\r
2348                 composite.run(graph);\r
2349             }\r
2350         }\r
2351 \r
2352         composite.run(graph);\r
2353 \r
2354     }\r
2355 \r
2356 //    void dumpHeap() {\r
2357 //\r
2358 //        try {\r
2359 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),\r
2360 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(\r
2361 //                    "d:/heap" + flushCounter + ".txt", true);\r
2362 //        } catch (IOException e) {\r
2363 //            e.printStackTrace();\r
2364 //        }\r
2365 //\r
2366 //    }\r
2367 \r
2368     void fireReactionsToSynchronize(ChangeSet cs) {\r
2369 \r
2370         // Do not fire empty events\r
2371         if (cs.isEmpty())\r
2372             return;\r
2373 \r
2374         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());\r
2375 //        g.state.barrier.inc();\r
2376 \r
2377         try {\r
2378 \r
2379             if (!cs.isEmpty()) {\r
2380                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);\r
2381                 for (ChangeListener l : changeListeners2) {\r
2382                     try {\r
2383                         l.graphChanged(e2);\r
2384                     } catch (Exception ex) {\r
2385                         ex.printStackTrace();\r
2386                     }\r
2387                 }\r
2388             }\r
2389 \r
2390         } finally {\r
2391 \r
2392 //            g.state.barrier.dec();\r
2393 //            g.waitAsync(null);\r
2394 \r
2395         }\r
2396 \r
2397     }\r
2398 \r
2399     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {\r
2400         try {\r
2401 \r
2402             // Do not fire empty events\r
2403             if (cs2.isEmpty())\r
2404                 return;\r
2405 \r
2406 //            graph.restart();\r
2407 //            graph.state.barrier.inc();\r
2408 \r
2409             try {\r
2410 \r
2411                 if (!cs2.isEmpty()) {\r
2412 \r
2413                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);\r
2414                     for (ChangeListener l : changeListeners2) {\r
2415                         try {\r
2416                             l.graphChanged(e2);\r
2417 //                            System.out.println("changelistener " + l);\r
2418                         } catch (Exception ex) {\r
2419                             ex.printStackTrace();\r
2420                         }\r
2421                     }\r
2422 \r
2423                 }\r
2424 \r
2425             } finally {\r
2426 \r
2427 //                graph.state.barrier.dec();\r
2428 //                graph.waitAsync(null);\r
2429 \r
2430             }\r
2431         } catch (Throwable t) {\r
2432             t.printStackTrace();\r
2433 \r
2434         }\r
2435     }\r
2436     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {\r
2437 \r
2438         try {\r
2439 \r
2440             // Do not fire empty events\r
2441             if (cs2.isEmpty())\r
2442                 return;\r
2443 \r
2444             // Do not fire on virtual requests\r
2445             if(graph.getProvider() != null)\r
2446                 return;\r
2447 \r
2448             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);\r
2449 \r
2450             try {\r
2451 \r
2452                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);\r
2453                 for (ChangeListener l : metadataListeners) {\r
2454                     try {\r
2455                         l.graphChanged(e2);\r
2456                     } catch (Throwable ex) {\r
2457                         ex.printStackTrace();\r
2458                     }\r
2459                 }\r
2460 \r
2461             } finally {\r
2462 \r
2463             }\r
2464 \r
2465         } catch (Throwable t) {\r
2466             t.printStackTrace();\r
2467         }\r
2468 \r
2469     }\r
2470 \r
2471 \r
2472     /*\r
2473      * (non-Javadoc)\r
2474      *\r
2475      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)\r
2476      */\r
2477     @Override\r
2478     public <T> T getService(Class<T> api) {\r
2479         T t = peekService(api);\r
2480         if (t == null) {\r
2481             if (state.isClosed())\r
2482                 throw new ServiceNotFoundException(this, api, "Session has been shut down");\r
2483             throw new ServiceNotFoundException(this, api);\r
2484         }\r
2485         return t;\r
2486     }\r
2487 \r
2488     /**\r
2489      * @return\r
2490      */\r
2491     protected abstract ServerInformation getCachedServerInformation();\r
2492 \r
2493         private Class<?> serviceKey1 = null;\r
2494         private Class<?> serviceKey2 = null;\r
2495         private Object service1 = null;\r
2496         private Object service2 = null;\r
2497 \r
2498     /*\r
2499      * (non-Javadoc)\r
2500      *\r
2501      * @see\r
2502      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)\r
2503      */\r
2504     @SuppressWarnings("unchecked")\r
2505     @Override\r
2506     public synchronized <T> T peekService(Class<T> api) {\r
2507 \r
2508                 if(serviceKey1 == api) {\r
2509                         return (T)service1;\r
2510                 } else if (serviceKey2 == api) {\r
2511                         // Promote this key\r
2512                         Object result = service2;\r
2513                         service2 = service1;\r
2514                         serviceKey2 = serviceKey1;\r
2515                         service1 = result;\r
2516                         serviceKey1 = api;\r
2517                         return (T)result;\r
2518                 }\r
2519 \r
2520         if (Layer0.class == api)\r
2521                 return (T) L0;\r
2522         if (ServerInformation.class == api)\r
2523             return (T) getCachedServerInformation();\r
2524         else if (WriteGraphImpl.class == api)\r
2525             return (T) writeState.getGraph();\r
2526         else if (ClusterBuilder.class == api)\r
2527             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);\r
2528         else if (ClusterBuilderFactory.class == api)\r
2529             return (T)new ClusterBuilderFactoryImpl(this);\r
2530 \r
2531                 service2 = service1;\r
2532                 serviceKey2 = serviceKey1;\r
2533 \r
2534         service1 = serviceLocator.peekService(api);\r
2535         serviceKey1 = api;\r
2536 \r
2537         return (T)service1;\r
2538 \r
2539     }\r
2540 \r
2541     /*\r
2542      * (non-Javadoc)\r
2543      *\r
2544      * @see\r
2545      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)\r
2546      */\r
2547     @Override\r
2548     public boolean hasService(Class<?> api) {\r
2549         return serviceLocator.hasService(api);\r
2550     }\r
2551 \r
2552     /**\r
2553      * @param api the api that must be implemented by the specified service\r
2554      * @param service the service implementation\r
2555      */\r
2556     @Override\r
2557     public <T> void registerService(Class<T> api, T service) {\r
2558         if(Layer0.class == api) {\r
2559                 L0 = (Layer0)service;\r
2560                 return;\r
2561         }\r
2562         serviceLocator.registerService(api, service);\r
2563         if (TransactionPolicySupport.class == api) {\r
2564             transactionPolicy = (TransactionPolicySupport)service;\r
2565             state.resetTransactionPolicy();\r
2566         }\r
2567         if (api == serviceKey1)\r
2568                 service1 = service;\r
2569         else if (api == serviceKey2)\r
2570                 service2 = service;\r
2571     }\r
2572 \r
2573     // ----------------\r
2574     // SessionMonitor\r
2575     // ----------------\r
2576 \r
2577     void fireSessionVariableChange(String variable) {\r
2578         for (MonitorHandler h : monitorHandlers) {\r
2579             MonitorContext ctx = monitorContexts.getLeft(h);\r
2580             assert ctx != null;\r
2581             // SafeRunner functionality repeated here to avoid dependency.\r
2582             try {\r
2583                 h.valuesChanged(ctx);\r
2584             } catch (Exception e) {\r
2585                 Logger.defaultLogError("monitor handler notification produced the following exception", e);\r
2586             } catch (LinkageError e) {\r
2587                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);\r
2588             }\r
2589         }\r
2590     }\r
2591 \r
2592 \r
2593     class ResourceSerializerImpl implements ResourceSerializer {\r
2594 \r
2595         public long createRandomAccessId(int id) throws DatabaseException {\r
2596             if(id < 0)\r
2597                 return id;\r
2598             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);\r
2599             long cluster = getCluster(id);\r
2600             if (0 == cluster)\r
2601                 return 0; // Better to return 0 then invalid id.\r
2602             long result = ClusterTraitsBase.createResourceId(cluster, index);\r
2603             return result;\r
2604         }\r
2605 \r
2606         @Override\r
2607         public long getRandomAccessId(Resource resource) throws DatabaseException {\r
2608 \r
2609             ResourceImpl resourceImpl = (ResourceImpl) resource;\r
2610             return createRandomAccessId(resourceImpl.id);\r
2611 \r
2612         }\r
2613 \r
2614         @Override\r
2615         public int getTransientId(Resource resource) throws DatabaseException {\r
2616 \r
2617             ResourceImpl resourceImpl = (ResourceImpl) resource;\r
2618             return resourceImpl.id;\r
2619 \r
2620         }\r
2621 \r
2622         @Override\r
2623         public String createRandomAccessId(Resource resource)\r
2624         throws InvalidResourceReferenceException {\r
2625 \r
2626             if(resource == null) throw new IllegalArgumentException();\r
2627 \r
2628             ResourceImpl resourceImpl = (ResourceImpl) resource;\r
2629 \r
2630             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";\r
2631 \r
2632             int r;\r
2633             try {\r
2634                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);\r
2635             } catch (DatabaseException e1) {\r
2636                 throw new InvalidResourceReferenceException(e1);\r
2637             }\r
2638             try {\r
2639                 // Serialize as '<resource index>_<cluster id>'\r
2640                 return "" + r + "_" + getCluster(resourceImpl);\r
2641             } catch (Throwable e) {\r
2642                 e.printStackTrace();\r
2643                 throw new InvalidResourceReferenceException(e);\r
2644             } finally {\r
2645             }\r
2646         }\r
2647 \r
2648         public int getTransientId(long serialized) throws DatabaseException {\r
2649             if (serialized <= 0)\r
2650                 return (int)serialized;\r
2651             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);\r
2652             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);\r
2653             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);\r
2654             if (cluster == null)\r
2655                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);\r
2656             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);\r
2657             return key;\r
2658         }\r
2659 \r
2660         @Override\r
2661         public Resource getResource(long randomAccessId) throws DatabaseException {\r
2662             return getResourceByKey(getTransientId(randomAccessId));\r
2663         }\r
2664 \r
2665         @Override\r
2666         public Resource getResource(int transientId) throws DatabaseException {\r
2667             return getResourceByKey(transientId);\r
2668         }\r
2669 \r
2670         @Override\r
2671         public Resource getResource(String randomAccessId)\r
2672         throws InvalidResourceReferenceException {\r
2673             try {\r
2674                 int i = randomAccessId.indexOf('_');\r
2675                 if (i == -1)\r
2676                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"\r
2677                             + randomAccessId + "'");\r
2678                 int r = Integer.parseInt(randomAccessId.substring(0, i));\r
2679                 if(r < 0) return getResourceByKey(r);\r
2680                 long c = Long.parseLong(randomAccessId.substring(i + 1));\r
2681                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);\r
2682                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);\r
2683                 if (cluster.hasResource(key, clusterTranslator))\r
2684                     return getResourceByKey(key);\r
2685             } catch (InvalidResourceReferenceException e) {\r
2686                 throw e;\r
2687             } catch (NumberFormatException e) {\r
2688                 throw new InvalidResourceReferenceException(e);\r
2689             } catch (Throwable e) {\r
2690                 e.printStackTrace();\r
2691                 throw new InvalidResourceReferenceException(e);\r
2692             } finally {\r
2693             }\r
2694             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);\r
2695         }\r
2696 \r
2697         @Override\r
2698         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {\r
2699             try {\r
2700                 return true;\r
2701             } catch (Throwable e) {\r
2702                 e.printStackTrace();\r
2703                 throw new InvalidResourceReferenceException(e);\r
2704             } finally {\r
2705             }\r
2706         }\r
2707     }\r
2708 \r
2709     // Copied from old SessionImpl\r
2710 \r
2711     void check() {\r
2712         if (state.isClosed())\r
2713             throw new Error("Session closed.");\r
2714     }\r
2715 \r
2716 \r
2717 \r
2718     public ResourceImpl getNewResource(long clusterId) {\r
2719         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);\r
2720         int newId;\r
2721         try {\r
2722             newId = cluster.createResource(clusterTranslator);\r
2723         } catch (DatabaseException e) {\r
2724             Logger.defaultLogError(e);\r
2725             return null;\r
2726         }\r
2727         return new ResourceImpl(resourceSupport, newId);\r
2728     }\r
2729 \r
2730     public ResourceImpl getNewResource(Resource clusterSet)\r
2731     throws DatabaseException {\r
2732         long resourceId = clusterSet.getResourceId();\r
2733         Long clusterId = clusterSetsSupport.get(resourceId);\r
2734         if (null == clusterId)\r
2735             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);\r
2736         if (Constants.NewClusterId == clusterId) {\r
2737             clusterId = getService(ClusteringSupport.class).createCluster();\r
2738             if ((serviceMode & SERVICE_MODE_CREATE) > 0)\r
2739                 createdClusters.add(clusterId);\r
2740             clusterSetsSupport.put(resourceId, clusterId);\r
2741             return getNewResource(clusterId);\r
2742         } else {\r
2743             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);\r
2744             ResourceImpl result;\r
2745             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {\r
2746                 clusterId = getService(ClusteringSupport.class).createCluster();\r
2747                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)\r
2748                     createdClusters.add(clusterId);\r
2749                 clusterSetsSupport.put(resourceId, clusterId);\r
2750                 return getNewResource(clusterId);\r
2751             } else {\r
2752                 result = getNewResource(clusterId);\r
2753                 int resultKey = querySupport.getId(result);\r
2754                 long resultCluster = querySupport.getClusterId(resultKey);\r
2755                 if (clusterId != resultCluster)\r
2756                     clusterSetsSupport.put(resourceId, resultCluster);\r
2757                 return result;\r
2758             }\r
2759         }\r
2760     }\r
2761 \r
2762     public void getNewClusterSet(Resource clusterSet)\r
2763     throws DatabaseException {\r
2764         if (DEBUG)\r
2765             System.out.println("new cluster set=" + clusterSet);\r
2766         long resourceId = clusterSet.getResourceId();\r
2767         if (clusterSetsSupport.containsKey(resourceId))\r
2768             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);\r
2769         clusterSetsSupport.put(resourceId, Constants.NewClusterId);\r
2770     }\r
2771     public boolean&nbs