package org.simantics.db.server.internal; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import org.simantics.db.ClusterCreator; import org.simantics.db.Database; import org.simantics.db.Database.Session; import org.simantics.db.Database.Session.ChangeSetData; import org.simantics.db.Database.Session.ChangeSetIds; import org.simantics.db.Database.Session.ChangeSetUpdate; import org.simantics.db.Database.Session.Cluster; import org.simantics.db.Database.Session.ClusterChanges; import org.simantics.db.Database.Session.ClusterIds; import org.simantics.db.Database.Session.Information; import org.simantics.db.Database.Session.OnChangeSetUpdate; import org.simantics.db.Database.Session.Refresh; import org.simantics.db.Database.Session.ResourceSegment; import org.simantics.db.Database.Session.Transaction; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.SDBException; import org.simantics.db.server.ProCoreException; import org.simantics.db.server.protocol.AAAFunction; import org.simantics.db.server.protocol.AcceptCommitFunction; import org.simantics.db.server.protocol.AskTransactionFunction; import org.simantics.db.server.protocol.CancelCommitFunction; import org.simantics.db.server.protocol.Constants; import org.simantics.db.server.protocol.EndTransactionFunction; import org.simantics.db.server.protocol.ExecuteFunction; import org.simantics.db.server.protocol.GetChangeSetContextFunction; import org.simantics.db.server.protocol.GetChangeSetDataFunction; import org.simantics.db.server.protocol.GetChangeSetsFunction; import org.simantics.db.server.protocol.GetClusterChangesFunction; import org.simantics.db.server.protocol.GetClusterNewFunction; import org.simantics.db.server.protocol.GetRefresh2Function; import org.simantics.db.server.protocol.GetResourceSegmentFunction; import org.simantics.db.server.protocol.GetServerInfo2Function; import org.simantics.db.server.protocol.ListClustersFunction; import org.simantics.db.server.protocol.ReconnectFunction; import org.simantics.db.server.protocol.ReserveIdsFunction; import org.simantics.db.server.protocol.UndoFunction; import org.simantics.db.server.protocol.UpdateClusterFunction; import org.simantics.db.service.ClusterUID; public class SessionI implements Session { protected ClusterDecompressor clusterDecompressor = new ClusterDecompressor(); private final DatabaseI db; private Client client; SessionI(DatabaseI db, Client client) { this.db = db; this.client = client; } DatabaseI getDb() { return db; } Client replace(Client client) throws ProCoreException { Client t = this.client; this.client = client; return t; } @Override public Database getDatabase() { return db; } @Override public void close() throws ProCoreException { client.close(); } @Override public boolean isClosed() throws ProCoreException { return client.isClosed(); } @Override public void open() throws ProCoreException { try { client.open(); } catch (InterruptedException e) { throw new ProCoreException("Client.open was interrupted.", e); } } public void callAAA() throws ProCoreException { AAAFunction f = new AAAFunction(); client.call(new Method(f, null, null)); } @Override public Transaction askReadTransaction() throws ProCoreException { AskTransactionFunction f = new AskTransactionFunction(Constants.readTransaction, Constants.NullTransactionId); AskTransactionMethod m = new AskTransactionMethod(f); client.call(m); return m; } @Override public Transaction askWriteTransaction(long transactionId) throws ProCoreException { AskTransactionFunction f = new AskTransactionFunction(Constants.writeTransaction, transactionId); AskTransactionMethod m = new AskTransactionMethod(f); client.call(m); return m; } @Override public long endTransaction(long transactionId) throws ProCoreException { EndTransactionFunction f = new EndTransactionFunction(transactionId); Method m = new Method(f, null, null); client.call(m); return f.headChangeSetId; } @Override public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException { AcceptCommitFunction f = new AcceptCommitFunction(transactionId, changeSetId, metadata); client.call(new Method(f, null, null)); } public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException { CancelCommitFunction f = new CancelCommitFunction(transactionId, changeSetId, metadata); client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null)); return f.headChangeSetId; } // { // OpenClientSessionFunction f = new OpenClientSessionFunction(); // s.call(f); // Util.show("OpenClientSession sid=" + f.sessionId); // } // { // CloseClientSessionFunction f = new CloseClientSessionFunction(1); // s.call(f); // Util.show("CloseClientSession sid=" + f.sessionId); // } // { // EchoFunction f = new EchoFunction(new byte[1]); // s.call(f); // Util.show("Echo bytes.len=" + f.bytes.length); // } @Override public String execute(String command) throws ProCoreException { ExecuteFunction f = new ExecuteFunction(command); Method m = new Method(f, null, null); client.call(m); return f.out; } @Override public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException { GetChangeSetContextFunction f = new GetChangeSetContextFunction(changeSetId); client.call(new Method(f, null, null)); return f.changeSetContext; } public ChangeSetData getChangeSetData(long minChangeSetId, long maxChangeSetId, OnChangeSetUpdate on) throws ProCoreException { GetChangeSetDataFunction f = new GetChangeSetDataFunction(minChangeSetId, maxChangeSetId); GetChangeSetDataMethod m = new GetChangeSetDataMethod(f, new ChangeSetEventHandler(on)); client.call(m); return m; } public ChangeSetIds getChangeSetIds() throws ProCoreException { GetChangeSetsFunction f = new GetChangeSetsFunction(); GetChangeSetIdsMethod m = new GetChangeSetIdsMethod(f); client.call(m); return m; } public Cluster getCluster(byte[] clusterId) throws ProCoreException { GetClusterNewFunction f = new GetClusterNewFunction(clusterId); f.deflated = ByteBuffer.allocate(8192); GetClusterMethod m = new GetClusterMethod(f); client.call(m); return m; } public ClusterIds getClusterIds() throws ProCoreException { int all = 0; ListClustersFunction f = new ListClustersFunction(all); GetClusterIdsMethod m = new GetClusterIdsMethod(f); client.call(m); return m; } public Refresh getRefresh(long changeSetId) throws ProCoreException { GetRefresh2Function f = new GetRefresh2Function(0); GetRefreshMethod m = new GetRefreshMethod(f); client.call(m); return m; } public ResourceSegment getResourceSegment(byte[] clusterId, int resourceIndex, long offset, short size) throws ProCoreException { GetResourceSegmentFunction f = new GetResourceSegmentFunction(clusterId, resourceIndex, offset, size); GetResourceSegmentMethod m = new GetResourceSegmentMethod(f); client.call(m); return m; } public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId) throws ProCoreException { GetClusterChangesFunction f = new GetClusterChangesFunction(changeSetId, clusterId); GetClusterChangesMethod m = new GetClusterChangesMethod(f); client.call(m); return m; } public Information getInformation() throws ProCoreException { GetServerInfo2Function f = new GetServerInfo2Function(); GetInformationMethod m = new GetInformationMethod(f); client.call(m); return m; } public void reconnect(int sessionId) throws ProCoreException { ReconnectFunction f = new ReconnectFunction(sessionId); client.call(new Method(f, null, null)); } public long reserveIds(int count) throws ProCoreException { ReserveIdsFunction f = new ReserveIdsFunction(count); client.call(new Method(f, null, null)); return f.firstId; } public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException { UndoFunction f = new UndoFunction(changeSetIds); client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null)); return f.potentialConflicts; } public void updateCluster(byte[] operations) throws ProCoreException { UpdateClusterFunction f = new UpdateClusterFunction(operations); client.call(new Method(f, null, null)); } @Override public T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException { Cluster t = getCluster(uid.asBytes()); ByteBuffer deflated = t.getDeflated(); Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated); //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate); long[] longs = (long[]) arrays[0]; int[] ints = (int[]) arrays[1]; byte[] bytes = (byte[]) arrays[2]; return creator.create(uid, bytes, ints, longs); } @Override public boolean refreshEnabled() { return true; } @Override public boolean rolledback() { // TODO Auto-generated method stub return false; } } class SessionManager { private ConcurrentHashMap sessionMap = new ConcurrentHashMap(); SessionManager() { } public List disconnect(DatabaseI db) throws ProCoreException { ArrayList clients = new ArrayList(); for (Entry i : sessionMap.entrySet()) { SessionI si = i.getValue(); if (!db.equals(si.getDb())) continue; Client client = i.getKey(); if (client.isOpen()) { client.close(); clients.add(client); } } return clients; } public void connect(DatabaseI db, List clients) throws ProCoreException, InterruptedException { for (Client client : clients) { SessionI si = sessionMap.get(client); if (null == si || !db.equals(si.getDb())) continue; client.open(); } // for (Entry i : sessionMap.entrySet()) { // SessionI si = i.getValue(); // if (!db.equals(si.getDb())) // continue; // Client client = i.getKey(); // client.open(); // } } public SessionI newSession(DatabaseI db) throws ProCoreException { Client client = db.newClient(); SessionI si = new SessionI(db, client); sessionMap.put(client, si); return si; } } class SessionEvent { private final Session session; private final Throwable cause; public SessionEvent(Session session, Throwable cause) { this.session = session; this.cause = cause; } public Session getSession() { return session; } public Throwable getCause() { return cause; } } class AskTransactionMethod extends Method implements Transaction { private final AskTransactionFunction function; AskTransactionMethod(AskTransactionFunction function) { super(function, null, null); this.function = function; } @Override public long getHeadChangeSetId() { return function.headChangeSetId; } @Override public long getTransactionId() { return function.transactionId; } } class GetChangeSetDataMethod extends Method implements ChangeSetData { private final GetChangeSetDataFunction function; GetChangeSetDataMethod(GetChangeSetDataFunction function, EventHandler evh) { super(function, evh, null); this.function = function; } @Override public boolean isOk() { return !function.failed; } } class GetChangeSetIdsMethod extends Method implements ChangeSetIds { private final GetChangeSetsFunction function; GetChangeSetIdsMethod(GetChangeSetsFunction function) { super(function, null, null); this.function = function; } @Override public long getFirstChangeSetId() { if (function.changeSetIds.length > 0) return function.changeSetIds[0]; else return Constants.NullChangeSetId; } @Override public int getCount() { return function.changeSetIds.length; } } class GetClusterMethod extends Method implements Cluster { private final GetClusterNewFunction function; GetClusterMethod(GetClusterNewFunction function) { super(function, null, new GetClusterHandler(function)); this.function = function; } @Override public int getInflateSize() { return function.inflateSize; } @Override public ByteBuffer getDeflated() { return function.deflated; } } class GetClusterChangesMethod extends Method implements ClusterChanges { private final GetClusterChangesFunction function; GetClusterChangesMethod(GetClusterChangesFunction function) { super(function, null, null); this.function = function; } @Override public long getHeadChangeSetId() { return function.headChangeSetId; } @Override public int[] getResourceIndex() { return function.resourceIndex; } @Override public int[] getPredicateIndex() { return function.predicateIndex; } @Override public long[] getPredicateFirst() { return function.predicateFirst; } @Override public long[] getPredicateSecond() { return function.predicateSecond; } @Override public int[] getValueIndex() { return function.valueIndex; } } class GetClusterIdsMethod extends Method implements ClusterIds { private final ListClustersFunction function; GetClusterIdsMethod(ListClustersFunction function) { super(function, null, null); this.function = function; } @Override public int getStatus() { return function.status; } @Override public long[] getFirst() { return function.first; } @Override public long[] getSecond() { return function.second; } } class GetInformationMethod extends Method implements Information { private final GetServerInfo2Function function; GetInformationMethod(GetServerInfo2Function function) { super(function, null, null); this.function = function; } @Override public String getServerId() { return function.serverId; } @Override public String getProtocolId() { return function.protocolId; } @Override public String getDatabaseId() { return function.databaseId; } @Override public long getFirstChangeSetId() { return function.firstChangeSetId; } } class GetRefreshMethod extends Method implements Refresh { private final GetRefresh2Function function; GetRefreshMethod(GetRefresh2Function function) { super(function, null, null); this.function = function; } @Override public long getHeadChangeSetId() { return function.headChangeSetId; } @Override public long[] getFirst() { return function.first; } @Override public long[] getSecond() { return function.second; } } class GetResourceSegmentMethod extends Method implements ResourceSegment { private final GetResourceSegmentFunction function; GetResourceSegmentMethod(GetResourceSegmentFunction function) { super(function, null, null); this.function = function; } @Override public byte[] getClusterId() { return function.clusterUID; } @Override public int getResourceIndex() { return function.resourceIndex; } @Override public long getValueSize() { return function.valueSize; } @Override public byte[] getSegment() { return function.segment; } @Override public long getOffset() { return function.segmentOffset; } } class ChangeSetEventHandler extends EventHandler implements ChangeSetUpdate { private final org.simantics.db.server.protocol.ChangeSetUpdateEvent csuEvent = new org.simantics.db.server.protocol.ChangeSetUpdateEvent(); private final OnChangeSetUpdate on; private ProCoreException exception; ChangeSetEventHandler(OnChangeSetUpdate on) { this.on = on; } @Override void on(Event event) { event.deserialize(csuEvent); Util.log("ChangeSetEvent cid=" + csuEvent.changeSetId); try { if (null != on) on.onChangeSetUpdate(this); } catch (SDBException e) { if (null == exception) exception = new ProCoreException("Execution of ChangeSetUpdate callback failed.", e); else Util.logError("Execution of ChangeSetUpdate callback failed(2).", e); } } @Override public long getChangeSetId() { return csuEvent.changeSetId; } @Override public int getChangeSetIndex() { return csuEvent.changeSetIndex; } @Override public int getNumberOfClusterChangeSets() { return csuEvent.numberOfClusterChangeSets; } @Override public int getIndexOfClusterChangeSet() { return csuEvent.indexOfClusterChangeSet; } @Override public byte[] getClusterId() { return csuEvent.clusterUID; } @Override public boolean getNewCluster() { return csuEvent.newCluster; } @Override public byte[] getData() { return csuEvent.data; } }