X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSynchronizeContext.java;h=2d31febc23bf84176d32816c09b9ebe8a662d0ae;hp=5f8f4a5f810f2e8fce8dd10a7f8c155801ebc217;hb=cec265e1216a0a211e6e6dc4f91d5fda4c5747db;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SynchronizeContext.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SynchronizeContext.java index 5f8f4a5f8..2d31febc2 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SynchronizeContext.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SynchronizeContext.java @@ -1,417 +1,417 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package fi.vtt.simantics.procore.internal; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Vector; - -import org.simantics.db.Database.Session.ChangeSetUpdate; -import org.simantics.db.SessionManager; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.exception.InternalException; -import org.simantics.db.impl.ClusterI; -import org.simantics.db.impl.ClusterTraitsBase; -import org.simantics.db.procore.cluster.ClusterChangeSet; -import org.simantics.db.procore.cluster.ClusterChangeSetI; -import org.simantics.db.procore.cluster.ClusterChangeSetI.Operation; -import org.simantics.db.procore.cluster.ClusterChangeSetI.OperationEnum; -import org.simantics.db.procore.cluster.ClusterTraits; -import org.simantics.db.server.ProCoreException; -import org.simantics.db.service.ClusterUID; -import org.simantics.db.service.LifecycleSupport; - -public class SynchronizeContext implements SynchronizeContextI { - class ChangeSet { - ChangeSet(long id, int size) { - this.id = id; - this.count = 0; - ccss = new Vector(size); - ccss.setSize(size); - ccss3 = new boolean[size]; - skip = new boolean[size]; - } - long id; - int count; - Vector ccss; - Vector ccss2; - boolean[] ccss3; // this is getting a bit ridiculous, but... - boolean[] skip; - } - private boolean DEBUG = false; - private int changeSetsCount; - private ChangeSet[] changeSets; - private ClientChangesImpl clientChangesImpl; - private SessionImplSocket session; - private boolean endOfUpdate = false; - private final boolean fetchOnly; // True if change set(s) are only fetched. - private Vector changes = new Vector(); - private String compressionCodec; - - SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N) { - this(s, clientChangesImpl, N, false); - } - SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N, boolean fetchOnly) { - this.session = s; - this.clientChangesImpl = clientChangesImpl; - changeSetsCount = 0; - changeSets = new ChangeSet[N]; - this.fetchOnly = fetchOnly; - - // TODO: find maybe a better way to do this - LifecycleSupport support = session.getService(LifecycleSupport.class); - SessionManager manager = support.getSessionManager(); - compressionCodec = manager.getDatabase().getCompression(); - } - Collection getChangeSets() { - Vector changeSets = new Vector(changes.size()); - for (ClientChangesImpl c : changes) - changeSets.add(c); - return changeSets; - } - public synchronized boolean isOk(boolean undo) - throws DatabaseException { - while (!endOfUpdate && changeSetsCount < changeSets.length) { - try { - this.wait(10000); // msec - // We are either notified, timed out or spuriously waked up, - // and I'd like to know which. I could time the wait but... - boolean timeout = changeSetsCount < changeSets.length; - if (DEBUG) - if (timeout) - System.out.println("DEBUG: Synchronize context timeout."); - } catch (InterruptedException e) { - // I assume that someone wants me to quit waiting. - } - break; - } - if (!endOfUpdate && changeSetsCount != changeSets.length) - return false; // not ok - commitAndUpdate(undo); - return true; - } - @Override - public synchronized void onChangeSetUpdate(ChangeSetUpdate event) - throws ProCoreException { - ClusterUID clusterUID = ClusterUID.make(event.getClusterId(), 0); - if (DEBUG) { - System.out.println("cs id " + event.getChangeSetId()); - System.out.println("cs index " + event.getChangeSetIndex()); - System.out.println("ccs N " + event.getNumberOfClusterChangeSets()); - System.out.println("ccs index " + event.getIndexOfClusterChangeSet()); - System.out.println("cluster=" + clusterUID); - if (event.getNewCluster()) - System.out.println("cluster is new"); - System.out.println("data length " + event.getData().length); - System.out.println(Arrays.toString(event.getData())); - } - if (0 == event.getChangeSetId() && - (0 == event.getChangeSetIndex() || changeSets.length == event.getChangeSetIndex())&& - 0 == event.getNumberOfClusterChangeSets() && - 0 == event.getIndexOfClusterChangeSet() && - ClusterUID.Null.equals(clusterUID)) { - endOfUpdate = true; - this.notify(); - return; - } - - assert (changeSetsCount < changeSets.length); - assert (event.getChangeSetIndex() < changeSets.length); - assert (event.getIndexOfClusterChangeSet() < event.getNumberOfClusterChangeSets()); - ChangeSet cs = changeSets[event.getChangeSetIndex()]; - if (null == cs) { // first, create - cs = new ChangeSet(event.getChangeSetId(), event.getNumberOfClusterChangeSets()); - changeSets[event.getChangeSetIndex()] = cs; - } else { - assert (event.getChangeSetId() == cs.id); - assert (event.getNumberOfClusterChangeSets() == cs.ccss.size()); - assert (cs.count < cs.ccss.size()); - } - assert (event.getData().length > 0); - if (DEBUG) - System.out.println("ccs count=" + cs.count); - cs.ccss3[event.getIndexOfClusterChangeSet()] = event.getNewCluster(); - cs.ccss.set(event.getIndexOfClusterChangeSet(), event.getData()); - ++cs.count; - if (cs.count < cs.ccss.size()) - return; - ++changeSetsCount; - if (changeSetsCount >= changeSets.length) - this.notify(); - } - private void commitAndUpdate(boolean undo) throws DatabaseException { - final int SIZE = changeSetsCount; // changeSets.length; - if (DEBUG) - System.out.println("commitAndUpdate: cs count=" + SIZE); - for (int i = 0; i < SIZE; ++i) { - ChangeSet cs = changeSets[i]; - final int JSIZE = cs.ccss.size(); - cs.ccss2 = new Vector(JSIZE); - ClusterTable clusterTable = session.getClusterTable(); - cs.ccss2.setSize(JSIZE); - for (int j = 0; j < JSIZE; ++j) { - ClusterChangeSetI ccs = ClusterChangeSet.create(cs.ccss.get(j), compressionCodec); - cs.ccss2.set(j, ccs); - if (fetchOnly) - continue; - if (cs.ccss3[j]) - getClusterOrCreate(ccs.getClusterUID(), clusterTable); - ClusterUID clusterUID = ccs.getClusterUID(); - ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID); - if (null == cluster) - cs.skip[j] = true; - } - } - if (fetchOnly) { - changes.setSize(SIZE); - for (int i = 0; i < SIZE; ++i) { - ChangeSet cs = changeSets[i]; - changes.set(i, new ClientChangesImpl(session)); - final int JSIZE = cs.ccss.size(); - for (int j = 0; j < JSIZE; ++j) { - ClusterChangeSetI ccs = cs.ccss2.get(j); - if (DEBUG) - System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " index=" + j); - this.updateSecondPass(ccs, changes.get(i)); - } - } - return; - } - for (int i = 0; i < SIZE; ++i) { - ChangeSet cs = changeSets[i]; - final int JSIZE = cs.ccss.size(); - for (int j = 0; j < JSIZE; ++j) { - ClusterChangeSetI ccs = cs.ccss2.get(j); - if (cs.skip[j]) { - if (DEBUG) - System.out.println("skipping change set " + ccs.getClusterUID()); - continue; - } - if (DEBUG) - System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " ccs=" + j); - this.updateFirstPass(ccs, undo); - } - } - for (int i = 0; i < SIZE; ++i) { - ChangeSet cs = changeSets[i]; - final int JSIZE = cs.ccss2.size(); - for (int j = 0; j < JSIZE; ++j) { - ClusterChangeSetI ccs = cs.ccss2.get(j); - if (cs.skip[j]) { - if (DEBUG) - System.out.println("skipping change set " + ccs.getClusterUID()); - continue; - } - this.updateSecondPass(ccs, clientChangesImpl); - } - } - } - - private ClusterI getClusterOrCreate(ClusterUID clusterUID, ClusterTable clusterTable) { - // New clusters are created here because resource keys require that referred resources exist. - ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID); - if (null == cluster) - cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID); - if (null != cluster); - return cluster; - } - private int getKey(long ri, ClusterUID clusterUID, ClusterTable clusterTable) - throws DatabaseException { - if (ri < 1 || ri > ClusterTraits.getMaxNumberOfResources()) - throw new InternalException("Illegal resource index. index=" + ri + " cluster=" + clusterUID); - if (!ClusterUID.isLegal(clusterUID)) - throw new InternalException("Illegal resource cluster. index=" + ri + " cluster=" + clusterUID); - ClusterI cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID); - int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), (int) ri); - if (0 == key) - throw new InternalException("Illegal resource index=" + ri + " cluster=" + clusterUID); - return key; - } - private void updateFirstPass(ClusterChangeSetI ccs, boolean undo) - throws DatabaseException { - ClusterTable clusterTable = session.getClusterTable(); - ClusterUID clusterUID = ccs.getClusterUID(); - ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID); - if (null == cluster) - throw new DatabaseException("Missing cluster id=" + clusterUID); - if (!cluster.isLoaded()) - if (!undo) - return; - else { - if (DEBUG) - System.out.println("Cluster is not loaded. cluster=" + clusterUID); - cluster.load(session.clusterTranslator, new Runnable() { - @Override public void run() { - if (DEBUG) - System.out.println("Cluster loaded for undo."); - } - }); - cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID); - } - Operation op = new Operation(); - for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) { - switch (op.type) { - default: - throw new InternalException("Unknown operation " + op); - case CreateResource: - if (op.count != 1) - throw new InternalException("Illegal argument(s) to create resource."); - if (DEBUG) - System.out.println("Create " + op.resourceIndex); - int s = getKey(op.resourceIndex, clusterUID, clusterTable); - if (!cluster.hasResource(s, session.clusterTranslator)) { - int ss = cluster.createResource(session.clusterTranslator); - short ri = ClusterTraitsBase.getResourceIndexFromResourceKey(ss); - if (ri != op.resourceIndex) - throw new InternalException("Created resource key=" + ss + - " does not match expected resource index=" + op.resourceIndex); - } - break; - case AddRelation: - if (op.count != 5) - throw new InternalException("Illegal argument(s) to add relation"); - s = getKey(op.resourceIndex, clusterUID, clusterTable); - int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable); - int o = getKey(op.objectIndex, op.objectCluster, clusterTable); - if (DEBUG) - System.out.println("Add " + s + "-" + p + "-" + o); - ClusterI c = cluster.addRelation(s, p, o, session.clusterTranslator); - if (null != c && c != cluster) - cluster = c; - break; - case RemoveRelation: - if (op.count != 5) - throw new InternalException("Illegal argument(s) to remove relation"); - s = getKey(op.resourceIndex, clusterUID, clusterTable); - p = getKey(op.predicateIndex, op.predicateCluster, clusterTable); - o = getKey(op.objectIndex, op.objectCluster, clusterTable); - if (DEBUG) - System.out.println("Remove " + s + "-" + p + "-" + o); - cluster.removeRelation(s, p, o, session.clusterTranslator); - break; - case SetValue: - if (op.count != 4) - throw new InternalException("Illegal argument(s) to set value"); - s = getKey(op.resourceIndex, clusterUID, clusterTable); - if (DEBUG) - System.out.println("Set value " + s + " l=" + op.valueSize); - // TODO: remove this unnecessary copy. - byte[] value = new byte[op.valueSize]; - System.arraycopy(op.valueData, op.valueStart, value, 0, op.valueSize); - cluster = cluster.setValue(s, value, value.length, session.clusterTranslator); - break; - case DeleteValue: - if (op.count != 1) - throw new InternalException("Illegal argument(s) to set value"); - s = getKey(op.resourceIndex, clusterUID, clusterTable); - if (DEBUG) - System.out.println("Delete " + s); - cluster.removeValue(s, session.clusterTranslator); - break; - case ModifyValue: - if (op.count != 5) // index, offset, size, value, valueStart - throw new InternalException("Illegal argument(s) to modify resource file"); - s = getKey(op.resourceIndex, clusterUID, clusterTable); - if (DEBUG) - System.out.println("ModifyValue " + s + " off=" + op.valueOffset + - " siz=" + op.valueSize + " start=" + op.valueStart); - cluster.setValueEx(s); - if (undo) - session.clusterTranslator.undoValueEx(cluster, op.resourceIndex); - break; - } - } - } - private void updateSecondPass(ClusterChangeSetI ccs, ClientChangesImpl changes) - throws DatabaseException { - if (DEBUG) - System.out.println("DEBUG: second pass cid=" + ccs.getClusterUID()); - ClusterTable clusterTable = session.getClusterTable(); - ClusterUID clusterUID = ccs.getClusterUID(); - ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID); - if (null == cluster) { - if (fetchOnly) // This is needed because resource implementation is based on cluster keys. - cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID); - else - throw new DatabaseException("Missing cluster id=" + clusterUID); - } - if (!fetchOnly && !cluster.isLoaded()) - return; - Operation op = new Operation(); - for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) { - switch (op.type) { - default: - throw new InternalException("Unknown operation " + op); - case CreateResource: - if (op.count != 1) - throw new InternalException("Illegal argument(s) to create resource."); - break; - case AddRelation: - if (op.count != 5) - throw new InternalException("Illegal argument(s) to add relation."); - int s = getKey(op.resourceIndex, clusterUID, clusterTable); - int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable); - int o = getKey(op.objectIndex, op.objectCluster, clusterTable); - if (DEBUG) - System.out.println("Add " + s + "-" + p + "-" + o); - if (!fetchOnly) - session.getQueryProvider2().updateStatements(s, p); - changes.claim(s, p, o); - break; - case RemoveRelation: - if (op.count != 5) - throw new InternalException("Illegal argument(s) to remove relation."); - s = getKey(op.resourceIndex, clusterUID, clusterTable); - p = getKey(op.predicateIndex, op.predicateCluster, clusterTable); - o = getKey(op.objectIndex, op.objectCluster, clusterTable); - if (DEBUG) - System.out.println("Remove " + s + "-" + p + "-" + o); - if (!fetchOnly) - session.getQueryProvider2().updateStatements(s, p); - changes.deny(s, p, o); - break; - case SetValue: - if (op.count != 4) - throw new InternalException("Illegal argument(s) to set value."); - s = getKey(op.resourceIndex, clusterUID, clusterTable); - if (DEBUG) - System.out.println("Set value " + s + " l=" + op.valueSize); - if (!fetchOnly) - session.getQueryProvider2().updateValue(s); - changes.claimValue(s); - break; - case DeleteValue: - if (op.count != 1) - throw new InternalException("Illegal argument(s) to remove value."); - s = getKey(op.resourceIndex, clusterUID, clusterTable); - if (DEBUG) - System.out.println("Delete value " + s); - if (!fetchOnly) - session.getQueryProvider2().updateValue(s); - changes.claimValue(s); - break; - case ModifyValue: - if (op.count != 5) - throw new InternalException("Illegal argument(s) to modify resource file"); - s = getKey(op.resourceIndex, clusterUID, clusterTable); - if (DEBUG) - System.out.println("ModifyValue " + s + " off=" + op.valueOffset + - " siz=" + op.valueSize + " start=" + op.valueStart); - if (!fetchOnly) - session.getQueryProvider2().updateValue(s); - else - changes.claimValue(s); - break; - } - } - } +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package fi.vtt.simantics.procore.internal; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Vector; + +import org.simantics.db.Database.Session.ChangeSetUpdate; +import org.simantics.db.SessionManager; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.exception.InternalException; +import org.simantics.db.impl.ClusterI; +import org.simantics.db.impl.ClusterTraitsBase; +import org.simantics.db.procore.cluster.ClusterChangeSet; +import org.simantics.db.procore.cluster.ClusterChangeSetI; +import org.simantics.db.procore.cluster.ClusterChangeSetI.Operation; +import org.simantics.db.procore.cluster.ClusterChangeSetI.OperationEnum; +import org.simantics.db.procore.cluster.ClusterTraits; +import org.simantics.db.server.ProCoreException; +import org.simantics.db.service.ClusterUID; +import org.simantics.db.service.LifecycleSupport; + +public class SynchronizeContext implements SynchronizeContextI { + class ChangeSet { + ChangeSet(long id, int size) { + this.id = id; + this.count = 0; + ccss = new Vector(size); + ccss.setSize(size); + ccss3 = new boolean[size]; + skip = new boolean[size]; + } + long id; + int count; + Vector ccss; + Vector ccss2; + boolean[] ccss3; // this is getting a bit ridiculous, but... + boolean[] skip; + } + private boolean DEBUG = false; + private int changeSetsCount; + private ChangeSet[] changeSets; + private ClientChangesImpl clientChangesImpl; + private SessionImplSocket session; + private boolean endOfUpdate = false; + private final boolean fetchOnly; // True if change set(s) are only fetched. + private Vector changes = new Vector(); + private String compressionCodec; + + SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N) { + this(s, clientChangesImpl, N, false); + } + SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N, boolean fetchOnly) { + this.session = s; + this.clientChangesImpl = clientChangesImpl; + changeSetsCount = 0; + changeSets = new ChangeSet[N]; + this.fetchOnly = fetchOnly; + + // TODO: find maybe a better way to do this + LifecycleSupport support = session.getService(LifecycleSupport.class); + SessionManager manager = support.getSessionManager(); + compressionCodec = manager.getDatabase().getCompression(); + } + Collection getChangeSets() { + Vector changeSets = new Vector(changes.size()); + for (ClientChangesImpl c : changes) + changeSets.add(c); + return changeSets; + } + public synchronized boolean isOk(boolean undo) + throws DatabaseException { + while (!endOfUpdate && changeSetsCount < changeSets.length) { + try { + this.wait(10000); // msec + // We are either notified, timed out or spuriously waked up, + // and I'd like to know which. I could time the wait but... + boolean timeout = changeSetsCount < changeSets.length; + if (DEBUG) + if (timeout) + System.out.println("DEBUG: Synchronize context timeout."); + } catch (InterruptedException e) { + // I assume that someone wants me to quit waiting. + } + break; + } + if (!endOfUpdate && changeSetsCount != changeSets.length) + return false; // not ok + commitAndUpdate(undo); + return true; + } + @Override + public synchronized void onChangeSetUpdate(ChangeSetUpdate event) + throws ProCoreException { + ClusterUID clusterUID = ClusterUID.make(event.getClusterId(), 0); + if (DEBUG) { + System.out.println("cs id " + event.getChangeSetId()); + System.out.println("cs index " + event.getChangeSetIndex()); + System.out.println("ccs N " + event.getNumberOfClusterChangeSets()); + System.out.println("ccs index " + event.getIndexOfClusterChangeSet()); + System.out.println("cluster=" + clusterUID); + if (event.getNewCluster()) + System.out.println("cluster is new"); + System.out.println("data length " + event.getData().length); + System.out.println(Arrays.toString(event.getData())); + } + if (0 == event.getChangeSetId() && + (0 == event.getChangeSetIndex() || changeSets.length == event.getChangeSetIndex())&& + 0 == event.getNumberOfClusterChangeSets() && + 0 == event.getIndexOfClusterChangeSet() && + ClusterUID.Null.equals(clusterUID)) { + endOfUpdate = true; + this.notify(); + return; + } + + assert (changeSetsCount < changeSets.length); + assert (event.getChangeSetIndex() < changeSets.length); + assert (event.getIndexOfClusterChangeSet() < event.getNumberOfClusterChangeSets()); + ChangeSet cs = changeSets[event.getChangeSetIndex()]; + if (null == cs) { // first, create + cs = new ChangeSet(event.getChangeSetId(), event.getNumberOfClusterChangeSets()); + changeSets[event.getChangeSetIndex()] = cs; + } else { + assert (event.getChangeSetId() == cs.id); + assert (event.getNumberOfClusterChangeSets() == cs.ccss.size()); + assert (cs.count < cs.ccss.size()); + } + assert (event.getData().length > 0); + if (DEBUG) + System.out.println("ccs count=" + cs.count); + cs.ccss3[event.getIndexOfClusterChangeSet()] = event.getNewCluster(); + cs.ccss.set(event.getIndexOfClusterChangeSet(), event.getData()); + ++cs.count; + if (cs.count < cs.ccss.size()) + return; + ++changeSetsCount; + if (changeSetsCount >= changeSets.length) + this.notify(); + } + private void commitAndUpdate(boolean undo) throws DatabaseException { + final int SIZE = changeSetsCount; // changeSets.length; + if (DEBUG) + System.out.println("commitAndUpdate: cs count=" + SIZE); + for (int i = 0; i < SIZE; ++i) { + ChangeSet cs = changeSets[i]; + final int JSIZE = cs.ccss.size(); + cs.ccss2 = new Vector(JSIZE); + ClusterTable clusterTable = session.getClusterTable(); + cs.ccss2.setSize(JSIZE); + for (int j = 0; j < JSIZE; ++j) { + ClusterChangeSetI ccs = ClusterChangeSet.create(cs.ccss.get(j), compressionCodec); + cs.ccss2.set(j, ccs); + if (fetchOnly) + continue; + if (cs.ccss3[j]) + getClusterOrCreate(ccs.getClusterUID(), clusterTable); + ClusterUID clusterUID = ccs.getClusterUID(); + ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID); + if (null == cluster) + cs.skip[j] = true; + } + } + if (fetchOnly) { + changes.setSize(SIZE); + for (int i = 0; i < SIZE; ++i) { + ChangeSet cs = changeSets[i]; + changes.set(i, new ClientChangesImpl(session)); + final int JSIZE = cs.ccss.size(); + for (int j = 0; j < JSIZE; ++j) { + ClusterChangeSetI ccs = cs.ccss2.get(j); + if (DEBUG) + System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " index=" + j); + this.updateSecondPass(ccs, changes.get(i)); + } + } + return; + } + for (int i = 0; i < SIZE; ++i) { + ChangeSet cs = changeSets[i]; + final int JSIZE = cs.ccss.size(); + for (int j = 0; j < JSIZE; ++j) { + ClusterChangeSetI ccs = cs.ccss2.get(j); + if (cs.skip[j]) { + if (DEBUG) + System.out.println("skipping change set " + ccs.getClusterUID()); + continue; + } + if (DEBUG) + System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " ccs=" + j); + this.updateFirstPass(ccs, undo); + } + } + for (int i = 0; i < SIZE; ++i) { + ChangeSet cs = changeSets[i]; + final int JSIZE = cs.ccss2.size(); + for (int j = 0; j < JSIZE; ++j) { + ClusterChangeSetI ccs = cs.ccss2.get(j); + if (cs.skip[j]) { + if (DEBUG) + System.out.println("skipping change set " + ccs.getClusterUID()); + continue; + } + this.updateSecondPass(ccs, clientChangesImpl); + } + } + } + + private ClusterI getClusterOrCreate(ClusterUID clusterUID, ClusterTable clusterTable) { + // New clusters are created here because resource keys require that referred resources exist. + ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID); + if (null == cluster) + cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID); + if (null != cluster); + return cluster; + } + private int getKey(long ri, ClusterUID clusterUID, ClusterTable clusterTable) + throws DatabaseException { + if (ri < 1 || ri > ClusterTraits.getMaxNumberOfResources()) + throw new InternalException("Illegal resource index. index=" + ri + " cluster=" + clusterUID); + if (!ClusterUID.isLegal(clusterUID)) + throw new InternalException("Illegal resource cluster. index=" + ri + " cluster=" + clusterUID); + ClusterI cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID); + int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), (int) ri); + if (0 == key) + throw new InternalException("Illegal resource index=" + ri + " cluster=" + clusterUID); + return key; + } + private void updateFirstPass(ClusterChangeSetI ccs, boolean undo) + throws DatabaseException { + ClusterTable clusterTable = session.getClusterTable(); + ClusterUID clusterUID = ccs.getClusterUID(); + ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID); + if (null == cluster) + throw new DatabaseException("Missing cluster id=" + clusterUID); + if (!cluster.isLoaded()) + if (!undo) + return; + else { + if (DEBUG) + System.out.println("Cluster is not loaded. cluster=" + clusterUID); + cluster.load(session.clusterTranslator, new Runnable() { + @Override public void run() { + if (DEBUG) + System.out.println("Cluster loaded for undo."); + } + }); + cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID); + } + Operation op = new Operation(); + for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) { + switch (op.type) { + default: + throw new InternalException("Unknown operation " + op); + case CreateResource: + if (op.count != 1) + throw new InternalException("Illegal argument(s) to create resource."); + if (DEBUG) + System.out.println("Create " + op.resourceIndex); + int s = getKey(op.resourceIndex, clusterUID, clusterTable); + if (!cluster.hasResource(s, session.clusterTranslator)) { + int ss = cluster.createResource(session.clusterTranslator); + short ri = ClusterTraitsBase.getResourceIndexFromResourceKey(ss); + if (ri != op.resourceIndex) + throw new InternalException("Created resource key=" + ss + + " does not match expected resource index=" + op.resourceIndex); + } + break; + case AddRelation: + if (op.count != 5) + throw new InternalException("Illegal argument(s) to add relation"); + s = getKey(op.resourceIndex, clusterUID, clusterTable); + int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable); + int o = getKey(op.objectIndex, op.objectCluster, clusterTable); + if (DEBUG) + System.out.println("Add " + s + "-" + p + "-" + o); + ClusterI c = cluster.addRelation(s, p, o, session.clusterTranslator); + if (null != c && c != cluster) + cluster = c; + break; + case RemoveRelation: + if (op.count != 5) + throw new InternalException("Illegal argument(s) to remove relation"); + s = getKey(op.resourceIndex, clusterUID, clusterTable); + p = getKey(op.predicateIndex, op.predicateCluster, clusterTable); + o = getKey(op.objectIndex, op.objectCluster, clusterTable); + if (DEBUG) + System.out.println("Remove " + s + "-" + p + "-" + o); + cluster.removeRelation(s, p, o, session.clusterTranslator); + break; + case SetValue: + if (op.count != 4) + throw new InternalException("Illegal argument(s) to set value"); + s = getKey(op.resourceIndex, clusterUID, clusterTable); + if (DEBUG) + System.out.println("Set value " + s + " l=" + op.valueSize); + // TODO: remove this unnecessary copy. + byte[] value = new byte[op.valueSize]; + System.arraycopy(op.valueData, op.valueStart, value, 0, op.valueSize); + cluster = cluster.setValue(s, value, value.length, session.clusterTranslator); + break; + case DeleteValue: + if (op.count != 1) + throw new InternalException("Illegal argument(s) to set value"); + s = getKey(op.resourceIndex, clusterUID, clusterTable); + if (DEBUG) + System.out.println("Delete " + s); + cluster.removeValue(s, session.clusterTranslator); + break; + case ModifyValue: + if (op.count != 5) // index, offset, size, value, valueStart + throw new InternalException("Illegal argument(s) to modify resource file"); + s = getKey(op.resourceIndex, clusterUID, clusterTable); + if (DEBUG) + System.out.println("ModifyValue " + s + " off=" + op.valueOffset + + " siz=" + op.valueSize + " start=" + op.valueStart); + cluster.setValueEx(s); + if (undo) + session.clusterTranslator.undoValueEx(cluster, op.resourceIndex); + break; + } + } + } + private void updateSecondPass(ClusterChangeSetI ccs, ClientChangesImpl changes) + throws DatabaseException { + if (DEBUG) + System.out.println("DEBUG: second pass cid=" + ccs.getClusterUID()); + ClusterTable clusterTable = session.getClusterTable(); + ClusterUID clusterUID = ccs.getClusterUID(); + ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID); + if (null == cluster) { + if (fetchOnly) // This is needed because resource implementation is based on cluster keys. + cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID); + else + throw new DatabaseException("Missing cluster id=" + clusterUID); + } + if (!fetchOnly && !cluster.isLoaded()) + return; + Operation op = new Operation(); + for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) { + switch (op.type) { + default: + throw new InternalException("Unknown operation " + op); + case CreateResource: + if (op.count != 1) + throw new InternalException("Illegal argument(s) to create resource."); + break; + case AddRelation: + if (op.count != 5) + throw new InternalException("Illegal argument(s) to add relation."); + int s = getKey(op.resourceIndex, clusterUID, clusterTable); + int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable); + int o = getKey(op.objectIndex, op.objectCluster, clusterTable); + if (DEBUG) + System.out.println("Add " + s + "-" + p + "-" + o); + if (!fetchOnly) + session.getQueryProvider2().updateStatements(s, p); + changes.claim(s, p, o); + break; + case RemoveRelation: + if (op.count != 5) + throw new InternalException("Illegal argument(s) to remove relation."); + s = getKey(op.resourceIndex, clusterUID, clusterTable); + p = getKey(op.predicateIndex, op.predicateCluster, clusterTable); + o = getKey(op.objectIndex, op.objectCluster, clusterTable); + if (DEBUG) + System.out.println("Remove " + s + "-" + p + "-" + o); + if (!fetchOnly) + session.getQueryProvider2().updateStatements(s, p); + changes.deny(s, p, o); + break; + case SetValue: + if (op.count != 4) + throw new InternalException("Illegal argument(s) to set value."); + s = getKey(op.resourceIndex, clusterUID, clusterTable); + if (DEBUG) + System.out.println("Set value " + s + " l=" + op.valueSize); + if (!fetchOnly) + session.getQueryProvider2().updateValue(s); + changes.claimValue(s); + break; + case DeleteValue: + if (op.count != 1) + throw new InternalException("Illegal argument(s) to remove value."); + s = getKey(op.resourceIndex, clusterUID, clusterTable); + if (DEBUG) + System.out.println("Delete value " + s); + if (!fetchOnly) + session.getQueryProvider2().updateValue(s); + changes.claimValue(s); + break; + case ModifyValue: + if (op.count != 5) + throw new InternalException("Illegal argument(s) to modify resource file"); + s = getKey(op.resourceIndex, clusterUID, clusterTable); + if (DEBUG) + System.out.println("ModifyValue " + s + " off=" + op.valueOffset + + " siz=" + op.valueSize + " start=" + op.valueStart); + if (!fetchOnly) + session.getQueryProvider2().updateValue(s); + else + changes.claimValue(s); + break; + } + } + } } \ No newline at end of file