1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
14 import java.util.Arrays;
15 import java.util.Collection;
16 import java.util.Vector;
18 import org.simantics.db.Database.Session.ChangeSetUpdate;
19 import org.simantics.db.SessionManager;
20 import org.simantics.db.exception.DatabaseException;
21 import org.simantics.db.exception.InternalException;
22 import org.simantics.db.impl.ClusterI;
23 import org.simantics.db.impl.ClusterTraitsBase;
24 import org.simantics.db.procore.cluster.ClusterChangeSet;
25 import org.simantics.db.procore.cluster.ClusterChangeSetI;
26 import org.simantics.db.procore.cluster.ClusterChangeSetI.Operation;
27 import org.simantics.db.procore.cluster.ClusterChangeSetI.OperationEnum;
28 import org.simantics.db.procore.cluster.ClusterTraits;
29 import org.simantics.db.server.ProCoreException;
30 import org.simantics.db.service.ClusterUID;
31 import org.simantics.db.service.LifecycleSupport;
33 public class SynchronizeContext implements SynchronizeContextI {
35 ChangeSet(long id, int size) {
38 ccss = new Vector<byte[]>(size);
40 ccss3 = new boolean[size];
41 skip = new boolean[size];
46 Vector<ClusterChangeSetI> ccss2;
47 boolean[] ccss3; // this is getting a bit ridiculous, but...
50 private boolean DEBUG = false;
51 private int changeSetsCount;
52 private ChangeSet[] changeSets;
53 private ClientChangesImpl clientChangesImpl;
54 private SessionImplSocket session;
55 private boolean endOfUpdate = false;
56 private final boolean fetchOnly; // True if change set(s) are only fetched.
57 private Vector<ClientChangesImpl> changes = new Vector<ClientChangesImpl>();
58 private String compressionCodec;
60 SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N) {
61 this(s, clientChangesImpl, N, false);
63 SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N, boolean fetchOnly) {
65 this.clientChangesImpl = clientChangesImpl;
67 changeSets = new ChangeSet[N];
68 this.fetchOnly = fetchOnly;
70 // TODO: find maybe a better way to do this
71 LifecycleSupport support = session.getService(LifecycleSupport.class);
72 SessionManager manager = support.getSessionManager();
73 compressionCodec = manager.getDatabase().getCompression();
75 Collection<org.simantics.db.ChangeSet> getChangeSets() {
76 Vector<org.simantics.db.ChangeSet> changeSets = new Vector<org.simantics.db.ChangeSet>(changes.size());
77 for (ClientChangesImpl c : changes)
81 public synchronized boolean isOk(boolean undo)
82 throws DatabaseException {
83 while (!endOfUpdate && changeSetsCount < changeSets.length) {
85 this.wait(10000); // msec
86 // We are either notified, timed out or spuriously waked up,
87 // and I'd like to know which. I could time the wait but...
88 boolean timeout = changeSetsCount < changeSets.length;
91 System.out.println("DEBUG: Synchronize context timeout.");
92 } catch (InterruptedException e) {
93 // I assume that someone wants me to quit waiting.
97 if (!endOfUpdate && changeSetsCount != changeSets.length)
98 return false; // not ok
99 commitAndUpdate(undo);
103 public synchronized void onChangeSetUpdate(ChangeSetUpdate event)
104 throws ProCoreException {
105 ClusterUID clusterUID = ClusterUID.make(event.getClusterId(), 0);
107 System.out.println("cs id " + event.getChangeSetId());
108 System.out.println("cs index " + event.getChangeSetIndex());
109 System.out.println("ccs N " + event.getNumberOfClusterChangeSets());
110 System.out.println("ccs index " + event.getIndexOfClusterChangeSet());
111 System.out.println("cluster=" + clusterUID);
112 if (event.getNewCluster())
113 System.out.println("cluster is new");
114 System.out.println("data length " + event.getData().length);
115 System.out.println(Arrays.toString(event.getData()));
117 if (0 == event.getChangeSetId() &&
118 (0 == event.getChangeSetIndex() || changeSets.length == event.getChangeSetIndex())&&
119 0 == event.getNumberOfClusterChangeSets() &&
120 0 == event.getIndexOfClusterChangeSet() &&
121 ClusterUID.Null.equals(clusterUID)) {
127 assert (changeSetsCount < changeSets.length);
128 assert (event.getChangeSetIndex() < changeSets.length);
129 assert (event.getIndexOfClusterChangeSet() < event.getNumberOfClusterChangeSets());
130 ChangeSet cs = changeSets[event.getChangeSetIndex()];
131 if (null == cs) { // first, create
132 cs = new ChangeSet(event.getChangeSetId(), event.getNumberOfClusterChangeSets());
133 changeSets[event.getChangeSetIndex()] = cs;
135 assert (event.getChangeSetId() == cs.id);
136 assert (event.getNumberOfClusterChangeSets() == cs.ccss.size());
137 assert (cs.count < cs.ccss.size());
139 assert (event.getData().length > 0);
141 System.out.println("ccs count=" + cs.count);
142 cs.ccss3[event.getIndexOfClusterChangeSet()] = event.getNewCluster();
143 cs.ccss.set(event.getIndexOfClusterChangeSet(), event.getData());
145 if (cs.count < cs.ccss.size())
148 if (changeSetsCount >= changeSets.length)
151 private void commitAndUpdate(boolean undo) throws DatabaseException {
152 final int SIZE = changeSetsCount; // changeSets.length;
154 System.out.println("commitAndUpdate: cs count=" + SIZE);
155 for (int i = 0; i < SIZE; ++i) {
156 ChangeSet cs = changeSets[i];
157 final int JSIZE = cs.ccss.size();
158 cs.ccss2 = new Vector<ClusterChangeSetI>(JSIZE);
159 ClusterTable clusterTable = session.getClusterTable();
160 cs.ccss2.setSize(JSIZE);
161 for (int j = 0; j < JSIZE; ++j) {
162 ClusterChangeSetI ccs = ClusterChangeSet.create(cs.ccss.get(j), compressionCodec);
163 cs.ccss2.set(j, ccs);
167 getClusterOrCreate(ccs.getClusterUID(), clusterTable);
168 ClusterUID clusterUID = ccs.getClusterUID();
169 ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
175 changes.setSize(SIZE);
176 for (int i = 0; i < SIZE; ++i) {
177 ChangeSet cs = changeSets[i];
178 changes.set(i, new ClientChangesImpl(session));
179 final int JSIZE = cs.ccss.size();
180 for (int j = 0; j < JSIZE; ++j) {
181 ClusterChangeSetI ccs = cs.ccss2.get(j);
183 System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " index=" + j);
184 this.updateSecondPass(ccs, changes.get(i));
189 for (int i = 0; i < SIZE; ++i) {
190 ChangeSet cs = changeSets[i];
191 final int JSIZE = cs.ccss.size();
192 for (int j = 0; j < JSIZE; ++j) {
193 ClusterChangeSetI ccs = cs.ccss2.get(j);
196 System.out.println("skipping change set " + ccs.getClusterUID());
200 System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " ccs=" + j);
201 this.updateFirstPass(ccs, undo);
204 for (int i = 0; i < SIZE; ++i) {
205 ChangeSet cs = changeSets[i];
206 final int JSIZE = cs.ccss2.size();
207 for (int j = 0; j < JSIZE; ++j) {
208 ClusterChangeSetI ccs = cs.ccss2.get(j);
211 System.out.println("skipping change set " + ccs.getClusterUID());
214 this.updateSecondPass(ccs, clientChangesImpl);
219 private ClusterI getClusterOrCreate(ClusterUID clusterUID, ClusterTable clusterTable) {
220 // New clusters are created here because resource keys require that referred resources exist.
221 ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
223 cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
224 if (null != cluster);
227 private int getKey(long ri, ClusterUID clusterUID, ClusterTable clusterTable)
228 throws DatabaseException {
229 if (ri < 1 || ri > ClusterTraits.getMaxNumberOfResources())
230 throw new InternalException("Illegal resource index. index=" + ri + " cluster=" + clusterUID);
231 if (!ClusterUID.isLegal(clusterUID))
232 throw new InternalException("Illegal resource cluster. index=" + ri + " cluster=" + clusterUID);
233 ClusterI cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
234 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), (int) ri);
236 throw new InternalException("Illegal resource index=" + ri + " cluster=" + clusterUID);
239 private void updateFirstPass(ClusterChangeSetI ccs, boolean undo)
240 throws DatabaseException {
241 ClusterTable clusterTable = session.getClusterTable();
242 ClusterUID clusterUID = ccs.getClusterUID();
243 ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
245 throw new DatabaseException("Missing cluster id=" + clusterUID);
246 if (!cluster.isLoaded())
251 System.out.println("Cluster is not loaded. cluster=" + clusterUID);
252 cluster.load(session.clusterTranslator, new Runnable() {
253 @Override public void run() {
255 System.out.println("Cluster loaded for undo.");
258 cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
260 Operation op = new Operation();
261 for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {
264 throw new InternalException("Unknown operation " + op);
267 throw new InternalException("Illegal argument(s) to create resource.");
269 System.out.println("Create " + op.resourceIndex);
270 int s = getKey(op.resourceIndex, clusterUID, clusterTable);
271 if (!cluster.hasResource(s, session.clusterTranslator)) {
272 int ss = cluster.createResource(session.clusterTranslator);
273 short ri = ClusterTraitsBase.getResourceIndexFromResourceKey(ss);
274 if (ri != op.resourceIndex)
275 throw new InternalException("Created resource key=" + ss +
276 " does not match expected resource index=" + op.resourceIndex);
281 throw new InternalException("Illegal argument(s) to add relation");
282 s = getKey(op.resourceIndex, clusterUID, clusterTable);
283 int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
284 int o = getKey(op.objectIndex, op.objectCluster, clusterTable);
286 System.out.println("Add " + s + "-" + p + "-" + o);
287 ClusterI c = cluster.addRelation(s, p, o, session.clusterTranslator);
288 if (null != c && c != cluster)
293 throw new InternalException("Illegal argument(s) to remove relation");
294 s = getKey(op.resourceIndex, clusterUID, clusterTable);
295 p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
296 o = getKey(op.objectIndex, op.objectCluster, clusterTable);
298 System.out.println("Remove " + s + "-" + p + "-" + o);
299 cluster.removeRelation(s, p, o, session.clusterTranslator);
303 throw new InternalException("Illegal argument(s) to set value");
304 s = getKey(op.resourceIndex, clusterUID, clusterTable);
306 System.out.println("Set value " + s + " l=" + op.valueSize);
307 // TODO: remove this unnecessary copy.
308 byte[] value = new byte[op.valueSize];
309 System.arraycopy(op.valueData, op.valueStart, value, 0, op.valueSize);
310 cluster = cluster.setValue(s, value, value.length, session.clusterTranslator);
314 throw new InternalException("Illegal argument(s) to set value");
315 s = getKey(op.resourceIndex, clusterUID, clusterTable);
317 System.out.println("Delete " + s);
318 cluster.removeValue(s, session.clusterTranslator);
321 if (op.count != 5) // index, offset, size, value, valueStart
322 throw new InternalException("Illegal argument(s) to modify resource file");
323 s = getKey(op.resourceIndex, clusterUID, clusterTable);
325 System.out.println("ModifyValue " + s + " off=" + op.valueOffset +
326 " siz=" + op.valueSize + " start=" + op.valueStart);
327 cluster.setValueEx(s);
329 session.clusterTranslator.undoValueEx(cluster, op.resourceIndex);
334 private void updateSecondPass(ClusterChangeSetI ccs, ClientChangesImpl changes)
335 throws DatabaseException {
337 System.out.println("DEBUG: second pass cid=" + ccs.getClusterUID());
338 ClusterTable clusterTable = session.getClusterTable();
339 ClusterUID clusterUID = ccs.getClusterUID();
340 ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
341 if (null == cluster) {
342 if (fetchOnly) // This is needed because resource implementation is based on cluster keys.
343 cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
345 throw new DatabaseException("Missing cluster id=" + clusterUID);
347 if (!fetchOnly && !cluster.isLoaded())
349 Operation op = new Operation();
350 for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {
353 throw new InternalException("Unknown operation " + op);
356 throw new InternalException("Illegal argument(s) to create resource.");
360 throw new InternalException("Illegal argument(s) to add relation.");
361 int s = getKey(op.resourceIndex, clusterUID, clusterTable);
362 int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
363 int o = getKey(op.objectIndex, op.objectCluster, clusterTable);
365 System.out.println("Add " + s + "-" + p + "-" + o);
367 session.getQueryProvider2().updateStatements(s, p);
368 changes.claim(s, p, o);
372 throw new InternalException("Illegal argument(s) to remove relation.");
373 s = getKey(op.resourceIndex, clusterUID, clusterTable);
374 p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
375 o = getKey(op.objectIndex, op.objectCluster, clusterTable);
377 System.out.println("Remove " + s + "-" + p + "-" + o);
379 session.getQueryProvider2().updateStatements(s, p);
380 changes.deny(s, p, o);
384 throw new InternalException("Illegal argument(s) to set value.");
385 s = getKey(op.resourceIndex, clusterUID, clusterTable);
387 System.out.println("Set value " + s + " l=" + op.valueSize);
389 session.getQueryProvider2().updateValue(s);
390 changes.claimValue(s);
394 throw new InternalException("Illegal argument(s) to remove value.");
395 s = getKey(op.resourceIndex, clusterUID, clusterTable);
397 System.out.println("Delete value " + s);
399 session.getQueryProvider2().updateValue(s);
400 changes.claimValue(s);
404 throw new InternalException("Illegal argument(s) to modify resource file");
405 s = getKey(op.resourceIndex, clusterUID, clusterTable);
407 System.out.println("ModifyValue " + s + " off=" + op.valueOffset +
408 " siz=" + op.valueSize + " start=" + op.valueStart);
410 session.getQueryProvider2().updateValue(s);
412 changes.claimValue(s);