]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.server/src/org/simantics/db/server/internal/SessionI.java
Fixed multiple issues causing dangling references to discarded queries
[simantics/platform.git] / bundles / org.simantics.db.server / src / org / simantics / db / server / internal / SessionI.java
1 package org.simantics.db.server.internal;
2
3 import java.nio.ByteBuffer;
4 import java.util.ArrayList;
5 import java.util.List;
6 import java.util.Map.Entry;
7 import java.util.concurrent.ConcurrentHashMap;
8
9 import org.simantics.db.ClusterCreator;
10 import org.simantics.db.Database;
11 import org.simantics.db.Database.Session;
12 import org.simantics.db.Database.Session.ChangeSetData;
13 import org.simantics.db.Database.Session.ChangeSetIds;
14 import org.simantics.db.Database.Session.ChangeSetUpdate;
15 import org.simantics.db.Database.Session.Cluster;
16 import org.simantics.db.Database.Session.ClusterChanges;
17 import org.simantics.db.Database.Session.ClusterIds;
18 import org.simantics.db.Database.Session.Information;
19 import org.simantics.db.Database.Session.OnChangeSetUpdate;
20 import org.simantics.db.Database.Session.Refresh;
21 import org.simantics.db.Database.Session.ResourceSegment;
22 import org.simantics.db.Database.Session.Transaction;
23 import org.simantics.db.exception.DatabaseException;
24 import org.simantics.db.exception.SDBException;
25 import org.simantics.db.server.ProCoreException;
26 import org.simantics.db.server.protocol.AAAFunction;
27 import org.simantics.db.server.protocol.AcceptCommitFunction;
28 import org.simantics.db.server.protocol.AskTransactionFunction;
29 import org.simantics.db.server.protocol.CancelCommitFunction;
30 import org.simantics.db.server.protocol.Constants;
31 import org.simantics.db.server.protocol.EndTransactionFunction;
32 import org.simantics.db.server.protocol.ExecuteFunction;
33 import org.simantics.db.server.protocol.GetChangeSetContextFunction;
34 import org.simantics.db.server.protocol.GetChangeSetDataFunction;
35 import org.simantics.db.server.protocol.GetChangeSetsFunction;
36 import org.simantics.db.server.protocol.GetClusterChangesFunction;
37 import org.simantics.db.server.protocol.GetClusterNewFunction;
38 import org.simantics.db.server.protocol.GetRefresh2Function;
39 import org.simantics.db.server.protocol.GetResourceSegmentFunction;
40 import org.simantics.db.server.protocol.GetServerInfo2Function;
41 import org.simantics.db.server.protocol.ListClustersFunction;
42 import org.simantics.db.server.protocol.ReconnectFunction;
43 import org.simantics.db.server.protocol.ReserveIdsFunction;
44 import org.simantics.db.server.protocol.UndoFunction;
45 import org.simantics.db.server.protocol.UpdateClusterFunction;
46 import org.simantics.db.service.ClusterUID;
47
48 public class SessionI implements Session {
49     
50     protected ClusterDecompressor clusterDecompressor = new ClusterDecompressor();
51     
52     private final DatabaseI db;
53     private Client client;
54     SessionI(DatabaseI db, Client client) {
55         this.db = db;
56         this.client = client;
57     }
58     DatabaseI getDb() {
59         return db;
60     }
61     Client replace(Client client) throws ProCoreException {
62         Client t = this.client;
63         this.client = client;
64         return t;
65     }
66     @Override
67     public Database getDatabase() {
68         return db;
69     }
70     @Override
71     public void close() throws ProCoreException {
72         client.close();
73     }
74     @Override
75     public boolean isClosed() throws ProCoreException {
76         return client.isClosed();
77     }
78     @Override
79     public void open() throws ProCoreException {
80         try {
81             client.open();
82         } catch (InterruptedException e) {
83             throw new ProCoreException("Client.open was interrupted.", e);
84         }
85     }
86     public void callAAA() throws ProCoreException {
87         AAAFunction f = new AAAFunction();
88         client.call(new Method(f, null, null));
89     }
90     @Override
91     public Transaction askReadTransaction() throws ProCoreException {
92         AskTransactionFunction f = new AskTransactionFunction(Constants.readTransaction, Constants.NullTransactionId);
93         AskTransactionMethod m = new AskTransactionMethod(f);
94         client.call(m);
95         return m;
96     }
97     @Override
98     public Transaction askWriteTransaction(long transactionId) throws ProCoreException {
99         AskTransactionFunction f = new AskTransactionFunction(Constants.writeTransaction, transactionId);
100         AskTransactionMethod m = new AskTransactionMethod(f);
101         client.call(m);
102         return m;
103     }
104     @Override
105     public long endTransaction(long transactionId) throws ProCoreException {
106         EndTransactionFunction f = new EndTransactionFunction(transactionId);
107         Method m = new Method(f, null, null);
108         client.call(m);
109         return f.headChangeSetId;
110     }
111     @Override
112     public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
113         AcceptCommitFunction f = new AcceptCommitFunction(transactionId, changeSetId, metadata);
114         client.call(new Method(f, null, null));
115     }
116     public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
117         CancelCommitFunction f = new CancelCommitFunction(transactionId, changeSetId, metadata);
118         client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));
119         return f.headChangeSetId;
120 }
121 //    {
122 //        OpenClientSessionFunction f = new OpenClientSessionFunction();
123 //        s.call(f);
124 //        Util.show("OpenClientSession sid=" + f.sessionId);
125 //    }
126 //    {
127 //        CloseClientSessionFunction f = new CloseClientSessionFunction(1);
128 //        s.call(f);
129 //        Util.show("CloseClientSession sid=" + f.sessionId);
130 //    }
131 //    {
132 //        EchoFunction f = new EchoFunction(new byte[1]);
133 //        s.call(f);
134 //        Util.show("Echo bytes.len=" + f.bytes.length);
135 //    }
136     @Override
137     public String execute(String command) throws ProCoreException {
138         ExecuteFunction f = new ExecuteFunction(command);
139         Method m = new Method(f, null, null);
140         client.call(m);
141         return f.out;
142     }
143     @Override
144     public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
145         GetChangeSetContextFunction f = new GetChangeSetContextFunction(changeSetId);
146         client.call(new Method(f, null, null));
147         return f.changeSetContext;
148     }
149     public ChangeSetData getChangeSetData(long minChangeSetId, long  maxChangeSetId, OnChangeSetUpdate on) throws ProCoreException {
150         GetChangeSetDataFunction f = new GetChangeSetDataFunction(minChangeSetId, maxChangeSetId);
151         GetChangeSetDataMethod m = new GetChangeSetDataMethod(f, new ChangeSetEventHandler(on));
152         client.call(m);
153         return m;
154     }
155     public ChangeSetIds getChangeSetIds() throws ProCoreException {
156         GetChangeSetsFunction f = new GetChangeSetsFunction();
157         GetChangeSetIdsMethod m = new GetChangeSetIdsMethod(f);
158         client.call(m);
159         return m;
160     }
161     public Cluster getCluster(byte[] clusterId) throws ProCoreException {
162         GetClusterNewFunction f = new GetClusterNewFunction(clusterId);
163         f.deflated = ByteBuffer.allocate(8192);
164         GetClusterMethod m = new GetClusterMethod(f);
165         client.call(m);
166         return m;
167     }
168     public ClusterIds getClusterIds() throws ProCoreException {
169         int all = 0;
170         ListClustersFunction f = new ListClustersFunction(all);
171         GetClusterIdsMethod m = new GetClusterIdsMethod(f);
172         client.call(m);
173         return m;
174     }
175     public Refresh getRefresh(long changeSetId) throws ProCoreException {
176         GetRefresh2Function f = new GetRefresh2Function(0);
177         GetRefreshMethod m = new GetRefreshMethod(f);
178         client.call(m);
179         return m;
180     }
181     public ResourceSegment getResourceSegment(byte[] clusterId, int resourceIndex, long offset, short size) throws ProCoreException {
182         GetResourceSegmentFunction f = new GetResourceSegmentFunction(clusterId, resourceIndex, offset, size);
183         GetResourceSegmentMethod m = new GetResourceSegmentMethod(f);
184         client.call(m);
185         return m;
186     }
187     public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)  throws ProCoreException {
188         GetClusterChangesFunction f = new GetClusterChangesFunction(changeSetId, clusterId);
189         GetClusterChangesMethod m = new GetClusterChangesMethod(f);
190         client.call(m);
191         return m;
192     }
193     public Information getInformation() throws ProCoreException {
194         GetServerInfo2Function f = new GetServerInfo2Function();
195         GetInformationMethod m = new GetInformationMethod(f);
196         client.call(m);
197         return m;
198     }
199     public void reconnect(int sessionId) throws ProCoreException {
200         ReconnectFunction f = new ReconnectFunction(sessionId);
201         client.call(new Method(f, null, null));
202     }
203     public long reserveIds(int count) throws ProCoreException {
204         ReserveIdsFunction f = new ReserveIdsFunction(count);
205         client.call(new Method(f, null, null));
206         return f.firstId;
207     }
208     public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
209         UndoFunction f = new UndoFunction(changeSetIds);
210         client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));
211         return f.potentialConflicts;
212     }
213     public void updateCluster(byte[] operations) throws ProCoreException {
214         UpdateClusterFunction f = new UpdateClusterFunction(operations);
215         client.call(new Method(f, null, null));
216     }
217     @Override
218     public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
219         Cluster t = getCluster(uid.asBytes());
220
221         ByteBuffer deflated = t.getDeflated();
222         Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);
223
224         //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);
225         long[] longs = (long[]) arrays[0];
226         int[] ints = (int[]) arrays[1];
227         byte[] bytes = (byte[]) arrays[2];
228         return creator.create(uid, bytes, ints, longs);
229
230     }
231     @Override
232     public boolean refreshEnabled() {
233         return true;
234     }
235     @Override
236     public boolean rolledback() {
237         return false;
238     }
239 }
240 class SessionManager {
241     private ConcurrentHashMap<Client, SessionI> sessionMap = new ConcurrentHashMap<Client, SessionI>();
242     SessionManager() {
243     }
244     public List<Client> disconnect(DatabaseI db) throws ProCoreException {
245         ArrayList<Client> clients = new ArrayList<Client>();
246         for (Entry<Client, SessionI> i : sessionMap.entrySet()) {
247             SessionI si = i.getValue();
248             if (!db.equals(si.getDb()))
249                 continue;
250             Client client = i.getKey();
251             if (client.isOpen()) {
252                 client.close();
253                 clients.add(client);
254             }
255         }
256         return clients;
257     }
258     public void connect(DatabaseI db, List<Client> clients) throws ProCoreException, InterruptedException {
259         for (Client client : clients) {
260             SessionI si = sessionMap.get(client);
261             if (null == si || !db.equals(si.getDb()))
262                 continue;
263             client.open();
264         }
265 //        for (Entry<Client, SessionI> i : sessionMap.entrySet()) {
266 //            SessionI si = i.getValue();
267 //            if (!db.equals(si.getDb()))
268 //                continue;
269 //            Client client = i.getKey();
270 //            client.open();
271 //        }
272     }
273     public SessionI newSession(DatabaseI db) throws ProCoreException {
274         Client client = db.newClient();
275         SessionI si = new SessionI(db, client);
276         sessionMap.put(client,  si);
277         return si;
278     }
279 }
280 class SessionEvent {
281     private final Session session;
282     private final Throwable cause;
283     public SessionEvent(Session session, Throwable cause) {
284         this.session = session;
285         this.cause = cause;
286     }
287     public Session getSession() {
288         return session;
289     }
290     public Throwable getCause() {
291         return cause;
292     }
293 }
294 class AskTransactionMethod extends Method implements Transaction {
295     private final AskTransactionFunction function;
296     AskTransactionMethod(AskTransactionFunction function) {
297         super(function, null, null);
298         this.function = function;
299     }
300     @Override
301     public long getHeadChangeSetId() {
302         return function.headChangeSetId;
303     }
304     @Override
305     public long getTransactionId() {
306         return function.transactionId;
307     }
308 }
309 class GetChangeSetDataMethod extends Method implements ChangeSetData {
310     private final GetChangeSetDataFunction function;
311     GetChangeSetDataMethod(GetChangeSetDataFunction function, EventHandler evh) {
312         super(function, evh, null);
313         this.function = function;
314     }
315     @Override
316     public boolean isOk() {
317         return !function.failed;
318     }
319 }
320 class GetChangeSetIdsMethod extends Method implements ChangeSetIds {
321     private final GetChangeSetsFunction function;
322     GetChangeSetIdsMethod(GetChangeSetsFunction function) {
323         super(function, null, null);
324         this.function = function;
325     }
326     @Override
327     public long getFirstChangeSetId() {
328         if (function.changeSetIds.length > 0)
329             return function.changeSetIds[0];
330         else
331             return Constants.NullChangeSetId;
332     }
333     @Override
334     public int getCount() {
335         return function.changeSetIds.length;
336     }
337 }
338 class GetClusterMethod extends Method implements Cluster {
339     private final GetClusterNewFunction function;
340     GetClusterMethod(GetClusterNewFunction function) {
341         super(function, null, new GetClusterHandler(function));
342         this.function = function;
343     }
344     @Override
345     public int getInflateSize() {
346         return function.inflateSize;
347     }
348     @Override
349     public ByteBuffer getDeflated() {
350         return function.deflated;
351     }
352 }
353 class GetClusterChangesMethod extends Method implements ClusterChanges {
354     private final GetClusterChangesFunction function;
355     GetClusterChangesMethod(GetClusterChangesFunction function) {
356         super(function, null, null);
357         this.function = function;
358     }
359     @Override
360     public long getHeadChangeSetId() {
361         return function.headChangeSetId;
362     }
363     @Override
364     public int[] getResourceIndex() {
365         return function.resourceIndex;
366     }
367     @Override
368     public int[] getPredicateIndex() {
369         return function.predicateIndex;
370     }
371     @Override
372     public long[] getPredicateFirst() {
373         return function.predicateFirst;
374     }
375     @Override
376     public long[] getPredicateSecond() {
377         return function.predicateSecond;
378     }
379     @Override
380     public int[] getValueIndex() {
381         return function.valueIndex;
382     }
383 }
384 class GetClusterIdsMethod extends Method implements ClusterIds {
385     private final ListClustersFunction function;
386     GetClusterIdsMethod(ListClustersFunction function) {
387         super(function, null, null);
388         this.function = function;
389     }
390     @Override
391     public int getStatus() {
392         return function.status;
393     }
394     @Override
395     public long[] getFirst() {
396         return function.first;
397     }
398     @Override
399     public long[] getSecond() {
400         return function.second;
401     }
402 }
403 class GetInformationMethod extends Method implements Information {
404     private final GetServerInfo2Function function;
405     GetInformationMethod(GetServerInfo2Function function) {
406         super(function, null, null);
407         this.function = function;
408     }
409     @Override
410     public String getServerId() {
411         return function.serverId;
412     }
413     @Override
414     public String getProtocolId() {
415         return function.protocolId;
416     }
417     @Override
418     public String getDatabaseId() {
419         return function.databaseId;
420     }
421     @Override
422     public long getFirstChangeSetId() {
423         return function.firstChangeSetId;
424     }
425 }
426 class GetRefreshMethod extends Method implements Refresh {
427     private final GetRefresh2Function function;
428     GetRefreshMethod(GetRefresh2Function function) {
429         super(function, null, null);
430         this.function = function;
431     }
432     @Override
433     public long getHeadChangeSetId() {
434         return function.headChangeSetId;
435     }
436     @Override
437     public long[] getFirst() {
438         return function.first;
439     }
440     @Override
441     public long[] getSecond() {
442         return function.second;
443     }
444 }
445 class GetResourceSegmentMethod extends Method implements ResourceSegment {
446     private final GetResourceSegmentFunction function;
447     GetResourceSegmentMethod(GetResourceSegmentFunction function) {
448         super(function, null, null);
449         this.function = function;
450     }
451     @Override
452     public byte[] getClusterId() {
453         return function.clusterUID;
454     }
455     @Override
456     public int getResourceIndex() {
457         return function.resourceIndex;
458     }
459     @Override
460     public long getValueSize() {
461         return function.valueSize;
462     }
463     @Override
464     public byte[] getSegment() {
465         return function.segment;
466     }
467     @Override
468     public long getOffset() {
469         return function.segmentOffset;
470     }
471 }
472 class ChangeSetEventHandler extends EventHandler implements ChangeSetUpdate {
473     private final org.simantics.db.server.protocol.ChangeSetUpdateEvent csuEvent = new org.simantics.db.server.protocol.ChangeSetUpdateEvent();
474     private final OnChangeSetUpdate on;
475     private ProCoreException exception;
476     ChangeSetEventHandler(OnChangeSetUpdate on) {
477         this.on = on;
478     }
479     @Override
480     void on(Event event) {
481         event.deserialize(csuEvent);
482         Util.log("ChangeSetEvent cid=" + csuEvent.changeSetId);
483         try {
484             if (null != on)
485                 on.onChangeSetUpdate(this);
486         } catch (SDBException e) {
487             if (null == exception)
488                 exception = new ProCoreException("Execution of ChangeSetUpdate callback failed.", e);
489             else
490                 Util.logError("Execution of ChangeSetUpdate callback failed(2).", e);
491         }
492     }
493     @Override
494     public long getChangeSetId() {
495         return csuEvent.changeSetId;
496     }
497     @Override
498     public int getChangeSetIndex() {
499         return csuEvent.changeSetIndex;
500     }
501     @Override
502     public int getNumberOfClusterChangeSets() {
503         return csuEvent.numberOfClusterChangeSets;
504     }
505     @Override
506     public int getIndexOfClusterChangeSet() {
507         return csuEvent.indexOfClusterChangeSet;
508     }
509     @Override
510     public byte[] getClusterId() {
511         return csuEvent.clusterUID;
512     }
513     @Override
514     public boolean getNewCluster() {
515         return csuEvent.newCluster;
516     }
517     @Override
518     public byte[] getData() {
519         return csuEvent.data;
520     }
521 }