]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SynchronizeContext.java
Cluster loading problem fix made by Antti
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SynchronizeContext.java
index 5f8f4a5f810f2e8fce8dd10a7f8c155801ebc217..2d31febc23bf84176d32816c09b9ebe8a662d0ae 100644 (file)
-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- *     VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package fi.vtt.simantics.procore.internal;\r
-\r
-import java.util.Arrays;\r
-import java.util.Collection;\r
-import java.util.Vector;\r
-\r
-import org.simantics.db.Database.Session.ChangeSetUpdate;\r
-import org.simantics.db.SessionManager;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.exception.InternalException;\r
-import org.simantics.db.impl.ClusterI;\r
-import org.simantics.db.impl.ClusterTraitsBase;\r
-import org.simantics.db.procore.cluster.ClusterChangeSet;\r
-import org.simantics.db.procore.cluster.ClusterChangeSetI;\r
-import org.simantics.db.procore.cluster.ClusterChangeSetI.Operation;\r
-import org.simantics.db.procore.cluster.ClusterChangeSetI.OperationEnum;\r
-import org.simantics.db.procore.cluster.ClusterTraits;\r
-import org.simantics.db.server.ProCoreException;\r
-import org.simantics.db.service.ClusterUID;\r
-import org.simantics.db.service.LifecycleSupport;\r
-\r
-public class SynchronizeContext implements SynchronizeContextI {\r
-    class ChangeSet {\r
-        ChangeSet(long id, int size) {\r
-            this.id = id;\r
-            this.count = 0;\r
-            ccss = new Vector<byte[]>(size);\r
-            ccss.setSize(size);\r
-            ccss3 = new boolean[size];\r
-            skip = new boolean[size];\r
-        }\r
-        long id;\r
-        int count;\r
-        Vector<byte[]> ccss;\r
-        Vector<ClusterChangeSetI> ccss2;\r
-        boolean[] ccss3; // this is getting a bit ridiculous, but...\r
-        boolean[] skip;\r
-    }\r
-    private boolean DEBUG = false;\r
-    private int changeSetsCount;\r
-    private ChangeSet[] changeSets;\r
-    private ClientChangesImpl clientChangesImpl;\r
-    private SessionImplSocket session;\r
-    private boolean endOfUpdate = false;\r
-    private final boolean fetchOnly; // True if change set(s) are only fetched.\r
-    private Vector<ClientChangesImpl> changes = new Vector<ClientChangesImpl>();\r
-       private String compressionCodec;\r
-       \r
-    SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N) {\r
-       this(s, clientChangesImpl, N, false);\r
-    }\r
-    SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N, boolean fetchOnly) {\r
-        this.session = s;\r
-        this.clientChangesImpl = clientChangesImpl;\r
-        changeSetsCount = 0;\r
-        changeSets = new ChangeSet[N];\r
-        this.fetchOnly = fetchOnly;\r
-        \r
-        // TODO: find maybe a better way to do this\r
-        LifecycleSupport support = session.getService(LifecycleSupport.class);\r
-        SessionManager manager = support.getSessionManager();\r
-        compressionCodec = manager.getDatabase().getCompression();\r
-    }\r
-    Collection<org.simantics.db.ChangeSet> getChangeSets() {\r
-        Vector<org.simantics.db.ChangeSet> changeSets = new Vector<org.simantics.db.ChangeSet>(changes.size());\r
-        for (ClientChangesImpl c : changes)\r
-            changeSets.add(c);\r
-        return changeSets;\r
-    }\r
-    public synchronized boolean isOk(boolean undo)\r
-    throws DatabaseException {\r
-        while (!endOfUpdate && changeSetsCount < changeSets.length) {\r
-            try {\r
-                this.wait(10000); // msec\r
-                // We are either notified, timed out or spuriously waked up,\r
-                // and I'd like to know which. I could time the wait but...\r
-                boolean timeout = changeSetsCount < changeSets.length;\r
-                if (DEBUG)\r
-                    if (timeout)\r
-                        System.out.println("DEBUG: Synchronize context timeout.");\r
-            } catch (InterruptedException e) {\r
-                // I assume that someone wants me to quit waiting.\r
-            }\r
-            break;\r
-        }\r
-        if (!endOfUpdate && changeSetsCount != changeSets.length)\r
-            return false; // not ok\r
-        commitAndUpdate(undo);\r
-        return true;\r
-    }\r
-    @Override\r
-    public synchronized void onChangeSetUpdate(ChangeSetUpdate event)\r
-    throws ProCoreException {\r
-        ClusterUID clusterUID = ClusterUID.make(event.getClusterId(), 0);\r
-        if (DEBUG) {\r
-            System.out.println("cs id " + event.getChangeSetId());\r
-            System.out.println("cs index " + event.getChangeSetIndex());\r
-            System.out.println("ccs N " + event.getNumberOfClusterChangeSets());\r
-            System.out.println("ccs index " + event.getIndexOfClusterChangeSet());\r
-            System.out.println("cluster=" + clusterUID);\r
-            if (event.getNewCluster())\r
-                System.out.println("cluster is new");\r
-            System.out.println("data length " + event.getData().length);\r
-            System.out.println(Arrays.toString(event.getData()));\r
-        }\r
-        if (0 == event.getChangeSetId() &&\r
-            (0 == event.getChangeSetIndex() || changeSets.length == event.getChangeSetIndex())&&\r
-            0 == event.getNumberOfClusterChangeSets() &&\r
-            0 == event.getIndexOfClusterChangeSet() &&\r
-            ClusterUID.Null.equals(clusterUID)) {\r
-            endOfUpdate = true;\r
-            this.notify();\r
-            return;\r
-        }\r
-\r
-        assert (changeSetsCount < changeSets.length);\r
-        assert (event.getChangeSetIndex() < changeSets.length);\r
-        assert (event.getIndexOfClusterChangeSet() < event.getNumberOfClusterChangeSets());\r
-        ChangeSet cs = changeSets[event.getChangeSetIndex()];\r
-        if (null == cs) { // first, create\r
-            cs = new ChangeSet(event.getChangeSetId(), event.getNumberOfClusterChangeSets());\r
-            changeSets[event.getChangeSetIndex()] = cs;\r
-        } else {\r
-            assert (event.getChangeSetId() == cs.id);\r
-            assert (event.getNumberOfClusterChangeSets() == cs.ccss.size());\r
-            assert (cs.count < cs.ccss.size());\r
-        }\r
-        assert (event.getData().length > 0);\r
-        if (DEBUG)\r
-            System.out.println("ccs count=" + cs.count);\r
-        cs.ccss3[event.getIndexOfClusterChangeSet()] = event.getNewCluster();\r
-        cs.ccss.set(event.getIndexOfClusterChangeSet(), event.getData());\r
-        ++cs.count;\r
-        if (cs.count < cs.ccss.size())\r
-            return;\r
-        ++changeSetsCount;\r
-        if (changeSetsCount >= changeSets.length)\r
-            this.notify();\r
-    }\r
-    private void commitAndUpdate(boolean undo) throws DatabaseException {\r
-        final int SIZE = changeSetsCount; // changeSets.length;\r
-        if (DEBUG)\r
-            System.out.println("commitAndUpdate: cs count=" + SIZE);\r
-        for (int i = 0; i < SIZE; ++i) {\r
-            ChangeSet cs = changeSets[i];\r
-            final int JSIZE = cs.ccss.size();\r
-            cs.ccss2 = new Vector<ClusterChangeSetI>(JSIZE);\r
-            ClusterTable clusterTable = session.getClusterTable();\r
-            cs.ccss2.setSize(JSIZE);\r
-            for (int j = 0; j < JSIZE; ++j) {\r
-                ClusterChangeSetI ccs = ClusterChangeSet.create(cs.ccss.get(j), compressionCodec);\r
-                cs.ccss2.set(j, ccs);\r
-                if (fetchOnly)\r
-                    continue;\r
-                if (cs.ccss3[j])\r
-                    getClusterOrCreate(ccs.getClusterUID(), clusterTable);\r
-                ClusterUID clusterUID = ccs.getClusterUID();\r
-                ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
-                if (null == cluster)\r
-                    cs.skip[j] = true;\r
-            }\r
-        }\r
-        if (fetchOnly) {\r
-            changes.setSize(SIZE);\r
-            for (int i = 0; i < SIZE; ++i) {\r
-                ChangeSet cs = changeSets[i];\r
-                changes.set(i, new ClientChangesImpl(session));\r
-                final int JSIZE = cs.ccss.size();\r
-                for (int j = 0; j < JSIZE; ++j) {\r
-                    ClusterChangeSetI ccs = cs.ccss2.get(j);\r
-                    if (DEBUG)\r
-                        System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " index=" + j);\r
-                    this.updateSecondPass(ccs, changes.get(i));\r
-                }\r
-            }\r
-            return;\r
-        }\r
-        for (int i = 0; i < SIZE; ++i) {\r
-            ChangeSet cs = changeSets[i];\r
-            final int JSIZE = cs.ccss.size();\r
-            for (int j = 0; j < JSIZE; ++j) {\r
-                ClusterChangeSetI ccs = cs.ccss2.get(j);\r
-                if (cs.skip[j]) {\r
-                    if (DEBUG)\r
-                        System.out.println("skipping change set " + ccs.getClusterUID());\r
-                    continue;\r
-                }\r
-                if (DEBUG)\r
-                    System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " ccs=" + j);\r
-                this.updateFirstPass(ccs, undo);\r
-            }\r
-        }\r
-        for (int i = 0; i < SIZE; ++i) {\r
-            ChangeSet cs = changeSets[i];\r
-            final int JSIZE = cs.ccss2.size();\r
-            for (int j = 0; j < JSIZE; ++j) {\r
-                ClusterChangeSetI ccs = cs.ccss2.get(j);\r
-                if (cs.skip[j]) {\r
-                    if (DEBUG)\r
-                        System.out.println("skipping change set " + ccs.getClusterUID());\r
-                    continue;\r
-                }\r
-                this.updateSecondPass(ccs, clientChangesImpl);\r
-            }\r
-        }\r
-    }\r
-\r
-    private ClusterI getClusterOrCreate(ClusterUID clusterUID, ClusterTable clusterTable) {\r
-        // New clusters are created here because resource keys require that referred resources exist.\r
-        ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
-        if (null == cluster)\r
-            cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
-        if (null != cluster);\r
-        return cluster;\r
-    }\r
-    private int getKey(long ri, ClusterUID clusterUID, ClusterTable clusterTable)\r
-    throws DatabaseException {\r
-        if (ri < 1 || ri > ClusterTraits.getMaxNumberOfResources())\r
-            throw new InternalException("Illegal resource index. index=" + ri + " cluster=" + clusterUID);\r
-        if (!ClusterUID.isLegal(clusterUID))\r
-            throw new InternalException("Illegal resource cluster. index=" + ri + " cluster=" + clusterUID);\r
-        ClusterI cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
-        int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), (int) ri);\r
-        if (0 == key)\r
-            throw new InternalException("Illegal resource index=" + ri + " cluster=" + clusterUID);\r
-        return key;\r
-    }\r
-    private void updateFirstPass(ClusterChangeSetI ccs, boolean undo)\r
-    throws DatabaseException {\r
-        ClusterTable clusterTable = session.getClusterTable();\r
-        ClusterUID clusterUID = ccs.getClusterUID();\r
-        ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
-        if (null == cluster)\r
-            throw new DatabaseException("Missing cluster id=" + clusterUID);\r
-        if (!cluster.isLoaded())\r
-            if (!undo)\r
-                return;\r
-            else {\r
-                if (DEBUG)\r
-                    System.out.println("Cluster is not loaded. cluster=" + clusterUID);\r
-                cluster.load(session.clusterTranslator, new Runnable() {\r
-                    @Override public void run() {\r
-                        if (DEBUG)\r
-                            System.out.println("Cluster loaded for undo.");\r
-                    }\r
-                });\r
-                cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
-            }\r
-        Operation op = new Operation();\r
-        for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {\r
-            switch (op.type) {\r
-                default:\r
-                    throw new InternalException("Unknown operation " + op);\r
-                case CreateResource:\r
-                    if (op.count != 1)\r
-                        throw new InternalException("Illegal argument(s) to create resource.");\r
-                    if (DEBUG)\r
-                        System.out.println("Create " + op.resourceIndex);\r
-                    int s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
-                    if (!cluster.hasResource(s, session.clusterTranslator)) {\r
-                        int ss = cluster.createResource(session.clusterTranslator);\r
-                        short ri = ClusterTraitsBase.getResourceIndexFromResourceKey(ss);\r
-                        if (ri != op.resourceIndex)\r
-                            throw new InternalException("Created resource key=" + ss +\r
-                                " does not match expected resource index=" + op.resourceIndex);\r
-                    }\r
-                    break;\r
-                case AddRelation:\r
-                    if (op.count != 5)\r
-                        throw new InternalException("Illegal argument(s) to add relation");\r
-                    s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
-                    int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
-                    int o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
-                    if (DEBUG)\r
-                        System.out.println("Add " + s + "-" + p + "-" + o);\r
-                    ClusterI c = cluster.addRelation(s, p, o, session.clusterTranslator);\r
-                    if (null != c && c != cluster)\r
-                        cluster = c;\r
-                    break;\r
-                case RemoveRelation:\r
-                    if (op.count != 5)\r
-                        throw new InternalException("Illegal argument(s) to remove relation");\r
-                    s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
-                    p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
-                    o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
-                    if (DEBUG)\r
-                        System.out.println("Remove " + s + "-" + p + "-" + o);\r
-                    cluster.removeRelation(s, p, o, session.clusterTranslator);\r
-                    break;\r
-                case SetValue:\r
-                    if (op.count != 4)\r
-                        throw new InternalException("Illegal argument(s) to set value");\r
-                    s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
-                    if (DEBUG)\r
-                        System.out.println("Set value " + s + " l=" + op.valueSize);\r
-                    // TODO: remove this unnecessary copy.\r
-                    byte[] value = new byte[op.valueSize];\r
-                    System.arraycopy(op.valueData, op.valueStart, value, 0, op.valueSize);\r
-                    cluster = cluster.setValue(s, value, value.length, session.clusterTranslator);\r
-                    break;\r
-                case DeleteValue:\r
-                    if (op.count != 1)\r
-                        throw new InternalException("Illegal argument(s) to set value");\r
-                    s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
-                    if (DEBUG)\r
-                        System.out.println("Delete " + s);\r
-                    cluster.removeValue(s, session.clusterTranslator);\r
-                    break;\r
-                case ModifyValue:\r
-                    if (op.count != 5) // index, offset, size, value, valueStart\r
-                        throw new InternalException("Illegal argument(s) to modify resource file");\r
-                    s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
-                    if (DEBUG)\r
-                        System.out.println("ModifyValue " + s + " off=" + op.valueOffset +\r
-                                " siz=" + op.valueSize + " start=" + op.valueStart);\r
-                    cluster.setValueEx(s);\r
-                    if (undo)\r
-                        session.clusterTranslator.undoValueEx(cluster, op.resourceIndex);\r
-                    break;\r
-            }\r
-        }\r
-    }\r
-    private void updateSecondPass(ClusterChangeSetI ccs, ClientChangesImpl changes)\r
-    throws DatabaseException {\r
-        if (DEBUG)\r
-            System.out.println("DEBUG: second pass cid=" + ccs.getClusterUID());\r
-        ClusterTable clusterTable = session.getClusterTable();\r
-        ClusterUID clusterUID = ccs.getClusterUID();\r
-        ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
-        if (null == cluster) {\r
-            if (fetchOnly) // This is needed because resource implementation is based on cluster keys.\r
-                cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
-            else\r
-                throw new DatabaseException("Missing cluster id=" + clusterUID);\r
-        }\r
-        if (!fetchOnly && !cluster.isLoaded())\r
-            return;\r
-        Operation op = new Operation();\r
-        for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {\r
-            switch (op.type) {\r
-                default:\r
-                    throw new InternalException("Unknown operation " + op);\r
-                case CreateResource:\r
-                    if (op.count != 1)\r
-                        throw new InternalException("Illegal argument(s) to create resource.");\r
-                    break;\r
-                case AddRelation:\r
-                    if (op.count != 5)\r
-                        throw new InternalException("Illegal argument(s) to add relation.");\r
-                    int s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
-                    int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
-                    int o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
-                    if (DEBUG)\r
-                        System.out.println("Add " + s + "-" + p + "-" + o);\r
-                    if (!fetchOnly)\r
-                        session.getQueryProvider2().updateStatements(s, p);\r
-                    changes.claim(s, p, o);\r
-                    break;\r
-                case RemoveRelation:\r
-                    if (op.count != 5)\r
-                        throw new InternalException("Illegal argument(s) to remove relation.");\r
-                    s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
-                    p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
-                    o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
-                    if (DEBUG)\r
-                        System.out.println("Remove " + s + "-" + p + "-" + o);\r
-                    if (!fetchOnly)\r
-                        session.getQueryProvider2().updateStatements(s, p);\r
-                    changes.deny(s, p, o);\r
-                    break;\r
-                case SetValue:\r
-                    if (op.count != 4)\r
-                        throw new InternalException("Illegal argument(s) to set value.");\r
-                    s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
-                    if (DEBUG)\r
-                        System.out.println("Set value " + s + " l=" + op.valueSize);\r
-                    if (!fetchOnly)\r
-                        session.getQueryProvider2().updateValue(s);\r
-                    changes.claimValue(s);\r
-                    break;\r
-                case DeleteValue:\r
-                    if (op.count != 1)\r
-                        throw new InternalException("Illegal argument(s) to remove value.");\r
-                    s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
-                    if (DEBUG)\r
-                        System.out.println("Delete value " + s);\r
-                    if (!fetchOnly)\r
-                        session.getQueryProvider2().updateValue(s);\r
-                    changes.claimValue(s);\r
-                    break;\r
-                case ModifyValue:\r
-                    if (op.count != 5)\r
-                        throw new InternalException("Illegal argument(s) to modify resource file");\r
-                    s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
-                    if (DEBUG)\r
-                        System.out.println("ModifyValue " + s + " off=" + op.valueOffset +\r
-                                " siz=" + op.valueSize + " start=" + op.valueStart);\r
-                    if (!fetchOnly)\r
-                        session.getQueryProvider2().updateValue(s);\r
-                    else\r
-                        changes.claimValue(s);\r
-                    break;\r
-            }\r
-        }\r
-    }\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package fi.vtt.simantics.procore.internal;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Vector;
+
+import org.simantics.db.Database.Session.ChangeSetUpdate;
+import org.simantics.db.SessionManager;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.InternalException;
+import org.simantics.db.impl.ClusterI;
+import org.simantics.db.impl.ClusterTraitsBase;
+import org.simantics.db.procore.cluster.ClusterChangeSet;
+import org.simantics.db.procore.cluster.ClusterChangeSetI;
+import org.simantics.db.procore.cluster.ClusterChangeSetI.Operation;
+import org.simantics.db.procore.cluster.ClusterChangeSetI.OperationEnum;
+import org.simantics.db.procore.cluster.ClusterTraits;
+import org.simantics.db.server.ProCoreException;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.db.service.LifecycleSupport;
+
+public class SynchronizeContext implements SynchronizeContextI {
+    class ChangeSet {
+        ChangeSet(long id, int size) {
+            this.id = id;
+            this.count = 0;
+            ccss = new Vector<byte[]>(size);
+            ccss.setSize(size);
+            ccss3 = new boolean[size];
+            skip = new boolean[size];
+        }
+        long id;
+        int count;
+        Vector<byte[]> ccss;
+        Vector<ClusterChangeSetI> ccss2;
+        boolean[] ccss3; // this is getting a bit ridiculous, but...
+        boolean[] skip;
+    }
+    private boolean DEBUG = false;
+    private int changeSetsCount;
+    private ChangeSet[] changeSets;
+    private ClientChangesImpl clientChangesImpl;
+    private SessionImplSocket session;
+    private boolean endOfUpdate = false;
+    private final boolean fetchOnly; // True if change set(s) are only fetched.
+    private Vector<ClientChangesImpl> changes = new Vector<ClientChangesImpl>();
+       private String compressionCodec;
+       
+    SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N) {
+       this(s, clientChangesImpl, N, false);
+    }
+    SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N, boolean fetchOnly) {
+        this.session = s;
+        this.clientChangesImpl = clientChangesImpl;
+        changeSetsCount = 0;
+        changeSets = new ChangeSet[N];
+        this.fetchOnly = fetchOnly;
+        
+        // TODO: find maybe a better way to do this
+        LifecycleSupport support = session.getService(LifecycleSupport.class);
+        SessionManager manager = support.getSessionManager();
+        compressionCodec = manager.getDatabase().getCompression();
+    }
+    Collection<org.simantics.db.ChangeSet> getChangeSets() {
+        Vector<org.simantics.db.ChangeSet> changeSets = new Vector<org.simantics.db.ChangeSet>(changes.size());
+        for (ClientChangesImpl c : changes)
+            changeSets.add(c);
+        return changeSets;
+    }
+    public synchronized boolean isOk(boolean undo)
+    throws DatabaseException {
+        while (!endOfUpdate && changeSetsCount < changeSets.length) {
+            try {
+                this.wait(10000); // msec
+                // We are either notified, timed out or spuriously waked up,
+                // and I'd like to know which. I could time the wait but...
+                boolean timeout = changeSetsCount < changeSets.length;
+                if (DEBUG)
+                    if (timeout)
+                        System.out.println("DEBUG: Synchronize context timeout.");
+            } catch (InterruptedException e) {
+                // I assume that someone wants me to quit waiting.
+            }
+            break;
+        }
+        if (!endOfUpdate && changeSetsCount != changeSets.length)
+            return false; // not ok
+        commitAndUpdate(undo);
+        return true;
+    }
+    @Override
+    public synchronized void onChangeSetUpdate(ChangeSetUpdate event)
+    throws ProCoreException {
+        ClusterUID clusterUID = ClusterUID.make(event.getClusterId(), 0);
+        if (DEBUG) {
+            System.out.println("cs id " + event.getChangeSetId());
+            System.out.println("cs index " + event.getChangeSetIndex());
+            System.out.println("ccs N " + event.getNumberOfClusterChangeSets());
+            System.out.println("ccs index " + event.getIndexOfClusterChangeSet());
+            System.out.println("cluster=" + clusterUID);
+            if (event.getNewCluster())
+                System.out.println("cluster is new");
+            System.out.println("data length " + event.getData().length);
+            System.out.println(Arrays.toString(event.getData()));
+        }
+        if (0 == event.getChangeSetId() &&
+            (0 == event.getChangeSetIndex() || changeSets.length == event.getChangeSetIndex())&&
+            0 == event.getNumberOfClusterChangeSets() &&
+            0 == event.getIndexOfClusterChangeSet() &&
+            ClusterUID.Null.equals(clusterUID)) {
+            endOfUpdate = true;
+            this.notify();
+            return;
+        }
+
+        assert (changeSetsCount < changeSets.length);
+        assert (event.getChangeSetIndex() < changeSets.length);
+        assert (event.getIndexOfClusterChangeSet() < event.getNumberOfClusterChangeSets());
+        ChangeSet cs = changeSets[event.getChangeSetIndex()];
+        if (null == cs) { // first, create
+            cs = new ChangeSet(event.getChangeSetId(), event.getNumberOfClusterChangeSets());
+            changeSets[event.getChangeSetIndex()] = cs;
+        } else {
+            assert (event.getChangeSetId() == cs.id);
+            assert (event.getNumberOfClusterChangeSets() == cs.ccss.size());
+            assert (cs.count < cs.ccss.size());
+        }
+        assert (event.getData().length > 0);
+        if (DEBUG)
+            System.out.println("ccs count=" + cs.count);
+        cs.ccss3[event.getIndexOfClusterChangeSet()] = event.getNewCluster();
+        cs.ccss.set(event.getIndexOfClusterChangeSet(), event.getData());
+        ++cs.count;
+        if (cs.count < cs.ccss.size())
+            return;
+        ++changeSetsCount;
+        if (changeSetsCount >= changeSets.length)
+            this.notify();
+    }
+    private void commitAndUpdate(boolean undo) throws DatabaseException {
+        final int SIZE = changeSetsCount; // changeSets.length;
+        if (DEBUG)
+            System.out.println("commitAndUpdate: cs count=" + SIZE);
+        for (int i = 0; i < SIZE; ++i) {
+            ChangeSet cs = changeSets[i];
+            final int JSIZE = cs.ccss.size();
+            cs.ccss2 = new Vector<ClusterChangeSetI>(JSIZE);
+            ClusterTable clusterTable = session.getClusterTable();
+            cs.ccss2.setSize(JSIZE);
+            for (int j = 0; j < JSIZE; ++j) {
+                ClusterChangeSetI ccs = ClusterChangeSet.create(cs.ccss.get(j), compressionCodec);
+                cs.ccss2.set(j, ccs);
+                if (fetchOnly)
+                    continue;
+                if (cs.ccss3[j])
+                    getClusterOrCreate(ccs.getClusterUID(), clusterTable);
+                ClusterUID clusterUID = ccs.getClusterUID();
+                ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
+                if (null == cluster)
+                    cs.skip[j] = true;
+            }
+        }
+        if (fetchOnly) {
+            changes.setSize(SIZE);
+            for (int i = 0; i < SIZE; ++i) {
+                ChangeSet cs = changeSets[i];
+                changes.set(i, new ClientChangesImpl(session));
+                final int JSIZE = cs.ccss.size();
+                for (int j = 0; j < JSIZE; ++j) {
+                    ClusterChangeSetI ccs = cs.ccss2.get(j);
+                    if (DEBUG)
+                        System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " index=" + j);
+                    this.updateSecondPass(ccs, changes.get(i));
+                }
+            }
+            return;
+        }
+        for (int i = 0; i < SIZE; ++i) {
+            ChangeSet cs = changeSets[i];
+            final int JSIZE = cs.ccss.size();
+            for (int j = 0; j < JSIZE; ++j) {
+                ClusterChangeSetI ccs = cs.ccss2.get(j);
+                if (cs.skip[j]) {
+                    if (DEBUG)
+                        System.out.println("skipping change set " + ccs.getClusterUID());
+                    continue;
+                }
+                if (DEBUG)
+                    System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " ccs=" + j);
+                this.updateFirstPass(ccs, undo);
+            }
+        }
+        for (int i = 0; i < SIZE; ++i) {
+            ChangeSet cs = changeSets[i];
+            final int JSIZE = cs.ccss2.size();
+            for (int j = 0; j < JSIZE; ++j) {
+                ClusterChangeSetI ccs = cs.ccss2.get(j);
+                if (cs.skip[j]) {
+                    if (DEBUG)
+                        System.out.println("skipping change set " + ccs.getClusterUID());
+                    continue;
+                }
+                this.updateSecondPass(ccs, clientChangesImpl);
+            }
+        }
+    }
+
+    private ClusterI getClusterOrCreate(ClusterUID clusterUID, ClusterTable clusterTable) {
+        // New clusters are created here because resource keys require that referred resources exist.
+        ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
+        if (null == cluster)
+            cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
+        if (null != cluster);
+        return cluster;
+    }
+    private int getKey(long ri, ClusterUID clusterUID, ClusterTable clusterTable)
+    throws DatabaseException {
+        if (ri < 1 || ri > ClusterTraits.getMaxNumberOfResources())
+            throw new InternalException("Illegal resource index. index=" + ri + " cluster=" + clusterUID);
+        if (!ClusterUID.isLegal(clusterUID))
+            throw new InternalException("Illegal resource cluster. index=" + ri + " cluster=" + clusterUID);
+        ClusterI cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
+        int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), (int) ri);
+        if (0 == key)
+            throw new InternalException("Illegal resource index=" + ri + " cluster=" + clusterUID);
+        return key;
+    }
+    private void updateFirstPass(ClusterChangeSetI ccs, boolean undo)
+    throws DatabaseException {
+        ClusterTable clusterTable = session.getClusterTable();
+        ClusterUID clusterUID = ccs.getClusterUID();
+        ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
+        if (null == cluster)
+            throw new DatabaseException("Missing cluster id=" + clusterUID);
+        if (!cluster.isLoaded())
+            if (!undo)
+                return;
+            else {
+                if (DEBUG)
+                    System.out.println("Cluster is not loaded. cluster=" + clusterUID);
+                cluster.load(session.clusterTranslator, new Runnable() {
+                    @Override public void run() {
+                        if (DEBUG)
+                            System.out.println("Cluster loaded for undo.");
+                    }
+                });
+                cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
+            }
+        Operation op = new Operation();
+        for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {
+            switch (op.type) {
+                default:
+                    throw new InternalException("Unknown operation " + op);
+                case CreateResource:
+                    if (op.count != 1)
+                        throw new InternalException("Illegal argument(s) to create resource.");
+                    if (DEBUG)
+                        System.out.println("Create " + op.resourceIndex);
+                    int s = getKey(op.resourceIndex, clusterUID, clusterTable);
+                    if (!cluster.hasResource(s, session.clusterTranslator)) {
+                        int ss = cluster.createResource(session.clusterTranslator);
+                        short ri = ClusterTraitsBase.getResourceIndexFromResourceKey(ss);
+                        if (ri != op.resourceIndex)
+                            throw new InternalException("Created resource key=" + ss +
+                                " does not match expected resource index=" + op.resourceIndex);
+                    }
+                    break;
+                case AddRelation:
+                    if (op.count != 5)
+                        throw new InternalException("Illegal argument(s) to add relation");
+                    s = getKey(op.resourceIndex, clusterUID, clusterTable);
+                    int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
+                    int o = getKey(op.objectIndex, op.objectCluster, clusterTable);
+                    if (DEBUG)
+                        System.out.println("Add " + s + "-" + p + "-" + o);
+                    ClusterI c = cluster.addRelation(s, p, o, session.clusterTranslator);
+                    if (null != c && c != cluster)
+                        cluster = c;
+                    break;
+                case RemoveRelation:
+                    if (op.count != 5)
+                        throw new InternalException("Illegal argument(s) to remove relation");
+                    s = getKey(op.resourceIndex, clusterUID, clusterTable);
+                    p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
+                    o = getKey(op.objectIndex, op.objectCluster, clusterTable);
+                    if (DEBUG)
+                        System.out.println("Remove " + s + "-" + p + "-" + o);
+                    cluster.removeRelation(s, p, o, session.clusterTranslator);
+                    break;
+                case SetValue:
+                    if (op.count != 4)
+                        throw new InternalException("Illegal argument(s) to set value");
+                    s = getKey(op.resourceIndex, clusterUID, clusterTable);
+                    if (DEBUG)
+                        System.out.println("Set value " + s + " l=" + op.valueSize);
+                    // TODO: remove this unnecessary copy.
+                    byte[] value = new byte[op.valueSize];
+                    System.arraycopy(op.valueData, op.valueStart, value, 0, op.valueSize);
+                    cluster = cluster.setValue(s, value, value.length, session.clusterTranslator);
+                    break;
+                case DeleteValue:
+                    if (op.count != 1)
+                        throw new InternalException("Illegal argument(s) to set value");
+                    s = getKey(op.resourceIndex, clusterUID, clusterTable);
+                    if (DEBUG)
+                        System.out.println("Delete " + s);
+                    cluster.removeValue(s, session.clusterTranslator);
+                    break;
+                case ModifyValue:
+                    if (op.count != 5) // index, offset, size, value, valueStart
+                        throw new InternalException("Illegal argument(s) to modify resource file");
+                    s = getKey(op.resourceIndex, clusterUID, clusterTable);
+                    if (DEBUG)
+                        System.out.println("ModifyValue " + s + " off=" + op.valueOffset +
+                                " siz=" + op.valueSize + " start=" + op.valueStart);
+                    cluster.setValueEx(s);
+                    if (undo)
+                        session.clusterTranslator.undoValueEx(cluster, op.resourceIndex);
+                    break;
+            }
+        }
+    }
+    private void updateSecondPass(ClusterChangeSetI ccs, ClientChangesImpl changes)
+    throws DatabaseException {
+        if (DEBUG)
+            System.out.println("DEBUG: second pass cid=" + ccs.getClusterUID());
+        ClusterTable clusterTable = session.getClusterTable();
+        ClusterUID clusterUID = ccs.getClusterUID();
+        ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
+        if (null == cluster) {
+            if (fetchOnly) // This is needed because resource implementation is based on cluster keys.
+                cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
+            else
+                throw new DatabaseException("Missing cluster id=" + clusterUID);
+        }
+        if (!fetchOnly && !cluster.isLoaded())
+            return;
+        Operation op = new Operation();
+        for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {
+            switch (op.type) {
+                default:
+                    throw new InternalException("Unknown operation " + op);
+                case CreateResource:
+                    if (op.count != 1)
+                        throw new InternalException("Illegal argument(s) to create resource.");
+                    break;
+                case AddRelation:
+                    if (op.count != 5)
+                        throw new InternalException("Illegal argument(s) to add relation.");
+                    int s = getKey(op.resourceIndex, clusterUID, clusterTable);
+                    int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
+                    int o = getKey(op.objectIndex, op.objectCluster, clusterTable);
+                    if (DEBUG)
+                        System.out.println("Add " + s + "-" + p + "-" + o);
+                    if (!fetchOnly)
+                        session.getQueryProvider2().updateStatements(s, p);
+                    changes.claim(s, p, o);
+                    break;
+                case RemoveRelation:
+                    if (op.count != 5)
+                        throw new InternalException("Illegal argument(s) to remove relation.");
+                    s = getKey(op.resourceIndex, clusterUID, clusterTable);
+                    p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
+                    o = getKey(op.objectIndex, op.objectCluster, clusterTable);
+                    if (DEBUG)
+                        System.out.println("Remove " + s + "-" + p + "-" + o);
+                    if (!fetchOnly)
+                        session.getQueryProvider2().updateStatements(s, p);
+                    changes.deny(s, p, o);
+                    break;
+                case SetValue:
+                    if (op.count != 4)
+                        throw new InternalException("Illegal argument(s) to set value.");
+                    s = getKey(op.resourceIndex, clusterUID, clusterTable);
+                    if (DEBUG)
+                        System.out.println("Set value " + s + " l=" + op.valueSize);
+                    if (!fetchOnly)
+                        session.getQueryProvider2().updateValue(s);
+                    changes.claimValue(s);
+                    break;
+                case DeleteValue:
+                    if (op.count != 1)
+                        throw new InternalException("Illegal argument(s) to remove value.");
+                    s = getKey(op.resourceIndex, clusterUID, clusterTable);
+                    if (DEBUG)
+                        System.out.println("Delete value " + s);
+                    if (!fetchOnly)
+                        session.getQueryProvider2().updateValue(s);
+                    changes.claimValue(s);
+                    break;
+                case ModifyValue:
+                    if (op.count != 5)
+                        throw new InternalException("Illegal argument(s) to modify resource file");
+                    s = getKey(op.resourceIndex, clusterUID, clusterTable);
+                    if (DEBUG)
+                        System.out.println("ModifyValue " + s + " off=" + op.valueOffset +
+                                " siz=" + op.valueSize + " start=" + op.valueStart);
+                    if (!fetchOnly)
+                        session.getQueryProvider2().updateValue(s);
+                    else
+                        changes.claimValue(s);
+                    break;
+            }
+        }
+    }
 }
\ No newline at end of file