-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package fi.vtt.simantics.procore.internal;\r
-\r
-import java.util.Arrays;\r
-import java.util.Collection;\r
-import java.util.Vector;\r
-\r
-import org.simantics.db.Database.Session.ChangeSetUpdate;\r
-import org.simantics.db.SessionManager;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.exception.InternalException;\r
-import org.simantics.db.impl.ClusterI;\r
-import org.simantics.db.impl.ClusterTraitsBase;\r
-import org.simantics.db.procore.cluster.ClusterChangeSet;\r
-import org.simantics.db.procore.cluster.ClusterChangeSetI;\r
-import org.simantics.db.procore.cluster.ClusterChangeSetI.Operation;\r
-import org.simantics.db.procore.cluster.ClusterChangeSetI.OperationEnum;\r
-import org.simantics.db.procore.cluster.ClusterTraits;\r
-import org.simantics.db.server.ProCoreException;\r
-import org.simantics.db.service.ClusterUID;\r
-import org.simantics.db.service.LifecycleSupport;\r
-\r
-public class SynchronizeContext implements SynchronizeContextI {\r
- class ChangeSet {\r
- ChangeSet(long id, int size) {\r
- this.id = id;\r
- this.count = 0;\r
- ccss = new Vector<byte[]>(size);\r
- ccss.setSize(size);\r
- ccss3 = new boolean[size];\r
- skip = new boolean[size];\r
- }\r
- long id;\r
- int count;\r
- Vector<byte[]> ccss;\r
- Vector<ClusterChangeSetI> ccss2;\r
- boolean[] ccss3; // this is getting a bit ridiculous, but...\r
- boolean[] skip;\r
- }\r
- private boolean DEBUG = false;\r
- private int changeSetsCount;\r
- private ChangeSet[] changeSets;\r
- private ClientChangesImpl clientChangesImpl;\r
- private SessionImplSocket session;\r
- private boolean endOfUpdate = false;\r
- private final boolean fetchOnly; // True if change set(s) are only fetched.\r
- private Vector<ClientChangesImpl> changes = new Vector<ClientChangesImpl>();\r
- private String compressionCodec;\r
- \r
- SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N) {\r
- this(s, clientChangesImpl, N, false);\r
- }\r
- SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N, boolean fetchOnly) {\r
- this.session = s;\r
- this.clientChangesImpl = clientChangesImpl;\r
- changeSetsCount = 0;\r
- changeSets = new ChangeSet[N];\r
- this.fetchOnly = fetchOnly;\r
- \r
- // TODO: find maybe a better way to do this\r
- LifecycleSupport support = session.getService(LifecycleSupport.class);\r
- SessionManager manager = support.getSessionManager();\r
- compressionCodec = manager.getDatabase().getCompression();\r
- }\r
- Collection<org.simantics.db.ChangeSet> getChangeSets() {\r
- Vector<org.simantics.db.ChangeSet> changeSets = new Vector<org.simantics.db.ChangeSet>(changes.size());\r
- for (ClientChangesImpl c : changes)\r
- changeSets.add(c);\r
- return changeSets;\r
- }\r
- public synchronized boolean isOk(boolean undo)\r
- throws DatabaseException {\r
- while (!endOfUpdate && changeSetsCount < changeSets.length) {\r
- try {\r
- this.wait(10000); // msec\r
- // We are either notified, timed out or spuriously waked up,\r
- // and I'd like to know which. I could time the wait but...\r
- boolean timeout = changeSetsCount < changeSets.length;\r
- if (DEBUG)\r
- if (timeout)\r
- System.out.println("DEBUG: Synchronize context timeout.");\r
- } catch (InterruptedException e) {\r
- // I assume that someone wants me to quit waiting.\r
- }\r
- break;\r
- }\r
- if (!endOfUpdate && changeSetsCount != changeSets.length)\r
- return false; // not ok\r
- commitAndUpdate(undo);\r
- return true;\r
- }\r
- @Override\r
- public synchronized void onChangeSetUpdate(ChangeSetUpdate event)\r
- throws ProCoreException {\r
- ClusterUID clusterUID = ClusterUID.make(event.getClusterId(), 0);\r
- if (DEBUG) {\r
- System.out.println("cs id " + event.getChangeSetId());\r
- System.out.println("cs index " + event.getChangeSetIndex());\r
- System.out.println("ccs N " + event.getNumberOfClusterChangeSets());\r
- System.out.println("ccs index " + event.getIndexOfClusterChangeSet());\r
- System.out.println("cluster=" + clusterUID);\r
- if (event.getNewCluster())\r
- System.out.println("cluster is new");\r
- System.out.println("data length " + event.getData().length);\r
- System.out.println(Arrays.toString(event.getData()));\r
- }\r
- if (0 == event.getChangeSetId() &&\r
- (0 == event.getChangeSetIndex() || changeSets.length == event.getChangeSetIndex())&&\r
- 0 == event.getNumberOfClusterChangeSets() &&\r
- 0 == event.getIndexOfClusterChangeSet() &&\r
- ClusterUID.Null.equals(clusterUID)) {\r
- endOfUpdate = true;\r
- this.notify();\r
- return;\r
- }\r
-\r
- assert (changeSetsCount < changeSets.length);\r
- assert (event.getChangeSetIndex() < changeSets.length);\r
- assert (event.getIndexOfClusterChangeSet() < event.getNumberOfClusterChangeSets());\r
- ChangeSet cs = changeSets[event.getChangeSetIndex()];\r
- if (null == cs) { // first, create\r
- cs = new ChangeSet(event.getChangeSetId(), event.getNumberOfClusterChangeSets());\r
- changeSets[event.getChangeSetIndex()] = cs;\r
- } else {\r
- assert (event.getChangeSetId() == cs.id);\r
- assert (event.getNumberOfClusterChangeSets() == cs.ccss.size());\r
- assert (cs.count < cs.ccss.size());\r
- }\r
- assert (event.getData().length > 0);\r
- if (DEBUG)\r
- System.out.println("ccs count=" + cs.count);\r
- cs.ccss3[event.getIndexOfClusterChangeSet()] = event.getNewCluster();\r
- cs.ccss.set(event.getIndexOfClusterChangeSet(), event.getData());\r
- ++cs.count;\r
- if (cs.count < cs.ccss.size())\r
- return;\r
- ++changeSetsCount;\r
- if (changeSetsCount >= changeSets.length)\r
- this.notify();\r
- }\r
- private void commitAndUpdate(boolean undo) throws DatabaseException {\r
- final int SIZE = changeSetsCount; // changeSets.length;\r
- if (DEBUG)\r
- System.out.println("commitAndUpdate: cs count=" + SIZE);\r
- for (int i = 0; i < SIZE; ++i) {\r
- ChangeSet cs = changeSets[i];\r
- final int JSIZE = cs.ccss.size();\r
- cs.ccss2 = new Vector<ClusterChangeSetI>(JSIZE);\r
- ClusterTable clusterTable = session.getClusterTable();\r
- cs.ccss2.setSize(JSIZE);\r
- for (int j = 0; j < JSIZE; ++j) {\r
- ClusterChangeSetI ccs = ClusterChangeSet.create(cs.ccss.get(j), compressionCodec);\r
- cs.ccss2.set(j, ccs);\r
- if (fetchOnly)\r
- continue;\r
- if (cs.ccss3[j])\r
- getClusterOrCreate(ccs.getClusterUID(), clusterTable);\r
- ClusterUID clusterUID = ccs.getClusterUID();\r
- ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
- if (null == cluster)\r
- cs.skip[j] = true;\r
- }\r
- }\r
- if (fetchOnly) {\r
- changes.setSize(SIZE);\r
- for (int i = 0; i < SIZE; ++i) {\r
- ChangeSet cs = changeSets[i];\r
- changes.set(i, new ClientChangesImpl(session));\r
- final int JSIZE = cs.ccss.size();\r
- for (int j = 0; j < JSIZE; ++j) {\r
- ClusterChangeSetI ccs = cs.ccss2.get(j);\r
- if (DEBUG)\r
- System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " index=" + j);\r
- this.updateSecondPass(ccs, changes.get(i));\r
- }\r
- }\r
- return;\r
- }\r
- for (int i = 0; i < SIZE; ++i) {\r
- ChangeSet cs = changeSets[i];\r
- final int JSIZE = cs.ccss.size();\r
- for (int j = 0; j < JSIZE; ++j) {\r
- ClusterChangeSetI ccs = cs.ccss2.get(j);\r
- if (cs.skip[j]) {\r
- if (DEBUG)\r
- System.out.println("skipping change set " + ccs.getClusterUID());\r
- continue;\r
- }\r
- if (DEBUG)\r
- System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " ccs=" + j);\r
- this.updateFirstPass(ccs, undo);\r
- }\r
- }\r
- for (int i = 0; i < SIZE; ++i) {\r
- ChangeSet cs = changeSets[i];\r
- final int JSIZE = cs.ccss2.size();\r
- for (int j = 0; j < JSIZE; ++j) {\r
- ClusterChangeSetI ccs = cs.ccss2.get(j);\r
- if (cs.skip[j]) {\r
- if (DEBUG)\r
- System.out.println("skipping change set " + ccs.getClusterUID());\r
- continue;\r
- }\r
- this.updateSecondPass(ccs, clientChangesImpl);\r
- }\r
- }\r
- }\r
-\r
- private ClusterI getClusterOrCreate(ClusterUID clusterUID, ClusterTable clusterTable) {\r
- // New clusters are created here because resource keys require that referred resources exist.\r
- ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
- if (null == cluster)\r
- cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
- if (null != cluster);\r
- return cluster;\r
- }\r
- private int getKey(long ri, ClusterUID clusterUID, ClusterTable clusterTable)\r
- throws DatabaseException {\r
- if (ri < 1 || ri > ClusterTraits.getMaxNumberOfResources())\r
- throw new InternalException("Illegal resource index. index=" + ri + " cluster=" + clusterUID);\r
- if (!ClusterUID.isLegal(clusterUID))\r
- throw new InternalException("Illegal resource cluster. index=" + ri + " cluster=" + clusterUID);\r
- ClusterI cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
- int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), (int) ri);\r
- if (0 == key)\r
- throw new InternalException("Illegal resource index=" + ri + " cluster=" + clusterUID);\r
- return key;\r
- }\r
- private void updateFirstPass(ClusterChangeSetI ccs, boolean undo)\r
- throws DatabaseException {\r
- ClusterTable clusterTable = session.getClusterTable();\r
- ClusterUID clusterUID = ccs.getClusterUID();\r
- ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
- if (null == cluster)\r
- throw new DatabaseException("Missing cluster id=" + clusterUID);\r
- if (!cluster.isLoaded())\r
- if (!undo)\r
- return;\r
- else {\r
- if (DEBUG)\r
- System.out.println("Cluster is not loaded. cluster=" + clusterUID);\r
- cluster.load(session.clusterTranslator, new Runnable() {\r
- @Override public void run() {\r
- if (DEBUG)\r
- System.out.println("Cluster loaded for undo.");\r
- }\r
- });\r
- cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
- }\r
- Operation op = new Operation();\r
- for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {\r
- switch (op.type) {\r
- default:\r
- throw new InternalException("Unknown operation " + op);\r
- case CreateResource:\r
- if (op.count != 1)\r
- throw new InternalException("Illegal argument(s) to create resource.");\r
- if (DEBUG)\r
- System.out.println("Create " + op.resourceIndex);\r
- int s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
- if (!cluster.hasResource(s, session.clusterTranslator)) {\r
- int ss = cluster.createResource(session.clusterTranslator);\r
- short ri = ClusterTraitsBase.getResourceIndexFromResourceKey(ss);\r
- if (ri != op.resourceIndex)\r
- throw new InternalException("Created resource key=" + ss +\r
- " does not match expected resource index=" + op.resourceIndex);\r
- }\r
- break;\r
- case AddRelation:\r
- if (op.count != 5)\r
- throw new InternalException("Illegal argument(s) to add relation");\r
- s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
- int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
- int o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
- if (DEBUG)\r
- System.out.println("Add " + s + "-" + p + "-" + o);\r
- ClusterI c = cluster.addRelation(s, p, o, session.clusterTranslator);\r
- if (null != c && c != cluster)\r
- cluster = c;\r
- break;\r
- case RemoveRelation:\r
- if (op.count != 5)\r
- throw new InternalException("Illegal argument(s) to remove relation");\r
- s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
- p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
- o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
- if (DEBUG)\r
- System.out.println("Remove " + s + "-" + p + "-" + o);\r
- cluster.removeRelation(s, p, o, session.clusterTranslator);\r
- break;\r
- case SetValue:\r
- if (op.count != 4)\r
- throw new InternalException("Illegal argument(s) to set value");\r
- s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
- if (DEBUG)\r
- System.out.println("Set value " + s + " l=" + op.valueSize);\r
- // TODO: remove this unnecessary copy.\r
- byte[] value = new byte[op.valueSize];\r
- System.arraycopy(op.valueData, op.valueStart, value, 0, op.valueSize);\r
- cluster = cluster.setValue(s, value, value.length, session.clusterTranslator);\r
- break;\r
- case DeleteValue:\r
- if (op.count != 1)\r
- throw new InternalException("Illegal argument(s) to set value");\r
- s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
- if (DEBUG)\r
- System.out.println("Delete " + s);\r
- cluster.removeValue(s, session.clusterTranslator);\r
- break;\r
- case ModifyValue:\r
- if (op.count != 5) // index, offset, size, value, valueStart\r
- throw new InternalException("Illegal argument(s) to modify resource file");\r
- s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
- if (DEBUG)\r
- System.out.println("ModifyValue " + s + " off=" + op.valueOffset +\r
- " siz=" + op.valueSize + " start=" + op.valueStart);\r
- cluster.setValueEx(s);\r
- if (undo)\r
- session.clusterTranslator.undoValueEx(cluster, op.resourceIndex);\r
- break;\r
- }\r
- }\r
- }\r
- private void updateSecondPass(ClusterChangeSetI ccs, ClientChangesImpl changes)\r
- throws DatabaseException {\r
- if (DEBUG)\r
- System.out.println("DEBUG: second pass cid=" + ccs.getClusterUID());\r
- ClusterTable clusterTable = session.getClusterTable();\r
- ClusterUID clusterUID = ccs.getClusterUID();\r
- ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
- if (null == cluster) {\r
- if (fetchOnly) // This is needed because resource implementation is based on cluster keys.\r
- cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
- else\r
- throw new DatabaseException("Missing cluster id=" + clusterUID);\r
- }\r
- if (!fetchOnly && !cluster.isLoaded())\r
- return;\r
- Operation op = new Operation();\r
- for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {\r
- switch (op.type) {\r
- default:\r
- throw new InternalException("Unknown operation " + op);\r
- case CreateResource:\r
- if (op.count != 1)\r
- throw new InternalException("Illegal argument(s) to create resource.");\r
- break;\r
- case AddRelation:\r
- if (op.count != 5)\r
- throw new InternalException("Illegal argument(s) to add relation.");\r
- int s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
- int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
- int o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
- if (DEBUG)\r
- System.out.println("Add " + s + "-" + p + "-" + o);\r
- if (!fetchOnly)\r
- session.getQueryProvider2().updateStatements(s, p);\r
- changes.claim(s, p, o);\r
- break;\r
- case RemoveRelation:\r
- if (op.count != 5)\r
- throw new InternalException("Illegal argument(s) to remove relation.");\r
- s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
- p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
- o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
- if (DEBUG)\r
- System.out.println("Remove " + s + "-" + p + "-" + o);\r
- if (!fetchOnly)\r
- session.getQueryProvider2().updateStatements(s, p);\r
- changes.deny(s, p, o);\r
- break;\r
- case SetValue:\r
- if (op.count != 4)\r
- throw new InternalException("Illegal argument(s) to set value.");\r
- s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
- if (DEBUG)\r
- System.out.println("Set value " + s + " l=" + op.valueSize);\r
- if (!fetchOnly)\r
- session.getQueryProvider2().updateValue(s);\r
- changes.claimValue(s);\r
- break;\r
- case DeleteValue:\r
- if (op.count != 1)\r
- throw new InternalException("Illegal argument(s) to remove value.");\r
- s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
- if (DEBUG)\r
- System.out.println("Delete value " + s);\r
- if (!fetchOnly)\r
- session.getQueryProvider2().updateValue(s);\r
- changes.claimValue(s);\r
- break;\r
- case ModifyValue:\r
- if (op.count != 5)\r
- throw new InternalException("Illegal argument(s) to modify resource file");\r
- s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
- if (DEBUG)\r
- System.out.println("ModifyValue " + s + " off=" + op.valueOffset +\r
- " siz=" + op.valueSize + " start=" + op.valueStart);\r
- if (!fetchOnly)\r
- session.getQueryProvider2().updateValue(s);\r
- else\r
- changes.claimValue(s);\r
- break;\r
- }\r
- }\r
- }\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package fi.vtt.simantics.procore.internal;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Vector;
+
+import org.simantics.db.Database.Session.ChangeSetUpdate;
+import org.simantics.db.SessionManager;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.InternalException;
+import org.simantics.db.impl.ClusterI;
+import org.simantics.db.impl.ClusterTraitsBase;
+import org.simantics.db.procore.cluster.ClusterChangeSet;
+import org.simantics.db.procore.cluster.ClusterChangeSetI;
+import org.simantics.db.procore.cluster.ClusterChangeSetI.Operation;
+import org.simantics.db.procore.cluster.ClusterChangeSetI.OperationEnum;
+import org.simantics.db.procore.cluster.ClusterTraits;
+import org.simantics.db.server.ProCoreException;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.db.service.LifecycleSupport;
+
+public class SynchronizeContext implements SynchronizeContextI {
+ class ChangeSet {
+ ChangeSet(long id, int size) {
+ this.id = id;
+ this.count = 0;
+ ccss = new Vector<byte[]>(size);
+ ccss.setSize(size);
+ ccss3 = new boolean[size];
+ skip = new boolean[size];
+ }
+ long id;
+ int count;
+ Vector<byte[]> ccss;
+ Vector<ClusterChangeSetI> ccss2;
+ boolean[] ccss3; // this is getting a bit ridiculous, but...
+ boolean[] skip;
+ }
+ private boolean DEBUG = false;
+ private int changeSetsCount;
+ private ChangeSet[] changeSets;
+ private ClientChangesImpl clientChangesImpl;
+ private SessionImplSocket session;
+ private boolean endOfUpdate = false;
+ private final boolean fetchOnly; // True if change set(s) are only fetched.
+ private Vector<ClientChangesImpl> changes = new Vector<ClientChangesImpl>();
+ private String compressionCodec;
+
+ SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N) {
+ this(s, clientChangesImpl, N, false);
+ }
+ SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N, boolean fetchOnly) {
+ this.session = s;
+ this.clientChangesImpl = clientChangesImpl;
+ changeSetsCount = 0;
+ changeSets = new ChangeSet[N];
+ this.fetchOnly = fetchOnly;
+
+ // TODO: find maybe a better way to do this
+ LifecycleSupport support = session.getService(LifecycleSupport.class);
+ SessionManager manager = support.getSessionManager();
+ compressionCodec = manager.getDatabase().getCompression();
+ }
+ Collection<org.simantics.db.ChangeSet> getChangeSets() {
+ Vector<org.simantics.db.ChangeSet> changeSets = new Vector<org.simantics.db.ChangeSet>(changes.size());
+ for (ClientChangesImpl c : changes)
+ changeSets.add(c);
+ return changeSets;
+ }
+ public synchronized boolean isOk(boolean undo)
+ throws DatabaseException {
+ while (!endOfUpdate && changeSetsCount < changeSets.length) {
+ try {
+ this.wait(10000); // msec
+ // We are either notified, timed out or spuriously waked up,
+ // and I'd like to know which. I could time the wait but...
+ boolean timeout = changeSetsCount < changeSets.length;
+ if (DEBUG)
+ if (timeout)
+ System.out.println("DEBUG: Synchronize context timeout.");
+ } catch (InterruptedException e) {
+ // I assume that someone wants me to quit waiting.
+ }
+ break;
+ }
+ if (!endOfUpdate && changeSetsCount != changeSets.length)
+ return false; // not ok
+ commitAndUpdate(undo);
+ return true;
+ }
+ @Override
+ public synchronized void onChangeSetUpdate(ChangeSetUpdate event)
+ throws ProCoreException {
+ ClusterUID clusterUID = ClusterUID.make(event.getClusterId(), 0);
+ if (DEBUG) {
+ System.out.println("cs id " + event.getChangeSetId());
+ System.out.println("cs index " + event.getChangeSetIndex());
+ System.out.println("ccs N " + event.getNumberOfClusterChangeSets());
+ System.out.println("ccs index " + event.getIndexOfClusterChangeSet());
+ System.out.println("cluster=" + clusterUID);
+ if (event.getNewCluster())
+ System.out.println("cluster is new");
+ System.out.println("data length " + event.getData().length);
+ System.out.println(Arrays.toString(event.getData()));
+ }
+ if (0 == event.getChangeSetId() &&
+ (0 == event.getChangeSetIndex() || changeSets.length == event.getChangeSetIndex())&&
+ 0 == event.getNumberOfClusterChangeSets() &&
+ 0 == event.getIndexOfClusterChangeSet() &&
+ ClusterUID.Null.equals(clusterUID)) {
+ endOfUpdate = true;
+ this.notify();
+ return;
+ }
+
+ assert (changeSetsCount < changeSets.length);
+ assert (event.getChangeSetIndex() < changeSets.length);
+ assert (event.getIndexOfClusterChangeSet() < event.getNumberOfClusterChangeSets());
+ ChangeSet cs = changeSets[event.getChangeSetIndex()];
+ if (null == cs) { // first, create
+ cs = new ChangeSet(event.getChangeSetId(), event.getNumberOfClusterChangeSets());
+ changeSets[event.getChangeSetIndex()] = cs;
+ } else {
+ assert (event.getChangeSetId() == cs.id);
+ assert (event.getNumberOfClusterChangeSets() == cs.ccss.size());
+ assert (cs.count < cs.ccss.size());
+ }
+ assert (event.getData().length > 0);
+ if (DEBUG)
+ System.out.println("ccs count=" + cs.count);
+ cs.ccss3[event.getIndexOfClusterChangeSet()] = event.getNewCluster();
+ cs.ccss.set(event.getIndexOfClusterChangeSet(), event.getData());
+ ++cs.count;
+ if (cs.count < cs.ccss.size())
+ return;
+ ++changeSetsCount;
+ if (changeSetsCount >= changeSets.length)
+ this.notify();
+ }
+ private void commitAndUpdate(boolean undo) throws DatabaseException {
+ final int SIZE = changeSetsCount; // changeSets.length;
+ if (DEBUG)
+ System.out.println("commitAndUpdate: cs count=" + SIZE);
+ for (int i = 0; i < SIZE; ++i) {
+ ChangeSet cs = changeSets[i];
+ final int JSIZE = cs.ccss.size();
+ cs.ccss2 = new Vector<ClusterChangeSetI>(JSIZE);
+ ClusterTable clusterTable = session.getClusterTable();
+ cs.ccss2.setSize(JSIZE);
+ for (int j = 0; j < JSIZE; ++j) {
+ ClusterChangeSetI ccs = ClusterChangeSet.create(cs.ccss.get(j), compressionCodec);
+ cs.ccss2.set(j, ccs);
+ if (fetchOnly)
+ continue;
+ if (cs.ccss3[j])
+ getClusterOrCreate(ccs.getClusterUID(), clusterTable);
+ ClusterUID clusterUID = ccs.getClusterUID();
+ ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
+ if (null == cluster)
+ cs.skip[j] = true;
+ }
+ }
+ if (fetchOnly) {
+ changes.setSize(SIZE);
+ for (int i = 0; i < SIZE; ++i) {
+ ChangeSet cs = changeSets[i];
+ changes.set(i, new ClientChangesImpl(session));
+ final int JSIZE = cs.ccss.size();
+ for (int j = 0; j < JSIZE; ++j) {
+ ClusterChangeSetI ccs = cs.ccss2.get(j);
+ if (DEBUG)
+ System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " index=" + j);
+ this.updateSecondPass(ccs, changes.get(i));
+ }
+ }
+ return;
+ }
+ for (int i = 0; i < SIZE; ++i) {
+ ChangeSet cs = changeSets[i];
+ final int JSIZE = cs.ccss.size();
+ for (int j = 0; j < JSIZE; ++j) {
+ ClusterChangeSetI ccs = cs.ccss2.get(j);
+ if (cs.skip[j]) {
+ if (DEBUG)
+ System.out.println("skipping change set " + ccs.getClusterUID());
+ continue;
+ }
+ if (DEBUG)
+ System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " ccs=" + j);
+ this.updateFirstPass(ccs, undo);
+ }
+ }
+ for (int i = 0; i < SIZE; ++i) {
+ ChangeSet cs = changeSets[i];
+ final int JSIZE = cs.ccss2.size();
+ for (int j = 0; j < JSIZE; ++j) {
+ ClusterChangeSetI ccs = cs.ccss2.get(j);
+ if (cs.skip[j]) {
+ if (DEBUG)
+ System.out.println("skipping change set " + ccs.getClusterUID());
+ continue;
+ }
+ this.updateSecondPass(ccs, clientChangesImpl);
+ }
+ }
+ }
+
+ private ClusterI getClusterOrCreate(ClusterUID clusterUID, ClusterTable clusterTable) {
+ // New clusters are created here because resource keys require that referred resources exist.
+ ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
+ if (null == cluster)
+ cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
+ if (null != cluster);
+ return cluster;
+ }
+ private int getKey(long ri, ClusterUID clusterUID, ClusterTable clusterTable)
+ throws DatabaseException {
+ if (ri < 1 || ri > ClusterTraits.getMaxNumberOfResources())
+ throw new InternalException("Illegal resource index. index=" + ri + " cluster=" + clusterUID);
+ if (!ClusterUID.isLegal(clusterUID))
+ throw new InternalException("Illegal resource cluster. index=" + ri + " cluster=" + clusterUID);
+ ClusterI cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
+ int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), (int) ri);
+ if (0 == key)
+ throw new InternalException("Illegal resource index=" + ri + " cluster=" + clusterUID);
+ return key;
+ }
+ private void updateFirstPass(ClusterChangeSetI ccs, boolean undo)
+ throws DatabaseException {
+ ClusterTable clusterTable = session.getClusterTable();
+ ClusterUID clusterUID = ccs.getClusterUID();
+ ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
+ if (null == cluster)
+ throw new DatabaseException("Missing cluster id=" + clusterUID);
+ if (!cluster.isLoaded())
+ if (!undo)
+ return;
+ else {
+ if (DEBUG)
+ System.out.println("Cluster is not loaded. cluster=" + clusterUID);
+ cluster.load(session.clusterTranslator, new Runnable() {
+ @Override public void run() {
+ if (DEBUG)
+ System.out.println("Cluster loaded for undo.");
+ }
+ });
+ cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
+ }
+ Operation op = new Operation();
+ for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {
+ switch (op.type) {
+ default:
+ throw new InternalException("Unknown operation " + op);
+ case CreateResource:
+ if (op.count != 1)
+ throw new InternalException("Illegal argument(s) to create resource.");
+ if (DEBUG)
+ System.out.println("Create " + op.resourceIndex);
+ int s = getKey(op.resourceIndex, clusterUID, clusterTable);
+ if (!cluster.hasResource(s, session.clusterTranslator)) {
+ int ss = cluster.createResource(session.clusterTranslator);
+ short ri = ClusterTraitsBase.getResourceIndexFromResourceKey(ss);
+ if (ri != op.resourceIndex)
+ throw new InternalException("Created resource key=" + ss +
+ " does not match expected resource index=" + op.resourceIndex);
+ }
+ break;
+ case AddRelation:
+ if (op.count != 5)
+ throw new InternalException("Illegal argument(s) to add relation");
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);
+ int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
+ int o = getKey(op.objectIndex, op.objectCluster, clusterTable);
+ if (DEBUG)
+ System.out.println("Add " + s + "-" + p + "-" + o);
+ ClusterI c = cluster.addRelation(s, p, o, session.clusterTranslator);
+ if (null != c && c != cluster)
+ cluster = c;
+ break;
+ case RemoveRelation:
+ if (op.count != 5)
+ throw new InternalException("Illegal argument(s) to remove relation");
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);
+ p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
+ o = getKey(op.objectIndex, op.objectCluster, clusterTable);
+ if (DEBUG)
+ System.out.println("Remove " + s + "-" + p + "-" + o);
+ cluster.removeRelation(s, p, o, session.clusterTranslator);
+ break;
+ case SetValue:
+ if (op.count != 4)
+ throw new InternalException("Illegal argument(s) to set value");
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);
+ if (DEBUG)
+ System.out.println("Set value " + s + " l=" + op.valueSize);
+ // TODO: remove this unnecessary copy.
+ byte[] value = new byte[op.valueSize];
+ System.arraycopy(op.valueData, op.valueStart, value, 0, op.valueSize);
+ cluster = cluster.setValue(s, value, value.length, session.clusterTranslator);
+ break;
+ case DeleteValue:
+ if (op.count != 1)
+ throw new InternalException("Illegal argument(s) to set value");
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);
+ if (DEBUG)
+ System.out.println("Delete " + s);
+ cluster.removeValue(s, session.clusterTranslator);
+ break;
+ case ModifyValue:
+ if (op.count != 5) // index, offset, size, value, valueStart
+ throw new InternalException("Illegal argument(s) to modify resource file");
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);
+ if (DEBUG)
+ System.out.println("ModifyValue " + s + " off=" + op.valueOffset +
+ " siz=" + op.valueSize + " start=" + op.valueStart);
+ cluster.setValueEx(s);
+ if (undo)
+ session.clusterTranslator.undoValueEx(cluster, op.resourceIndex);
+ break;
+ }
+ }
+ }
+ private void updateSecondPass(ClusterChangeSetI ccs, ClientChangesImpl changes)
+ throws DatabaseException {
+ if (DEBUG)
+ System.out.println("DEBUG: second pass cid=" + ccs.getClusterUID());
+ ClusterTable clusterTable = session.getClusterTable();
+ ClusterUID clusterUID = ccs.getClusterUID();
+ ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
+ if (null == cluster) {
+ if (fetchOnly) // This is needed because resource implementation is based on cluster keys.
+ cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
+ else
+ throw new DatabaseException("Missing cluster id=" + clusterUID);
+ }
+ if (!fetchOnly && !cluster.isLoaded())
+ return;
+ Operation op = new Operation();
+ for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {
+ switch (op.type) {
+ default:
+ throw new InternalException("Unknown operation " + op);
+ case CreateResource:
+ if (op.count != 1)
+ throw new InternalException("Illegal argument(s) to create resource.");
+ break;
+ case AddRelation:
+ if (op.count != 5)
+ throw new InternalException("Illegal argument(s) to add relation.");
+ int s = getKey(op.resourceIndex, clusterUID, clusterTable);
+ int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
+ int o = getKey(op.objectIndex, op.objectCluster, clusterTable);
+ if (DEBUG)
+ System.out.println("Add " + s + "-" + p + "-" + o);
+ if (!fetchOnly)
+ session.getQueryProvider2().updateStatements(s, p);
+ changes.claim(s, p, o);
+ break;
+ case RemoveRelation:
+ if (op.count != 5)
+ throw new InternalException("Illegal argument(s) to remove relation.");
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);
+ p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
+ o = getKey(op.objectIndex, op.objectCluster, clusterTable);
+ if (DEBUG)
+ System.out.println("Remove " + s + "-" + p + "-" + o);
+ if (!fetchOnly)
+ session.getQueryProvider2().updateStatements(s, p);
+ changes.deny(s, p, o);
+ break;
+ case SetValue:
+ if (op.count != 4)
+ throw new InternalException("Illegal argument(s) to set value.");
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);
+ if (DEBUG)
+ System.out.println("Set value " + s + " l=" + op.valueSize);
+ if (!fetchOnly)
+ session.getQueryProvider2().updateValue(s);
+ changes.claimValue(s);
+ break;
+ case DeleteValue:
+ if (op.count != 1)
+ throw new InternalException("Illegal argument(s) to remove value.");
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);
+ if (DEBUG)
+ System.out.println("Delete value " + s);
+ if (!fetchOnly)
+ session.getQueryProvider2().updateValue(s);
+ changes.claimValue(s);
+ break;
+ case ModifyValue:
+ if (op.count != 5)
+ throw new InternalException("Illegal argument(s) to modify resource file");
+ s = getKey(op.resourceIndex, clusterUID, clusterTable);
+ if (DEBUG)
+ System.out.println("ModifyValue " + s + " off=" + op.valueOffset +
+ " siz=" + op.valueSize + " start=" + op.valueStart);
+ if (!fetchOnly)
+ session.getQueryProvider2().updateValue(s);
+ else
+ changes.claimValue(s);
+ break;
+ }
+ }
+ }
}
\ No newline at end of file