X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.server%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fserver%2Finternal%2FSessionI.java;h=6f0ddac50df935d7538745ae91cfcb1625f36e44;hb=HEAD;hp=2d08de4a68734399a461b7684f98c93f799ae8ff;hpb=54473e30e21abd83d47c14b4252a688ad944142e;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/SessionI.java b/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/SessionI.java index 2d08de4a6..6f0ddac50 100644 --- a/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/SessionI.java +++ b/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/SessionI.java @@ -1,522 +1,521 @@ -package org.simantics.db.server.internal; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - -import org.simantics.db.ClusterCreator; -import org.simantics.db.Database; -import org.simantics.db.Database.Session; -import org.simantics.db.Database.Session.ChangeSetData; -import org.simantics.db.Database.Session.ChangeSetIds; -import org.simantics.db.Database.Session.ChangeSetUpdate; -import org.simantics.db.Database.Session.Cluster; -import org.simantics.db.Database.Session.ClusterChanges; -import org.simantics.db.Database.Session.ClusterIds; -import org.simantics.db.Database.Session.Information; -import org.simantics.db.Database.Session.OnChangeSetUpdate; -import org.simantics.db.Database.Session.Refresh; -import org.simantics.db.Database.Session.ResourceSegment; -import org.simantics.db.Database.Session.Transaction; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.exception.SDBException; -import org.simantics.db.server.ProCoreException; -import org.simantics.db.server.protocol.AAAFunction; -import org.simantics.db.server.protocol.AcceptCommitFunction; -import org.simantics.db.server.protocol.AskTransactionFunction; -import org.simantics.db.server.protocol.CancelCommitFunction; -import org.simantics.db.server.protocol.Constants; -import org.simantics.db.server.protocol.EndTransactionFunction; -import org.simantics.db.server.protocol.ExecuteFunction; -import org.simantics.db.server.protocol.GetChangeSetContextFunction; -import org.simantics.db.server.protocol.GetChangeSetDataFunction; -import org.simantics.db.server.protocol.GetChangeSetsFunction; -import org.simantics.db.server.protocol.GetClusterChangesFunction; -import org.simantics.db.server.protocol.GetClusterNewFunction; -import org.simantics.db.server.protocol.GetRefresh2Function; -import org.simantics.db.server.protocol.GetResourceSegmentFunction; -import org.simantics.db.server.protocol.GetServerInfo2Function; -import org.simantics.db.server.protocol.ListClustersFunction; -import org.simantics.db.server.protocol.ReconnectFunction; -import org.simantics.db.server.protocol.ReserveIdsFunction; -import org.simantics.db.server.protocol.UndoFunction; -import org.simantics.db.server.protocol.UpdateClusterFunction; -import org.simantics.db.service.ClusterUID; - -public class SessionI implements Session { - - protected ClusterDecompressor clusterDecompressor = new ClusterDecompressor(); - - private final DatabaseI db; - private Client client; - SessionI(DatabaseI db, Client client) { - this.db = db; - this.client = client; - } - DatabaseI getDb() { - return db; - } - Client replace(Client client) throws ProCoreException { - Client t = this.client; - this.client = client; - return t; - } - @Override - public Database getDatabase() { - return db; - } - @Override - public void close() throws ProCoreException { - client.close(); - } - @Override - public boolean isClosed() throws ProCoreException { - return client.isClosed(); - } - @Override - public void open() throws ProCoreException { - try { - client.open(); - } catch (InterruptedException e) { - throw new ProCoreException("Client.open was interrupted.", e); - } - } - public void callAAA() throws ProCoreException { - AAAFunction f = new AAAFunction(); - client.call(new Method(f, null, null)); - } - @Override - public Transaction askReadTransaction() throws ProCoreException { - AskTransactionFunction f = new AskTransactionFunction(Constants.readTransaction, Constants.NullTransactionId); - AskTransactionMethod m = new AskTransactionMethod(f); - client.call(m); - return m; - } - @Override - public Transaction askWriteTransaction(long transactionId) throws ProCoreException { - AskTransactionFunction f = new AskTransactionFunction(Constants.writeTransaction, transactionId); - AskTransactionMethod m = new AskTransactionMethod(f); - client.call(m); - return m; - } - @Override - public long endTransaction(long transactionId) throws ProCoreException { - EndTransactionFunction f = new EndTransactionFunction(transactionId); - Method m = new Method(f, null, null); - client.call(m); - return f.headChangeSetId; - } - @Override - public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException { - AcceptCommitFunction f = new AcceptCommitFunction(transactionId, changeSetId, metadata); - client.call(new Method(f, null, null)); - } - public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException { - CancelCommitFunction f = new CancelCommitFunction(transactionId, changeSetId, metadata); - client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null)); - return f.headChangeSetId; -} -// { -// OpenClientSessionFunction f = new OpenClientSessionFunction(); -// s.call(f); -// Util.show("OpenClientSession sid=" + f.sessionId); -// } -// { -// CloseClientSessionFunction f = new CloseClientSessionFunction(1); -// s.call(f); -// Util.show("CloseClientSession sid=" + f.sessionId); -// } -// { -// EchoFunction f = new EchoFunction(new byte[1]); -// s.call(f); -// Util.show("Echo bytes.len=" + f.bytes.length); -// } - @Override - public String execute(String command) throws ProCoreException { - ExecuteFunction f = new ExecuteFunction(command); - Method m = new Method(f, null, null); - client.call(m); - return f.out; - } - @Override - public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException { - GetChangeSetContextFunction f = new GetChangeSetContextFunction(changeSetId); - client.call(new Method(f, null, null)); - return f.changeSetContext; - } - public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate on) throws ProCoreException { - GetChangeSetDataFunction f = new GetChangeSetDataFunction(minChangeSetId, maxChangeSetId); - GetChangeSetDataMethod m = new GetChangeSetDataMethod(f, new ChangeSetEventHandler(on)); - client.call(m); - return m; - } - public ChangeSetIds getChangeSetIds() throws ProCoreException { - GetChangeSetsFunction f = new GetChangeSetsFunction(); - GetChangeSetIdsMethod m = new GetChangeSetIdsMethod(f); - client.call(m); - return m; - } - public Cluster getCluster(byte[] clusterId) throws ProCoreException { - GetClusterNewFunction f = new GetClusterNewFunction(clusterId); - f.deflated = ByteBuffer.allocate(8192); - GetClusterMethod m = new GetClusterMethod(f); - client.call(m); - return m; - } - public ClusterIds getClusterIds() throws ProCoreException { - int all = 0; - ListClustersFunction f = new ListClustersFunction(all); - GetClusterIdsMethod m = new GetClusterIdsMethod(f); - client.call(m); - return m; - } - public Refresh getRefresh(long changeSetId) throws ProCoreException { - GetRefresh2Function f = new GetRefresh2Function(0); - GetRefreshMethod m = new GetRefreshMethod(f); - client.call(m); - return m; - } - public ResourceSegment getResourceSegment(byte[] clusterId, int resourceIndex, long offset, short size) throws ProCoreException { - GetResourceSegmentFunction f = new GetResourceSegmentFunction(clusterId, resourceIndex, offset, size); - GetResourceSegmentMethod m = new GetResourceSegmentMethod(f); - client.call(m); - return m; - } - public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId) throws ProCoreException { - GetClusterChangesFunction f = new GetClusterChangesFunction(changeSetId, clusterId); - GetClusterChangesMethod m = new GetClusterChangesMethod(f); - client.call(m); - return m; - } - public Information getInformation() throws ProCoreException { - GetServerInfo2Function f = new GetServerInfo2Function(); - GetInformationMethod m = new GetInformationMethod(f); - client.call(m); - return m; - } - public void reconnect(int sessionId) throws ProCoreException { - ReconnectFunction f = new ReconnectFunction(sessionId); - client.call(new Method(f, null, null)); - } - public long reserveIds(int count) throws ProCoreException { - ReserveIdsFunction f = new ReserveIdsFunction(count); - client.call(new Method(f, null, null)); - return f.firstId; - } - public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException { - UndoFunction f = new UndoFunction(changeSetIds); - client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null)); - return f.potentialConflicts; - } - public void updateCluster(byte[] operations) throws ProCoreException { - UpdateClusterFunction f = new UpdateClusterFunction(operations); - client.call(new Method(f, null, null)); - } - @Override - public T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException { - Cluster t = getCluster(uid.asBytes()); - - ByteBuffer deflated = t.getDeflated(); - Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated); - - //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate); - long[] longs = (long[]) arrays[0]; - int[] ints = (int[]) arrays[1]; - byte[] bytes = (byte[]) arrays[2]; - return creator.create(uid, bytes, ints, longs); - - } - @Override - public boolean refreshEnabled() { - return true; - } - @Override - public boolean rolledback() { - // TODO Auto-generated method stub - return false; - } -} -class SessionManager { - private ConcurrentHashMap sessionMap = new ConcurrentHashMap(); - SessionManager() { - } - public List disconnect(DatabaseI db) throws ProCoreException { - ArrayList clients = new ArrayList(); - for (Entry i : sessionMap.entrySet()) { - SessionI si = i.getValue(); - if (!db.equals(si.getDb())) - continue; - Client client = i.getKey(); - if (client.isOpen()) { - client.close(); - clients.add(client); - } - } - return clients; - } - public void connect(DatabaseI db, List clients) throws ProCoreException, InterruptedException { - for (Client client : clients) { - SessionI si = sessionMap.get(client); - if (null == si || !db.equals(si.getDb())) - continue; - client.open(); - } -// for (Entry i : sessionMap.entrySet()) { -// SessionI si = i.getValue(); -// if (!db.equals(si.getDb())) -// continue; -// Client client = i.getKey(); -// client.open(); -// } - } - public SessionI newSession(DatabaseI db) throws ProCoreException { - Client client = db.newClient(); - SessionI si = new SessionI(db, client); - sessionMap.put(client, si); - return si; - } -} -class SessionEvent { - private final Session session; - private final Throwable cause; - public SessionEvent(Session session, Throwable cause) { - this.session = session; - this.cause = cause; - } - public Session getSession() { - return session; - } - public Throwable getCause() { - return cause; - } -} -class AskTransactionMethod extends Method implements Transaction { - private final AskTransactionFunction function; - AskTransactionMethod(AskTransactionFunction function) { - super(function, null, null); - this.function = function; - } - @Override - public long getHeadChangeSetId() { - return function.headChangeSetId; - } - @Override - public long getTransactionId() { - return function.transactionId; - } -} -class GetChangeSetDataMethod extends Method implements ChangeSetData { - private final GetChangeSetDataFunction function; - GetChangeSetDataMethod(GetChangeSetDataFunction function, EventHandler evh) { - super(function, evh, null); - this.function = function; - } - @Override - public boolean isOk() { - return !function.failed; - } -} -class GetChangeSetIdsMethod extends Method implements ChangeSetIds { - private final GetChangeSetsFunction function; - GetChangeSetIdsMethod(GetChangeSetsFunction function) { - super(function, null, null); - this.function = function; - } - @Override - public long getFirstChangeSetId() { - if (function.changeSetIds.length > 0) - return function.changeSetIds[0]; - else - return Constants.NullChangeSetId; - } - @Override - public int getCount() { - return function.changeSetIds.length; - } -} -class GetClusterMethod extends Method implements Cluster { - private final GetClusterNewFunction function; - GetClusterMethod(GetClusterNewFunction function) { - super(function, null, new GetClusterHandler(function)); - this.function = function; - } - @Override - public int getInflateSize() { - return function.inflateSize; - } - @Override - public ByteBuffer getDeflated() { - return function.deflated; - } -} -class GetClusterChangesMethod extends Method implements ClusterChanges { - private final GetClusterChangesFunction function; - GetClusterChangesMethod(GetClusterChangesFunction function) { - super(function, null, null); - this.function = function; - } - @Override - public long getHeadChangeSetId() { - return function.headChangeSetId; - } - @Override - public int[] getResourceIndex() { - return function.resourceIndex; - } - @Override - public int[] getPredicateIndex() { - return function.predicateIndex; - } - @Override - public long[] getPredicateFirst() { - return function.predicateFirst; - } - @Override - public long[] getPredicateSecond() { - return function.predicateSecond; - } - @Override - public int[] getValueIndex() { - return function.valueIndex; - } -} -class GetClusterIdsMethod extends Method implements ClusterIds { - private final ListClustersFunction function; - GetClusterIdsMethod(ListClustersFunction function) { - super(function, null, null); - this.function = function; - } - @Override - public int getStatus() { - return function.status; - } - @Override - public long[] getFirst() { - return function.first; - } - @Override - public long[] getSecond() { - return function.second; - } -} -class GetInformationMethod extends Method implements Information { - private final GetServerInfo2Function function; - GetInformationMethod(GetServerInfo2Function function) { - super(function, null, null); - this.function = function; - } - @Override - public String getServerId() { - return function.serverId; - } - @Override - public String getProtocolId() { - return function.protocolId; - } - @Override - public String getDatabaseId() { - return function.databaseId; - } - @Override - public long getFirstChangeSetId() { - return function.firstChangeSetId; - } -} -class GetRefreshMethod extends Method implements Refresh { - private final GetRefresh2Function function; - GetRefreshMethod(GetRefresh2Function function) { - super(function, null, null); - this.function = function; - } - @Override - public long getHeadChangeSetId() { - return function.headChangeSetId; - } - @Override - public long[] getFirst() { - return function.first; - } - @Override - public long[] getSecond() { - return function.second; - } -} -class GetResourceSegmentMethod extends Method implements ResourceSegment { - private final GetResourceSegmentFunction function; - GetResourceSegmentMethod(GetResourceSegmentFunction function) { - super(function, null, null); - this.function = function; - } - @Override - public byte[] getClusterId() { - return function.clusterUID; - } - @Override - public int getResourceIndex() { - return function.resourceIndex; - } - @Override - public long getValueSize() { - return function.valueSize; - } - @Override - public byte[] getSegment() { - return function.segment; - } - @Override - public long getOffset() { - return function.segmentOffset; - } -} -class ChangeSetEventHandler extends EventHandler implements ChangeSetUpdate { - private final org.simantics.db.server.protocol.ChangeSetUpdateEvent csuEvent = new org.simantics.db.server.protocol.ChangeSetUpdateEvent(); - private final OnChangeSetUpdate on; - private ProCoreException exception; - ChangeSetEventHandler(OnChangeSetUpdate on) { - this.on = on; - } - @Override - void on(Event event) { - event.deserialize(csuEvent); - Util.log("ChangeSetEvent cid=" + csuEvent.changeSetId); - try { - if (null != on) - on.onChangeSetUpdate(this); - } catch (SDBException e) { - if (null == exception) - exception = new ProCoreException("Execution of ChangeSetUpdate callback failed.", e); - else - Util.logError("Execution of ChangeSetUpdate callback failed(2).", e); - } - } - @Override - public long getChangeSetId() { - return csuEvent.changeSetId; - } - @Override - public int getChangeSetIndex() { - return csuEvent.changeSetIndex; - } - @Override - public int getNumberOfClusterChangeSets() { - return csuEvent.numberOfClusterChangeSets; - } - @Override - public int getIndexOfClusterChangeSet() { - return csuEvent.indexOfClusterChangeSet; - } - @Override - public byte[] getClusterId() { - return csuEvent.clusterUID; - } - @Override - public boolean getNewCluster() { - return csuEvent.newCluster; - } - @Override - public byte[] getData() { - return csuEvent.data; - } -} +package org.simantics.db.server.internal; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +import org.simantics.db.ClusterCreator; +import org.simantics.db.Database; +import org.simantics.db.Database.Session; +import org.simantics.db.Database.Session.ChangeSetData; +import org.simantics.db.Database.Session.ChangeSetIds; +import org.simantics.db.Database.Session.ChangeSetUpdate; +import org.simantics.db.Database.Session.Cluster; +import org.simantics.db.Database.Session.ClusterChanges; +import org.simantics.db.Database.Session.ClusterIds; +import org.simantics.db.Database.Session.Information; +import org.simantics.db.Database.Session.OnChangeSetUpdate; +import org.simantics.db.Database.Session.Refresh; +import org.simantics.db.Database.Session.ResourceSegment; +import org.simantics.db.Database.Session.Transaction; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.exception.SDBException; +import org.simantics.db.server.ProCoreException; +import org.simantics.db.server.protocol.AAAFunction; +import org.simantics.db.server.protocol.AcceptCommitFunction; +import org.simantics.db.server.protocol.AskTransactionFunction; +import org.simantics.db.server.protocol.CancelCommitFunction; +import org.simantics.db.server.protocol.Constants; +import org.simantics.db.server.protocol.EndTransactionFunction; +import org.simantics.db.server.protocol.ExecuteFunction; +import org.simantics.db.server.protocol.GetChangeSetContextFunction; +import org.simantics.db.server.protocol.GetChangeSetDataFunction; +import org.simantics.db.server.protocol.GetChangeSetsFunction; +import org.simantics.db.server.protocol.GetClusterChangesFunction; +import org.simantics.db.server.protocol.GetClusterNewFunction; +import org.simantics.db.server.protocol.GetRefresh2Function; +import org.simantics.db.server.protocol.GetResourceSegmentFunction; +import org.simantics.db.server.protocol.GetServerInfo2Function; +import org.simantics.db.server.protocol.ListClustersFunction; +import org.simantics.db.server.protocol.ReconnectFunction; +import org.simantics.db.server.protocol.ReserveIdsFunction; +import org.simantics.db.server.protocol.UndoFunction; +import org.simantics.db.server.protocol.UpdateClusterFunction; +import org.simantics.db.service.ClusterUID; + +public class SessionI implements Session { + + protected ClusterDecompressor clusterDecompressor = new ClusterDecompressor(); + + private final DatabaseI db; + private Client client; + SessionI(DatabaseI db, Client client) { + this.db = db; + this.client = client; + } + DatabaseI getDb() { + return db; + } + Client replace(Client client) throws ProCoreException { + Client t = this.client; + this.client = client; + return t; + } + @Override + public Database getDatabase() { + return db; + } + @Override + public void close() throws ProCoreException { + client.close(); + } + @Override + public boolean isClosed() throws ProCoreException { + return client.isClosed(); + } + @Override + public void open() throws ProCoreException { + try { + client.open(); + } catch (InterruptedException e) { + throw new ProCoreException("Client.open was interrupted.", e); + } + } + public void callAAA() throws ProCoreException { + AAAFunction f = new AAAFunction(); + client.call(new Method(f, null, null)); + } + @Override + public Transaction askReadTransaction() throws ProCoreException { + AskTransactionFunction f = new AskTransactionFunction(Constants.readTransaction, Constants.NullTransactionId); + AskTransactionMethod m = new AskTransactionMethod(f); + client.call(m); + return m; + } + @Override + public Transaction askWriteTransaction(long transactionId) throws ProCoreException { + AskTransactionFunction f = new AskTransactionFunction(Constants.writeTransaction, transactionId); + AskTransactionMethod m = new AskTransactionMethod(f); + client.call(m); + return m; + } + @Override + public long endTransaction(long transactionId) throws ProCoreException { + EndTransactionFunction f = new EndTransactionFunction(transactionId); + Method m = new Method(f, null, null); + client.call(m); + return f.headChangeSetId; + } + @Override + public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException { + AcceptCommitFunction f = new AcceptCommitFunction(transactionId, changeSetId, metadata); + client.call(new Method(f, null, null)); + } + public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException { + CancelCommitFunction f = new CancelCommitFunction(transactionId, changeSetId, metadata); + client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null)); + return f.headChangeSetId; +} +// { +// OpenClientSessionFunction f = new OpenClientSessionFunction(); +// s.call(f); +// Util.show("OpenClientSession sid=" + f.sessionId); +// } +// { +// CloseClientSessionFunction f = new CloseClientSessionFunction(1); +// s.call(f); +// Util.show("CloseClientSession sid=" + f.sessionId); +// } +// { +// EchoFunction f = new EchoFunction(new byte[1]); +// s.call(f); +// Util.show("Echo bytes.len=" + f.bytes.length); +// } + @Override + public String execute(String command) throws ProCoreException { + ExecuteFunction f = new ExecuteFunction(command); + Method m = new Method(f, null, null); + client.call(m); + return f.out; + } + @Override + public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException { + GetChangeSetContextFunction f = new GetChangeSetContextFunction(changeSetId); + client.call(new Method(f, null, null)); + return f.changeSetContext; + } + public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate on) throws ProCoreException { + GetChangeSetDataFunction f = new GetChangeSetDataFunction(minChangeSetId, maxChangeSetId); + GetChangeSetDataMethod m = new GetChangeSetDataMethod(f, new ChangeSetEventHandler(on)); + client.call(m); + return m; + } + public ChangeSetIds getChangeSetIds() throws ProCoreException { + GetChangeSetsFunction f = new GetChangeSetsFunction(); + GetChangeSetIdsMethod m = new GetChangeSetIdsMethod(f); + client.call(m); + return m; + } + public Cluster getCluster(byte[] clusterId) throws ProCoreException { + GetClusterNewFunction f = new GetClusterNewFunction(clusterId); + f.deflated = ByteBuffer.allocate(8192); + GetClusterMethod m = new GetClusterMethod(f); + client.call(m); + return m; + } + public ClusterIds getClusterIds() throws ProCoreException { + int all = 0; + ListClustersFunction f = new ListClustersFunction(all); + GetClusterIdsMethod m = new GetClusterIdsMethod(f); + client.call(m); + return m; + } + public Refresh getRefresh(long changeSetId) throws ProCoreException { + GetRefresh2Function f = new GetRefresh2Function(0); + GetRefreshMethod m = new GetRefreshMethod(f); + client.call(m); + return m; + } + public ResourceSegment getResourceSegment(byte[] clusterId, int resourceIndex, long offset, short size) throws ProCoreException { + GetResourceSegmentFunction f = new GetResourceSegmentFunction(clusterId, resourceIndex, offset, size); + GetResourceSegmentMethod m = new GetResourceSegmentMethod(f); + client.call(m); + return m; + } + public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId) throws ProCoreException { + GetClusterChangesFunction f = new GetClusterChangesFunction(changeSetId, clusterId); + GetClusterChangesMethod m = new GetClusterChangesMethod(f); + client.call(m); + return m; + } + public Information getInformation() throws ProCoreException { + GetServerInfo2Function f = new GetServerInfo2Function(); + GetInformationMethod m = new GetInformationMethod(f); + client.call(m); + return m; + } + public void reconnect(int sessionId) throws ProCoreException { + ReconnectFunction f = new ReconnectFunction(sessionId); + client.call(new Method(f, null, null)); + } + public long reserveIds(int count) throws ProCoreException { + ReserveIdsFunction f = new ReserveIdsFunction(count); + client.call(new Method(f, null, null)); + return f.firstId; + } + public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException { + UndoFunction f = new UndoFunction(changeSetIds); + client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null)); + return f.potentialConflicts; + } + public void updateCluster(byte[] operations) throws ProCoreException { + UpdateClusterFunction f = new UpdateClusterFunction(operations); + client.call(new Method(f, null, null)); + } + @Override + public T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException { + Cluster t = getCluster(uid.asBytes()); + + ByteBuffer deflated = t.getDeflated(); + Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated); + + //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate); + long[] longs = (long[]) arrays[0]; + int[] ints = (int[]) arrays[1]; + byte[] bytes = (byte[]) arrays[2]; + return creator.create(uid, bytes, ints, longs); + + } + @Override + public boolean refreshEnabled() { + return true; + } + @Override + public boolean rolledback() { + return false; + } +} +class SessionManager { + private ConcurrentHashMap sessionMap = new ConcurrentHashMap(); + SessionManager() { + } + public List disconnect(DatabaseI db) throws ProCoreException { + ArrayList clients = new ArrayList(); + for (Entry i : sessionMap.entrySet()) { + SessionI si = i.getValue(); + if (!db.equals(si.getDb())) + continue; + Client client = i.getKey(); + if (client.isOpen()) { + client.close(); + clients.add(client); + } + } + return clients; + } + public void connect(DatabaseI db, List clients) throws ProCoreException, InterruptedException { + for (Client client : clients) { + SessionI si = sessionMap.get(client); + if (null == si || !db.equals(si.getDb())) + continue; + client.open(); + } +// for (Entry i : sessionMap.entrySet()) { +// SessionI si = i.getValue(); +// if (!db.equals(si.getDb())) +// continue; +// Client client = i.getKey(); +// client.open(); +// } + } + public SessionI newSession(DatabaseI db) throws ProCoreException { + Client client = db.newClient(); + SessionI si = new SessionI(db, client); + sessionMap.put(client, si); + return si; + } +} +class SessionEvent { + private final Session session; + private final Throwable cause; + public SessionEvent(Session session, Throwable cause) { + this.session = session; + this.cause = cause; + } + public Session getSession() { + return session; + } + public Throwable getCause() { + return cause; + } +} +class AskTransactionMethod extends Method implements Transaction { + private final AskTransactionFunction function; + AskTransactionMethod(AskTransactionFunction function) { + super(function, null, null); + this.function = function; + } + @Override + public long getHeadChangeSetId() { + return function.headChangeSetId; + } + @Override + public long getTransactionId() { + return function.transactionId; + } +} +class GetChangeSetDataMethod extends Method implements ChangeSetData { + private final GetChangeSetDataFunction function; + GetChangeSetDataMethod(GetChangeSetDataFunction function, EventHandler evh) { + super(function, evh, null); + this.function = function; + } + @Override + public boolean isOk() { + return !function.failed; + } +} +class GetChangeSetIdsMethod extends Method implements ChangeSetIds { + private final GetChangeSetsFunction function; + GetChangeSetIdsMethod(GetChangeSetsFunction function) { + super(function, null, null); + this.function = function; + } + @Override + public long getFirstChangeSetId() { + if (function.changeSetIds.length > 0) + return function.changeSetIds[0]; + else + return Constants.NullChangeSetId; + } + @Override + public int getCount() { + return function.changeSetIds.length; + } +} +class GetClusterMethod extends Method implements Cluster { + private final GetClusterNewFunction function; + GetClusterMethod(GetClusterNewFunction function) { + super(function, null, new GetClusterHandler(function)); + this.function = function; + } + @Override + public int getInflateSize() { + return function.inflateSize; + } + @Override + public ByteBuffer getDeflated() { + return function.deflated; + } +} +class GetClusterChangesMethod extends Method implements ClusterChanges { + private final GetClusterChangesFunction function; + GetClusterChangesMethod(GetClusterChangesFunction function) { + super(function, null, null); + this.function = function; + } + @Override + public long getHeadChangeSetId() { + return function.headChangeSetId; + } + @Override + public int[] getResourceIndex() { + return function.resourceIndex; + } + @Override + public int[] getPredicateIndex() { + return function.predicateIndex; + } + @Override + public long[] getPredicateFirst() { + return function.predicateFirst; + } + @Override + public long[] getPredicateSecond() { + return function.predicateSecond; + } + @Override + public int[] getValueIndex() { + return function.valueIndex; + } +} +class GetClusterIdsMethod extends Method implements ClusterIds { + private final ListClustersFunction function; + GetClusterIdsMethod(ListClustersFunction function) { + super(function, null, null); + this.function = function; + } + @Override + public int getStatus() { + return function.status; + } + @Override + public long[] getFirst() { + return function.first; + } + @Override + public long[] getSecond() { + return function.second; + } +} +class GetInformationMethod extends Method implements Information { + private final GetServerInfo2Function function; + GetInformationMethod(GetServerInfo2Function function) { + super(function, null, null); + this.function = function; + } + @Override + public String getServerId() { + return function.serverId; + } + @Override + public String getProtocolId() { + return function.protocolId; + } + @Override + public String getDatabaseId() { + return function.databaseId; + } + @Override + public long getFirstChangeSetId() { + return function.firstChangeSetId; + } +} +class GetRefreshMethod extends Method implements Refresh { + private final GetRefresh2Function function; + GetRefreshMethod(GetRefresh2Function function) { + super(function, null, null); + this.function = function; + } + @Override + public long getHeadChangeSetId() { + return function.headChangeSetId; + } + @Override + public long[] getFirst() { + return function.first; + } + @Override + public long[] getSecond() { + return function.second; + } +} +class GetResourceSegmentMethod extends Method implements ResourceSegment { + private final GetResourceSegmentFunction function; + GetResourceSegmentMethod(GetResourceSegmentFunction function) { + super(function, null, null); + this.function = function; + } + @Override + public byte[] getClusterId() { + return function.clusterUID; + } + @Override + public int getResourceIndex() { + return function.resourceIndex; + } + @Override + public long getValueSize() { + return function.valueSize; + } + @Override + public byte[] getSegment() { + return function.segment; + } + @Override + public long getOffset() { + return function.segmentOffset; + } +} +class ChangeSetEventHandler extends EventHandler implements ChangeSetUpdate { + private final org.simantics.db.server.protocol.ChangeSetUpdateEvent csuEvent = new org.simantics.db.server.protocol.ChangeSetUpdateEvent(); + private final OnChangeSetUpdate on; + private ProCoreException exception; + ChangeSetEventHandler(OnChangeSetUpdate on) { + this.on = on; + } + @Override + void on(Event event) { + event.deserialize(csuEvent); + Util.log("ChangeSetEvent cid=" + csuEvent.changeSetId); + try { + if (null != on) + on.onChangeSetUpdate(this); + } catch (SDBException e) { + if (null == exception) + exception = new ProCoreException("Execution of ChangeSetUpdate callback failed.", e); + else + Util.logError("Execution of ChangeSetUpdate callback failed(2).", e); + } + } + @Override + public long getChangeSetId() { + return csuEvent.changeSetId; + } + @Override + public int getChangeSetIndex() { + return csuEvent.changeSetIndex; + } + @Override + public int getNumberOfClusterChangeSets() { + return csuEvent.numberOfClusterChangeSets; + } + @Override + public int getIndexOfClusterChangeSet() { + return csuEvent.indexOfClusterChangeSet; + } + @Override + public byte[] getClusterId() { + return csuEvent.clusterUID; + } + @Override + public boolean getNewCluster() { + return csuEvent.newCluster; + } + @Override + public byte[] getData() { + return csuEvent.data; + } +}