bb2cdb04059fc21a94c311c01b8d6051f496fe95
[simantics/platform.git] / bundles / org.simantics.db.server / src / org / simantics / db / server / internal / SessionI.java
1 package org.simantics.db.server.internal;\r
2 \r
3 import java.nio.ByteBuffer;\r
4 import java.util.ArrayList;\r
5 import java.util.List;\r
6 import java.util.Map.Entry;\r
7 import java.util.concurrent.ConcurrentHashMap;\r
8 \r
9 import org.simantics.db.ClusterCreator;\r
10 import org.simantics.db.Database;\r
11 import org.simantics.db.Database.Session;\r
12 import org.simantics.db.Database.Session.ChangeSetData;\r
13 import org.simantics.db.Database.Session.ChangeSetIds;\r
14 import org.simantics.db.Database.Session.ChangeSetUpdate;\r
15 import org.simantics.db.Database.Session.Cluster;\r
16 import org.simantics.db.Database.Session.ClusterChanges;\r
17 import org.simantics.db.Database.Session.ClusterIds;\r
18 import org.simantics.db.Database.Session.Information;\r
19 import org.simantics.db.Database.Session.OnChangeSetUpdate;\r
20 import org.simantics.db.Database.Session.Refresh;\r
21 import org.simantics.db.Database.Session.ResourceSegment;\r
22 import org.simantics.db.Database.Session.Transaction;\r
23 import org.simantics.db.exception.DatabaseException;\r
24 import org.simantics.db.exception.SDBException;\r
25 import org.simantics.db.server.ProCoreException;\r
26 import org.simantics.db.server.protocol.AAAFunction;\r
27 import org.simantics.db.server.protocol.AcceptCommitFunction;\r
28 import org.simantics.db.server.protocol.AskTransactionFunction;\r
29 import org.simantics.db.server.protocol.CancelCommitFunction;\r
30 import org.simantics.db.server.protocol.Constants;\r
31 import org.simantics.db.server.protocol.EndTransactionFunction;\r
32 import org.simantics.db.server.protocol.ExecuteFunction;\r
33 import org.simantics.db.server.protocol.GetChangeSetContextFunction;\r
34 import org.simantics.db.server.protocol.GetChangeSetDataFunction;\r
35 import org.simantics.db.server.protocol.GetChangeSetsFunction;\r
36 import org.simantics.db.server.protocol.GetClusterChangesFunction;\r
37 import org.simantics.db.server.protocol.GetClusterNewFunction;\r
38 import org.simantics.db.server.protocol.GetRefresh2Function;\r
39 import org.simantics.db.server.protocol.GetResourceSegmentFunction;\r
40 import org.simantics.db.server.protocol.GetServerInfo2Function;\r
41 import org.simantics.db.server.protocol.ListClustersFunction;\r
42 import org.simantics.db.server.protocol.ReconnectFunction;\r
43 import org.simantics.db.server.protocol.ReserveIdsFunction;\r
44 import org.simantics.db.server.protocol.UndoFunction;\r
45 import org.simantics.db.server.protocol.UpdateClusterFunction;\r
46 import org.simantics.db.service.ClusterUID;\r
47 \r
48 public class SessionI implements Session {\r
49     \r
50     protected ClusterDecompressor clusterDecompressor = new ClusterDecompressor();\r
51     \r
52     private final DatabaseI db;\r
53     private Client client;\r
54     SessionI(DatabaseI db, Client client) {\r
55         this.db = db;\r
56         this.client = client;\r
57     }\r
58     DatabaseI getDb() {\r
59         return db;\r
60     }\r
61     Client replace(Client client) throws ProCoreException {\r
62         Client t = this.client;\r
63         this.client = client;\r
64         return t;\r
65     }\r
66     @Override\r
67     public Database getDatabase() {\r
68         return db;\r
69     }\r
70     @Override\r
71     public void close() throws ProCoreException {\r
72         client.close();\r
73     }\r
74     @Override\r
75     public boolean isClosed() throws ProCoreException {\r
76         return client.isClosed();\r
77     }\r
78     @Override\r
79     public void open() throws ProCoreException {\r
80         try {\r
81             client.open();\r
82         } catch (InterruptedException e) {\r
83             throw new ProCoreException("Client.open was interrupted.", e);\r
84         }\r
85     }\r
86     public void callAAA() throws ProCoreException {\r
87         AAAFunction f = new AAAFunction();\r
88         client.call(new Method(f, null, null));\r
89     }\r
90     @Override\r
91     public Transaction askReadTransaction() throws ProCoreException {\r
92         AskTransactionFunction f = new AskTransactionFunction(Constants.readTransaction, Constants.NullTransactionId);\r
93         AskTransactionMethod m = new AskTransactionMethod(f);\r
94         client.call(m);\r
95         return m;\r
96     }\r
97     @Override\r
98     public Transaction askWriteTransaction(long transactionId) throws ProCoreException {\r
99         AskTransactionFunction f = new AskTransactionFunction(Constants.writeTransaction, transactionId);\r
100         AskTransactionMethod m = new AskTransactionMethod(f);\r
101         client.call(m);\r
102         return m;\r
103     }\r
104     @Override\r
105     public long endTransaction(long transactionId) throws ProCoreException {\r
106         EndTransactionFunction f = new EndTransactionFunction(transactionId);\r
107         Method m = new Method(f, null, null);\r
108         client.call(m);\r
109         return f.headChangeSetId;\r
110     }\r
111     @Override\r
112     public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {\r
113         AcceptCommitFunction f = new AcceptCommitFunction(transactionId, changeSetId, metadata);\r
114         client.call(new Method(f, null, null));\r
115     }\r
116     public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {\r
117         CancelCommitFunction f = new CancelCommitFunction(transactionId, changeSetId, metadata);\r
118         client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));\r
119         return f.headChangeSetId;\r
120 }\r
121 //    {\r
122 //        OpenClientSessionFunction f = new OpenClientSessionFunction();\r
123 //        s.call(f);\r
124 //        Util.show("OpenClientSession sid=" + f.sessionId);\r
125 //    }\r
126 //    {\r
127 //        CloseClientSessionFunction f = new CloseClientSessionFunction(1);\r
128 //        s.call(f);\r
129 //        Util.show("CloseClientSession sid=" + f.sessionId);\r
130 //    }\r
131 //    {\r
132 //        EchoFunction f = new EchoFunction(new byte[1]);\r
133 //        s.call(f);\r
134 //        Util.show("Echo bytes.len=" + f.bytes.length);\r
135 //    }\r
136     @Override\r
137     public String execute(String command) throws ProCoreException {\r
138         ExecuteFunction f = new ExecuteFunction(command);\r
139         Method m = new Method(f, null, null);\r
140         client.call(m);\r
141         return f.out;\r
142     }\r
143     @Override\r
144     public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {\r
145         GetChangeSetContextFunction f = new GetChangeSetContextFunction(changeSetId);\r
146         client.call(new Method(f, null, null));\r
147         return f.changeSetContext;\r
148     }\r
149     public ChangeSetData getChangeSetData(long minChangeSetId, long  maxChangeSetId, OnChangeSetUpdate on) throws ProCoreException {\r
150         GetChangeSetDataFunction f = new GetChangeSetDataFunction(minChangeSetId, maxChangeSetId);\r
151         GetChangeSetDataMethod m = new GetChangeSetDataMethod(f, new ChangeSetEventHandler(on));\r
152         client.call(m);\r
153         return m;\r
154     }\r
155     public ChangeSetIds getChangeSetIds() throws ProCoreException {\r
156         GetChangeSetsFunction f = new GetChangeSetsFunction();\r
157         GetChangeSetIdsMethod m = new GetChangeSetIdsMethod(f);\r
158         client.call(m);\r
159         return m;\r
160     }\r
161     public Cluster getCluster(byte[] clusterId) throws ProCoreException {\r
162         GetClusterNewFunction f = new GetClusterNewFunction(clusterId);\r
163         f.deflated = ByteBuffer.allocate(8192);\r
164         GetClusterMethod m = new GetClusterMethod(f);\r
165         client.call(m);\r
166         return m;\r
167     }\r
168     public ClusterIds getClusterIds() throws ProCoreException {\r
169         int all = 0;\r
170         ListClustersFunction f = new ListClustersFunction(all);\r
171         GetClusterIdsMethod m = new GetClusterIdsMethod(f);\r
172         client.call(m);\r
173         return m;\r
174     }\r
175     public Refresh getRefresh(long changeSetId) throws ProCoreException {\r
176         GetRefresh2Function f = new GetRefresh2Function(0);\r
177         GetRefreshMethod m = new GetRefreshMethod(f);\r
178         client.call(m);\r
179         return m;\r
180     }\r
181     public ResourceSegment getResourceSegment(byte[] clusterId, int resourceIndex, long offset, short size) throws ProCoreException {\r
182         GetResourceSegmentFunction f = new GetResourceSegmentFunction(clusterId, resourceIndex, offset, size);\r
183         GetResourceSegmentMethod m = new GetResourceSegmentMethod(f);\r
184         client.call(m);\r
185         return m;\r
186     }\r
187     public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)  throws ProCoreException {\r
188         GetClusterChangesFunction f = new GetClusterChangesFunction(changeSetId, clusterId);\r
189         GetClusterChangesMethod m = new GetClusterChangesMethod(f);\r
190         client.call(m);\r
191         return m;\r
192     }\r
193     public Information getInformation() throws ProCoreException {\r
194         GetServerInfo2Function f = new GetServerInfo2Function();\r
195         GetInformationMethod m = new GetInformationMethod(f);\r
196         client.call(m);\r
197         return m;\r
198     }\r
199     public void reconnect(int sessionId) throws ProCoreException {\r
200         ReconnectFunction f = new ReconnectFunction(sessionId);\r
201         client.call(new Method(f, null, null));\r
202     }\r
203     public long reserveIds(int count) throws ProCoreException {\r
204         ReserveIdsFunction f = new ReserveIdsFunction(count);\r
205         client.call(new Method(f, null, null));\r
206         return f.firstId;\r
207     }\r
208     public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {\r
209         UndoFunction f = new UndoFunction(changeSetIds);\r
210         client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));\r
211         return f.potentialConflicts;\r
212     }\r
213     public void updateCluster(byte[] operations) throws ProCoreException {\r
214         UpdateClusterFunction f = new UpdateClusterFunction(operations);\r
215         client.call(new Method(f, null, null));\r
216     }\r
217     @Override\r
218     public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {\r
219         Cluster t = getCluster(uid.asBytes());\r
220 \r
221         ByteBuffer deflated = t.getDeflated();\r
222         Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);\r
223 \r
224         //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);\r
225         long[] longs = (long[]) arrays[0];\r
226         int[] ints = (int[]) arrays[1];\r
227         byte[] bytes = (byte[]) arrays[2];\r
228         return creator.create(uid, bytes, ints, longs);\r
229 \r
230     }\r
231     @Override\r
232     public boolean refreshEnabled() {\r
233         return true;\r
234     }\r
235 }\r
236 class SessionManager {\r
237     private ConcurrentHashMap<Client, SessionI> sessionMap = new ConcurrentHashMap<Client, SessionI>();\r
238     SessionManager() {\r
239     }\r
240     public List<Client> disconnect(DatabaseI db) throws ProCoreException {\r
241         ArrayList<Client> clients = new ArrayList<Client>();\r
242         for (Entry<Client, SessionI> i : sessionMap.entrySet()) {\r
243             SessionI si = i.getValue();\r
244             if (!db.equals(si.getDb()))\r
245                 continue;\r
246             Client client = i.getKey();\r
247             if (client.isOpen()) {\r
248                 client.close();\r
249                 clients.add(client);\r
250             }\r
251         }\r
252         return clients;\r
253     }\r
254     public void connect(DatabaseI db, List<Client> clients) throws ProCoreException, InterruptedException {\r
255         for (Client client : clients) {\r
256             SessionI si = sessionMap.get(client);\r
257             if (null == si || !db.equals(si.getDb()))\r
258                 continue;\r
259             client.open();\r
260         }\r
261 //        for (Entry<Client, SessionI> i : sessionMap.entrySet()) {\r
262 //            SessionI si = i.getValue();\r
263 //            if (!db.equals(si.getDb()))\r
264 //                continue;\r
265 //            Client client = i.getKey();\r
266 //            client.open();\r
267 //        }\r
268     }\r
269     public SessionI newSession(DatabaseI db) throws ProCoreException {\r
270         Client client = db.newClient();\r
271         SessionI si = new SessionI(db, client);\r
272         sessionMap.put(client,  si);\r
273         return si;\r
274     }\r
275 }\r
276 class SessionEvent {\r
277     private final Session session;\r
278     private final Throwable cause;\r
279     public SessionEvent(Session session, Throwable cause) {\r
280         this.session = session;\r
281         this.cause = cause;\r
282     }\r
283     public Session getSession() {\r
284         return session;\r
285     }\r
286     public Throwable getCause() {\r
287         return cause;\r
288     }\r
289 }\r
290 class AskTransactionMethod extends Method implements Transaction {\r
291     private final AskTransactionFunction function;\r
292     AskTransactionMethod(AskTransactionFunction function) {\r
293         super(function, null, null);\r
294         this.function = function;\r
295     }\r
296     @Override\r
297     public long getHeadChangeSetId() {\r
298         return function.headChangeSetId;\r
299     }\r
300     @Override\r
301     public long getTransactionId() {\r
302         return function.transactionId;\r
303     }\r
304 }\r
305 class GetChangeSetDataMethod extends Method implements ChangeSetData {\r
306     private final GetChangeSetDataFunction function;\r
307     GetChangeSetDataMethod(GetChangeSetDataFunction function, EventHandler evh) {\r
308         super(function, evh, null);\r
309         this.function = function;\r
310     }\r
311     @Override\r
312     public boolean isOk() {\r
313         return !function.failed;\r
314     }\r
315 }\r
316 class GetChangeSetIdsMethod extends Method implements ChangeSetIds {\r
317     private final GetChangeSetsFunction function;\r
318     GetChangeSetIdsMethod(GetChangeSetsFunction function) {\r
319         super(function, null, null);\r
320         this.function = function;\r
321     }\r
322     @Override\r
323     public long getFirstChangeSetId() {\r
324         if (function.changeSetIds.length > 0)\r
325             return function.changeSetIds[0];\r
326         else\r
327             return Constants.NullChangeSetId;\r
328     }\r
329     @Override\r
330     public int getCount() {\r
331         return function.changeSetIds.length;\r
332     }\r
333 }\r
334 class GetClusterMethod extends Method implements Cluster {\r
335     private final GetClusterNewFunction function;\r
336     GetClusterMethod(GetClusterNewFunction function) {\r
337         super(function, null, new GetClusterHandler(function));\r
338         this.function = function;\r
339     }\r
340     @Override\r
341     public int getInflateSize() {\r
342         return function.inflateSize;\r
343     }\r
344     @Override\r
345     public ByteBuffer getDeflated() {\r
346         return function.deflated;\r
347     }\r
348 }\r
349 class GetClusterChangesMethod extends Method implements ClusterChanges {\r
350     private final GetClusterChangesFunction function;\r
351     GetClusterChangesMethod(GetClusterChangesFunction function) {\r
352         super(function, null, null);\r
353         this.function = function;\r
354     }\r
355     @Override\r
356     public long getHeadChangeSetId() {\r
357         return function.headChangeSetId;\r
358     }\r
359     @Override\r
360     public int[] getResourceIndex() {\r
361         return function.resourceIndex;\r
362     }\r
363     @Override\r
364     public int[] getPredicateIndex() {\r
365         return function.predicateIndex;\r
366     }\r
367     @Override\r
368     public long[] getPredicateFirst() {\r
369         return function.predicateFirst;\r
370     }\r
371     @Override\r
372     public long[] getPredicateSecond() {\r
373         return function.predicateSecond;\r
374     }\r
375     @Override\r
376     public int[] getValueIndex() {\r
377         return function.valueIndex;\r
378     }\r
379 }\r
380 class GetClusterIdsMethod extends Method implements ClusterIds {\r
381     private final ListClustersFunction function;\r
382     GetClusterIdsMethod(ListClustersFunction function) {\r
383         super(function, null, null);\r
384         this.function = function;\r
385     }\r
386     @Override\r
387     public int getStatus() {\r
388         return function.status;\r
389     }\r
390     @Override\r
391     public long[] getFirst() {\r
392         return function.first;\r
393     }\r
394     @Override\r
395     public long[] getSecond() {\r
396         return function.second;\r
397     }\r
398 }\r
399 class GetInformationMethod extends Method implements Information {\r
400     private final GetServerInfo2Function function;\r
401     GetInformationMethod(GetServerInfo2Function function) {\r
402         super(function, null, null);\r
403         this.function = function;\r
404     }\r
405     @Override\r
406     public String getServerId() {\r
407         return function.serverId;\r
408     }\r
409     @Override\r
410     public String getProtocolId() {\r
411         return function.protocolId;\r
412     }\r
413     @Override\r
414     public String getDatabaseId() {\r
415         return function.databaseId;\r
416     }\r
417     @Override\r
418     public long getFirstChangeSetId() {\r
419         return function.firstChangeSetId;\r
420     }\r
421 }\r
422 class GetRefreshMethod extends Method implements Refresh {\r
423     private final GetRefresh2Function function;\r
424     GetRefreshMethod(GetRefresh2Function function) {\r
425         super(function, null, null);\r
426         this.function = function;\r
427     }\r
428     @Override\r
429     public long getHeadChangeSetId() {\r
430         return function.headChangeSetId;\r
431     }\r
432     @Override\r
433     public long[] getFirst() {\r
434         return function.first;\r
435     }\r
436     @Override\r
437     public long[] getSecond() {\r
438         return function.second;\r
439     }\r
440 }\r
441 class GetResourceSegmentMethod extends Method implements ResourceSegment {\r
442     private final GetResourceSegmentFunction function;\r
443     GetResourceSegmentMethod(GetResourceSegmentFunction function) {\r
444         super(function, null, null);\r
445         this.function = function;\r
446     }\r
447     @Override\r
448     public byte[] getClusterId() {\r
449         return function.clusterUID;\r
450     }\r
451     @Override\r
452     public int getResourceIndex() {\r
453         return function.resourceIndex;\r
454     }\r
455     @Override\r
456     public long getValueSize() {\r
457         return function.valueSize;\r
458     }\r
459     @Override\r
460     public byte[] getSegment() {\r
461         return function.segment;\r
462     }\r
463     @Override\r
464     public long getOffset() {\r
465         return function.segmentOffset;\r
466     }\r
467 }\r
468 class ChangeSetEventHandler extends EventHandler implements ChangeSetUpdate {\r
469     private final org.simantics.db.server.protocol.ChangeSetUpdateEvent csuEvent = new org.simantics.db.server.protocol.ChangeSetUpdateEvent();\r
470     private final OnChangeSetUpdate on;\r
471     private ProCoreException exception;\r
472     ChangeSetEventHandler(OnChangeSetUpdate on) {\r
473         this.on = on;\r
474     }\r
475     @Override\r
476     void on(Event event) {\r
477         event.deserialize(csuEvent);\r
478         Util.log("ChangeSetEvent cid=" + csuEvent.changeSetId);\r
479         try {\r
480             if (null != on)\r
481                 on.onChangeSetUpdate(this);\r
482         } catch (SDBException e) {\r
483             if (null == exception)\r
484                 exception = new ProCoreException("Execution of ChangeSetUpdate callback failed.", e);\r
485             else\r
486                 Util.logError("Execution of ChangeSetUpdate callback failed(2).", e);\r
487         }\r
488     }\r
489     @Override\r
490     public long getChangeSetId() {\r
491         return csuEvent.changeSetId;\r
492     }\r
493     @Override\r
494     public int getChangeSetIndex() {\r
495         return csuEvent.changeSetIndex;\r
496     }\r
497     @Override\r
498     public int getNumberOfClusterChangeSets() {\r
499         return csuEvent.numberOfClusterChangeSets;\r
500     }\r
501     @Override\r
502     public int getIndexOfClusterChangeSet() {\r
503         return csuEvent.indexOfClusterChangeSet;\r
504     }\r
505     @Override\r
506     public byte[] getClusterId() {\r
507         return csuEvent.clusterUID;\r
508     }\r
509     @Override\r
510     public boolean getNewCluster() {\r
511         return csuEvent.newCluster;\r
512     }\r
513     @Override\r
514     public byte[] getData() {\r
515         return csuEvent.data;\r
516     }\r
517 }\r