--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package fi.vtt.simantics.procore.internal;\r
+\r
+import java.util.Arrays;\r
+import java.util.Collection;\r
+import java.util.Vector;\r
+\r
+import org.simantics.db.Database.Session.ChangeSetUpdate;\r
+import org.simantics.db.SessionManager;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.exception.InternalException;\r
+import org.simantics.db.impl.ClusterI;\r
+import org.simantics.db.impl.ClusterTraitsBase;\r
+import org.simantics.db.procore.cluster.ClusterChangeSet;\r
+import org.simantics.db.procore.cluster.ClusterChangeSetI;\r
+import org.simantics.db.procore.cluster.ClusterChangeSetI.Operation;\r
+import org.simantics.db.procore.cluster.ClusterChangeSetI.OperationEnum;\r
+import org.simantics.db.procore.cluster.ClusterTraits;\r
+import org.simantics.db.server.ProCoreException;\r
+import org.simantics.db.service.ClusterUID;\r
+import org.simantics.db.service.LifecycleSupport;\r
+\r
+public class SynchronizeContext implements SynchronizeContextI {\r
+ class ChangeSet {\r
+ ChangeSet(long id, int size) {\r
+ this.id = id;\r
+ this.count = 0;\r
+ ccss = new Vector<byte[]>(size);\r
+ ccss.setSize(size);\r
+ ccss3 = new boolean[size];\r
+ skip = new boolean[size];\r
+ }\r
+ long id;\r
+ int count;\r
+ Vector<byte[]> ccss;\r
+ Vector<ClusterChangeSetI> ccss2;\r
+ boolean[] ccss3; // this is getting a bit ridiculous, but...\r
+ boolean[] skip;\r
+ }\r
+ private boolean DEBUG = false;\r
+ private int changeSetsCount;\r
+ private ChangeSet[] changeSets;\r
+ private ClientChangesImpl clientChangesImpl;\r
+ private SessionImplSocket session;\r
+ private boolean endOfUpdate = false;\r
+ private final boolean fetchOnly; // True if change set(s) are only fetched.\r
+ private Vector<ClientChangesImpl> changes = new Vector<ClientChangesImpl>();\r
+ private String compressionCodec;\r
+ \r
+ SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N) {\r
+ this(s, clientChangesImpl, N, false);\r
+ }\r
+ SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N, boolean fetchOnly) {\r
+ this.session = s;\r
+ this.clientChangesImpl = clientChangesImpl;\r
+ changeSetsCount = 0;\r
+ changeSets = new ChangeSet[N];\r
+ this.fetchOnly = fetchOnly;\r
+ \r
+ // TODO: find maybe a better way to do this\r
+ LifecycleSupport support = session.getService(LifecycleSupport.class);\r
+ SessionManager manager = support.getSessionManager();\r
+ compressionCodec = manager.getDatabase().getCompression();\r
+ }\r
+ Collection<org.simantics.db.ChangeSet> getChangeSets() {\r
+ Vector<org.simantics.db.ChangeSet> changeSets = new Vector<org.simantics.db.ChangeSet>(changes.size());\r
+ for (ClientChangesImpl c : changes)\r
+ changeSets.add(c);\r
+ return changeSets;\r
+ }\r
+ public synchronized boolean isOk(boolean undo)\r
+ throws DatabaseException {\r
+ while (!endOfUpdate && changeSetsCount < changeSets.length) {\r
+ try {\r
+ this.wait(10000); // msec\r
+ // We are either notified, timed out or spuriously waked up,\r
+ // and I'd like to know which. I could time the wait but...\r
+ boolean timeout = changeSetsCount < changeSets.length;\r
+ if (DEBUG)\r
+ if (timeout)\r
+ System.out.println("DEBUG: Synchronize context timeout.");\r
+ } catch (InterruptedException e) {\r
+ // I assume that someone wants me to quit waiting.\r
+ }\r
+ break;\r
+ }\r
+ if (!endOfUpdate && changeSetsCount != changeSets.length)\r
+ return false; // not ok\r
+ commitAndUpdate(undo);\r
+ return true;\r
+ }\r
+ @Override\r
+ public synchronized void onChangeSetUpdate(ChangeSetUpdate event)\r
+ throws ProCoreException {\r
+ ClusterUID clusterUID = ClusterUID.make(event.getClusterId(), 0);\r
+ if (DEBUG) {\r
+ System.out.println("cs id " + event.getChangeSetId());\r
+ System.out.println("cs index " + event.getChangeSetIndex());\r
+ System.out.println("ccs N " + event.getNumberOfClusterChangeSets());\r
+ System.out.println("ccs index " + event.getIndexOfClusterChangeSet());\r
+ System.out.println("cluster=" + clusterUID);\r
+ if (event.getNewCluster())\r
+ System.out.println("cluster is new");\r
+ System.out.println("data length " + event.getData().length);\r
+ System.out.println(Arrays.toString(event.getData()));\r
+ }\r
+ if (0 == event.getChangeSetId() &&\r
+ (0 == event.getChangeSetIndex() || changeSets.length == event.getChangeSetIndex())&&\r
+ 0 == event.getNumberOfClusterChangeSets() &&\r
+ 0 == event.getIndexOfClusterChangeSet() &&\r
+ ClusterUID.Null.equals(clusterUID)) {\r
+ endOfUpdate = true;\r
+ this.notify();\r
+ return;\r
+ }\r
+\r
+ assert (changeSetsCount < changeSets.length);\r
+ assert (event.getChangeSetIndex() < changeSets.length);\r
+ assert (event.getIndexOfClusterChangeSet() < event.getNumberOfClusterChangeSets());\r
+ ChangeSet cs = changeSets[event.getChangeSetIndex()];\r
+ if (null == cs) { // first, create\r
+ cs = new ChangeSet(event.getChangeSetId(), event.getNumberOfClusterChangeSets());\r
+ changeSets[event.getChangeSetIndex()] = cs;\r
+ } else {\r
+ assert (event.getChangeSetId() == cs.id);\r
+ assert (event.getNumberOfClusterChangeSets() == cs.ccss.size());\r
+ assert (cs.count < cs.ccss.size());\r
+ }\r
+ assert (event.getData().length > 0);\r
+ if (DEBUG)\r
+ System.out.println("ccs count=" + cs.count);\r
+ cs.ccss3[event.getIndexOfClusterChangeSet()] = event.getNewCluster();\r
+ cs.ccss.set(event.getIndexOfClusterChangeSet(), event.getData());\r
+ ++cs.count;\r
+ if (cs.count < cs.ccss.size())\r
+ return;\r
+ ++changeSetsCount;\r
+ if (changeSetsCount >= changeSets.length)\r
+ this.notify();\r
+ }\r
+ private void commitAndUpdate(boolean undo) throws DatabaseException {\r
+ final int SIZE = changeSetsCount; // changeSets.length;\r
+ if (DEBUG)\r
+ System.out.println("commitAndUpdate: cs count=" + SIZE);\r
+ for (int i = 0; i < SIZE; ++i) {\r
+ ChangeSet cs = changeSets[i];\r
+ final int JSIZE = cs.ccss.size();\r
+ cs.ccss2 = new Vector<ClusterChangeSetI>(JSIZE);\r
+ ClusterTable clusterTable = session.getClusterTable();\r
+ cs.ccss2.setSize(JSIZE);\r
+ for (int j = 0; j < JSIZE; ++j) {\r
+ ClusterChangeSetI ccs = ClusterChangeSet.create(cs.ccss.get(j), compressionCodec);\r
+ cs.ccss2.set(j, ccs);\r
+ if (fetchOnly)\r
+ continue;\r
+ if (cs.ccss3[j])\r
+ getClusterOrCreate(ccs.getClusterUID(), clusterTable);\r
+ ClusterUID clusterUID = ccs.getClusterUID();\r
+ ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
+ if (null == cluster)\r
+ cs.skip[j] = true;\r
+ }\r
+ }\r
+ if (fetchOnly) {\r
+ changes.setSize(SIZE);\r
+ for (int i = 0; i < SIZE; ++i) {\r
+ ChangeSet cs = changeSets[i];\r
+ changes.set(i, new ClientChangesImpl(session));\r
+ final int JSIZE = cs.ccss.size();\r
+ for (int j = 0; j < JSIZE; ++j) {\r
+ ClusterChangeSetI ccs = cs.ccss2.get(j);\r
+ if (DEBUG)\r
+ System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " index=" + j);\r
+ this.updateSecondPass(ccs, changes.get(i));\r
+ }\r
+ }\r
+ return;\r
+ }\r
+ for (int i = 0; i < SIZE; ++i) {\r
+ ChangeSet cs = changeSets[i];\r
+ final int JSIZE = cs.ccss.size();\r
+ for (int j = 0; j < JSIZE; ++j) {\r
+ ClusterChangeSetI ccs = cs.ccss2.get(j);\r
+ if (cs.skip[j]) {\r
+ if (DEBUG)\r
+ System.out.println("skipping change set " + ccs.getClusterUID());\r
+ continue;\r
+ }\r
+ if (DEBUG)\r
+ System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " ccs=" + j);\r
+ this.updateFirstPass(ccs, undo);\r
+ }\r
+ }\r
+ for (int i = 0; i < SIZE; ++i) {\r
+ ChangeSet cs = changeSets[i];\r
+ final int JSIZE = cs.ccss2.size();\r
+ for (int j = 0; j < JSIZE; ++j) {\r
+ ClusterChangeSetI ccs = cs.ccss2.get(j);\r
+ if (cs.skip[j]) {\r
+ if (DEBUG)\r
+ System.out.println("skipping change set " + ccs.getClusterUID());\r
+ continue;\r
+ }\r
+ this.updateSecondPass(ccs, clientChangesImpl);\r
+ }\r
+ }\r
+ }\r
+\r
+ private ClusterI getClusterOrCreate(ClusterUID clusterUID, ClusterTable clusterTable) {\r
+ // New clusters are created here because resource keys require that referred resources exist.\r
+ ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
+ if (null == cluster)\r
+ cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
+ if (null != cluster);\r
+ return cluster;\r
+ }\r
+ private int getKey(long ri, ClusterUID clusterUID, ClusterTable clusterTable)\r
+ throws DatabaseException {\r
+ if (ri < 1 || ri > ClusterTraits.getMaxNumberOfResources())\r
+ throw new InternalException("Illegal resource index. index=" + ri + " cluster=" + clusterUID);\r
+ if (!ClusterUID.isLegal(clusterUID))\r
+ throw new InternalException("Illegal resource cluster. index=" + ri + " cluster=" + clusterUID);\r
+ ClusterI cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
+ int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), (int) ri);\r
+ if (0 == key)\r
+ throw new InternalException("Illegal resource index=" + ri + " cluster=" + clusterUID);\r
+ return key;\r
+ }\r
+ private void updateFirstPass(ClusterChangeSetI ccs, boolean undo)\r
+ throws DatabaseException {\r
+ ClusterTable clusterTable = session.getClusterTable();\r
+ ClusterUID clusterUID = ccs.getClusterUID();\r
+ ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
+ if (null == cluster)\r
+ throw new DatabaseException("Missing cluster id=" + clusterUID);\r
+ if (!cluster.isLoaded())\r
+ if (!undo)\r
+ return;\r
+ else {\r
+ if (DEBUG)\r
+ System.out.println("Cluster is not loaded. cluster=" + clusterUID);\r
+ cluster.load(session.clusterTranslator, new Runnable() {\r
+ @Override public void run() {\r
+ if (DEBUG)\r
+ System.out.println("Cluster loaded for undo.");\r
+ }\r
+ });\r
+ cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
+ }\r
+ Operation op = new Operation();\r
+ for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {\r
+ switch (op.type) {\r
+ default:\r
+ throw new InternalException("Unknown operation " + op);\r
+ case CreateResource:\r
+ if (op.count != 1)\r
+ throw new InternalException("Illegal argument(s) to create resource.");\r
+ if (DEBUG)\r
+ System.out.println("Create " + op.resourceIndex);\r
+ int s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
+ if (!cluster.hasResource(s, session.clusterTranslator)) {\r
+ int ss = cluster.createResource(session.clusterTranslator);\r
+ short ri = ClusterTraitsBase.getResourceIndexFromResourceKey(ss);\r
+ if (ri != op.resourceIndex)\r
+ throw new InternalException("Created resource key=" + ss +\r
+ " does not match expected resource index=" + op.resourceIndex);\r
+ }\r
+ break;\r
+ case AddRelation:\r
+ if (op.count != 5)\r
+ throw new InternalException("Illegal argument(s) to add relation");\r
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
+ int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
+ int o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
+ if (DEBUG)\r
+ System.out.println("Add " + s + "-" + p + "-" + o);\r
+ ClusterI c = cluster.addRelation(s, p, o, session.clusterTranslator);\r
+ if (null != c && c != cluster)\r
+ cluster = c;\r
+ break;\r
+ case RemoveRelation:\r
+ if (op.count != 5)\r
+ throw new InternalException("Illegal argument(s) to remove relation");\r
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
+ p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
+ o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
+ if (DEBUG)\r
+ System.out.println("Remove " + s + "-" + p + "-" + o);\r
+ cluster.removeRelation(s, p, o, session.clusterTranslator);\r
+ break;\r
+ case SetValue:\r
+ if (op.count != 4)\r
+ throw new InternalException("Illegal argument(s) to set value");\r
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
+ if (DEBUG)\r
+ System.out.println("Set value " + s + " l=" + op.valueSize);\r
+ // TODO: remove this unnecessary copy.\r
+ byte[] value = new byte[op.valueSize];\r
+ System.arraycopy(op.valueData, op.valueStart, value, 0, op.valueSize);\r
+ cluster = cluster.setValue(s, value, value.length, session.clusterTranslator);\r
+ break;\r
+ case DeleteValue:\r
+ if (op.count != 1)\r
+ throw new InternalException("Illegal argument(s) to set value");\r
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
+ if (DEBUG)\r
+ System.out.println("Delete " + s);\r
+ cluster.removeValue(s, session.clusterTranslator);\r
+ break;\r
+ case ModifyValue:\r
+ if (op.count != 5) // index, offset, size, value, valueStart\r
+ throw new InternalException("Illegal argument(s) to modify resource file");\r
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
+ if (DEBUG)\r
+ System.out.println("ModifyValue " + s + " off=" + op.valueOffset +\r
+ " siz=" + op.valueSize + " start=" + op.valueStart);\r
+ cluster.setValueEx(s);\r
+ if (undo)\r
+ session.clusterTranslator.undoValueEx(cluster, op.resourceIndex);\r
+ break;\r
+ }\r
+ }\r
+ }\r
+ private void updateSecondPass(ClusterChangeSetI ccs, ClientChangesImpl changes)\r
+ throws DatabaseException {\r
+ if (DEBUG)\r
+ System.out.println("DEBUG: second pass cid=" + ccs.getClusterUID());\r
+ ClusterTable clusterTable = session.getClusterTable();\r
+ ClusterUID clusterUID = ccs.getClusterUID();\r
+ ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
+ if (null == cluster) {\r
+ if (fetchOnly) // This is needed because resource implementation is based on cluster keys.\r
+ cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
+ else\r
+ throw new DatabaseException("Missing cluster id=" + clusterUID);\r
+ }\r
+ if (!fetchOnly && !cluster.isLoaded())\r
+ return;\r
+ Operation op = new Operation();\r
+ for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {\r
+ switch (op.type) {\r
+ default:\r
+ throw new InternalException("Unknown operation " + op);\r
+ case CreateResource:\r
+ if (op.count != 1)\r
+ throw new InternalException("Illegal argument(s) to create resource.");\r
+ break;\r
+ case AddRelation:\r
+ if (op.count != 5)\r
+ throw new InternalException("Illegal argument(s) to add relation.");\r
+ int s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
+ int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
+ int o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
+ if (DEBUG)\r
+ System.out.println("Add " + s + "-" + p + "-" + o);\r
+ if (!fetchOnly)\r
+ session.getQueryProvider2().updateStatements(s, p);\r
+ changes.claim(s, p, o);\r
+ break;\r
+ case RemoveRelation:\r
+ if (op.count != 5)\r
+ throw new InternalException("Illegal argument(s) to remove relation.");\r
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
+ p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
+ o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
+ if (DEBUG)\r
+ System.out.println("Remove " + s + "-" + p + "-" + o);\r
+ if (!fetchOnly)\r
+ session.getQueryProvider2().updateStatements(s, p);\r
+ changes.deny(s, p, o);\r
+ break;\r
+ case SetValue:\r
+ if (op.count != 4)\r
+ throw new InternalException("Illegal argument(s) to set value.");\r
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
+ if (DEBUG)\r
+ System.out.println("Set value " + s + " l=" + op.valueSize);\r
+ if (!fetchOnly)\r
+ session.getQueryProvider2().updateValue(s);\r
+ changes.claimValue(s);\r
+ break;\r
+ case DeleteValue:\r
+ if (op.count != 1)\r
+ throw new InternalException("Illegal argument(s) to remove value.");\r
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
+ if (DEBUG)\r
+ System.out.println("Delete value " + s);\r
+ if (!fetchOnly)\r
+ session.getQueryProvider2().updateValue(s);\r
+ changes.claimValue(s);\r
+ break;\r
+ case ModifyValue:\r
+ if (op.count != 5)\r
+ throw new InternalException("Illegal argument(s) to modify resource file");\r
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
+ if (DEBUG)\r
+ System.out.println("ModifyValue " + s + " off=" + op.valueOffset +\r
+ " siz=" + op.valueSize + " start=" + op.valueStart);\r
+ if (!fetchOnly)\r
+ session.getQueryProvider2().updateValue(s);\r
+ else\r
+ changes.claimValue(s);\r
+ break;\r
+ }\r
+ }\r
+ }\r
+}
\ No newline at end of file