1 package org.simantics.db.server.internal;
3 import java.nio.ByteBuffer;
4 import java.util.ArrayList;
6 import java.util.Map.Entry;
7 import java.util.concurrent.ConcurrentHashMap;
9 import org.simantics.db.ClusterCreator;
10 import org.simantics.db.Database;
11 import org.simantics.db.Database.Session;
12 import org.simantics.db.Database.Session.ChangeSetData;
13 import org.simantics.db.Database.Session.ChangeSetIds;
14 import org.simantics.db.Database.Session.ChangeSetUpdate;
15 import org.simantics.db.Database.Session.Cluster;
16 import org.simantics.db.Database.Session.ClusterChanges;
17 import org.simantics.db.Database.Session.ClusterIds;
18 import org.simantics.db.Database.Session.Information;
19 import org.simantics.db.Database.Session.OnChangeSetUpdate;
20 import org.simantics.db.Database.Session.Refresh;
21 import org.simantics.db.Database.Session.ResourceSegment;
22 import org.simantics.db.Database.Session.Transaction;
23 import org.simantics.db.exception.DatabaseException;
24 import org.simantics.db.exception.SDBException;
25 import org.simantics.db.server.ProCoreException;
26 import org.simantics.db.server.protocol.AAAFunction;
27 import org.simantics.db.server.protocol.AcceptCommitFunction;
28 import org.simantics.db.server.protocol.AskTransactionFunction;
29 import org.simantics.db.server.protocol.CancelCommitFunction;
30 import org.simantics.db.server.protocol.Constants;
31 import org.simantics.db.server.protocol.EndTransactionFunction;
32 import org.simantics.db.server.protocol.ExecuteFunction;
33 import org.simantics.db.server.protocol.GetChangeSetContextFunction;
34 import org.simantics.db.server.protocol.GetChangeSetDataFunction;
35 import org.simantics.db.server.protocol.GetChangeSetsFunction;
36 import org.simantics.db.server.protocol.GetClusterChangesFunction;
37 import org.simantics.db.server.protocol.GetClusterNewFunction;
38 import org.simantics.db.server.protocol.GetRefresh2Function;
39 import org.simantics.db.server.protocol.GetResourceSegmentFunction;
40 import org.simantics.db.server.protocol.GetServerInfo2Function;
41 import org.simantics.db.server.protocol.ListClustersFunction;
42 import org.simantics.db.server.protocol.ReconnectFunction;
43 import org.simantics.db.server.protocol.ReserveIdsFunction;
44 import org.simantics.db.server.protocol.UndoFunction;
45 import org.simantics.db.server.protocol.UpdateClusterFunction;
46 import org.simantics.db.service.ClusterUID;
48 public class SessionI implements Session {
50 protected ClusterDecompressor clusterDecompressor = new ClusterDecompressor();
52 private final DatabaseI db;
53 private Client client;
54 SessionI(DatabaseI db, Client client) {
61 Client replace(Client client) throws ProCoreException {
62 Client t = this.client;
67 public Database getDatabase() {
71 public void close() throws ProCoreException {
75 public boolean isClosed() throws ProCoreException {
76 return client.isClosed();
79 public void open() throws ProCoreException {
82 } catch (InterruptedException e) {
83 throw new ProCoreException("Client.open was interrupted.", e);
86 public void callAAA() throws ProCoreException {
87 AAAFunction f = new AAAFunction();
88 client.call(new Method(f, null, null));
91 public Transaction askReadTransaction() throws ProCoreException {
92 AskTransactionFunction f = new AskTransactionFunction(Constants.readTransaction, Constants.NullTransactionId);
93 AskTransactionMethod m = new AskTransactionMethod(f);
98 public Transaction askWriteTransaction(long transactionId) throws ProCoreException {
99 AskTransactionFunction f = new AskTransactionFunction(Constants.writeTransaction, transactionId);
100 AskTransactionMethod m = new AskTransactionMethod(f);
105 public long endTransaction(long transactionId) throws ProCoreException {
106 EndTransactionFunction f = new EndTransactionFunction(transactionId);
107 Method m = new Method(f, null, null);
109 return f.headChangeSetId;
112 public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
113 AcceptCommitFunction f = new AcceptCommitFunction(transactionId, changeSetId, metadata);
114 client.call(new Method(f, null, null));
116 public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
117 CancelCommitFunction f = new CancelCommitFunction(transactionId, changeSetId, metadata);
118 client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));
119 return f.headChangeSetId;
122 // OpenClientSessionFunction f = new OpenClientSessionFunction();
124 // Util.show("OpenClientSession sid=" + f.sessionId);
127 // CloseClientSessionFunction f = new CloseClientSessionFunction(1);
129 // Util.show("CloseClientSession sid=" + f.sessionId);
132 // EchoFunction f = new EchoFunction(new byte[1]);
134 // Util.show("Echo bytes.len=" + f.bytes.length);
137 public String execute(String command) throws ProCoreException {
138 ExecuteFunction f = new ExecuteFunction(command);
139 Method m = new Method(f, null, null);
144 public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
145 GetChangeSetContextFunction f = new GetChangeSetContextFunction(changeSetId);
146 client.call(new Method(f, null, null));
147 return f.changeSetContext;
149 public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate on) throws ProCoreException {
150 GetChangeSetDataFunction f = new GetChangeSetDataFunction(minChangeSetId, maxChangeSetId);
151 GetChangeSetDataMethod m = new GetChangeSetDataMethod(f, new ChangeSetEventHandler(on));
155 public ChangeSetIds getChangeSetIds() throws ProCoreException {
156 GetChangeSetsFunction f = new GetChangeSetsFunction();
157 GetChangeSetIdsMethod m = new GetChangeSetIdsMethod(f);
161 public Cluster getCluster(byte[] clusterId) throws ProCoreException {
162 GetClusterNewFunction f = new GetClusterNewFunction(clusterId);
163 f.deflated = ByteBuffer.allocate(8192);
164 GetClusterMethod m = new GetClusterMethod(f);
168 public ClusterIds getClusterIds() throws ProCoreException {
170 ListClustersFunction f = new ListClustersFunction(all);
171 GetClusterIdsMethod m = new GetClusterIdsMethod(f);
175 public Refresh getRefresh(long changeSetId) throws ProCoreException {
176 GetRefresh2Function f = new GetRefresh2Function(0);
177 GetRefreshMethod m = new GetRefreshMethod(f);
181 public ResourceSegment getResourceSegment(byte[] clusterId, int resourceIndex, long offset, short size) throws ProCoreException {
182 GetResourceSegmentFunction f = new GetResourceSegmentFunction(clusterId, resourceIndex, offset, size);
183 GetResourceSegmentMethod m = new GetResourceSegmentMethod(f);
187 public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId) throws ProCoreException {
188 GetClusterChangesFunction f = new GetClusterChangesFunction(changeSetId, clusterId);
189 GetClusterChangesMethod m = new GetClusterChangesMethod(f);
193 public Information getInformation() throws ProCoreException {
194 GetServerInfo2Function f = new GetServerInfo2Function();
195 GetInformationMethod m = new GetInformationMethod(f);
199 public void reconnect(int sessionId) throws ProCoreException {
200 ReconnectFunction f = new ReconnectFunction(sessionId);
201 client.call(new Method(f, null, null));
203 public long reserveIds(int count) throws ProCoreException {
204 ReserveIdsFunction f = new ReserveIdsFunction(count);
205 client.call(new Method(f, null, null));
208 public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
209 UndoFunction f = new UndoFunction(changeSetIds);
210 client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));
211 return f.potentialConflicts;
213 public void updateCluster(byte[] operations) throws ProCoreException {
214 UpdateClusterFunction f = new UpdateClusterFunction(operations);
215 client.call(new Method(f, null, null));
218 public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
219 Cluster t = getCluster(uid.asBytes());
221 ByteBuffer deflated = t.getDeflated();
222 Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);
224 //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);
225 long[] longs = (long[]) arrays[0];
226 int[] ints = (int[]) arrays[1];
227 byte[] bytes = (byte[]) arrays[2];
228 return creator.create(uid, bytes, ints, longs);
232 public boolean refreshEnabled() {
236 public boolean rolledback() {
240 class SessionManager {
241 private ConcurrentHashMap<Client, SessionI> sessionMap = new ConcurrentHashMap<Client, SessionI>();
244 public List<Client> disconnect(DatabaseI db) throws ProCoreException {
245 ArrayList<Client> clients = new ArrayList<Client>();
246 for (Entry<Client, SessionI> i : sessionMap.entrySet()) {
247 SessionI si = i.getValue();
248 if (!db.equals(si.getDb()))
250 Client client = i.getKey();
251 if (client.isOpen()) {
258 public void connect(DatabaseI db, List<Client> clients) throws ProCoreException, InterruptedException {
259 for (Client client : clients) {
260 SessionI si = sessionMap.get(client);
261 if (null == si || !db.equals(si.getDb()))
265 // for (Entry<Client, SessionI> i : sessionMap.entrySet()) {
266 // SessionI si = i.getValue();
267 // if (!db.equals(si.getDb()))
269 // Client client = i.getKey();
273 public SessionI newSession(DatabaseI db) throws ProCoreException {
274 Client client = db.newClient();
275 SessionI si = new SessionI(db, client);
276 sessionMap.put(client, si);
281 private final Session session;
282 private final Throwable cause;
283 public SessionEvent(Session session, Throwable cause) {
284 this.session = session;
287 public Session getSession() {
290 public Throwable getCause() {
294 class AskTransactionMethod extends Method implements Transaction {
295 private final AskTransactionFunction function;
296 AskTransactionMethod(AskTransactionFunction function) {
297 super(function, null, null);
298 this.function = function;
301 public long getHeadChangeSetId() {
302 return function.headChangeSetId;
305 public long getTransactionId() {
306 return function.transactionId;
309 class GetChangeSetDataMethod extends Method implements ChangeSetData {
310 private final GetChangeSetDataFunction function;
311 GetChangeSetDataMethod(GetChangeSetDataFunction function, EventHandler evh) {
312 super(function, evh, null);
313 this.function = function;
316 public boolean isOk() {
317 return !function.failed;
320 class GetChangeSetIdsMethod extends Method implements ChangeSetIds {
321 private final GetChangeSetsFunction function;
322 GetChangeSetIdsMethod(GetChangeSetsFunction function) {
323 super(function, null, null);
324 this.function = function;
327 public long getFirstChangeSetId() {
328 if (function.changeSetIds.length > 0)
329 return function.changeSetIds[0];
331 return Constants.NullChangeSetId;
334 public int getCount() {
335 return function.changeSetIds.length;
338 class GetClusterMethod extends Method implements Cluster {
339 private final GetClusterNewFunction function;
340 GetClusterMethod(GetClusterNewFunction function) {
341 super(function, null, new GetClusterHandler(function));
342 this.function = function;
345 public int getInflateSize() {
346 return function.inflateSize;
349 public ByteBuffer getDeflated() {
350 return function.deflated;
353 class GetClusterChangesMethod extends Method implements ClusterChanges {
354 private final GetClusterChangesFunction function;
355 GetClusterChangesMethod(GetClusterChangesFunction function) {
356 super(function, null, null);
357 this.function = function;
360 public long getHeadChangeSetId() {
361 return function.headChangeSetId;
364 public int[] getResourceIndex() {
365 return function.resourceIndex;
368 public int[] getPredicateIndex() {
369 return function.predicateIndex;
372 public long[] getPredicateFirst() {
373 return function.predicateFirst;
376 public long[] getPredicateSecond() {
377 return function.predicateSecond;
380 public int[] getValueIndex() {
381 return function.valueIndex;
384 class GetClusterIdsMethod extends Method implements ClusterIds {
385 private final ListClustersFunction function;
386 GetClusterIdsMethod(ListClustersFunction function) {
387 super(function, null, null);
388 this.function = function;
391 public int getStatus() {
392 return function.status;
395 public long[] getFirst() {
396 return function.first;
399 public long[] getSecond() {
400 return function.second;
403 class GetInformationMethod extends Method implements Information {
404 private final GetServerInfo2Function function;
405 GetInformationMethod(GetServerInfo2Function function) {
406 super(function, null, null);
407 this.function = function;
410 public String getServerId() {
411 return function.serverId;
414 public String getProtocolId() {
415 return function.protocolId;
418 public String getDatabaseId() {
419 return function.databaseId;
422 public long getFirstChangeSetId() {
423 return function.firstChangeSetId;
426 class GetRefreshMethod extends Method implements Refresh {
427 private final GetRefresh2Function function;
428 GetRefreshMethod(GetRefresh2Function function) {
429 super(function, null, null);
430 this.function = function;
433 public long getHeadChangeSetId() {
434 return function.headChangeSetId;
437 public long[] getFirst() {
438 return function.first;
441 public long[] getSecond() {
442 return function.second;
445 class GetResourceSegmentMethod extends Method implements ResourceSegment {
446 private final GetResourceSegmentFunction function;
447 GetResourceSegmentMethod(GetResourceSegmentFunction function) {
448 super(function, null, null);
449 this.function = function;
452 public byte[] getClusterId() {
453 return function.clusterUID;
456 public int getResourceIndex() {
457 return function.resourceIndex;
460 public long getValueSize() {
461 return function.valueSize;
464 public byte[] getSegment() {
465 return function.segment;
468 public long getOffset() {
469 return function.segmentOffset;
472 class ChangeSetEventHandler extends EventHandler implements ChangeSetUpdate {
473 private final org.simantics.db.server.protocol.ChangeSetUpdateEvent csuEvent = new org.simantics.db.server.protocol.ChangeSetUpdateEvent();
474 private final OnChangeSetUpdate on;
475 private ProCoreException exception;
476 ChangeSetEventHandler(OnChangeSetUpdate on) {
480 void on(Event event) {
481 event.deserialize(csuEvent);
482 Util.log("ChangeSetEvent cid=" + csuEvent.changeSetId);
485 on.onChangeSetUpdate(this);
486 } catch (SDBException e) {
487 if (null == exception)
488 exception = new ProCoreException("Execution of ChangeSetUpdate callback failed.", e);
490 Util.logError("Execution of ChangeSetUpdate callback failed(2).", e);
494 public long getChangeSetId() {
495 return csuEvent.changeSetId;
498 public int getChangeSetIndex() {
499 return csuEvent.changeSetIndex;
502 public int getNumberOfClusterChangeSets() {
503 return csuEvent.numberOfClusterChangeSets;
506 public int getIndexOfClusterChangeSet() {
507 return csuEvent.indexOfClusterChangeSet;
510 public byte[] getClusterId() {
511 return csuEvent.clusterUID;
514 public boolean getNewCluster() {
515 return csuEvent.newCluster;
518 public byte[] getData() {
519 return csuEvent.data;