1 package org.simantics.db.server.internal;
\r
3 import java.nio.ByteBuffer;
\r
4 import java.util.ArrayList;
\r
5 import java.util.List;
\r
6 import java.util.Map.Entry;
\r
7 import java.util.concurrent.ConcurrentHashMap;
\r
9 import org.simantics.db.ClusterCreator;
\r
10 import org.simantics.db.Database;
\r
11 import org.simantics.db.Database.Session;
\r
12 import org.simantics.db.Database.Session.ChangeSetData;
\r
13 import org.simantics.db.Database.Session.ChangeSetIds;
\r
14 import org.simantics.db.Database.Session.ChangeSetUpdate;
\r
15 import org.simantics.db.Database.Session.Cluster;
\r
16 import org.simantics.db.Database.Session.ClusterChanges;
\r
17 import org.simantics.db.Database.Session.ClusterIds;
\r
18 import org.simantics.db.Database.Session.Information;
\r
19 import org.simantics.db.Database.Session.OnChangeSetUpdate;
\r
20 import org.simantics.db.Database.Session.Refresh;
\r
21 import org.simantics.db.Database.Session.ResourceSegment;
\r
22 import org.simantics.db.Database.Session.Transaction;
\r
23 import org.simantics.db.exception.DatabaseException;
\r
24 import org.simantics.db.exception.SDBException;
\r
25 import org.simantics.db.server.ProCoreException;
\r
26 import org.simantics.db.server.protocol.AAAFunction;
\r
27 import org.simantics.db.server.protocol.AcceptCommitFunction;
\r
28 import org.simantics.db.server.protocol.AskTransactionFunction;
\r
29 import org.simantics.db.server.protocol.CancelCommitFunction;
\r
30 import org.simantics.db.server.protocol.Constants;
\r
31 import org.simantics.db.server.protocol.EndTransactionFunction;
\r
32 import org.simantics.db.server.protocol.ExecuteFunction;
\r
33 import org.simantics.db.server.protocol.GetChangeSetContextFunction;
\r
34 import org.simantics.db.server.protocol.GetChangeSetDataFunction;
\r
35 import org.simantics.db.server.protocol.GetChangeSetsFunction;
\r
36 import org.simantics.db.server.protocol.GetClusterChangesFunction;
\r
37 import org.simantics.db.server.protocol.GetClusterNewFunction;
\r
38 import org.simantics.db.server.protocol.GetRefresh2Function;
\r
39 import org.simantics.db.server.protocol.GetResourceSegmentFunction;
\r
40 import org.simantics.db.server.protocol.GetServerInfo2Function;
\r
41 import org.simantics.db.server.protocol.ListClustersFunction;
\r
42 import org.simantics.db.server.protocol.ReconnectFunction;
\r
43 import org.simantics.db.server.protocol.ReserveIdsFunction;
\r
44 import org.simantics.db.server.protocol.UndoFunction;
\r
45 import org.simantics.db.server.protocol.UpdateClusterFunction;
\r
46 import org.simantics.db.service.ClusterUID;
\r
48 public class SessionI implements Session {
\r
50 protected ClusterDecompressor clusterDecompressor = new ClusterDecompressor();
\r
52 private final DatabaseI db;
\r
53 private Client client;
\r
54 SessionI(DatabaseI db, Client client) {
\r
56 this.client = client;
\r
61 Client replace(Client client) throws ProCoreException {
\r
62 Client t = this.client;
\r
63 this.client = client;
\r
67 public Database getDatabase() {
\r
71 public void close() throws ProCoreException {
\r
75 public boolean isClosed() throws ProCoreException {
\r
76 return client.isClosed();
\r
79 public void open() throws ProCoreException {
\r
82 } catch (InterruptedException e) {
\r
83 throw new ProCoreException("Client.open was interrupted.", e);
\r
86 public void callAAA() throws ProCoreException {
\r
87 AAAFunction f = new AAAFunction();
\r
88 client.call(new Method(f, null, null));
\r
91 public Transaction askReadTransaction() throws ProCoreException {
\r
92 AskTransactionFunction f = new AskTransactionFunction(Constants.readTransaction, Constants.NullTransactionId);
\r
93 AskTransactionMethod m = new AskTransactionMethod(f);
\r
98 public Transaction askWriteTransaction(long transactionId) throws ProCoreException {
\r
99 AskTransactionFunction f = new AskTransactionFunction(Constants.writeTransaction, transactionId);
\r
100 AskTransactionMethod m = new AskTransactionMethod(f);
\r
105 public long endTransaction(long transactionId) throws ProCoreException {
\r
106 EndTransactionFunction f = new EndTransactionFunction(transactionId);
\r
107 Method m = new Method(f, null, null);
\r
109 return f.headChangeSetId;
\r
112 public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
\r
113 AcceptCommitFunction f = new AcceptCommitFunction(transactionId, changeSetId, metadata);
\r
114 client.call(new Method(f, null, null));
\r
116 public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
\r
117 CancelCommitFunction f = new CancelCommitFunction(transactionId, changeSetId, metadata);
\r
118 client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));
\r
119 return f.headChangeSetId;
\r
122 // OpenClientSessionFunction f = new OpenClientSessionFunction();
\r
124 // Util.show("OpenClientSession sid=" + f.sessionId);
\r
127 // CloseClientSessionFunction f = new CloseClientSessionFunction(1);
\r
129 // Util.show("CloseClientSession sid=" + f.sessionId);
\r
132 // EchoFunction f = new EchoFunction(new byte[1]);
\r
134 // Util.show("Echo bytes.len=" + f.bytes.length);
\r
137 public String execute(String command) throws ProCoreException {
\r
138 ExecuteFunction f = new ExecuteFunction(command);
\r
139 Method m = new Method(f, null, null);
\r
144 public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
\r
145 GetChangeSetContextFunction f = new GetChangeSetContextFunction(changeSetId);
\r
146 client.call(new Method(f, null, null));
\r
147 return f.changeSetContext;
\r
149 public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate on) throws ProCoreException {
\r
150 GetChangeSetDataFunction f = new GetChangeSetDataFunction(minChangeSetId, maxChangeSetId);
\r
151 GetChangeSetDataMethod m = new GetChangeSetDataMethod(f, new ChangeSetEventHandler(on));
\r
155 public ChangeSetIds getChangeSetIds() throws ProCoreException {
\r
156 GetChangeSetsFunction f = new GetChangeSetsFunction();
\r
157 GetChangeSetIdsMethod m = new GetChangeSetIdsMethod(f);
\r
161 public Cluster getCluster(byte[] clusterId) throws ProCoreException {
\r
162 GetClusterNewFunction f = new GetClusterNewFunction(clusterId);
\r
163 f.deflated = ByteBuffer.allocate(8192);
\r
164 GetClusterMethod m = new GetClusterMethod(f);
\r
168 public ClusterIds getClusterIds() throws ProCoreException {
\r
170 ListClustersFunction f = new ListClustersFunction(all);
\r
171 GetClusterIdsMethod m = new GetClusterIdsMethod(f);
\r
175 public Refresh getRefresh(long changeSetId) throws ProCoreException {
\r
176 GetRefresh2Function f = new GetRefresh2Function(0);
\r
177 GetRefreshMethod m = new GetRefreshMethod(f);
\r
181 public ResourceSegment getResourceSegment(byte[] clusterId, int resourceIndex, long offset, short size) throws ProCoreException {
\r
182 GetResourceSegmentFunction f = new GetResourceSegmentFunction(clusterId, resourceIndex, offset, size);
\r
183 GetResourceSegmentMethod m = new GetResourceSegmentMethod(f);
\r
187 public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId) throws ProCoreException {
\r
188 GetClusterChangesFunction f = new GetClusterChangesFunction(changeSetId, clusterId);
\r
189 GetClusterChangesMethod m = new GetClusterChangesMethod(f);
\r
193 public Information getInformation() throws ProCoreException {
\r
194 GetServerInfo2Function f = new GetServerInfo2Function();
\r
195 GetInformationMethod m = new GetInformationMethod(f);
\r
199 public void reconnect(int sessionId) throws ProCoreException {
\r
200 ReconnectFunction f = new ReconnectFunction(sessionId);
\r
201 client.call(new Method(f, null, null));
\r
203 public long reserveIds(int count) throws ProCoreException {
\r
204 ReserveIdsFunction f = new ReserveIdsFunction(count);
\r
205 client.call(new Method(f, null, null));
\r
208 public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
\r
209 UndoFunction f = new UndoFunction(changeSetIds);
\r
210 client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));
\r
211 return f.potentialConflicts;
\r
213 public void updateCluster(byte[] operations) throws ProCoreException {
\r
214 UpdateClusterFunction f = new UpdateClusterFunction(operations);
\r
215 client.call(new Method(f, null, null));
\r
218 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
\r
219 Cluster t = getCluster(uid.asBytes());
\r
221 ByteBuffer deflated = t.getDeflated();
\r
222 Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);
\r
224 //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);
\r
225 long[] longs = (long[]) arrays[0];
\r
226 int[] ints = (int[]) arrays[1];
\r
227 byte[] bytes = (byte[]) arrays[2];
\r
228 return creator.create(uid, bytes, ints, longs);
\r
232 public boolean refreshEnabled() {
\r
236 class SessionManager {
\r
237 private ConcurrentHashMap<Client, SessionI> sessionMap = new ConcurrentHashMap<Client, SessionI>();
\r
240 public List<Client> disconnect(DatabaseI db) throws ProCoreException {
\r
241 ArrayList<Client> clients = new ArrayList<Client>();
\r
242 for (Entry<Client, SessionI> i : sessionMap.entrySet()) {
\r
243 SessionI si = i.getValue();
\r
244 if (!db.equals(si.getDb()))
\r
246 Client client = i.getKey();
\r
247 if (client.isOpen()) {
\r
249 clients.add(client);
\r
254 public void connect(DatabaseI db, List<Client> clients) throws ProCoreException, InterruptedException {
\r
255 for (Client client : clients) {
\r
256 SessionI si = sessionMap.get(client);
\r
257 if (null == si || !db.equals(si.getDb()))
\r
261 // for (Entry<Client, SessionI> i : sessionMap.entrySet()) {
\r
262 // SessionI si = i.getValue();
\r
263 // if (!db.equals(si.getDb()))
\r
265 // Client client = i.getKey();
\r
269 public SessionI newSession(DatabaseI db) throws ProCoreException {
\r
270 Client client = db.newClient();
\r
271 SessionI si = new SessionI(db, client);
\r
272 sessionMap.put(client, si);
\r
276 class SessionEvent {
\r
277 private final Session session;
\r
278 private final Throwable cause;
\r
279 public SessionEvent(Session session, Throwable cause) {
\r
280 this.session = session;
\r
281 this.cause = cause;
\r
283 public Session getSession() {
\r
286 public Throwable getCause() {
\r
290 class AskTransactionMethod extends Method implements Transaction {
\r
291 private final AskTransactionFunction function;
\r
292 AskTransactionMethod(AskTransactionFunction function) {
\r
293 super(function, null, null);
\r
294 this.function = function;
\r
297 public long getHeadChangeSetId() {
\r
298 return function.headChangeSetId;
\r
301 public long getTransactionId() {
\r
302 return function.transactionId;
\r
305 class GetChangeSetDataMethod extends Method implements ChangeSetData {
\r
306 private final GetChangeSetDataFunction function;
\r
307 GetChangeSetDataMethod(GetChangeSetDataFunction function, EventHandler evh) {
\r
308 super(function, evh, null);
\r
309 this.function = function;
\r
312 public boolean isOk() {
\r
313 return !function.failed;
\r
316 class GetChangeSetIdsMethod extends Method implements ChangeSetIds {
\r
317 private final GetChangeSetsFunction function;
\r
318 GetChangeSetIdsMethod(GetChangeSetsFunction function) {
\r
319 super(function, null, null);
\r
320 this.function = function;
\r
323 public long getFirstChangeSetId() {
\r
324 if (function.changeSetIds.length > 0)
\r
325 return function.changeSetIds[0];
\r
327 return Constants.NullChangeSetId;
\r
330 public int getCount() {
\r
331 return function.changeSetIds.length;
\r
334 class GetClusterMethod extends Method implements Cluster {
\r
335 private final GetClusterNewFunction function;
\r
336 GetClusterMethod(GetClusterNewFunction function) {
\r
337 super(function, null, new GetClusterHandler(function));
\r
338 this.function = function;
\r
341 public int getInflateSize() {
\r
342 return function.inflateSize;
\r
345 public ByteBuffer getDeflated() {
\r
346 return function.deflated;
\r
349 class GetClusterChangesMethod extends Method implements ClusterChanges {
\r
350 private final GetClusterChangesFunction function;
\r
351 GetClusterChangesMethod(GetClusterChangesFunction function) {
\r
352 super(function, null, null);
\r
353 this.function = function;
\r
356 public long getHeadChangeSetId() {
\r
357 return function.headChangeSetId;
\r
360 public int[] getResourceIndex() {
\r
361 return function.resourceIndex;
\r
364 public int[] getPredicateIndex() {
\r
365 return function.predicateIndex;
\r
368 public long[] getPredicateFirst() {
\r
369 return function.predicateFirst;
\r
372 public long[] getPredicateSecond() {
\r
373 return function.predicateSecond;
\r
376 public int[] getValueIndex() {
\r
377 return function.valueIndex;
\r
380 class GetClusterIdsMethod extends Method implements ClusterIds {
\r
381 private final ListClustersFunction function;
\r
382 GetClusterIdsMethod(ListClustersFunction function) {
\r
383 super(function, null, null);
\r
384 this.function = function;
\r
387 public int getStatus() {
\r
388 return function.status;
\r
391 public long[] getFirst() {
\r
392 return function.first;
\r
395 public long[] getSecond() {
\r
396 return function.second;
\r
399 class GetInformationMethod extends Method implements Information {
\r
400 private final GetServerInfo2Function function;
\r
401 GetInformationMethod(GetServerInfo2Function function) {
\r
402 super(function, null, null);
\r
403 this.function = function;
\r
406 public String getServerId() {
\r
407 return function.serverId;
\r
410 public String getProtocolId() {
\r
411 return function.protocolId;
\r
414 public String getDatabaseId() {
\r
415 return function.databaseId;
\r
418 public long getFirstChangeSetId() {
\r
419 return function.firstChangeSetId;
\r
422 class GetRefreshMethod extends Method implements Refresh {
\r
423 private final GetRefresh2Function function;
\r
424 GetRefreshMethod(GetRefresh2Function function) {
\r
425 super(function, null, null);
\r
426 this.function = function;
\r
429 public long getHeadChangeSetId() {
\r
430 return function.headChangeSetId;
\r
433 public long[] getFirst() {
\r
434 return function.first;
\r
437 public long[] getSecond() {
\r
438 return function.second;
\r
441 class GetResourceSegmentMethod extends Method implements ResourceSegment {
\r
442 private final GetResourceSegmentFunction function;
\r
443 GetResourceSegmentMethod(GetResourceSegmentFunction function) {
\r
444 super(function, null, null);
\r
445 this.function = function;
\r
448 public byte[] getClusterId() {
\r
449 return function.clusterUID;
\r
452 public int getResourceIndex() {
\r
453 return function.resourceIndex;
\r
456 public long getValueSize() {
\r
457 return function.valueSize;
\r
460 public byte[] getSegment() {
\r
461 return function.segment;
\r
464 public long getOffset() {
\r
465 return function.segmentOffset;
\r
468 class ChangeSetEventHandler extends EventHandler implements ChangeSetUpdate {
\r
469 private final org.simantics.db.server.protocol.ChangeSetUpdateEvent csuEvent = new org.simantics.db.server.protocol.ChangeSetUpdateEvent();
\r
470 private final OnChangeSetUpdate on;
\r
471 private ProCoreException exception;
\r
472 ChangeSetEventHandler(OnChangeSetUpdate on) {
\r
476 void on(Event event) {
\r
477 event.deserialize(csuEvent);
\r
478 Util.log("ChangeSetEvent cid=" + csuEvent.changeSetId);
\r
481 on.onChangeSetUpdate(this);
\r
482 } catch (SDBException e) {
\r
483 if (null == exception)
\r
484 exception = new ProCoreException("Execution of ChangeSetUpdate callback failed.", e);
\r
486 Util.logError("Execution of ChangeSetUpdate callback failed(2).", e);
\r
490 public long getChangeSetId() {
\r
491 return csuEvent.changeSetId;
\r
494 public int getChangeSetIndex() {
\r
495 return csuEvent.changeSetIndex;
\r
498 public int getNumberOfClusterChangeSets() {
\r
499 return csuEvent.numberOfClusterChangeSets;
\r
502 public int getIndexOfClusterChangeSet() {
\r
503 return csuEvent.indexOfClusterChangeSet;
\r
506 public byte[] getClusterId() {
\r
507 return csuEvent.clusterUID;
\r
510 public boolean getNewCluster() {
\r
511 return csuEvent.newCluster;
\r
514 public byte[] getData() {
\r
515 return csuEvent.data;
\r