X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.server%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fserver%2Finternal%2FSessionI.java;fp=bundles%2Forg.simantics.db.server%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fserver%2Finternal%2FSessionI.java;h=bb2cdb04059fc21a94c311c01b8d6051f496fe95;hb=969bd23cab98a79ca9101af33334000879fb60c5;hp=0000000000000000000000000000000000000000;hpb=866dba5cd5a3929bbeae85991796acb212338a08;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/SessionI.java b/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/SessionI.java new file mode 100644 index 000000000..bb2cdb040 --- /dev/null +++ b/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/SessionI.java @@ -0,0 +1,517 @@ +package org.simantics.db.server.internal; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +import org.simantics.db.ClusterCreator; +import org.simantics.db.Database; +import org.simantics.db.Database.Session; +import org.simantics.db.Database.Session.ChangeSetData; +import org.simantics.db.Database.Session.ChangeSetIds; +import org.simantics.db.Database.Session.ChangeSetUpdate; +import org.simantics.db.Database.Session.Cluster; +import org.simantics.db.Database.Session.ClusterChanges; +import org.simantics.db.Database.Session.ClusterIds; +import org.simantics.db.Database.Session.Information; +import org.simantics.db.Database.Session.OnChangeSetUpdate; +import org.simantics.db.Database.Session.Refresh; +import org.simantics.db.Database.Session.ResourceSegment; +import org.simantics.db.Database.Session.Transaction; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.exception.SDBException; +import org.simantics.db.server.ProCoreException; +import org.simantics.db.server.protocol.AAAFunction; +import org.simantics.db.server.protocol.AcceptCommitFunction; +import org.simantics.db.server.protocol.AskTransactionFunction; +import org.simantics.db.server.protocol.CancelCommitFunction; +import org.simantics.db.server.protocol.Constants; +import org.simantics.db.server.protocol.EndTransactionFunction; +import org.simantics.db.server.protocol.ExecuteFunction; +import org.simantics.db.server.protocol.GetChangeSetContextFunction; +import org.simantics.db.server.protocol.GetChangeSetDataFunction; +import org.simantics.db.server.protocol.GetChangeSetsFunction; +import org.simantics.db.server.protocol.GetClusterChangesFunction; +import org.simantics.db.server.protocol.GetClusterNewFunction; +import org.simantics.db.server.protocol.GetRefresh2Function; +import org.simantics.db.server.protocol.GetResourceSegmentFunction; +import org.simantics.db.server.protocol.GetServerInfo2Function; +import org.simantics.db.server.protocol.ListClustersFunction; +import org.simantics.db.server.protocol.ReconnectFunction; +import org.simantics.db.server.protocol.ReserveIdsFunction; +import org.simantics.db.server.protocol.UndoFunction; +import org.simantics.db.server.protocol.UpdateClusterFunction; +import org.simantics.db.service.ClusterUID; + +public class SessionI implements Session { + + protected ClusterDecompressor clusterDecompressor = new ClusterDecompressor(); + + private final DatabaseI db; + private Client client; + SessionI(DatabaseI db, Client client) { + this.db = db; + this.client = client; + } + DatabaseI getDb() { + return db; + } + Client replace(Client client) throws ProCoreException { + Client t = this.client; + this.client = client; + return t; + } + @Override + public Database getDatabase() { + return db; + } + @Override + public void close() throws ProCoreException { + client.close(); + } + @Override + public boolean isClosed() throws ProCoreException { + return client.isClosed(); + } + @Override + public void open() throws ProCoreException { + try { + client.open(); + } catch (InterruptedException e) { + throw new ProCoreException("Client.open was interrupted.", e); + } + } + public void callAAA() throws ProCoreException { + AAAFunction f = new AAAFunction(); + client.call(new Method(f, null, null)); + } + @Override + public Transaction askReadTransaction() throws ProCoreException { + AskTransactionFunction f = new AskTransactionFunction(Constants.readTransaction, Constants.NullTransactionId); + AskTransactionMethod m = new AskTransactionMethod(f); + client.call(m); + return m; + } + @Override + public Transaction askWriteTransaction(long transactionId) throws ProCoreException { + AskTransactionFunction f = new AskTransactionFunction(Constants.writeTransaction, transactionId); + AskTransactionMethod m = new AskTransactionMethod(f); + client.call(m); + return m; + } + @Override + public long endTransaction(long transactionId) throws ProCoreException { + EndTransactionFunction f = new EndTransactionFunction(transactionId); + Method m = new Method(f, null, null); + client.call(m); + return f.headChangeSetId; + } + @Override + public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException { + AcceptCommitFunction f = new AcceptCommitFunction(transactionId, changeSetId, metadata); + client.call(new Method(f, null, null)); + } + public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException { + CancelCommitFunction f = new CancelCommitFunction(transactionId, changeSetId, metadata); + client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null)); + return f.headChangeSetId; +} +// { +// OpenClientSessionFunction f = new OpenClientSessionFunction(); +// s.call(f); +// Util.show("OpenClientSession sid=" + f.sessionId); +// } +// { +// CloseClientSessionFunction f = new CloseClientSessionFunction(1); +// s.call(f); +// Util.show("CloseClientSession sid=" + f.sessionId); +// } +// { +// EchoFunction f = new EchoFunction(new byte[1]); +// s.call(f); +// Util.show("Echo bytes.len=" + f.bytes.length); +// } + @Override + public String execute(String command) throws ProCoreException { + ExecuteFunction f = new ExecuteFunction(command); + Method m = new Method(f, null, null); + client.call(m); + return f.out; + } + @Override + public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException { + GetChangeSetContextFunction f = new GetChangeSetContextFunction(changeSetId); + client.call(new Method(f, null, null)); + return f.changeSetContext; + } + public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate on) throws ProCoreException { + GetChangeSetDataFunction f = new GetChangeSetDataFunction(minChangeSetId, maxChangeSetId); + GetChangeSetDataMethod m = new GetChangeSetDataMethod(f, new ChangeSetEventHandler(on)); + client.call(m); + return m; + } + public ChangeSetIds getChangeSetIds() throws ProCoreException { + GetChangeSetsFunction f = new GetChangeSetsFunction(); + GetChangeSetIdsMethod m = new GetChangeSetIdsMethod(f); + client.call(m); + return m; + } + public Cluster getCluster(byte[] clusterId) throws ProCoreException { + GetClusterNewFunction f = new GetClusterNewFunction(clusterId); + f.deflated = ByteBuffer.allocate(8192); + GetClusterMethod m = new GetClusterMethod(f); + client.call(m); + return m; + } + public ClusterIds getClusterIds() throws ProCoreException { + int all = 0; + ListClustersFunction f = new ListClustersFunction(all); + GetClusterIdsMethod m = new GetClusterIdsMethod(f); + client.call(m); + return m; + } + public Refresh getRefresh(long changeSetId) throws ProCoreException { + GetRefresh2Function f = new GetRefresh2Function(0); + GetRefreshMethod m = new GetRefreshMethod(f); + client.call(m); + return m; + } + public ResourceSegment getResourceSegment(byte[] clusterId, int resourceIndex, long offset, short size) throws ProCoreException { + GetResourceSegmentFunction f = new GetResourceSegmentFunction(clusterId, resourceIndex, offset, size); + GetResourceSegmentMethod m = new GetResourceSegmentMethod(f); + client.call(m); + return m; + } + public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId) throws ProCoreException { + GetClusterChangesFunction f = new GetClusterChangesFunction(changeSetId, clusterId); + GetClusterChangesMethod m = new GetClusterChangesMethod(f); + client.call(m); + return m; + } + public Information getInformation() throws ProCoreException { + GetServerInfo2Function f = new GetServerInfo2Function(); + GetInformationMethod m = new GetInformationMethod(f); + client.call(m); + return m; + } + public void reconnect(int sessionId) throws ProCoreException { + ReconnectFunction f = new ReconnectFunction(sessionId); + client.call(new Method(f, null, null)); + } + public long reserveIds(int count) throws ProCoreException { + ReserveIdsFunction f = new ReserveIdsFunction(count); + client.call(new Method(f, null, null)); + return f.firstId; + } + public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException { + UndoFunction f = new UndoFunction(changeSetIds); + client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null)); + return f.potentialConflicts; + } + public void updateCluster(byte[] operations) throws ProCoreException { + UpdateClusterFunction f = new UpdateClusterFunction(operations); + client.call(new Method(f, null, null)); + } + @Override + public T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException { + Cluster t = getCluster(uid.asBytes()); + + ByteBuffer deflated = t.getDeflated(); + Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated); + + //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate); + long[] longs = (long[]) arrays[0]; + int[] ints = (int[]) arrays[1]; + byte[] bytes = (byte[]) arrays[2]; + return creator.create(uid, bytes, ints, longs); + + } + @Override + public boolean refreshEnabled() { + return true; + } +} +class SessionManager { + private ConcurrentHashMap sessionMap = new ConcurrentHashMap(); + SessionManager() { + } + public List disconnect(DatabaseI db) throws ProCoreException { + ArrayList clients = new ArrayList(); + for (Entry i : sessionMap.entrySet()) { + SessionI si = i.getValue(); + if (!db.equals(si.getDb())) + continue; + Client client = i.getKey(); + if (client.isOpen()) { + client.close(); + clients.add(client); + } + } + return clients; + } + public void connect(DatabaseI db, List clients) throws ProCoreException, InterruptedException { + for (Client client : clients) { + SessionI si = sessionMap.get(client); + if (null == si || !db.equals(si.getDb())) + continue; + client.open(); + } +// for (Entry i : sessionMap.entrySet()) { +// SessionI si = i.getValue(); +// if (!db.equals(si.getDb())) +// continue; +// Client client = i.getKey(); +// client.open(); +// } + } + public SessionI newSession(DatabaseI db) throws ProCoreException { + Client client = db.newClient(); + SessionI si = new SessionI(db, client); + sessionMap.put(client, si); + return si; + } +} +class SessionEvent { + private final Session session; + private final Throwable cause; + public SessionEvent(Session session, Throwable cause) { + this.session = session; + this.cause = cause; + } + public Session getSession() { + return session; + } + public Throwable getCause() { + return cause; + } +} +class AskTransactionMethod extends Method implements Transaction { + private final AskTransactionFunction function; + AskTransactionMethod(AskTransactionFunction function) { + super(function, null, null); + this.function = function; + } + @Override + public long getHeadChangeSetId() { + return function.headChangeSetId; + } + @Override + public long getTransactionId() { + return function.transactionId; + } +} +class GetChangeSetDataMethod extends Method implements ChangeSetData { + private final GetChangeSetDataFunction function; + GetChangeSetDataMethod(GetChangeSetDataFunction function, EventHandler evh) { + super(function, evh, null); + this.function = function; + } + @Override + public boolean isOk() { + return !function.failed; + } +} +class GetChangeSetIdsMethod extends Method implements ChangeSetIds { + private final GetChangeSetsFunction function; + GetChangeSetIdsMethod(GetChangeSetsFunction function) { + super(function, null, null); + this.function = function; + } + @Override + public long getFirstChangeSetId() { + if (function.changeSetIds.length > 0) + return function.changeSetIds[0]; + else + return Constants.NullChangeSetId; + } + @Override + public int getCount() { + return function.changeSetIds.length; + } +} +class GetClusterMethod extends Method implements Cluster { + private final GetClusterNewFunction function; + GetClusterMethod(GetClusterNewFunction function) { + super(function, null, new GetClusterHandler(function)); + this.function = function; + } + @Override + public int getInflateSize() { + return function.inflateSize; + } + @Override + public ByteBuffer getDeflated() { + return function.deflated; + } +} +class GetClusterChangesMethod extends Method implements ClusterChanges { + private final GetClusterChangesFunction function; + GetClusterChangesMethod(GetClusterChangesFunction function) { + super(function, null, null); + this.function = function; + } + @Override + public long getHeadChangeSetId() { + return function.headChangeSetId; + } + @Override + public int[] getResourceIndex() { + return function.resourceIndex; + } + @Override + public int[] getPredicateIndex() { + return function.predicateIndex; + } + @Override + public long[] getPredicateFirst() { + return function.predicateFirst; + } + @Override + public long[] getPredicateSecond() { + return function.predicateSecond; + } + @Override + public int[] getValueIndex() { + return function.valueIndex; + } +} +class GetClusterIdsMethod extends Method implements ClusterIds { + private final ListClustersFunction function; + GetClusterIdsMethod(ListClustersFunction function) { + super(function, null, null); + this.function = function; + } + @Override + public int getStatus() { + return function.status; + } + @Override + public long[] getFirst() { + return function.first; + } + @Override + public long[] getSecond() { + return function.second; + } +} +class GetInformationMethod extends Method implements Information { + private final GetServerInfo2Function function; + GetInformationMethod(GetServerInfo2Function function) { + super(function, null, null); + this.function = function; + } + @Override + public String getServerId() { + return function.serverId; + } + @Override + public String getProtocolId() { + return function.protocolId; + } + @Override + public String getDatabaseId() { + return function.databaseId; + } + @Override + public long getFirstChangeSetId() { + return function.firstChangeSetId; + } +} +class GetRefreshMethod extends Method implements Refresh { + private final GetRefresh2Function function; + GetRefreshMethod(GetRefresh2Function function) { + super(function, null, null); + this.function = function; + } + @Override + public long getHeadChangeSetId() { + return function.headChangeSetId; + } + @Override + public long[] getFirst() { + return function.first; + } + @Override + public long[] getSecond() { + return function.second; + } +} +class GetResourceSegmentMethod extends Method implements ResourceSegment { + private final GetResourceSegmentFunction function; + GetResourceSegmentMethod(GetResourceSegmentFunction function) { + super(function, null, null); + this.function = function; + } + @Override + public byte[] getClusterId() { + return function.clusterUID; + } + @Override + public int getResourceIndex() { + return function.resourceIndex; + } + @Override + public long getValueSize() { + return function.valueSize; + } + @Override + public byte[] getSegment() { + return function.segment; + } + @Override + public long getOffset() { + return function.segmentOffset; + } +} +class ChangeSetEventHandler extends EventHandler implements ChangeSetUpdate { + private final org.simantics.db.server.protocol.ChangeSetUpdateEvent csuEvent = new org.simantics.db.server.protocol.ChangeSetUpdateEvent(); + private final OnChangeSetUpdate on; + private ProCoreException exception; + ChangeSetEventHandler(OnChangeSetUpdate on) { + this.on = on; + } + @Override + void on(Event event) { + event.deserialize(csuEvent); + Util.log("ChangeSetEvent cid=" + csuEvent.changeSetId); + try { + if (null != on) + on.onChangeSetUpdate(this); + } catch (SDBException e) { + if (null == exception) + exception = new ProCoreException("Execution of ChangeSetUpdate callback failed.", e); + else + Util.logError("Execution of ChangeSetUpdate callback failed(2).", e); + } + } + @Override + public long getChangeSetId() { + return csuEvent.changeSetId; + } + @Override + public int getChangeSetIndex() { + return csuEvent.changeSetIndex; + } + @Override + public int getNumberOfClusterChangeSets() { + return csuEvent.numberOfClusterChangeSets; + } + @Override + public int getIndexOfClusterChangeSet() { + return csuEvent.indexOfClusterChangeSet; + } + @Override + public byte[] getClusterId() { + return csuEvent.clusterUID; + } + @Override + public boolean getNewCluster() { + return csuEvent.newCluster; + } + @Override + public byte[] getData() { + return csuEvent.data; + } +}