]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.server/src/org/simantics/db/server/internal/SessionI.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.db.server / src / org / simantics / db / server / internal / SessionI.java
diff --git a/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/SessionI.java b/bundles/org.simantics.db.server/src/org/simantics/db/server/internal/SessionI.java
new file mode 100644 (file)
index 0000000..bb2cdb0
--- /dev/null
@@ -0,0 +1,517 @@
+package org.simantics.db.server.internal;\r
+\r
+import java.nio.ByteBuffer;\r
+import java.util.ArrayList;\r
+import java.util.List;\r
+import java.util.Map.Entry;\r
+import java.util.concurrent.ConcurrentHashMap;\r
+\r
+import org.simantics.db.ClusterCreator;\r
+import org.simantics.db.Database;\r
+import org.simantics.db.Database.Session;\r
+import org.simantics.db.Database.Session.ChangeSetData;\r
+import org.simantics.db.Database.Session.ChangeSetIds;\r
+import org.simantics.db.Database.Session.ChangeSetUpdate;\r
+import org.simantics.db.Database.Session.Cluster;\r
+import org.simantics.db.Database.Session.ClusterChanges;\r
+import org.simantics.db.Database.Session.ClusterIds;\r
+import org.simantics.db.Database.Session.Information;\r
+import org.simantics.db.Database.Session.OnChangeSetUpdate;\r
+import org.simantics.db.Database.Session.Refresh;\r
+import org.simantics.db.Database.Session.ResourceSegment;\r
+import org.simantics.db.Database.Session.Transaction;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.exception.SDBException;\r
+import org.simantics.db.server.ProCoreException;\r
+import org.simantics.db.server.protocol.AAAFunction;\r
+import org.simantics.db.server.protocol.AcceptCommitFunction;\r
+import org.simantics.db.server.protocol.AskTransactionFunction;\r
+import org.simantics.db.server.protocol.CancelCommitFunction;\r
+import org.simantics.db.server.protocol.Constants;\r
+import org.simantics.db.server.protocol.EndTransactionFunction;\r
+import org.simantics.db.server.protocol.ExecuteFunction;\r
+import org.simantics.db.server.protocol.GetChangeSetContextFunction;\r
+import org.simantics.db.server.protocol.GetChangeSetDataFunction;\r
+import org.simantics.db.server.protocol.GetChangeSetsFunction;\r
+import org.simantics.db.server.protocol.GetClusterChangesFunction;\r
+import org.simantics.db.server.protocol.GetClusterNewFunction;\r
+import org.simantics.db.server.protocol.GetRefresh2Function;\r
+import org.simantics.db.server.protocol.GetResourceSegmentFunction;\r
+import org.simantics.db.server.protocol.GetServerInfo2Function;\r
+import org.simantics.db.server.protocol.ListClustersFunction;\r
+import org.simantics.db.server.protocol.ReconnectFunction;\r
+import org.simantics.db.server.protocol.ReserveIdsFunction;\r
+import org.simantics.db.server.protocol.UndoFunction;\r
+import org.simantics.db.server.protocol.UpdateClusterFunction;\r
+import org.simantics.db.service.ClusterUID;\r
+\r
+public class SessionI implements Session {\r
+    \r
+    protected ClusterDecompressor clusterDecompressor = new ClusterDecompressor();\r
+    \r
+    private final DatabaseI db;\r
+    private Client client;\r
+    SessionI(DatabaseI db, Client client) {\r
+        this.db = db;\r
+        this.client = client;\r
+    }\r
+    DatabaseI getDb() {\r
+        return db;\r
+    }\r
+    Client replace(Client client) throws ProCoreException {\r
+        Client t = this.client;\r
+        this.client = client;\r
+        return t;\r
+    }\r
+    @Override\r
+    public Database getDatabase() {\r
+        return db;\r
+    }\r
+    @Override\r
+    public void close() throws ProCoreException {\r
+        client.close();\r
+    }\r
+    @Override\r
+    public boolean isClosed() throws ProCoreException {\r
+        return client.isClosed();\r
+    }\r
+    @Override\r
+    public void open() throws ProCoreException {\r
+        try {\r
+            client.open();\r
+        } catch (InterruptedException e) {\r
+            throw new ProCoreException("Client.open was interrupted.", e);\r
+        }\r
+    }\r
+    public void callAAA() throws ProCoreException {\r
+        AAAFunction f = new AAAFunction();\r
+        client.call(new Method(f, null, null));\r
+    }\r
+    @Override\r
+    public Transaction askReadTransaction() throws ProCoreException {\r
+        AskTransactionFunction f = new AskTransactionFunction(Constants.readTransaction, Constants.NullTransactionId);\r
+        AskTransactionMethod m = new AskTransactionMethod(f);\r
+        client.call(m);\r
+        return m;\r
+    }\r
+    @Override\r
+    public Transaction askWriteTransaction(long transactionId) throws ProCoreException {\r
+        AskTransactionFunction f = new AskTransactionFunction(Constants.writeTransaction, transactionId);\r
+        AskTransactionMethod m = new AskTransactionMethod(f);\r
+        client.call(m);\r
+        return m;\r
+    }\r
+    @Override\r
+    public long endTransaction(long transactionId) throws ProCoreException {\r
+        EndTransactionFunction f = new EndTransactionFunction(transactionId);\r
+        Method m = new Method(f, null, null);\r
+        client.call(m);\r
+        return f.headChangeSetId;\r
+    }\r
+    @Override\r
+    public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {\r
+        AcceptCommitFunction f = new AcceptCommitFunction(transactionId, changeSetId, metadata);\r
+        client.call(new Method(f, null, null));\r
+    }\r
+    public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {\r
+        CancelCommitFunction f = new CancelCommitFunction(transactionId, changeSetId, metadata);\r
+        client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));\r
+        return f.headChangeSetId;\r
+}\r
+//    {\r
+//        OpenClientSessionFunction f = new OpenClientSessionFunction();\r
+//        s.call(f);\r
+//        Util.show("OpenClientSession sid=" + f.sessionId);\r
+//    }\r
+//    {\r
+//        CloseClientSessionFunction f = new CloseClientSessionFunction(1);\r
+//        s.call(f);\r
+//        Util.show("CloseClientSession sid=" + f.sessionId);\r
+//    }\r
+//    {\r
+//        EchoFunction f = new EchoFunction(new byte[1]);\r
+//        s.call(f);\r
+//        Util.show("Echo bytes.len=" + f.bytes.length);\r
+//    }\r
+    @Override\r
+    public String execute(String command) throws ProCoreException {\r
+        ExecuteFunction f = new ExecuteFunction(command);\r
+        Method m = new Method(f, null, null);\r
+        client.call(m);\r
+        return f.out;\r
+    }\r
+    @Override\r
+    public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {\r
+        GetChangeSetContextFunction f = new GetChangeSetContextFunction(changeSetId);\r
+        client.call(new Method(f, null, null));\r
+        return f.changeSetContext;\r
+    }\r
+    public ChangeSetData getChangeSetData(long minChangeSetId, long  maxChangeSetId, OnChangeSetUpdate on) throws ProCoreException {\r
+        GetChangeSetDataFunction f = new GetChangeSetDataFunction(minChangeSetId, maxChangeSetId);\r
+        GetChangeSetDataMethod m = new GetChangeSetDataMethod(f, new ChangeSetEventHandler(on));\r
+        client.call(m);\r
+        return m;\r
+    }\r
+    public ChangeSetIds getChangeSetIds() throws ProCoreException {\r
+        GetChangeSetsFunction f = new GetChangeSetsFunction();\r
+        GetChangeSetIdsMethod m = new GetChangeSetIdsMethod(f);\r
+        client.call(m);\r
+        return m;\r
+    }\r
+    public Cluster getCluster(byte[] clusterId) throws ProCoreException {\r
+        GetClusterNewFunction f = new GetClusterNewFunction(clusterId);\r
+        f.deflated = ByteBuffer.allocate(8192);\r
+        GetClusterMethod m = new GetClusterMethod(f);\r
+        client.call(m);\r
+        return m;\r
+    }\r
+    public ClusterIds getClusterIds() throws ProCoreException {\r
+        int all = 0;\r
+        ListClustersFunction f = new ListClustersFunction(all);\r
+        GetClusterIdsMethod m = new GetClusterIdsMethod(f);\r
+        client.call(m);\r
+        return m;\r
+    }\r
+    public Refresh getRefresh(long changeSetId) throws ProCoreException {\r
+        GetRefresh2Function f = new GetRefresh2Function(0);\r
+        GetRefreshMethod m = new GetRefreshMethod(f);\r
+        client.call(m);\r
+        return m;\r
+    }\r
+    public ResourceSegment getResourceSegment(byte[] clusterId, int resourceIndex, long offset, short size) throws ProCoreException {\r
+        GetResourceSegmentFunction f = new GetResourceSegmentFunction(clusterId, resourceIndex, offset, size);\r
+        GetResourceSegmentMethod m = new GetResourceSegmentMethod(f);\r
+        client.call(m);\r
+        return m;\r
+    }\r
+    public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)  throws ProCoreException {\r
+        GetClusterChangesFunction f = new GetClusterChangesFunction(changeSetId, clusterId);\r
+        GetClusterChangesMethod m = new GetClusterChangesMethod(f);\r
+        client.call(m);\r
+        return m;\r
+    }\r
+    public Information getInformation() throws ProCoreException {\r
+        GetServerInfo2Function f = new GetServerInfo2Function();\r
+        GetInformationMethod m = new GetInformationMethod(f);\r
+        client.call(m);\r
+        return m;\r
+    }\r
+    public void reconnect(int sessionId) throws ProCoreException {\r
+        ReconnectFunction f = new ReconnectFunction(sessionId);\r
+        client.call(new Method(f, null, null));\r
+    }\r
+    public long reserveIds(int count) throws ProCoreException {\r
+        ReserveIdsFunction f = new ReserveIdsFunction(count);\r
+        client.call(new Method(f, null, null));\r
+        return f.firstId;\r
+    }\r
+    public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {\r
+        UndoFunction f = new UndoFunction(changeSetIds);\r
+        client.call(new Method(f, new ChangeSetEventHandler(onChangeSetUpdate), null));\r
+        return f.potentialConflicts;\r
+    }\r
+    public void updateCluster(byte[] operations) throws ProCoreException {\r
+        UpdateClusterFunction f = new UpdateClusterFunction(operations);\r
+        client.call(new Method(f, null, null));\r
+    }\r
+    @Override\r
+    public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {\r
+        Cluster t = getCluster(uid.asBytes());\r
+\r
+        ByteBuffer deflated = t.getDeflated();\r
+        Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);\r
+\r
+        //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);\r
+        long[] longs = (long[]) arrays[0];\r
+        int[] ints = (int[]) arrays[1];\r
+        byte[] bytes = (byte[]) arrays[2];\r
+        return creator.create(uid, bytes, ints, longs);\r
+\r
+    }\r
+    @Override\r
+    public boolean refreshEnabled() {\r
+        return true;\r
+    }\r
+}\r
+class SessionManager {\r
+    private ConcurrentHashMap<Client, SessionI> sessionMap = new ConcurrentHashMap<Client, SessionI>();\r
+    SessionManager() {\r
+    }\r
+    public List<Client> disconnect(DatabaseI db) throws ProCoreException {\r
+        ArrayList<Client> clients = new ArrayList<Client>();\r
+        for (Entry<Client, SessionI> i : sessionMap.entrySet()) {\r
+            SessionI si = i.getValue();\r
+            if (!db.equals(si.getDb()))\r
+                continue;\r
+            Client client = i.getKey();\r
+            if (client.isOpen()) {\r
+                client.close();\r
+                clients.add(client);\r
+            }\r
+        }\r
+        return clients;\r
+    }\r
+    public void connect(DatabaseI db, List<Client> clients) throws ProCoreException, InterruptedException {\r
+        for (Client client : clients) {\r
+            SessionI si = sessionMap.get(client);\r
+            if (null == si || !db.equals(si.getDb()))\r
+                continue;\r
+            client.open();\r
+        }\r
+//        for (Entry<Client, SessionI> i : sessionMap.entrySet()) {\r
+//            SessionI si = i.getValue();\r
+//            if (!db.equals(si.getDb()))\r
+//                continue;\r
+//            Client client = i.getKey();\r
+//            client.open();\r
+//        }\r
+    }\r
+    public SessionI newSession(DatabaseI db) throws ProCoreException {\r
+        Client client = db.newClient();\r
+        SessionI si = new SessionI(db, client);\r
+        sessionMap.put(client,  si);\r
+        return si;\r
+    }\r
+}\r
+class SessionEvent {\r
+    private final Session session;\r
+    private final Throwable cause;\r
+    public SessionEvent(Session session, Throwable cause) {\r
+        this.session = session;\r
+        this.cause = cause;\r
+    }\r
+    public Session getSession() {\r
+        return session;\r
+    }\r
+    public Throwable getCause() {\r
+        return cause;\r
+    }\r
+}\r
+class AskTransactionMethod extends Method implements Transaction {\r
+    private final AskTransactionFunction function;\r
+    AskTransactionMethod(AskTransactionFunction function) {\r
+        super(function, null, null);\r
+        this.function = function;\r
+    }\r
+    @Override\r
+    public long getHeadChangeSetId() {\r
+        return function.headChangeSetId;\r
+    }\r
+    @Override\r
+    public long getTransactionId() {\r
+        return function.transactionId;\r
+    }\r
+}\r
+class GetChangeSetDataMethod extends Method implements ChangeSetData {\r
+    private final GetChangeSetDataFunction function;\r
+    GetChangeSetDataMethod(GetChangeSetDataFunction function, EventHandler evh) {\r
+        super(function, evh, null);\r
+        this.function = function;\r
+    }\r
+    @Override\r
+    public boolean isOk() {\r
+        return !function.failed;\r
+    }\r
+}\r
+class GetChangeSetIdsMethod extends Method implements ChangeSetIds {\r
+    private final GetChangeSetsFunction function;\r
+    GetChangeSetIdsMethod(GetChangeSetsFunction function) {\r
+        super(function, null, null);\r
+        this.function = function;\r
+    }\r
+    @Override\r
+    public long getFirstChangeSetId() {\r
+        if (function.changeSetIds.length > 0)\r
+            return function.changeSetIds[0];\r
+        else\r
+            return Constants.NullChangeSetId;\r
+    }\r
+    @Override\r
+    public int getCount() {\r
+        return function.changeSetIds.length;\r
+    }\r
+}\r
+class GetClusterMethod extends Method implements Cluster {\r
+    private final GetClusterNewFunction function;\r
+    GetClusterMethod(GetClusterNewFunction function) {\r
+        super(function, null, new GetClusterHandler(function));\r
+        this.function = function;\r
+    }\r
+    @Override\r
+    public int getInflateSize() {\r
+        return function.inflateSize;\r
+    }\r
+    @Override\r
+    public ByteBuffer getDeflated() {\r
+        return function.deflated;\r
+    }\r
+}\r
+class GetClusterChangesMethod extends Method implements ClusterChanges {\r
+    private final GetClusterChangesFunction function;\r
+    GetClusterChangesMethod(GetClusterChangesFunction function) {\r
+        super(function, null, null);\r
+        this.function = function;\r
+    }\r
+    @Override\r
+    public long getHeadChangeSetId() {\r
+        return function.headChangeSetId;\r
+    }\r
+    @Override\r
+    public int[] getResourceIndex() {\r
+        return function.resourceIndex;\r
+    }\r
+    @Override\r
+    public int[] getPredicateIndex() {\r
+        return function.predicateIndex;\r
+    }\r
+    @Override\r
+    public long[] getPredicateFirst() {\r
+        return function.predicateFirst;\r
+    }\r
+    @Override\r
+    public long[] getPredicateSecond() {\r
+        return function.predicateSecond;\r
+    }\r
+    @Override\r
+    public int[] getValueIndex() {\r
+        return function.valueIndex;\r
+    }\r
+}\r
+class GetClusterIdsMethod extends Method implements ClusterIds {\r
+    private final ListClustersFunction function;\r
+    GetClusterIdsMethod(ListClustersFunction function) {\r
+        super(function, null, null);\r
+        this.function = function;\r
+    }\r
+    @Override\r
+    public int getStatus() {\r
+        return function.status;\r
+    }\r
+    @Override\r
+    public long[] getFirst() {\r
+        return function.first;\r
+    }\r
+    @Override\r
+    public long[] getSecond() {\r
+        return function.second;\r
+    }\r
+}\r
+class GetInformationMethod extends Method implements Information {\r
+    private final GetServerInfo2Function function;\r
+    GetInformationMethod(GetServerInfo2Function function) {\r
+        super(function, null, null);\r
+        this.function = function;\r
+    }\r
+    @Override\r
+    public String getServerId() {\r
+        return function.serverId;\r
+    }\r
+    @Override\r
+    public String getProtocolId() {\r
+        return function.protocolId;\r
+    }\r
+    @Override\r
+    public String getDatabaseId() {\r
+        return function.databaseId;\r
+    }\r
+    @Override\r
+    public long getFirstChangeSetId() {\r
+        return function.firstChangeSetId;\r
+    }\r
+}\r
+class GetRefreshMethod extends Method implements Refresh {\r
+    private final GetRefresh2Function function;\r
+    GetRefreshMethod(GetRefresh2Function function) {\r
+        super(function, null, null);\r
+        this.function = function;\r
+    }\r
+    @Override\r
+    public long getHeadChangeSetId() {\r
+        return function.headChangeSetId;\r
+    }\r
+    @Override\r
+    public long[] getFirst() {\r
+        return function.first;\r
+    }\r
+    @Override\r
+    public long[] getSecond() {\r
+        return function.second;\r
+    }\r
+}\r
+class GetResourceSegmentMethod extends Method implements ResourceSegment {\r
+    private final GetResourceSegmentFunction function;\r
+    GetResourceSegmentMethod(GetResourceSegmentFunction function) {\r
+        super(function, null, null);\r
+        this.function = function;\r
+    }\r
+    @Override\r
+    public byte[] getClusterId() {\r
+        return function.clusterUID;\r
+    }\r
+    @Override\r
+    public int getResourceIndex() {\r
+        return function.resourceIndex;\r
+    }\r
+    @Override\r
+    public long getValueSize() {\r
+        return function.valueSize;\r
+    }\r
+    @Override\r
+    public byte[] getSegment() {\r
+        return function.segment;\r
+    }\r
+    @Override\r
+    public long getOffset() {\r
+        return function.segmentOffset;\r
+    }\r
+}\r
+class ChangeSetEventHandler extends EventHandler implements ChangeSetUpdate {\r
+    private final org.simantics.db.server.protocol.ChangeSetUpdateEvent csuEvent = new org.simantics.db.server.protocol.ChangeSetUpdateEvent();\r
+    private final OnChangeSetUpdate on;\r
+    private ProCoreException exception;\r
+    ChangeSetEventHandler(OnChangeSetUpdate on) {\r
+        this.on = on;\r
+    }\r
+    @Override\r
+    void on(Event event) {\r
+        event.deserialize(csuEvent);\r
+        Util.log("ChangeSetEvent cid=" + csuEvent.changeSetId);\r
+        try {\r
+            if (null != on)\r
+                on.onChangeSetUpdate(this);\r
+        } catch (SDBException e) {\r
+            if (null == exception)\r
+                exception = new ProCoreException("Execution of ChangeSetUpdate callback failed.", e);\r
+            else\r
+                Util.logError("Execution of ChangeSetUpdate callback failed(2).", e);\r
+        }\r
+    }\r
+    @Override\r
+    public long getChangeSetId() {\r
+        return csuEvent.changeSetId;\r
+    }\r
+    @Override\r
+    public int getChangeSetIndex() {\r
+        return csuEvent.changeSetIndex;\r
+    }\r
+    @Override\r
+    public int getNumberOfClusterChangeSets() {\r
+        return csuEvent.numberOfClusterChangeSets;\r
+    }\r
+    @Override\r
+    public int getIndexOfClusterChangeSet() {\r
+        return csuEvent.indexOfClusterChangeSet;\r
+    }\r
+    @Override\r
+    public byte[] getClusterId() {\r
+        return csuEvent.clusterUID;\r
+    }\r
+    @Override\r
+    public boolean getNewCluster() {\r
+        return csuEvent.newCluster;\r
+    }\r
+    @Override\r
+    public byte[] getData() {\r
+        return csuEvent.data;\r
+    }\r
+}\r