/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package fi.vtt.simantics.procore.internal; import java.util.Arrays; import java.util.Collection; import java.util.Vector; import org.simantics.db.Database.Session.ChangeSetUpdate; import org.simantics.db.SessionManager; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.InternalException; import org.simantics.db.impl.ClusterI; import org.simantics.db.impl.ClusterTraitsBase; import org.simantics.db.procore.cluster.ClusterChangeSet; import org.simantics.db.procore.cluster.ClusterChangeSetI; import org.simantics.db.procore.cluster.ClusterChangeSetI.Operation; import org.simantics.db.procore.cluster.ClusterChangeSetI.OperationEnum; import org.simantics.db.procore.cluster.ClusterTraits; import org.simantics.db.server.ProCoreException; import org.simantics.db.service.ClusterUID; import org.simantics.db.service.LifecycleSupport; public class SynchronizeContext implements SynchronizeContextI { class ChangeSet { ChangeSet(long id, int size) { this.id = id; this.count = 0; ccss = new Vector(size); ccss.setSize(size); ccss3 = new boolean[size]; skip = new boolean[size]; } long id; int count; Vector ccss; Vector ccss2; boolean[] ccss3; // this is getting a bit ridiculous, but... boolean[] skip; } private boolean DEBUG = false; private int changeSetsCount; private ChangeSet[] changeSets; private ClientChangesImpl clientChangesImpl; private SessionImplSocket session; private boolean endOfUpdate = false; private final boolean fetchOnly; // True if change set(s) are only fetched. private Vector changes = new Vector(); private String compressionCodec; SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N) { this(s, clientChangesImpl, N, false); } SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N, boolean fetchOnly) { this.session = s; this.clientChangesImpl = clientChangesImpl; changeSetsCount = 0; changeSets = new ChangeSet[N]; this.fetchOnly = fetchOnly; // TODO: find maybe a better way to do this LifecycleSupport support = session.getService(LifecycleSupport.class); SessionManager manager = support.getSessionManager(); compressionCodec = manager.getDatabase().getCompression(); } Collection getChangeSets() { Vector changeSets = new Vector(changes.size()); for (ClientChangesImpl c : changes) changeSets.add(c); return changeSets; } public synchronized boolean isOk(boolean undo) throws DatabaseException { while (!endOfUpdate && changeSetsCount < changeSets.length) { try { this.wait(10000); // msec // We are either notified, timed out or spuriously waked up, // and I'd like to know which. I could time the wait but... boolean timeout = changeSetsCount < changeSets.length; if (DEBUG) if (timeout) System.out.println("DEBUG: Synchronize context timeout."); } catch (InterruptedException e) { // I assume that someone wants me to quit waiting. } break; } if (!endOfUpdate && changeSetsCount != changeSets.length) return false; // not ok commitAndUpdate(undo); return true; } @Override public synchronized void onChangeSetUpdate(ChangeSetUpdate event) throws ProCoreException { ClusterUID clusterUID = ClusterUID.make(event.getClusterId(), 0); if (DEBUG) { System.out.println("cs id " + event.getChangeSetId()); System.out.println("cs index " + event.getChangeSetIndex()); System.out.println("ccs N " + event.getNumberOfClusterChangeSets()); System.out.println("ccs index " + event.getIndexOfClusterChangeSet()); System.out.println("cluster=" + clusterUID); if (event.getNewCluster()) System.out.println("cluster is new"); System.out.println("data length " + event.getData().length); System.out.println(Arrays.toString(event.getData())); } if (0 == event.getChangeSetId() && (0 == event.getChangeSetIndex() || changeSets.length == event.getChangeSetIndex())&& 0 == event.getNumberOfClusterChangeSets() && 0 == event.getIndexOfClusterChangeSet() && ClusterUID.Null.equals(clusterUID)) { endOfUpdate = true; this.notify(); return; } assert (changeSetsCount < changeSets.length); assert (event.getChangeSetIndex() < changeSets.length); assert (event.getIndexOfClusterChangeSet() < event.getNumberOfClusterChangeSets()); ChangeSet cs = changeSets[event.getChangeSetIndex()]; if (null == cs) { // first, create cs = new ChangeSet(event.getChangeSetId(), event.getNumberOfClusterChangeSets()); changeSets[event.getChangeSetIndex()] = cs; } else { assert (event.getChangeSetId() == cs.id); assert (event.getNumberOfClusterChangeSets() == cs.ccss.size()); assert (cs.count < cs.ccss.size()); } assert (event.getData().length > 0); if (DEBUG) System.out.println("ccs count=" + cs.count); cs.ccss3[event.getIndexOfClusterChangeSet()] = event.getNewCluster(); cs.ccss.set(event.getIndexOfClusterChangeSet(), event.getData()); ++cs.count; if (cs.count < cs.ccss.size()) return; ++changeSetsCount; if (changeSetsCount >= changeSets.length) this.notify(); } private void commitAndUpdate(boolean undo) throws DatabaseException { final int SIZE = changeSetsCount; // changeSets.length; if (DEBUG) System.out.println("commitAndUpdate: cs count=" + SIZE); for (int i = 0; i < SIZE; ++i) { ChangeSet cs = changeSets[i]; final int JSIZE = cs.ccss.size(); cs.ccss2 = new Vector(JSIZE); ClusterTable clusterTable = session.getClusterTable(); cs.ccss2.setSize(JSIZE); for (int j = 0; j < JSIZE; ++j) { ClusterChangeSetI ccs = ClusterChangeSet.create(cs.ccss.get(j), compressionCodec); cs.ccss2.set(j, ccs); if (fetchOnly) continue; if (cs.ccss3[j]) getClusterOrCreate(ccs.getClusterUID(), clusterTable); ClusterUID clusterUID = ccs.getClusterUID(); ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID); if (null == cluster) cs.skip[j] = true; } } if (fetchOnly) { changes.setSize(SIZE); for (int i = 0; i < SIZE; ++i) { ChangeSet cs = changeSets[i]; changes.set(i, new ClientChangesImpl(session)); final int JSIZE = cs.ccss.size(); for (int j = 0; j < JSIZE; ++j) { ClusterChangeSetI ccs = cs.ccss2.get(j); if (DEBUG) System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " index=" + j); this.updateSecondPass(ccs, changes.get(i)); } } return; } for (int i = 0; i < SIZE; ++i) { ChangeSet cs = changeSets[i]; final int JSIZE = cs.ccss.size(); for (int j = 0; j < JSIZE; ++j) { ClusterChangeSetI ccs = cs.ccss2.get(j); if (cs.skip[j]) { if (DEBUG) System.out.println("skipping change set " + ccs.getClusterUID()); continue; } if (DEBUG) System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " ccs=" + j); this.updateFirstPass(ccs, undo); } } for (int i = 0; i < SIZE; ++i) { ChangeSet cs = changeSets[i]; final int JSIZE = cs.ccss2.size(); for (int j = 0; j < JSIZE; ++j) { ClusterChangeSetI ccs = cs.ccss2.get(j); if (cs.skip[j]) { if (DEBUG) System.out.println("skipping change set " + ccs.getClusterUID()); continue; } this.updateSecondPass(ccs, clientChangesImpl); } } } private ClusterI getClusterOrCreate(ClusterUID clusterUID, ClusterTable clusterTable) { // New clusters are created here because resource keys require that referred resources exist. ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID); if (null == cluster) cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID); if (null != cluster); return cluster; } private int getKey(long ri, ClusterUID clusterUID, ClusterTable clusterTable) throws DatabaseException { if (ri < 1 || ri > ClusterTraits.getMaxNumberOfResources()) throw new InternalException("Illegal resource index. index=" + ri + " cluster=" + clusterUID); if (!ClusterUID.isLegal(clusterUID)) throw new InternalException("Illegal resource cluster. index=" + ri + " cluster=" + clusterUID); ClusterI cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID); int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), (int) ri); if (0 == key) throw new InternalException("Illegal resource index=" + ri + " cluster=" + clusterUID); return key; } private void updateFirstPass(ClusterChangeSetI ccs, boolean undo) throws DatabaseException { ClusterTable clusterTable = session.getClusterTable(); ClusterUID clusterUID = ccs.getClusterUID(); ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID); if (null == cluster) throw new DatabaseException("Missing cluster id=" + clusterUID); if (!cluster.isLoaded()) if (!undo) return; else { if (DEBUG) System.out.println("Cluster is not loaded. cluster=" + clusterUID); cluster.load(session.clusterTranslator, new Runnable() { @Override public void run() { if (DEBUG) System.out.println("Cluster loaded for undo."); } }); cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID); } Operation op = new Operation(); for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) { switch (op.type) { default: throw new InternalException("Unknown operation " + op); case CreateResource: if (op.count != 1) throw new InternalException("Illegal argument(s) to create resource."); if (DEBUG) System.out.println("Create " + op.resourceIndex); int s = getKey(op.resourceIndex, clusterUID, clusterTable); if (!cluster.hasResource(s, session.clusterTranslator)) { int ss = cluster.createResource(session.clusterTranslator); short ri = ClusterTraitsBase.getResourceIndexFromResourceKey(ss); if (ri != op.resourceIndex) throw new InternalException("Created resource key=" + ss + " does not match expected resource index=" + op.resourceIndex); } break; case AddRelation: if (op.count != 5) throw new InternalException("Illegal argument(s) to add relation"); s = getKey(op.resourceIndex, clusterUID, clusterTable); int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable); int o = getKey(op.objectIndex, op.objectCluster, clusterTable); if (DEBUG) System.out.println("Add " + s + "-" + p + "-" + o); ClusterI c = cluster.addRelation(s, p, o, session.clusterTranslator); if (null != c && c != cluster) cluster = c; break; case RemoveRelation: if (op.count != 5) throw new InternalException("Illegal argument(s) to remove relation"); s = getKey(op.resourceIndex, clusterUID, clusterTable); p = getKey(op.predicateIndex, op.predicateCluster, clusterTable); o = getKey(op.objectIndex, op.objectCluster, clusterTable); if (DEBUG) System.out.println("Remove " + s + "-" + p + "-" + o); cluster.removeRelation(s, p, o, session.clusterTranslator); break; case SetValue: if (op.count != 4) throw new InternalException("Illegal argument(s) to set value"); s = getKey(op.resourceIndex, clusterUID, clusterTable); if (DEBUG) System.out.println("Set value " + s + " l=" + op.valueSize); // TODO: remove this unnecessary copy. byte[] value = new byte[op.valueSize]; System.arraycopy(op.valueData, op.valueStart, value, 0, op.valueSize); cluster = cluster.setValue(s, value, value.length, session.clusterTranslator); break; case DeleteValue: if (op.count != 1) throw new InternalException("Illegal argument(s) to set value"); s = getKey(op.resourceIndex, clusterUID, clusterTable); if (DEBUG) System.out.println("Delete " + s); cluster.removeValue(s, session.clusterTranslator); break; case ModifyValue: if (op.count != 5) // index, offset, size, value, valueStart throw new InternalException("Illegal argument(s) to modify resource file"); s = getKey(op.resourceIndex, clusterUID, clusterTable); if (DEBUG) System.out.println("ModifyValue " + s + " off=" + op.valueOffset + " siz=" + op.valueSize + " start=" + op.valueStart); cluster.setValueEx(s); if (undo) session.clusterTranslator.undoValueEx(cluster, op.resourceIndex); break; } } } private void updateSecondPass(ClusterChangeSetI ccs, ClientChangesImpl changes) throws DatabaseException { if (DEBUG) System.out.println("DEBUG: second pass cid=" + ccs.getClusterUID()); ClusterTable clusterTable = session.getClusterTable(); ClusterUID clusterUID = ccs.getClusterUID(); ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID); if (null == cluster) { if (fetchOnly) // This is needed because resource implementation is based on cluster keys. cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID); else throw new DatabaseException("Missing cluster id=" + clusterUID); } if (!fetchOnly && !cluster.isLoaded()) return; Operation op = new Operation(); for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) { switch (op.type) { default: throw new InternalException("Unknown operation " + op); case CreateResource: if (op.count != 1) throw new InternalException("Illegal argument(s) to create resource."); break; case AddRelation: if (op.count != 5) throw new InternalException("Illegal argument(s) to add relation."); int s = getKey(op.resourceIndex, clusterUID, clusterTable); int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable); int o = getKey(op.objectIndex, op.objectCluster, clusterTable); if (DEBUG) System.out.println("Add " + s + "-" + p + "-" + o); if (!fetchOnly) session.getQueryProvider2().updateStatements(s, p); changes.claim(s, p, o); break; case RemoveRelation: if (op.count != 5) throw new InternalException("Illegal argument(s) to remove relation."); s = getKey(op.resourceIndex, clusterUID, clusterTable); p = getKey(op.predicateIndex, op.predicateCluster, clusterTable); o = getKey(op.objectIndex, op.objectCluster, clusterTable); if (DEBUG) System.out.println("Remove " + s + "-" + p + "-" + o); if (!fetchOnly) session.getQueryProvider2().updateStatements(s, p); changes.deny(s, p, o); break; case SetValue: if (op.count != 4) throw new InternalException("Illegal argument(s) to set value."); s = getKey(op.resourceIndex, clusterUID, clusterTable); if (DEBUG) System.out.println("Set value " + s + " l=" + op.valueSize); if (!fetchOnly) session.getQueryProvider2().updateValue(s); changes.claimValue(s); break; case DeleteValue: if (op.count != 1) throw new InternalException("Illegal argument(s) to remove value."); s = getKey(op.resourceIndex, clusterUID, clusterTable); if (DEBUG) System.out.println("Delete value " + s); if (!fetchOnly) session.getQueryProvider2().updateValue(s); changes.claimValue(s); break; case ModifyValue: if (op.count != 5) throw new InternalException("Illegal argument(s) to modify resource file"); s = getKey(op.resourceIndex, clusterUID, clusterTable); if (DEBUG) System.out.println("ModifyValue " + s + " off=" + op.valueOffset + " siz=" + op.valueSize + " start=" + op.valueStart); if (!fetchOnly) session.getQueryProvider2().updateValue(s); else changes.claimValue(s); break; } } } }