--- /dev/null
+package org.simantics.db.server.internal;\r
+\r
+import java.nio.ByteBuffer;\r
+import java.util.ArrayList;\r
+import java.util.List;\r
+import java.util.Map.Entry;\r
+import java.util.concurrent.ConcurrentHashMap;\r
+\r
+import org.simantics.db.ClusterCreator;\r
+import org.simantics.db.Database;\r
+import org.simantics.db.Database.Session;\r
+import org.simantics.db.Database.Session.ChangeSetData;\r
+import org.simantics.db.Database.Session.ChangeSetIds;\r
+import org.simantics.db.Database.Session.ChangeSetUpdate;\r
+import org.simantics.db.Database.Session.Cluster;\r
+import org.simantics.db.Database.Session.ClusterChanges;\r
+import org.simantics.db.Database.Session.ClusterIds;\r
+import org.simantics.db.Database.Session.Information;\r
+import org.simantics.db.Database.Session.OnChangeSetUpdate;\r
+import org.simantics.db.Database.Session.Refresh;\r
+import org.simantics.db.Database.Session.ResourceSegment;\r
+import org.simantics.db.Database.Session.Transaction;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.exception.SDBException;\r
+import org.simantics.db.server.ProCoreException;\r
+import org.simantics.db.server.protocol.AAAFunction;\r
+import org.simantics.db.server.protocol.AcceptCommitFunction;\r
+import org.simantics.db.server.protocol.AskTransactionFunction;\r
+import org.simantics.db.server.protocol.CancelCommitFunction;\r
+import org.simantics.db.server.protocol.Constants;\r
+import org.simantics.db.server.protocol.EndTransactionFunction;\r
+import org.simantics.db.server.protocol.ExecuteFunction;\r
+import org.simantics.db.server.protocol.GetChangeSetContextFunction;\r
+import org.simantics.db.server.protocol.GetChangeSetDataFunction;\r
+import org.simantics.db.server.protocol.GetChangeSetsFunction;\r
+import org.simantics.db.server.protocol.GetClusterChangesFunction;\r
+import org.simantics.db.server.protocol.GetClusterNewFunction;\r
+import org.simantics.db.server.protocol.GetRefresh2Function;\r
+import org.simantics.db.server.protocol.GetResourceSegmentFunction;\r
+import org.simantics.db.server.protocol.GetServerInfo2Function;\r
+import org.simantics.db.server.protocol.ListClustersFunction;\r
+import org.simantics.db.server.protocol.ReconnectFunction;\r
+import org.simantics.db.server.protocol.ReserveIdsFunction;\r
+import org.simantics.db.server.protocol.UndoFunction;\r
+import org.simantics.db.server.protocol.UpdateClusterFunction;\r
+import org.simantics.db.service.ClusterUID;\r
+\r
+public class SessionI implements Session {\r
+ \r
+ protected ClusterDecompressor clusterDecompressor = new ClusterDecompressor();\r
+ \r
+ private final DatabaseI db;\r
+ private Client client;\r
+ SessionI(DatabaseI db, Client client) {\r
+ this.db = db;\r
+ this.client = client;\r
+ }\r
+ DatabaseI getDb() {\r
+ return db;\r
+ }\r
+ Client replace(Client client) throws ProCoreException {\r
+ Client t = this.client;\r
+ this.client = client;\r
+ return t;\r
+ }\r
+ @Override\r
+ public Database getDatabase() {\r
+ return db;\r
+ }\r
+ @Override\r
+ public void close() throws ProCoreException {\r
+ client.close();\r
+ }\r
+ @Override\r
+ public boolean isClosed() throws ProCoreException {\r
+ return client.isClosed();\r
+ }\r
+ @Override\r
+ public void open() throws ProCoreException {\r
+ try {\r
+ client.open();\r
+ } catch (InterruptedException e) {\r
+ throw new ProCoreException("Client.open was interrupted.", e);\r
+ }\r
+ }\r
+ public void callAAA() throws ProCoreException {\r
+ AAAFunction f = new AAAFunction();\r
+ client.call(new Method(f, null, null));\r
+ }\r
+ @Override\r
+ public Transaction askReadTransaction() throws ProCoreException {\r
+ AskTransactionFunction f = new AskTransactionFunction(Constants.readTransaction, Constants.NullTransactionId);\r
+ AskTransactionMethod m = new AskTransactionMethod(f);\r
+ client.call(m);\r
+ return m;\r
+ }\r
+ @Override\r
+ public Transaction askWriteTransaction(long transactionId) throws ProCoreException {\r
+ AskTransactionFunction f = new AskTransactionFunction(Constants.writeTransaction, transactionId);\r
+ AskTransactionMethod m = new AskTransactionMethod(f);\r
+ client.call(m);\r
+ return m;\r
+ }\r
+ @Override\r
+ public long endTransaction(long transactionId) throws ProCoreException {\r
+ EndTransactionFunction f = new EndTransactionFunction(transactionId);\r
+ Method m = new Method(f, null, null);\r
+ client.call(m);\r
+ return f.headChangeSetId;\r
+ }\r
+ @Override\r
+ public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {\r
+ AcceptCommitFunction f = new AcceptCommitFunction(transactionId, changeSetId, metadata);\r
+ client.call(new Method(f, null, null));\r
+ }\r
+ public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {\r
+ CancelCommitFunction f = new CancelCommitFunction(transactionId, changeSetId, metadata);\r
+ client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));\r
+ return f.headChangeSetId;\r
+}\r
+// {\r
+// OpenClientSessionFunction f = new OpenClientSessionFunction();\r
+// s.call(f);\r
+// Util.show("OpenClientSession sid=" + f.sessionId);\r
+// }\r
+// {\r
+// CloseClientSessionFunction f = new CloseClientSessionFunction(1);\r
+// s.call(f);\r
+// Util.show("CloseClientSession sid=" + f.sessionId);\r
+// }\r
+// {\r
+// EchoFunction f = new EchoFunction(new byte[1]);\r
+// s.call(f);\r
+// Util.show("Echo bytes.len=" + f.bytes.length);\r
+// }\r
+ @Override\r
+ public String execute(String command) throws ProCoreException {\r
+ ExecuteFunction f = new ExecuteFunction(command);\r
+ Method m = new Method(f, null, null);\r
+ client.call(m);\r
+ return f.out;\r
+ }\r
+ @Override\r
+ public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {\r
+ GetChangeSetContextFunction f = new GetChangeSetContextFunction(changeSetId);\r
+ client.call(new Method(f, null, null));\r
+ return f.changeSetContext;\r
+ }\r
+ public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate on) throws ProCoreException {\r
+ GetChangeSetDataFunction f = new GetChangeSetDataFunction(minChangeSetId, maxChangeSetId);\r
+ GetChangeSetDataMethod m = new GetChangeSetDataMethod(f, new ChangeSetEventHandler(on));\r
+ client.call(m);\r
+ return m;\r
+ }\r
+ public ChangeSetIds getChangeSetIds() throws ProCoreException {\r
+ GetChangeSetsFunction f = new GetChangeSetsFunction();\r
+ GetChangeSetIdsMethod m = new GetChangeSetIdsMethod(f);\r
+ client.call(m);\r
+ return m;\r
+ }\r
+ public Cluster getCluster(byte[] clusterId) throws ProCoreException {\r
+ GetClusterNewFunction f = new GetClusterNewFunction(clusterId);\r
+ f.deflated = ByteBuffer.allocate(8192);\r
+ GetClusterMethod m = new GetClusterMethod(f);\r
+ client.call(m);\r
+ return m;\r
+ }\r
+ public ClusterIds getClusterIds() throws ProCoreException {\r
+ int all = 0;\r
+ ListClustersFunction f = new ListClustersFunction(all);\r
+ GetClusterIdsMethod m = new GetClusterIdsMethod(f);\r
+ client.call(m);\r
+ return m;\r
+ }\r
+ public Refresh getRefresh(long changeSetId) throws ProCoreException {\r
+ GetRefresh2Function f = new GetRefresh2Function(0);\r
+ GetRefreshMethod m = new GetRefreshMethod(f);\r
+ client.call(m);\r
+ return m;\r
+ }\r
+ public ResourceSegment getResourceSegment(byte[] clusterId, int resourceIndex, long offset, short size) throws ProCoreException {\r
+ GetResourceSegmentFunction f = new GetResourceSegmentFunction(clusterId, resourceIndex, offset, size);\r
+ GetResourceSegmentMethod m = new GetResourceSegmentMethod(f);\r
+ client.call(m);\r
+ return m;\r
+ }\r
+ public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId) throws ProCoreException {\r
+ GetClusterChangesFunction f = new GetClusterChangesFunction(changeSetId, clusterId);\r
+ GetClusterChangesMethod m = new GetClusterChangesMethod(f);\r
+ client.call(m);\r
+ return m;\r
+ }\r
+ public Information getInformation() throws ProCoreException {\r
+ GetServerInfo2Function f = new GetServerInfo2Function();\r
+ GetInformationMethod m = new GetInformationMethod(f);\r
+ client.call(m);\r
+ return m;\r
+ }\r
+ public void reconnect(int sessionId) throws ProCoreException {\r
+ ReconnectFunction f = new ReconnectFunction(sessionId);\r
+ client.call(new Method(f, null, null));\r
+ }\r
+ public long reserveIds(int count) throws ProCoreException {\r
+ ReserveIdsFunction f = new ReserveIdsFunction(count);\r
+ client.call(new Method(f, null, null));\r
+ return f.firstId;\r
+ }\r
+ public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {\r
+ UndoFunction f = new UndoFunction(changeSetIds);\r
+ client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));\r
+ return f.potentialConflicts;\r
+ }\r
+ public void updateCluster(byte[] operations) throws ProCoreException {\r
+ UpdateClusterFunction f = new UpdateClusterFunction(operations);\r
+ client.call(new Method(f, null, null));\r
+ }\r
+ @Override\r
+ public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {\r
+ Cluster t = getCluster(uid.asBytes());\r
+\r
+ ByteBuffer deflated = t.getDeflated();\r
+ Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);\r
+\r
+ //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);\r
+ long[] longs = (long[]) arrays[0];\r
+ int[] ints = (int[]) arrays[1];\r
+ byte[] bytes = (byte[]) arrays[2];\r
+ return creator.create(uid, bytes, ints, longs);\r
+\r
+ }\r
+ @Override\r
+ public boolean refreshEnabled() {\r
+ return true;\r
+ }\r
+}\r
+class SessionManager {\r
+ private ConcurrentHashMap<Client, SessionI> sessionMap = new ConcurrentHashMap<Client, SessionI>();\r
+ SessionManager() {\r
+ }\r
+ public List<Client> disconnect(DatabaseI db) throws ProCoreException {\r
+ ArrayList<Client> clients = new ArrayList<Client>();\r
+ for (Entry<Client, SessionI> i : sessionMap.entrySet()) {\r
+ SessionI si = i.getValue();\r
+ if (!db.equals(si.getDb()))\r
+ continue;\r
+ Client client = i.getKey();\r
+ if (client.isOpen()) {\r
+ client.close();\r
+ clients.add(client);\r
+ }\r
+ }\r
+ return clients;\r
+ }\r
+ public void connect(DatabaseI db, List<Client> clients) throws ProCoreException, InterruptedException {\r
+ for (Client client : clients) {\r
+ SessionI si = sessionMap.get(client);\r
+ if (null == si || !db.equals(si.getDb()))\r
+ continue;\r
+ client.open();\r
+ }\r
+// for (Entry<Client, SessionI> i : sessionMap.entrySet()) {\r
+// SessionI si = i.getValue();\r
+// if (!db.equals(si.getDb()))\r
+// continue;\r
+// Client client = i.getKey();\r
+// client.open();\r
+// }\r
+ }\r
+ public SessionI newSession(DatabaseI db) throws ProCoreException {\r
+ Client client = db.newClient();\r
+ SessionI si = new SessionI(db, client);\r
+ sessionMap.put(client, si);\r
+ return si;\r
+ }\r
+}\r
+class SessionEvent {\r
+ private final Session session;\r
+ private final Throwable cause;\r
+ public SessionEvent(Session session, Throwable cause) {\r
+ this.session = session;\r
+ this.cause = cause;\r
+ }\r
+ public Session getSession() {\r
+ return session;\r
+ }\r
+ public Throwable getCause() {\r
+ return cause;\r
+ }\r
+}\r
+class AskTransactionMethod extends Method implements Transaction {\r
+ private final AskTransactionFunction function;\r
+ AskTransactionMethod(AskTransactionFunction function) {\r
+ super(function, null, null);\r
+ this.function = function;\r
+ }\r
+ @Override\r
+ public long getHeadChangeSetId() {\r
+ return function.headChangeSetId;\r
+ }\r
+ @Override\r
+ public long getTransactionId() {\r
+ return function.transactionId;\r
+ }\r
+}\r
+class GetChangeSetDataMethod extends Method implements ChangeSetData {\r
+ private final GetChangeSetDataFunction function;\r
+ GetChangeSetDataMethod(GetChangeSetDataFunction function, EventHandler evh) {\r
+ super(function, evh, null);\r
+ this.function = function;\r
+ }\r
+ @Override\r
+ public boolean isOk() {\r
+ return !function.failed;\r
+ }\r
+}\r
+class GetChangeSetIdsMethod extends Method implements ChangeSetIds {\r
+ private final GetChangeSetsFunction function;\r
+ GetChangeSetIdsMethod(GetChangeSetsFunction function) {\r
+ super(function, null, null);\r
+ this.function = function;\r
+ }\r
+ @Override\r
+ public long getFirstChangeSetId() {\r
+ if (function.changeSetIds.length > 0)\r
+ return function.changeSetIds[0];\r
+ else\r
+ return Constants.NullChangeSetId;\r
+ }\r
+ @Override\r
+ public int getCount() {\r
+ return function.changeSetIds.length;\r
+ }\r
+}\r
+class GetClusterMethod extends Method implements Cluster {\r
+ private final GetClusterNewFunction function;\r
+ GetClusterMethod(GetClusterNewFunction function) {\r
+ super(function, null, new GetClusterHandler(function));\r
+ this.function = function;\r
+ }\r
+ @Override\r
+ public int getInflateSize() {\r
+ return function.inflateSize;\r
+ }\r
+ @Override\r
+ public ByteBuffer getDeflated() {\r
+ return function.deflated;\r
+ }\r
+}\r
+class GetClusterChangesMethod extends Method implements ClusterChanges {\r
+ private final GetClusterChangesFunction function;\r
+ GetClusterChangesMethod(GetClusterChangesFunction function) {\r
+ super(function, null, null);\r
+ this.function = function;\r
+ }\r
+ @Override\r
+ public long getHeadChangeSetId() {\r
+ return function.headChangeSetId;\r
+ }\r
+ @Override\r
+ public int[] getResourceIndex() {\r
+ return function.resourceIndex;\r
+ }\r
+ @Override\r
+ public int[] getPredicateIndex() {\r
+ return function.predicateIndex;\r
+ }\r
+ @Override\r
+ public long[] getPredicateFirst() {\r
+ return function.predicateFirst;\r
+ }\r
+ @Override\r
+ public long[] getPredicateSecond() {\r
+ return function.predicateSecond;\r
+ }\r
+ @Override\r
+ public int[] getValueIndex() {\r
+ return function.valueIndex;\r
+ }\r
+}\r
+class GetClusterIdsMethod extends Method implements ClusterIds {\r
+ private final ListClustersFunction function;\r
+ GetClusterIdsMethod(ListClustersFunction function) {\r
+ super(function, null, null);\r
+ this.function = function;\r
+ }\r
+ @Override\r
+ public int getStatus() {\r
+ return function.status;\r
+ }\r
+ @Override\r
+ public long[] getFirst() {\r
+ return function.first;\r
+ }\r
+ @Override\r
+ public long[] getSecond() {\r
+ return function.second;\r
+ }\r
+}\r
+class GetInformationMethod extends Method implements Information {\r
+ private final GetServerInfo2Function function;\r
+ GetInformationMethod(GetServerInfo2Function function) {\r
+ super(function, null, null);\r
+ this.function = function;\r
+ }\r
+ @Override\r
+ public String getServerId() {\r
+ return function.serverId;\r
+ }\r
+ @Override\r
+ public String getProtocolId() {\r
+ return function.protocolId;\r
+ }\r
+ @Override\r
+ public String getDatabaseId() {\r
+ return function.databaseId;\r
+ }\r
+ @Override\r
+ public long getFirstChangeSetId() {\r
+ return function.firstChangeSetId;\r
+ }\r
+}\r
+class GetRefreshMethod extends Method implements Refresh {\r
+ private final GetRefresh2Function function;\r
+ GetRefreshMethod(GetRefresh2Function function) {\r
+ super(function, null, null);\r
+ this.function = function;\r
+ }\r
+ @Override\r
+ public long getHeadChangeSetId() {\r
+ return function.headChangeSetId;\r
+ }\r
+ @Override\r
+ public long[] getFirst() {\r
+ return function.first;\r
+ }\r
+ @Override\r
+ public long[] getSecond() {\r
+ return function.second;\r
+ }\r
+}\r
+class GetResourceSegmentMethod extends Method implements ResourceSegment {\r
+ private final GetResourceSegmentFunction function;\r
+ GetResourceSegmentMethod(GetResourceSegmentFunction function) {\r
+ super(function, null, null);\r
+ this.function = function;\r
+ }\r
+ @Override\r
+ public byte[] getClusterId() {\r
+ return function.clusterUID;\r
+ }\r
+ @Override\r
+ public int getResourceIndex() {\r
+ return function.resourceIndex;\r
+ }\r
+ @Override\r
+ public long getValueSize() {\r
+ return function.valueSize;\r
+ }\r
+ @Override\r
+ public byte[] getSegment() {\r
+ return function.segment;\r
+ }\r
+ @Override\r
+ public long getOffset() {\r
+ return function.segmentOffset;\r
+ }\r
+}\r
+class ChangeSetEventHandler extends EventHandler implements ChangeSetUpdate {\r
+ private final org.simantics.db.server.protocol.ChangeSetUpdateEvent csuEvent = new org.simantics.db.server.protocol.ChangeSetUpdateEvent();\r
+ private final OnChangeSetUpdate on;\r
+ private ProCoreException exception;\r
+ ChangeSetEventHandler(OnChangeSetUpdate on) {\r
+ this.on = on;\r
+ }\r
+ @Override\r
+ void on(Event event) {\r
+ event.deserialize(csuEvent);\r
+ Util.log("ChangeSetEvent cid=" + csuEvent.changeSetId);\r
+ try {\r
+ if (null != on)\r
+ on.onChangeSetUpdate(this);\r
+ } catch (SDBException e) {\r
+ if (null == exception)\r
+ exception = new ProCoreException("Execution of ChangeSetUpdate callback failed.", e);\r
+ else\r
+ Util.logError("Execution of ChangeSetUpdate callback failed(2).", e);\r
+ }\r
+ }\r
+ @Override\r
+ public long getChangeSetId() {\r
+ return csuEvent.changeSetId;\r
+ }\r
+ @Override\r
+ public int getChangeSetIndex() {\r
+ return csuEvent.changeSetIndex;\r
+ }\r
+ @Override\r
+ public int getNumberOfClusterChangeSets() {\r
+ return csuEvent.numberOfClusterChangeSets;\r
+ }\r
+ @Override\r
+ public int getIndexOfClusterChangeSet() {\r
+ return csuEvent.indexOfClusterChangeSet;\r
+ }\r
+ @Override\r
+ public byte[] getClusterId() {\r
+ return csuEvent.clusterUID;\r
+ }\r
+ @Override\r
+ public boolean getNewCluster() {\r
+ return csuEvent.newCluster;\r
+ }\r
+ @Override\r
+ public byte[] getData() {\r
+ return csuEvent.data;\r
+ }\r
+}\r