-package org.simantics.db.server.internal;\r
-\r
-import java.nio.ByteBuffer;\r
-import java.util.ArrayList;\r
-import java.util.List;\r
-import java.util.Map.Entry;\r
-import java.util.concurrent.ConcurrentHashMap;\r
-\r
-import org.simantics.db.ClusterCreator;\r
-import org.simantics.db.Database;\r
-import org.simantics.db.Database.Session;\r
-import org.simantics.db.Database.Session.ChangeSetData;\r
-import org.simantics.db.Database.Session.ChangeSetIds;\r
-import org.simantics.db.Database.Session.ChangeSetUpdate;\r
-import org.simantics.db.Database.Session.Cluster;\r
-import org.simantics.db.Database.Session.ClusterChanges;\r
-import org.simantics.db.Database.Session.ClusterIds;\r
-import org.simantics.db.Database.Session.Information;\r
-import org.simantics.db.Database.Session.OnChangeSetUpdate;\r
-import org.simantics.db.Database.Session.Refresh;\r
-import org.simantics.db.Database.Session.ResourceSegment;\r
-import org.simantics.db.Database.Session.Transaction;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.exception.SDBException;\r
-import org.simantics.db.server.ProCoreException;\r
-import org.simantics.db.server.protocol.AAAFunction;\r
-import org.simantics.db.server.protocol.AcceptCommitFunction;\r
-import org.simantics.db.server.protocol.AskTransactionFunction;\r
-import org.simantics.db.server.protocol.CancelCommitFunction;\r
-import org.simantics.db.server.protocol.Constants;\r
-import org.simantics.db.server.protocol.EndTransactionFunction;\r
-import org.simantics.db.server.protocol.ExecuteFunction;\r
-import org.simantics.db.server.protocol.GetChangeSetContextFunction;\r
-import org.simantics.db.server.protocol.GetChangeSetDataFunction;\r
-import org.simantics.db.server.protocol.GetChangeSetsFunction;\r
-import org.simantics.db.server.protocol.GetClusterChangesFunction;\r
-import org.simantics.db.server.protocol.GetClusterNewFunction;\r
-import org.simantics.db.server.protocol.GetRefresh2Function;\r
-import org.simantics.db.server.protocol.GetResourceSegmentFunction;\r
-import org.simantics.db.server.protocol.GetServerInfo2Function;\r
-import org.simantics.db.server.protocol.ListClustersFunction;\r
-import org.simantics.db.server.protocol.ReconnectFunction;\r
-import org.simantics.db.server.protocol.ReserveIdsFunction;\r
-import org.simantics.db.server.protocol.UndoFunction;\r
-import org.simantics.db.server.protocol.UpdateClusterFunction;\r
-import org.simantics.db.service.ClusterUID;\r
-\r
-public class SessionI implements Session {\r
- \r
- protected ClusterDecompressor clusterDecompressor = new ClusterDecompressor();\r
- \r
- private final DatabaseI db;\r
- private Client client;\r
- SessionI(DatabaseI db, Client client) {\r
- this.db = db;\r
- this.client = client;\r
- }\r
- DatabaseI getDb() {\r
- return db;\r
- }\r
- Client replace(Client client) throws ProCoreException {\r
- Client t = this.client;\r
- this.client = client;\r
- return t;\r
- }\r
- @Override\r
- public Database getDatabase() {\r
- return db;\r
- }\r
- @Override\r
- public void close() throws ProCoreException {\r
- client.close();\r
- }\r
- @Override\r
- public boolean isClosed() throws ProCoreException {\r
- return client.isClosed();\r
- }\r
- @Override\r
- public void open() throws ProCoreException {\r
- try {\r
- client.open();\r
- } catch (InterruptedException e) {\r
- throw new ProCoreException("Client.open was interrupted.", e);\r
- }\r
- }\r
- public void callAAA() throws ProCoreException {\r
- AAAFunction f = new AAAFunction();\r
- client.call(new Method(f, null, null));\r
- }\r
- @Override\r
- public Transaction askReadTransaction() throws ProCoreException {\r
- AskTransactionFunction f = new AskTransactionFunction(Constants.readTransaction, Constants.NullTransactionId);\r
- AskTransactionMethod m = new AskTransactionMethod(f);\r
- client.call(m);\r
- return m;\r
- }\r
- @Override\r
- public Transaction askWriteTransaction(long transactionId) throws ProCoreException {\r
- AskTransactionFunction f = new AskTransactionFunction(Constants.writeTransaction, transactionId);\r
- AskTransactionMethod m = new AskTransactionMethod(f);\r
- client.call(m);\r
- return m;\r
- }\r
- @Override\r
- public long endTransaction(long transactionId) throws ProCoreException {\r
- EndTransactionFunction f = new EndTransactionFunction(transactionId);\r
- Method m = new Method(f, null, null);\r
- client.call(m);\r
- return f.headChangeSetId;\r
- }\r
- @Override\r
- public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {\r
- AcceptCommitFunction f = new AcceptCommitFunction(transactionId, changeSetId, metadata);\r
- client.call(new Method(f, null, null));\r
- }\r
- public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {\r
- CancelCommitFunction f = new CancelCommitFunction(transactionId, changeSetId, metadata);\r
- client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));\r
- return f.headChangeSetId;\r
-}\r
-// {\r
-// OpenClientSessionFunction f = new OpenClientSessionFunction();\r
-// s.call(f);\r
-// Util.show("OpenClientSession sid=" + f.sessionId);\r
-// }\r
-// {\r
-// CloseClientSessionFunction f = new CloseClientSessionFunction(1);\r
-// s.call(f);\r
-// Util.show("CloseClientSession sid=" + f.sessionId);\r
-// }\r
-// {\r
-// EchoFunction f = new EchoFunction(new byte[1]);\r
-// s.call(f);\r
-// Util.show("Echo bytes.len=" + f.bytes.length);\r
-// }\r
- @Override\r
- public String execute(String command) throws ProCoreException {\r
- ExecuteFunction f = new ExecuteFunction(command);\r
- Method m = new Method(f, null, null);\r
- client.call(m);\r
- return f.out;\r
- }\r
- @Override\r
- public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {\r
- GetChangeSetContextFunction f = new GetChangeSetContextFunction(changeSetId);\r
- client.call(new Method(f, null, null));\r
- return f.changeSetContext;\r
- }\r
- public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate on) throws ProCoreException {\r
- GetChangeSetDataFunction f = new GetChangeSetDataFunction(minChangeSetId, maxChangeSetId);\r
- GetChangeSetDataMethod m = new GetChangeSetDataMethod(f, new ChangeSetEventHandler(on));\r
- client.call(m);\r
- return m;\r
- }\r
- public ChangeSetIds getChangeSetIds() throws ProCoreException {\r
- GetChangeSetsFunction f = new GetChangeSetsFunction();\r
- GetChangeSetIdsMethod m = new GetChangeSetIdsMethod(f);\r
- client.call(m);\r
- return m;\r
- }\r
- public Cluster getCluster(byte[] clusterId) throws ProCoreException {\r
- GetClusterNewFunction f = new GetClusterNewFunction(clusterId);\r
- f.deflated = ByteBuffer.allocate(8192);\r
- GetClusterMethod m = new GetClusterMethod(f);\r
- client.call(m);\r
- return m;\r
- }\r
- public ClusterIds getClusterIds() throws ProCoreException {\r
- int all = 0;\r
- ListClustersFunction f = new ListClustersFunction(all);\r
- GetClusterIdsMethod m = new GetClusterIdsMethod(f);\r
- client.call(m);\r
- return m;\r
- }\r
- public Refresh getRefresh(long changeSetId) throws ProCoreException {\r
- GetRefresh2Function f = new GetRefresh2Function(0);\r
- GetRefreshMethod m = new GetRefreshMethod(f);\r
- client.call(m);\r
- return m;\r
- }\r
- public ResourceSegment getResourceSegment(byte[] clusterId, int resourceIndex, long offset, short size) throws ProCoreException {\r
- GetResourceSegmentFunction f = new GetResourceSegmentFunction(clusterId, resourceIndex, offset, size);\r
- GetResourceSegmentMethod m = new GetResourceSegmentMethod(f);\r
- client.call(m);\r
- return m;\r
- }\r
- public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId) throws ProCoreException {\r
- GetClusterChangesFunction f = new GetClusterChangesFunction(changeSetId, clusterId);\r
- GetClusterChangesMethod m = new GetClusterChangesMethod(f);\r
- client.call(m);\r
- return m;\r
- }\r
- public Information getInformation() throws ProCoreException {\r
- GetServerInfo2Function f = new GetServerInfo2Function();\r
- GetInformationMethod m = new GetInformationMethod(f);\r
- client.call(m);\r
- return m;\r
- }\r
- public void reconnect(int sessionId) throws ProCoreException {\r
- ReconnectFunction f = new ReconnectFunction(sessionId);\r
- client.call(new Method(f, null, null));\r
- }\r
- public long reserveIds(int count) throws ProCoreException {\r
- ReserveIdsFunction f = new ReserveIdsFunction(count);\r
- client.call(new Method(f, null, null));\r
- return f.firstId;\r
- }\r
- public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {\r
- UndoFunction f = new UndoFunction(changeSetIds);\r
- client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));\r
- return f.potentialConflicts;\r
- }\r
- public void updateCluster(byte[] operations) throws ProCoreException {\r
- UpdateClusterFunction f = new UpdateClusterFunction(operations);\r
- client.call(new Method(f, null, null));\r
- }\r
- @Override\r
- public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {\r
- Cluster t = getCluster(uid.asBytes());\r
-\r
- ByteBuffer deflated = t.getDeflated();\r
- Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);\r
-\r
- //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);\r
- long[] longs = (long[]) arrays[0];\r
- int[] ints = (int[]) arrays[1];\r
- byte[] bytes = (byte[]) arrays[2];\r
- return creator.create(uid, bytes, ints, longs);\r
-\r
- }\r
- @Override\r
- public boolean refreshEnabled() {\r
- return true;\r
- }\r
- @Override\r
- public boolean rolledback() {\r
- // TODO Auto-generated method stub\r
- return false;\r
- }\r
-}\r
-class SessionManager {\r
- private ConcurrentHashMap<Client, SessionI> sessionMap = new ConcurrentHashMap<Client, SessionI>();\r
- SessionManager() {\r
- }\r
- public List<Client> disconnect(DatabaseI db) throws ProCoreException {\r
- ArrayList<Client> clients = new ArrayList<Client>();\r
- for (Entry<Client, SessionI> i : sessionMap.entrySet()) {\r
- SessionI si = i.getValue();\r
- if (!db.equals(si.getDb()))\r
- continue;\r
- Client client = i.getKey();\r
- if (client.isOpen()) {\r
- client.close();\r
- clients.add(client);\r
- }\r
- }\r
- return clients;\r
- }\r
- public void connect(DatabaseI db, List<Client> clients) throws ProCoreException, InterruptedException {\r
- for (Client client : clients) {\r
- SessionI si = sessionMap.get(client);\r
- if (null == si || !db.equals(si.getDb()))\r
- continue;\r
- client.open();\r
- }\r
-// for (Entry<Client, SessionI> i : sessionMap.entrySet()) {\r
-// SessionI si = i.getValue();\r
-// if (!db.equals(si.getDb()))\r
-// continue;\r
-// Client client = i.getKey();\r
-// client.open();\r
-// }\r
- }\r
- public SessionI newSession(DatabaseI db) throws ProCoreException {\r
- Client client = db.newClient();\r
- SessionI si = new SessionI(db, client);\r
- sessionMap.put(client, si);\r
- return si;\r
- }\r
-}\r
-class SessionEvent {\r
- private final Session session;\r
- private final Throwable cause;\r
- public SessionEvent(Session session, Throwable cause) {\r
- this.session = session;\r
- this.cause = cause;\r
- }\r
- public Session getSession() {\r
- return session;\r
- }\r
- public Throwable getCause() {\r
- return cause;\r
- }\r
-}\r
-class AskTransactionMethod extends Method implements Transaction {\r
- private final AskTransactionFunction function;\r
- AskTransactionMethod(AskTransactionFunction function) {\r
- super(function, null, null);\r
- this.function = function;\r
- }\r
- @Override\r
- public long getHeadChangeSetId() {\r
- return function.headChangeSetId;\r
- }\r
- @Override\r
- public long getTransactionId() {\r
- return function.transactionId;\r
- }\r
-}\r
-class GetChangeSetDataMethod extends Method implements ChangeSetData {\r
- private final GetChangeSetDataFunction function;\r
- GetChangeSetDataMethod(GetChangeSetDataFunction function, EventHandler evh) {\r
- super(function, evh, null);\r
- this.function = function;\r
- }\r
- @Override\r
- public boolean isOk() {\r
- return !function.failed;\r
- }\r
-}\r
-class GetChangeSetIdsMethod extends Method implements ChangeSetIds {\r
- private final GetChangeSetsFunction function;\r
- GetChangeSetIdsMethod(GetChangeSetsFunction function) {\r
- super(function, null, null);\r
- this.function = function;\r
- }\r
- @Override\r
- public long getFirstChangeSetId() {\r
- if (function.changeSetIds.length > 0)\r
- return function.changeSetIds[0];\r
- else\r
- return Constants.NullChangeSetId;\r
- }\r
- @Override\r
- public int getCount() {\r
- return function.changeSetIds.length;\r
- }\r
-}\r
-class GetClusterMethod extends Method implements Cluster {\r
- private final GetClusterNewFunction function;\r
- GetClusterMethod(GetClusterNewFunction function) {\r
- super(function, null, new GetClusterHandler(function));\r
- this.function = function;\r
- }\r
- @Override\r
- public int getInflateSize() {\r
- return function.inflateSize;\r
- }\r
- @Override\r
- public ByteBuffer getDeflated() {\r
- return function.deflated;\r
- }\r
-}\r
-class GetClusterChangesMethod extends Method implements ClusterChanges {\r
- private final GetClusterChangesFunction function;\r
- GetClusterChangesMethod(GetClusterChangesFunction function) {\r
- super(function, null, null);\r
- this.function = function;\r
- }\r
- @Override\r
- public long getHeadChangeSetId() {\r
- return function.headChangeSetId;\r
- }\r
- @Override\r
- public int[] getResourceIndex() {\r
- return function.resourceIndex;\r
- }\r
- @Override\r
- public int[] getPredicateIndex() {\r
- return function.predicateIndex;\r
- }\r
- @Override\r
- public long[] getPredicateFirst() {\r
- return function.predicateFirst;\r
- }\r
- @Override\r
- public long[] getPredicateSecond() {\r
- return function.predicateSecond;\r
- }\r
- @Override\r
- public int[] getValueIndex() {\r
- return function.valueIndex;\r
- }\r
-}\r
-class GetClusterIdsMethod extends Method implements ClusterIds {\r
- private final ListClustersFunction function;\r
- GetClusterIdsMethod(ListClustersFunction function) {\r
- super(function, null, null);\r
- this.function = function;\r
- }\r
- @Override\r
- public int getStatus() {\r
- return function.status;\r
- }\r
- @Override\r
- public long[] getFirst() {\r
- return function.first;\r
- }\r
- @Override\r
- public long[] getSecond() {\r
- return function.second;\r
- }\r
-}\r
-class GetInformationMethod extends Method implements Information {\r
- private final GetServerInfo2Function function;\r
- GetInformationMethod(GetServerInfo2Function function) {\r
- super(function, null, null);\r
- this.function = function;\r
- }\r
- @Override\r
- public String getServerId() {\r
- return function.serverId;\r
- }\r
- @Override\r
- public String getProtocolId() {\r
- return function.protocolId;\r
- }\r
- @Override\r
- public String getDatabaseId() {\r
- return function.databaseId;\r
- }\r
- @Override\r
- public long getFirstChangeSetId() {\r
- return function.firstChangeSetId;\r
- }\r
-}\r
-class GetRefreshMethod extends Method implements Refresh {\r
- private final GetRefresh2Function function;\r
- GetRefreshMethod(GetRefresh2Function function) {\r
- super(function, null, null);\r
- this.function = function;\r
- }\r
- @Override\r
- public long getHeadChangeSetId() {\r
- return function.headChangeSetId;\r
- }\r
- @Override\r
- public long[] getFirst() {\r
- return function.first;\r
- }\r
- @Override\r
- public long[] getSecond() {\r
- return function.second;\r
- }\r
-}\r
-class GetResourceSegmentMethod extends Method implements ResourceSegment {\r
- private final GetResourceSegmentFunction function;\r
- GetResourceSegmentMethod(GetResourceSegmentFunction function) {\r
- super(function, null, null);\r
- this.function = function;\r
- }\r
- @Override\r
- public byte[] getClusterId() {\r
- return function.clusterUID;\r
- }\r
- @Override\r
- public int getResourceIndex() {\r
- return function.resourceIndex;\r
- }\r
- @Override\r
- public long getValueSize() {\r
- return function.valueSize;\r
- }\r
- @Override\r
- public byte[] getSegment() {\r
- return function.segment;\r
- }\r
- @Override\r
- public long getOffset() {\r
- return function.segmentOffset;\r
- }\r
-}\r
-class ChangeSetEventHandler extends EventHandler implements ChangeSetUpdate {\r
- private final org.simantics.db.server.protocol.ChangeSetUpdateEvent csuEvent = new org.simantics.db.server.protocol.ChangeSetUpdateEvent();\r
- private final OnChangeSetUpdate on;\r
- private ProCoreException exception;\r
- ChangeSetEventHandler(OnChangeSetUpdate on) {\r
- this.on = on;\r
- }\r
- @Override\r
- void on(Event event) {\r
- event.deserialize(csuEvent);\r
- Util.log("ChangeSetEvent cid=" + csuEvent.changeSetId);\r
- try {\r
- if (null != on)\r
- on.onChangeSetUpdate(this);\r
- } catch (SDBException e) {\r
- if (null == exception)\r
- exception = new ProCoreException("Execution of ChangeSetUpdate callback failed.", e);\r
- else\r
- Util.logError("Execution of ChangeSetUpdate callback failed(2).", e);\r
- }\r
- }\r
- @Override\r
- public long getChangeSetId() {\r
- return csuEvent.changeSetId;\r
- }\r
- @Override\r
- public int getChangeSetIndex() {\r
- return csuEvent.changeSetIndex;\r
- }\r
- @Override\r
- public int getNumberOfClusterChangeSets() {\r
- return csuEvent.numberOfClusterChangeSets;\r
- }\r
- @Override\r
- public int getIndexOfClusterChangeSet() {\r
- return csuEvent.indexOfClusterChangeSet;\r
- }\r
- @Override\r
- public byte[] getClusterId() {\r
- return csuEvent.clusterUID;\r
- }\r
- @Override\r
- public boolean getNewCluster() {\r
- return csuEvent.newCluster;\r
- }\r
- @Override\r
- public byte[] getData() {\r
- return csuEvent.data;\r
- }\r
-}\r
+package org.simantics.db.server.internal;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.simantics.db.ClusterCreator;
+import org.simantics.db.Database;
+import org.simantics.db.Database.Session;
+import org.simantics.db.Database.Session.ChangeSetData;
+import org.simantics.db.Database.Session.ChangeSetIds;
+import org.simantics.db.Database.Session.ChangeSetUpdate;
+import org.simantics.db.Database.Session.Cluster;
+import org.simantics.db.Database.Session.ClusterChanges;
+import org.simantics.db.Database.Session.ClusterIds;
+import org.simantics.db.Database.Session.Information;
+import org.simantics.db.Database.Session.OnChangeSetUpdate;
+import org.simantics.db.Database.Session.Refresh;
+import org.simantics.db.Database.Session.ResourceSegment;
+import org.simantics.db.Database.Session.Transaction;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.SDBException;
+import org.simantics.db.server.ProCoreException;
+import org.simantics.db.server.protocol.AAAFunction;
+import org.simantics.db.server.protocol.AcceptCommitFunction;
+import org.simantics.db.server.protocol.AskTransactionFunction;
+import org.simantics.db.server.protocol.CancelCommitFunction;
+import org.simantics.db.server.protocol.Constants;
+import org.simantics.db.server.protocol.EndTransactionFunction;
+import org.simantics.db.server.protocol.ExecuteFunction;
+import org.simantics.db.server.protocol.GetChangeSetContextFunction;
+import org.simantics.db.server.protocol.GetChangeSetDataFunction;
+import org.simantics.db.server.protocol.GetChangeSetsFunction;
+import org.simantics.db.server.protocol.GetClusterChangesFunction;
+import org.simantics.db.server.protocol.GetClusterNewFunction;
+import org.simantics.db.server.protocol.GetRefresh2Function;
+import org.simantics.db.server.protocol.GetResourceSegmentFunction;
+import org.simantics.db.server.protocol.GetServerInfo2Function;
+import org.simantics.db.server.protocol.ListClustersFunction;
+import org.simantics.db.server.protocol.ReconnectFunction;
+import org.simantics.db.server.protocol.ReserveIdsFunction;
+import org.simantics.db.server.protocol.UndoFunction;
+import org.simantics.db.server.protocol.UpdateClusterFunction;
+import org.simantics.db.service.ClusterUID;
+
+public class SessionI implements Session {
+
+ protected ClusterDecompressor clusterDecompressor = new ClusterDecompressor();
+
+ private final DatabaseI db;
+ private Client client;
+ SessionI(DatabaseI db, Client client) {
+ this.db = db;
+ this.client = client;
+ }
+ DatabaseI getDb() {
+ return db;
+ }
+ Client replace(Client client) throws ProCoreException {
+ Client t = this.client;
+ this.client = client;
+ return t;
+ }
+ @Override
+ public Database getDatabase() {
+ return db;
+ }
+ @Override
+ public void close() throws ProCoreException {
+ client.close();
+ }
+ @Override
+ public boolean isClosed() throws ProCoreException {
+ return client.isClosed();
+ }
+ @Override
+ public void open() throws ProCoreException {
+ try {
+ client.open();
+ } catch (InterruptedException e) {
+ throw new ProCoreException("Client.open was interrupted.", e);
+ }
+ }
+ public void callAAA() throws ProCoreException {
+ AAAFunction f = new AAAFunction();
+ client.call(new Method(f, null, null));
+ }
+ @Override
+ public Transaction askReadTransaction() throws ProCoreException {
+ AskTransactionFunction f = new AskTransactionFunction(Constants.readTransaction, Constants.NullTransactionId);
+ AskTransactionMethod m = new AskTransactionMethod(f);
+ client.call(m);
+ return m;
+ }
+ @Override
+ public Transaction askWriteTransaction(long transactionId) throws ProCoreException {
+ AskTransactionFunction f = new AskTransactionFunction(Constants.writeTransaction, transactionId);
+ AskTransactionMethod m = new AskTransactionMethod(f);
+ client.call(m);
+ return m;
+ }
+ @Override
+ public long endTransaction(long transactionId) throws ProCoreException {
+ EndTransactionFunction f = new EndTransactionFunction(transactionId);
+ Method m = new Method(f, null, null);
+ client.call(m);
+ return f.headChangeSetId;
+ }
+ @Override
+ public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
+ AcceptCommitFunction f = new AcceptCommitFunction(transactionId, changeSetId, metadata);
+ client.call(new Method(f, null, null));
+ }
+ public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
+ CancelCommitFunction f = new CancelCommitFunction(transactionId, changeSetId, metadata);
+ client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));
+ return f.headChangeSetId;
+}
+// {
+// OpenClientSessionFunction f = new OpenClientSessionFunction();
+// s.call(f);
+// Util.show("OpenClientSession sid=" + f.sessionId);
+// }
+// {
+// CloseClientSessionFunction f = new CloseClientSessionFunction(1);
+// s.call(f);
+// Util.show("CloseClientSession sid=" + f.sessionId);
+// }
+// {
+// EchoFunction f = new EchoFunction(new byte[1]);
+// s.call(f);
+// Util.show("Echo bytes.len=" + f.bytes.length);
+// }
+ @Override
+ public String execute(String command) throws ProCoreException {
+ ExecuteFunction f = new ExecuteFunction(command);
+ Method m = new Method(f, null, null);
+ client.call(m);
+ return f.out;
+ }
+ @Override
+ public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
+ GetChangeSetContextFunction f = new GetChangeSetContextFunction(changeSetId);
+ client.call(new Method(f, null, null));
+ return f.changeSetContext;
+ }
+ public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate on) throws ProCoreException {
+ GetChangeSetDataFunction f = new GetChangeSetDataFunction(minChangeSetId, maxChangeSetId);
+ GetChangeSetDataMethod m = new GetChangeSetDataMethod(f, new ChangeSetEventHandler(on));
+ client.call(m);
+ return m;
+ }
+ public ChangeSetIds getChangeSetIds() throws ProCoreException {
+ GetChangeSetsFunction f = new GetChangeSetsFunction();
+ GetChangeSetIdsMethod m = new GetChangeSetIdsMethod(f);
+ client.call(m);
+ return m;
+ }
+ public Cluster getCluster(byte[] clusterId) throws ProCoreException {
+ GetClusterNewFunction f = new GetClusterNewFunction(clusterId);
+ f.deflated = ByteBuffer.allocate(8192);
+ GetClusterMethod m = new GetClusterMethod(f);
+ client.call(m);
+ return m;
+ }
+ public ClusterIds getClusterIds() throws ProCoreException {
+ int all = 0;
+ ListClustersFunction f = new ListClustersFunction(all);
+ GetClusterIdsMethod m = new GetClusterIdsMethod(f);
+ client.call(m);
+ return m;
+ }
+ public Refresh getRefresh(long changeSetId) throws ProCoreException {
+ GetRefresh2Function f = new GetRefresh2Function(0);
+ GetRefreshMethod m = new GetRefreshMethod(f);
+ client.call(m);
+ return m;
+ }
+ public ResourceSegment getResourceSegment(byte[] clusterId, int resourceIndex, long offset, short size) throws ProCoreException {
+ GetResourceSegmentFunction f = new GetResourceSegmentFunction(clusterId, resourceIndex, offset, size);
+ GetResourceSegmentMethod m = new GetResourceSegmentMethod(f);
+ client.call(m);
+ return m;
+ }
+ public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId) throws ProCoreException {
+ GetClusterChangesFunction f = new GetClusterChangesFunction(changeSetId, clusterId);
+ GetClusterChangesMethod m = new GetClusterChangesMethod(f);
+ client.call(m);
+ return m;
+ }
+ public Information getInformation() throws ProCoreException {
+ GetServerInfo2Function f = new GetServerInfo2Function();
+ GetInformationMethod m = new GetInformationMethod(f);
+ client.call(m);
+ return m;
+ }
+ public void reconnect(int sessionId) throws ProCoreException {
+ ReconnectFunction f = new ReconnectFunction(sessionId);
+ client.call(new Method(f, null, null));
+ }
+ public long reserveIds(int count) throws ProCoreException {
+ ReserveIdsFunction f = new ReserveIdsFunction(count);
+ client.call(new Method(f, null, null));
+ return f.firstId;
+ }
+ public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
+ UndoFunction f = new UndoFunction(changeSetIds);
+ client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));
+ return f.potentialConflicts;
+ }
+ public void updateCluster(byte[] operations) throws ProCoreException {
+ UpdateClusterFunction f = new UpdateClusterFunction(operations);
+ client.call(new Method(f, null, null));
+ }
+ @Override
+ public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
+ Cluster t = getCluster(uid.asBytes());
+
+ ByteBuffer deflated = t.getDeflated();
+ Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);
+
+ //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);
+ long[] longs = (long[]) arrays[0];
+ int[] ints = (int[]) arrays[1];
+ byte[] bytes = (byte[]) arrays[2];
+ return creator.create(uid, bytes, ints, longs);
+
+ }
+ @Override
+ public boolean refreshEnabled() {
+ return true;
+ }
+ @Override
+ public boolean rolledback() {
+ return false;
+ }
+}
+class SessionManager {
+ private ConcurrentHashMap<Client, SessionI> sessionMap = new ConcurrentHashMap<Client, SessionI>();
+ SessionManager() {
+ }
+ public List<Client> disconnect(DatabaseI db) throws ProCoreException {
+ ArrayList<Client> clients = new ArrayList<Client>();
+ for (Entry<Client, SessionI> i : sessionMap.entrySet()) {
+ SessionI si = i.getValue();
+ if (!db.equals(si.getDb()))
+ continue;
+ Client client = i.getKey();
+ if (client.isOpen()) {
+ client.close();
+ clients.add(client);
+ }
+ }
+ return clients;
+ }
+ public void connect(DatabaseI db, List<Client> clients) throws ProCoreException, InterruptedException {
+ for (Client client : clients) {
+ SessionI si = sessionMap.get(client);
+ if (null == si || !db.equals(si.getDb()))
+ continue;
+ client.open();
+ }
+// for (Entry<Client, SessionI> i : sessionMap.entrySet()) {
+// SessionI si = i.getValue();
+// if (!db.equals(si.getDb()))
+// continue;
+// Client client = i.getKey();
+// client.open();
+// }
+ }
+ public SessionI newSession(DatabaseI db) throws ProCoreException {
+ Client client = db.newClient();
+ SessionI si = new SessionI(db, client);
+ sessionMap.put(client, si);
+ return si;
+ }
+}
+class SessionEvent {
+ private final Session session;
+ private final Throwable cause;
+ public SessionEvent(Session session, Throwable cause) {
+ this.session = session;
+ this.cause = cause;
+ }
+ public Session getSession() {
+ return session;
+ }
+ public Throwable getCause() {
+ return cause;
+ }
+}
+class AskTransactionMethod extends Method implements Transaction {
+ private final AskTransactionFunction function;
+ AskTransactionMethod(AskTransactionFunction function) {
+ super(function, null, null);
+ this.function = function;
+ }
+ @Override
+ public long getHeadChangeSetId() {
+ return function.headChangeSetId;
+ }
+ @Override
+ public long getTransactionId() {
+ return function.transactionId;
+ }
+}
+class GetChangeSetDataMethod extends Method implements ChangeSetData {
+ private final GetChangeSetDataFunction function;
+ GetChangeSetDataMethod(GetChangeSetDataFunction function, EventHandler evh) {
+ super(function, evh, null);
+ this.function = function;
+ }
+ @Override
+ public boolean isOk() {
+ return !function.failed;
+ }
+}
+class GetChangeSetIdsMethod extends Method implements ChangeSetIds {
+ private final GetChangeSetsFunction function;
+ GetChangeSetIdsMethod(GetChangeSetsFunction function) {
+ super(function, null, null);
+ this.function = function;
+ }
+ @Override
+ public long getFirstChangeSetId() {
+ if (function.changeSetIds.length > 0)
+ return function.changeSetIds[0];
+ else
+ return Constants.NullChangeSetId;
+ }
+ @Override
+ public int getCount() {
+ return function.changeSetIds.length;
+ }
+}
+class GetClusterMethod extends Method implements Cluster {
+ private final GetClusterNewFunction function;
+ GetClusterMethod(GetClusterNewFunction function) {
+ super(function, null, new GetClusterHandler(function));
+ this.function = function;
+ }
+ @Override
+ public int getInflateSize() {
+ return function.inflateSize;
+ }
+ @Override
+ public ByteBuffer getDeflated() {
+ return function.deflated;
+ }
+}
+class GetClusterChangesMethod extends Method implements ClusterChanges {
+ private final GetClusterChangesFunction function;
+ GetClusterChangesMethod(GetClusterChangesFunction function) {
+ super(function, null, null);
+ this.function = function;
+ }
+ @Override
+ public long getHeadChangeSetId() {
+ return function.headChangeSetId;
+ }
+ @Override
+ public int[] getResourceIndex() {
+ return function.resourceIndex;
+ }
+ @Override
+ public int[] getPredicateIndex() {
+ return function.predicateIndex;
+ }
+ @Override
+ public long[] getPredicateFirst() {
+ return function.predicateFirst;
+ }
+ @Override
+ public long[] getPredicateSecond() {
+ return function.predicateSecond;
+ }
+ @Override
+ public int[] getValueIndex() {
+ return function.valueIndex;
+ }
+}
+class GetClusterIdsMethod extends Method implements ClusterIds {
+ private final ListClustersFunction function;
+ GetClusterIdsMethod(ListClustersFunction function) {
+ super(function, null, null);
+ this.function = function;
+ }
+ @Override
+ public int getStatus() {
+ return function.status;
+ }
+ @Override
+ public long[] getFirst() {
+ return function.first;
+ }
+ @Override
+ public long[] getSecond() {
+ return function.second;
+ }
+}
+class GetInformationMethod extends Method implements Information {
+ private final GetServerInfo2Function function;
+ GetInformationMethod(GetServerInfo2Function function) {
+ super(function, null, null);
+ this.function = function;
+ }
+ @Override
+ public String getServerId() {
+ return function.serverId;
+ }
+ @Override
+ public String getProtocolId() {
+ return function.protocolId;
+ }
+ @Override
+ public String getDatabaseId() {
+ return function.databaseId;
+ }
+ @Override
+ public long getFirstChangeSetId() {
+ return function.firstChangeSetId;
+ }
+}
+class GetRefreshMethod extends Method implements Refresh {
+ private final GetRefresh2Function function;
+ GetRefreshMethod(GetRefresh2Function function) {
+ super(function, null, null);
+ this.function = function;
+ }
+ @Override
+ public long getHeadChangeSetId() {
+ return function.headChangeSetId;
+ }
+ @Override
+ public long[] getFirst() {
+ return function.first;
+ }
+ @Override
+ public long[] getSecond() {
+ return function.second;
+ }
+}
+class GetResourceSegmentMethod extends Method implements ResourceSegment {
+ private final GetResourceSegmentFunction function;
+ GetResourceSegmentMethod(GetResourceSegmentFunction function) {
+ super(function, null, null);
+ this.function = function;
+ }
+ @Override
+ public byte[] getClusterId() {
+ return function.clusterUID;
+ }
+ @Override
+ public int getResourceIndex() {
+ return function.resourceIndex;
+ }
+ @Override
+ public long getValueSize() {
+ return function.valueSize;
+ }
+ @Override
+ public byte[] getSegment() {
+ return function.segment;
+ }
+ @Override
+ public long getOffset() {
+ return function.segmentOffset;
+ }
+}
+class ChangeSetEventHandler extends EventHandler implements ChangeSetUpdate {
+ private final org.simantics.db.server.protocol.ChangeSetUpdateEvent csuEvent = new org.simantics.db.server.protocol.ChangeSetUpdateEvent();
+ private final OnChangeSetUpdate on;
+ private ProCoreException exception;
+ ChangeSetEventHandler(OnChangeSetUpdate on) {
+ this.on = on;
+ }
+ @Override
+ void on(Event event) {
+ event.deserialize(csuEvent);
+ Util.log("ChangeSetEvent cid=" + csuEvent.changeSetId);
+ try {
+ if (null != on)
+ on.onChangeSetUpdate(this);
+ } catch (SDBException e) {
+ if (null == exception)
+ exception = new ProCoreException("Execution of ChangeSetUpdate callback failed.", e);
+ else
+ Util.logError("Execution of ChangeSetUpdate callback failed(2).", e);
+ }
+ }
+ @Override
+ public long getChangeSetId() {
+ return csuEvent.changeSetId;
+ }
+ @Override
+ public int getChangeSetIndex() {
+ return csuEvent.changeSetIndex;
+ }
+ @Override
+ public int getNumberOfClusterChangeSets() {
+ return csuEvent.numberOfClusterChangeSets;
+ }
+ @Override
+ public int getIndexOfClusterChangeSet() {
+ return csuEvent.indexOfClusterChangeSet;
+ }
+ @Override
+ public byte[] getClusterId() {
+ return csuEvent.clusterUID;
+ }
+ @Override
+ public boolean getNewCluster() {
+ return csuEvent.newCluster;
+ }
+ @Override
+ public byte[] getData() {
+ return csuEvent.data;
+ }
+}