]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.server/src/org/simantics/db/server/internal/SessionI.java
DB-client fixes
[simantics/platform.git] / bundles / org.simantics.db.server / src / org / simantics / db / server / internal / SessionI.java
1 package org.simantics.db.server.internal;\r
2 \r
3 import java.nio.ByteBuffer;\r
4 import java.util.ArrayList;\r
5 import java.util.List;\r
6 import java.util.Map.Entry;\r
7 import java.util.concurrent.ConcurrentHashMap;\r
8 \r
9 import org.simantics.db.ClusterCreator;\r
10 import org.simantics.db.Database;\r
11 import org.simantics.db.Database.Session;\r
12 import org.simantics.db.Database.Session.ChangeSetData;\r
13 import org.simantics.db.Database.Session.ChangeSetIds;\r
14 import org.simantics.db.Database.Session.ChangeSetUpdate;\r
15 import org.simantics.db.Database.Session.Cluster;\r
16 import org.simantics.db.Database.Session.ClusterChanges;\r
17 import org.simantics.db.Database.Session.ClusterIds;\r
18 import org.simantics.db.Database.Session.Information;\r
19 import org.simantics.db.Database.Session.OnChangeSetUpdate;\r
20 import org.simantics.db.Database.Session.Refresh;\r
21 import org.simantics.db.Database.Session.ResourceSegment;\r
22 import org.simantics.db.Database.Session.Transaction;\r
23 import org.simantics.db.exception.DatabaseException;\r
24 import org.simantics.db.exception.SDBException;\r
25 import org.simantics.db.server.ProCoreException;\r
26 import org.simantics.db.server.protocol.AAAFunction;\r
27 import org.simantics.db.server.protocol.AcceptCommitFunction;\r
28 import org.simantics.db.server.protocol.AskTransactionFunction;\r
29 import org.simantics.db.server.protocol.CancelCommitFunction;\r
30 import org.simantics.db.server.protocol.Constants;\r
31 import org.simantics.db.server.protocol.EndTransactionFunction;\r
32 import org.simantics.db.server.protocol.ExecuteFunction;\r
33 import org.simantics.db.server.protocol.GetChangeSetContextFunction;\r
34 import org.simantics.db.server.protocol.GetChangeSetDataFunction;\r
35 import org.simantics.db.server.protocol.GetChangeSetsFunction;\r
36 import org.simantics.db.server.protocol.GetClusterChangesFunction;\r
37 import org.simantics.db.server.protocol.GetClusterNewFunction;\r
38 import org.simantics.db.server.protocol.GetRefresh2Function;\r
39 import org.simantics.db.server.protocol.GetResourceSegmentFunction;\r
40 import org.simantics.db.server.protocol.GetServerInfo2Function;\r
41 import org.simantics.db.server.protocol.ListClustersFunction;\r
42 import org.simantics.db.server.protocol.ReconnectFunction;\r
43 import org.simantics.db.server.protocol.ReserveIdsFunction;\r
44 import org.simantics.db.server.protocol.UndoFunction;\r
45 import org.simantics.db.server.protocol.UpdateClusterFunction;\r
46 import org.simantics.db.service.ClusterUID;\r
47 \r
48 public class SessionI implements Session {\r
49     \r
50     protected ClusterDecompressor clusterDecompressor = new ClusterDecompressor();\r
51     \r
52     private final DatabaseI db;\r
53     private Client client;\r
54     SessionI(DatabaseI db, Client client) {\r
55         this.db = db;\r
56         this.client = client;\r
57     }\r
58     DatabaseI getDb() {\r
59         return db;\r
60     }\r
61     Client replace(Client client) throws ProCoreException {\r
62         Client t = this.client;\r
63         this.client = client;\r
64         return t;\r
65     }\r
66     @Override\r
67     public Database getDatabase() {\r
68         return db;\r
69     }\r
70     @Override\r
71     public void close() throws ProCoreException {\r
72         client.close();\r
73     }\r
74     @Override\r
75     public boolean isClosed() throws ProCoreException {\r
76         return client.isClosed();\r
77     }\r
78     @Override\r
79     public void open() throws ProCoreException {\r
80         try {\r
81             client.open();\r
82         } catch (InterruptedException e) {\r
83             throw new ProCoreException("Client.open was interrupted.", e);\r
84         }\r
85     }\r
86     public void callAAA() throws ProCoreException {\r
87         AAAFunction f = new AAAFunction();\r
88         client.call(new Method(f, null, null));\r
89     }\r
90     @Override\r
91     public Transaction askReadTransaction() throws ProCoreException {\r
92         AskTransactionFunction f = new AskTransactionFunction(Constants.readTransaction, Constants.NullTransactionId);\r
93         AskTransactionMethod m = new AskTransactionMethod(f);\r
94         client.call(m);\r
95         return m;\r
96     }\r
97     @Override\r
98     public Transaction askWriteTransaction(long transactionId) throws ProCoreException {\r
99         AskTransactionFunction f = new AskTransactionFunction(Constants.writeTransaction, transactionId);\r
100         AskTransactionMethod m = new AskTransactionMethod(f);\r
101         client.call(m);\r
102         return m;\r
103     }\r
104     @Override\r
105     public long endTransaction(long transactionId) throws ProCoreException {\r
106         EndTransactionFunction f = new EndTransactionFunction(transactionId);\r
107         Method m = new Method(f, null, null);\r
108         client.call(m);\r
109         return f.headChangeSetId;\r
110     }\r
111     @Override\r
112     public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {\r
113         AcceptCommitFunction f = new AcceptCommitFunction(transactionId, changeSetId, metadata);\r
114         client.call(new Method(f, null, null));\r
115     }\r
116     public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {\r
117         CancelCommitFunction f = new CancelCommitFunction(transactionId, changeSetId, metadata);\r
118         client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));\r
119         return f.headChangeSetId;\r
120 }\r
121 //    {\r
122 //        OpenClientSessionFunction f = new OpenClientSessionFunction();\r
123 //        s.call(f);\r
124 //        Util.show("OpenClientSession sid=" + f.sessionId);\r
125 //    }\r
126 //    {\r
127 //        CloseClientSessionFunction f = new CloseClientSessionFunction(1);\r
128 //        s.call(f);\r
129 //        Util.show("CloseClientSession sid=" + f.sessionId);\r
130 //    }\r
131 //    {\r
132 //        EchoFunction f = new EchoFunction(new byte[1]);\r
133 //        s.call(f);\r
134 //        Util.show("Echo bytes.len=" + f.bytes.length);\r
135 //    }\r
136     @Override\r
137     public String execute(String command) throws ProCoreException {\r
138         ExecuteFunction f = new ExecuteFunction(command);\r
139         Method m = new Method(f, null, null);\r
140         client.call(m);\r
141         return f.out;\r
142     }\r
143     @Override\r
144     public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {\r
145         GetChangeSetContextFunction f = new GetChangeSetContextFunction(changeSetId);\r
146         client.call(new Method(f, null, null));\r
147         return f.changeSetContext;\r
148     }\r
149     public ChangeSetData getChangeSetData(long minChangeSetId, long  maxChangeSetId, OnChangeSetUpdate on) throws ProCoreException {\r
150         GetChangeSetDataFunction f = new GetChangeSetDataFunction(minChangeSetId, maxChangeSetId);\r
151         GetChangeSetDataMethod m = new GetChangeSetDataMethod(f, new ChangeSetEventHandler(on));\r
152         client.call(m);\r
153         return m;\r
154     }\r
155     public ChangeSetIds getChangeSetIds() throws ProCoreException {\r
156         GetChangeSetsFunction f = new GetChangeSetsFunction();\r
157         GetChangeSetIdsMethod m = new GetChangeSetIdsMethod(f);\r
158         client.call(m);\r
159         return m;\r
160     }\r
161     public Cluster getCluster(byte[] clusterId) throws ProCoreException {\r
162         GetClusterNewFunction f = new GetClusterNewFunction(clusterId);\r
163         f.deflated = ByteBuffer.allocate(8192);\r
164         GetClusterMethod m = new GetClusterMethod(f);\r
165         client.call(m);\r
166         return m;\r
167     }\r
168     public ClusterIds getClusterIds() throws ProCoreException {\r
169         int all = 0;\r
170         ListClustersFunction f = new ListClustersFunction(all);\r
171         GetClusterIdsMethod m = new GetClusterIdsMethod(f);\r
172         client.call(m);\r
173         return m;\r
174     }\r
175     public Refresh getRefresh(long changeSetId) throws ProCoreException {\r
176         GetRefresh2Function f = new GetRefresh2Function(0);\r
177         GetRefreshMethod m = new GetRefreshMethod(f);\r
178         client.call(m);\r
179         return m;\r
180     }\r
181     public ResourceSegment getResourceSegment(byte[] clusterId, int resourceIndex, long offset, short size) throws ProCoreException {\r
182         GetResourceSegmentFunction f = new GetResourceSegmentFunction(clusterId, resourceIndex, offset, size);\r
183         GetResourceSegmentMethod m = new GetResourceSegmentMethod(f);\r
184         client.call(m);\r
185         return m;\r
186     }\r
187     public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)  throws ProCoreException {\r
188         GetClusterChangesFunction f = new GetClusterChangesFunction(changeSetId, clusterId);\r
189         GetClusterChangesMethod m = new GetClusterChangesMethod(f);\r
190         client.call(m);\r
191         return m;\r
192     }\r
193     public Information getInformation() throws ProCoreException {\r
194         GetServerInfo2Function f = new GetServerInfo2Function();\r
195         GetInformationMethod m = new GetInformationMethod(f);\r
196         client.call(m);\r
197         return m;\r
198     }\r
199     public void reconnect(int sessionId) throws ProCoreException {\r
200         ReconnectFunction f = new ReconnectFunction(sessionId);\r
201         client.call(new Method(f, null, null));\r
202     }\r
203     public long reserveIds(int count) throws ProCoreException {\r
204         ReserveIdsFunction f = new ReserveIdsFunction(count);\r
205         client.call(new Method(f, null, null));\r
206         return f.firstId;\r
207     }\r
208     public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {\r
209         UndoFunction f = new UndoFunction(changeSetIds);\r
210         client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));\r
211         return f.potentialConflicts;\r
212     }\r
213     public void updateCluster(byte[] operations) throws ProCoreException {\r
214         UpdateClusterFunction f = new UpdateClusterFunction(operations);\r
215         client.call(new Method(f, null, null));\r
216     }\r
217     @Override\r
218     public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {\r
219         Cluster t = getCluster(uid.asBytes());\r
220 \r
221         ByteBuffer deflated = t.getDeflated();\r
222         Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);\r
223 \r
224         //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);\r
225         long[] longs = (long[]) arrays[0];\r
226         int[] ints = (int[]) arrays[1];\r
227         byte[] bytes = (byte[]) arrays[2];\r
228         return creator.create(uid, bytes, ints, longs);\r
229 \r
230     }\r
231     @Override\r
232     public boolean refreshEnabled() {\r
233         return true;\r
234     }\r
235     @Override\r
236     public boolean rolledback() {\r
237         // TODO Auto-generated method stub\r
238         return false;\r
239     }\r
240 }\r
241 class SessionManager {\r
242     private ConcurrentHashMap<Client, SessionI> sessionMap = new ConcurrentHashMap<Client, SessionI>();\r
243     SessionManager() {\r
244     }\r
245     public List<Client> disconnect(DatabaseI db) throws ProCoreException {\r
246         ArrayList<Client> clients = new ArrayList<Client>();\r
247         for (Entry<Client, SessionI> i : sessionMap.entrySet()) {\r
248             SessionI si = i.getValue();\r
249             if (!db.equals(si.getDb()))\r
250                 continue;\r
251             Client client = i.getKey();\r
252             if (client.isOpen()) {\r
253                 client.close();\r
254                 clients.add(client);\r
255             }\r
256         }\r
257         return clients;\r
258     }\r
259     public void connect(DatabaseI db, List<Client> clients) throws ProCoreException, InterruptedException {\r
260         for (Client client : clients) {\r
261             SessionI si = sessionMap.get(client);\r
262             if (null == si || !db.equals(si.getDb()))\r
263                 continue;\r
264             client.open();\r
265         }\r
266 //        for (Entry<Client, SessionI> i : sessionMap.entrySet()) {\r
267 //            SessionI si = i.getValue();\r
268 //            if (!db.equals(si.getDb()))\r
269 //                continue;\r
270 //            Client client = i.getKey();\r
271 //            client.open();\r
272 //        }\r
273     }\r
274     public SessionI newSession(DatabaseI db) throws ProCoreException {\r
275         Client client = db.newClient();\r
276         SessionI si = new SessionI(db, client);\r
277         sessionMap.put(client,  si);\r
278         return si;\r
279     }\r
280 }\r
281 class SessionEvent {\r
282     private final Session session;\r
283     private final Throwable cause;\r
284     public SessionEvent(Session session, Throwable cause) {\r
285         this.session = session;\r
286         this.cause = cause;\r
287     }\r
288     public Session getSession() {\r
289         return session;\r
290     }\r
291     public Throwable getCause() {\r
292         return cause;\r
293     }\r
294 }\r
295 class AskTransactionMethod extends Method implements Transaction {\r
296     private final AskTransactionFunction function;\r
297     AskTransactionMethod(AskTransactionFunction function) {\r
298         super(function, null, null);\r
299         this.function = function;\r
300     }\r
301     @Override\r
302     public long getHeadChangeSetId() {\r
303         return function.headChangeSetId;\r
304     }\r
305     @Override\r
306     public long getTransactionId() {\r
307         return function.transactionId;\r
308     }\r
309 }\r
310 class GetChangeSetDataMethod extends Method implements ChangeSetData {\r
311     private final GetChangeSetDataFunction function;\r
312     GetChangeSetDataMethod(GetChangeSetDataFunction function, EventHandler evh) {\r
313         super(function, evh, null);\r
314         this.function = function;\r
315     }\r
316     @Override\r
317     public boolean isOk() {\r
318         return !function.failed;\r
319     }\r
320 }\r
321 class GetChangeSetIdsMethod extends Method implements ChangeSetIds {\r
322     private final GetChangeSetsFunction function;\r
323     GetChangeSetIdsMethod(GetChangeSetsFunction function) {\r
324         super(function, null, null);\r
325         this.function = function;\r
326     }\r
327     @Override\r
328     public long getFirstChangeSetId() {\r
329         if (function.changeSetIds.length > 0)\r
330             return function.changeSetIds[0];\r
331         else\r
332             return Constants.NullChangeSetId;\r
333     }\r
334     @Override\r
335     public int getCount() {\r
336         return function.changeSetIds.length;\r
337     }\r
338 }\r
339 class GetClusterMethod extends Method implements Cluster {\r
340     private final GetClusterNewFunction function;\r
341     GetClusterMethod(GetClusterNewFunction function) {\r
342         super(function, null, new GetClusterHandler(function));\r
343         this.function = function;\r
344     }\r
345     @Override\r
346     public int getInflateSize() {\r
347         return function.inflateSize;\r
348     }\r
349     @Override\r
350     public ByteBuffer getDeflated() {\r
351         return function.deflated;\r
352     }\r
353 }\r
354 class GetClusterChangesMethod extends Method implements ClusterChanges {\r
355     private final GetClusterChangesFunction function;\r
356     GetClusterChangesMethod(GetClusterChangesFunction function) {\r
357         super(function, null, null);\r
358         this.function = function;\r
359     }\r
360     @Override\r
361     public long getHeadChangeSetId() {\r
362         return function.headChangeSetId;\r
363     }\r
364     @Override\r
365     public int[] getResourceIndex() {\r
366         return function.resourceIndex;\r
367     }\r
368     @Override\r
369     public int[] getPredicateIndex() {\r
370         return function.predicateIndex;\r
371     }\r
372     @Override\r
373     public long[] getPredicateFirst() {\r
374         return function.predicateFirst;\r
375     }\r
376     @Override\r
377     public long[] getPredicateSecond() {\r
378         return function.predicateSecond;\r
379     }\r
380     @Override\r
381     public int[] getValueIndex() {\r
382         return function.valueIndex;\r
383     }\r
384 }\r
385 class GetClusterIdsMethod extends Method implements ClusterIds {\r
386     private final ListClustersFunction function;\r
387     GetClusterIdsMethod(ListClustersFunction function) {\r
388         super(function, null, null);\r
389         this.function = function;\r
390     }\r
391     @Override\r
392     public int getStatus() {\r
393         return function.status;\r
394     }\r
395     @Override\r
396     public long[] getFirst() {\r
397         return function.first;\r
398     }\r
399     @Override\r
400     public long[] getSecond() {\r
401         return function.second;\r
402     }\r
403 }\r
404 class GetInformationMethod extends Method implements Information {\r
405     private final GetServerInfo2Function function;\r
406     GetInformationMethod(GetServerInfo2Function function) {\r
407         super(function, null, null);\r
408         this.function = function;\r
409     }\r
410     @Override\r
411     public String getServerId() {\r
412         return function.serverId;\r
413     }\r
414     @Override\r
415     public String getProtocolId() {\r
416         return function.protocolId;\r
417     }\r
418     @Override\r
419     public String getDatabaseId() {\r
420         return function.databaseId;\r
421     }\r
422     @Override\r
423     public long getFirstChangeSetId() {\r
424         return function.firstChangeSetId;\r
425     }\r
426 }\r
427 class GetRefreshMethod extends Method implements Refresh {\r
428     private final GetRefresh2Function function;\r
429     GetRefreshMethod(GetRefresh2Function function) {\r
430         super(function, null, null);\r
431         this.function = function;\r
432     }\r
433     @Override\r
434     public long getHeadChangeSetId() {\r
435         return function.headChangeSetId;\r
436     }\r
437     @Override\r
438     public long[] getFirst() {\r
439         return function.first;\r
440     }\r
441     @Override\r
442     public long[] getSecond() {\r
443         return function.second;\r
444     }\r
445 }\r
446 class GetResourceSegmentMethod extends Method implements ResourceSegment {\r
447     private final GetResourceSegmentFunction function;\r
448     GetResourceSegmentMethod(GetResourceSegmentFunction function) {\r
449         super(function, null, null);\r
450         this.function = function;\r
451     }\r
452     @Override\r
453     public byte[] getClusterId() {\r
454         return function.clusterUID;\r
455     }\r
456     @Override\r
457     public int getResourceIndex() {\r
458         return function.resourceIndex;\r
459     }\r
460     @Override\r
461     public long getValueSize() {\r
462         return function.valueSize;\r
463     }\r
464     @Override\r
465     public byte[] getSegment() {\r
466         return function.segment;\r
467     }\r
468     @Override\r
469     public long getOffset() {\r
470         return function.segmentOffset;\r
471     }\r
472 }\r
473 class ChangeSetEventHandler extends EventHandler implements ChangeSetUpdate {\r
474     private final org.simantics.db.server.protocol.ChangeSetUpdateEvent csuEvent = new org.simantics.db.server.protocol.ChangeSetUpdateEvent();\r
475     private final OnChangeSetUpdate on;\r
476     private ProCoreException exception;\r
477     ChangeSetEventHandler(OnChangeSetUpdate on) {\r
478         this.on = on;\r
479     }\r
480     @Override\r
481     void on(Event event) {\r
482         event.deserialize(csuEvent);\r
483         Util.log("ChangeSetEvent cid=" + csuEvent.changeSetId);\r
484         try {\r
485             if (null != on)\r
486                 on.onChangeSetUpdate(this);\r
487         } catch (SDBException e) {\r
488             if (null == exception)\r
489                 exception = new ProCoreException("Execution of ChangeSetUpdate callback failed.", e);\r
490             else\r
491                 Util.logError("Execution of ChangeSetUpdate callback failed(2).", e);\r
492         }\r
493     }\r
494     @Override\r
495     public long getChangeSetId() {\r
496         return csuEvent.changeSetId;\r
497     }\r
498     @Override\r
499     public int getChangeSetIndex() {\r
500         return csuEvent.changeSetIndex;\r
501     }\r
502     @Override\r
503     public int getNumberOfClusterChangeSets() {\r
504         return csuEvent.numberOfClusterChangeSets;\r
505     }\r
506     @Override\r
507     public int getIndexOfClusterChangeSet() {\r
508         return csuEvent.indexOfClusterChangeSet;\r
509     }\r
510     @Override\r
511     public byte[] getClusterId() {\r
512         return csuEvent.clusterUID;\r
513     }\r
514     @Override\r
515     public boolean getNewCluster() {\r
516         return csuEvent.newCluster;\r
517     }\r
518     @Override\r
519     public byte[] getData() {\r
520         return csuEvent.data;\r
521     }\r
522 }\r