1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package fi.vtt.simantics.procore.internal;
\r
14 import java.util.Arrays;
\r
15 import java.util.Collection;
\r
16 import java.util.Vector;
\r
18 import org.simantics.db.Database.Session.ChangeSetUpdate;
\r
19 import org.simantics.db.SessionManager;
\r
20 import org.simantics.db.exception.DatabaseException;
\r
21 import org.simantics.db.exception.InternalException;
\r
22 import org.simantics.db.impl.ClusterI;
\r
23 import org.simantics.db.impl.ClusterTraitsBase;
\r
24 import org.simantics.db.procore.cluster.ClusterChangeSet;
\r
25 import org.simantics.db.procore.cluster.ClusterChangeSetI;
\r
26 import org.simantics.db.procore.cluster.ClusterChangeSetI.Operation;
\r
27 import org.simantics.db.procore.cluster.ClusterChangeSetI.OperationEnum;
\r
28 import org.simantics.db.procore.cluster.ClusterTraits;
\r
29 import org.simantics.db.server.ProCoreException;
\r
30 import org.simantics.db.service.ClusterUID;
\r
31 import org.simantics.db.service.LifecycleSupport;
\r
33 public class SynchronizeContext implements SynchronizeContextI {
\r
35 ChangeSet(long id, int size) {
\r
38 ccss = new Vector<byte[]>(size);
\r
40 ccss3 = new boolean[size];
\r
41 skip = new boolean[size];
\r
45 Vector<byte[]> ccss;
\r
46 Vector<ClusterChangeSetI> ccss2;
\r
47 boolean[] ccss3; // this is getting a bit ridiculous, but...
\r
50 private boolean DEBUG = false;
\r
51 private int changeSetsCount;
\r
52 private ChangeSet[] changeSets;
\r
53 private ClientChangesImpl clientChangesImpl;
\r
54 private SessionImplSocket session;
\r
55 private boolean endOfUpdate = false;
\r
56 private final boolean fetchOnly; // True if change set(s) are only fetched.
\r
57 private Vector<ClientChangesImpl> changes = new Vector<ClientChangesImpl>();
\r
58 private String compressionCodec;
\r
60 SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N) {
\r
61 this(s, clientChangesImpl, N, false);
\r
63 SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N, boolean fetchOnly) {
\r
65 this.clientChangesImpl = clientChangesImpl;
\r
66 changeSetsCount = 0;
\r
67 changeSets = new ChangeSet[N];
\r
68 this.fetchOnly = fetchOnly;
\r
70 // TODO: find maybe a better way to do this
\r
71 LifecycleSupport support = session.getService(LifecycleSupport.class);
\r
72 SessionManager manager = support.getSessionManager();
\r
73 compressionCodec = manager.getDatabase().getCompression();
\r
75 Collection<org.simantics.db.ChangeSet> getChangeSets() {
\r
76 Vector<org.simantics.db.ChangeSet> changeSets = new Vector<org.simantics.db.ChangeSet>(changes.size());
\r
77 for (ClientChangesImpl c : changes)
\r
81 public synchronized boolean isOk(boolean undo)
\r
82 throws DatabaseException {
\r
83 while (!endOfUpdate && changeSetsCount < changeSets.length) {
\r
85 this.wait(10000); // msec
\r
86 // We are either notified, timed out or spuriously waked up,
\r
87 // and I'd like to know which. I could time the wait but...
\r
88 boolean timeout = changeSetsCount < changeSets.length;
\r
91 System.out.println("DEBUG: Synchronize context timeout.");
\r
92 } catch (InterruptedException e) {
\r
93 // I assume that someone wants me to quit waiting.
\r
97 if (!endOfUpdate && changeSetsCount != changeSets.length)
\r
98 return false; // not ok
\r
99 commitAndUpdate(undo);
\r
103 public synchronized void onChangeSetUpdate(ChangeSetUpdate event)
\r
104 throws ProCoreException {
\r
105 ClusterUID clusterUID = ClusterUID.make(event.getClusterId(), 0);
\r
107 System.out.println("cs id " + event.getChangeSetId());
\r
108 System.out.println("cs index " + event.getChangeSetIndex());
\r
109 System.out.println("ccs N " + event.getNumberOfClusterChangeSets());
\r
110 System.out.println("ccs index " + event.getIndexOfClusterChangeSet());
\r
111 System.out.println("cluster=" + clusterUID);
\r
112 if (event.getNewCluster())
\r
113 System.out.println("cluster is new");
\r
114 System.out.println("data length " + event.getData().length);
\r
115 System.out.println(Arrays.toString(event.getData()));
\r
117 if (0 == event.getChangeSetId() &&
\r
118 (0 == event.getChangeSetIndex() || changeSets.length == event.getChangeSetIndex())&&
\r
119 0 == event.getNumberOfClusterChangeSets() &&
\r
120 0 == event.getIndexOfClusterChangeSet() &&
\r
121 ClusterUID.Null.equals(clusterUID)) {
\r
122 endOfUpdate = true;
\r
127 assert (changeSetsCount < changeSets.length);
\r
128 assert (event.getChangeSetIndex() < changeSets.length);
\r
129 assert (event.getIndexOfClusterChangeSet() < event.getNumberOfClusterChangeSets());
\r
130 ChangeSet cs = changeSets[event.getChangeSetIndex()];
\r
131 if (null == cs) { // first, create
\r
132 cs = new ChangeSet(event.getChangeSetId(), event.getNumberOfClusterChangeSets());
\r
133 changeSets[event.getChangeSetIndex()] = cs;
\r
135 assert (event.getChangeSetId() == cs.id);
\r
136 assert (event.getNumberOfClusterChangeSets() == cs.ccss.size());
\r
137 assert (cs.count < cs.ccss.size());
\r
139 assert (event.getData().length > 0);
\r
141 System.out.println("ccs count=" + cs.count);
\r
142 cs.ccss3[event.getIndexOfClusterChangeSet()] = event.getNewCluster();
\r
143 cs.ccss.set(event.getIndexOfClusterChangeSet(), event.getData());
\r
145 if (cs.count < cs.ccss.size())
\r
148 if (changeSetsCount >= changeSets.length)
\r
151 private void commitAndUpdate(boolean undo) throws DatabaseException {
\r
152 final int SIZE = changeSetsCount; // changeSets.length;
\r
154 System.out.println("commitAndUpdate: cs count=" + SIZE);
\r
155 for (int i = 0; i < SIZE; ++i) {
\r
156 ChangeSet cs = changeSets[i];
\r
157 final int JSIZE = cs.ccss.size();
\r
158 cs.ccss2 = new Vector<ClusterChangeSetI>(JSIZE);
\r
159 ClusterTable clusterTable = session.getClusterTable();
\r
160 cs.ccss2.setSize(JSIZE);
\r
161 for (int j = 0; j < JSIZE; ++j) {
\r
162 ClusterChangeSetI ccs = ClusterChangeSet.create(cs.ccss.get(j), compressionCodec);
\r
163 cs.ccss2.set(j, ccs);
\r
167 getClusterOrCreate(ccs.getClusterUID(), clusterTable);
\r
168 ClusterUID clusterUID = ccs.getClusterUID();
\r
169 ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
\r
170 if (null == cluster)
\r
175 changes.setSize(SIZE);
\r
176 for (int i = 0; i < SIZE; ++i) {
\r
177 ChangeSet cs = changeSets[i];
\r
178 changes.set(i, new ClientChangesImpl(session));
\r
179 final int JSIZE = cs.ccss.size();
\r
180 for (int j = 0; j < JSIZE; ++j) {
\r
181 ClusterChangeSetI ccs = cs.ccss2.get(j);
\r
183 System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " index=" + j);
\r
184 this.updateSecondPass(ccs, changes.get(i));
\r
189 for (int i = 0; i < SIZE; ++i) {
\r
190 ChangeSet cs = changeSets[i];
\r
191 final int JSIZE = cs.ccss.size();
\r
192 for (int j = 0; j < JSIZE; ++j) {
\r
193 ClusterChangeSetI ccs = cs.ccss2.get(j);
\r
196 System.out.println("skipping change set " + ccs.getClusterUID());
\r
200 System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " ccs=" + j);
\r
201 this.updateFirstPass(ccs, undo);
\r
204 for (int i = 0; i < SIZE; ++i) {
\r
205 ChangeSet cs = changeSets[i];
\r
206 final int JSIZE = cs.ccss2.size();
\r
207 for (int j = 0; j < JSIZE; ++j) {
\r
208 ClusterChangeSetI ccs = cs.ccss2.get(j);
\r
211 System.out.println("skipping change set " + ccs.getClusterUID());
\r
214 this.updateSecondPass(ccs, clientChangesImpl);
\r
219 private ClusterI getClusterOrCreate(ClusterUID clusterUID, ClusterTable clusterTable) {
\r
220 // New clusters are created here because resource keys require that referred resources exist.
\r
221 ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
\r
222 if (null == cluster)
\r
223 cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
\r
224 if (null != cluster);
\r
227 private int getKey(long ri, ClusterUID clusterUID, ClusterTable clusterTable)
\r
228 throws DatabaseException {
\r
229 if (ri < 1 || ri > ClusterTraits.getMaxNumberOfResources())
\r
230 throw new InternalException("Illegal resource index. index=" + ri + " cluster=" + clusterUID);
\r
231 if (!ClusterUID.isLegal(clusterUID))
\r
232 throw new InternalException("Illegal resource cluster. index=" + ri + " cluster=" + clusterUID);
\r
233 ClusterI cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
\r
234 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), (int) ri);
\r
236 throw new InternalException("Illegal resource index=" + ri + " cluster=" + clusterUID);
\r
239 private void updateFirstPass(ClusterChangeSetI ccs, boolean undo)
\r
240 throws DatabaseException {
\r
241 ClusterTable clusterTable = session.getClusterTable();
\r
242 ClusterUID clusterUID = ccs.getClusterUID();
\r
243 ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
\r
244 if (null == cluster)
\r
245 throw new DatabaseException("Missing cluster id=" + clusterUID);
\r
246 if (!cluster.isLoaded())
\r
251 System.out.println("Cluster is not loaded. cluster=" + clusterUID);
\r
252 cluster.load(session.clusterTranslator, new Runnable() {
\r
253 @Override public void run() {
\r
255 System.out.println("Cluster loaded for undo.");
\r
258 cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
\r
260 Operation op = new Operation();
\r
261 for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {
\r
264 throw new InternalException("Unknown operation " + op);
\r
265 case CreateResource:
\r
267 throw new InternalException("Illegal argument(s) to create resource.");
\r
269 System.out.println("Create " + op.resourceIndex);
\r
270 int s = getKey(op.resourceIndex, clusterUID, clusterTable);
\r
271 if (!cluster.hasResource(s, session.clusterTranslator)) {
\r
272 int ss = cluster.createResource(session.clusterTranslator);
\r
273 short ri = ClusterTraitsBase.getResourceIndexFromResourceKey(ss);
\r
274 if (ri != op.resourceIndex)
\r
275 throw new InternalException("Created resource key=" + ss +
\r
276 " does not match expected resource index=" + op.resourceIndex);
\r
281 throw new InternalException("Illegal argument(s) to add relation");
\r
282 s = getKey(op.resourceIndex, clusterUID, clusterTable);
\r
283 int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
\r
284 int o = getKey(op.objectIndex, op.objectCluster, clusterTable);
\r
286 System.out.println("Add " + s + "-" + p + "-" + o);
\r
287 ClusterI c = cluster.addRelation(s, p, o, session.clusterTranslator);
\r
288 if (null != c && c != cluster)
\r
291 case RemoveRelation:
\r
293 throw new InternalException("Illegal argument(s) to remove relation");
\r
294 s = getKey(op.resourceIndex, clusterUID, clusterTable);
\r
295 p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
\r
296 o = getKey(op.objectIndex, op.objectCluster, clusterTable);
\r
298 System.out.println("Remove " + s + "-" + p + "-" + o);
\r
299 cluster.removeRelation(s, p, o, session.clusterTranslator);
\r
303 throw new InternalException("Illegal argument(s) to set value");
\r
304 s = getKey(op.resourceIndex, clusterUID, clusterTable);
\r
306 System.out.println("Set value " + s + " l=" + op.valueSize);
\r
307 // TODO: remove this unnecessary copy.
\r
308 byte[] value = new byte[op.valueSize];
\r
309 System.arraycopy(op.valueData, op.valueStart, value, 0, op.valueSize);
\r
310 cluster = cluster.setValue(s, value, value.length, session.clusterTranslator);
\r
314 throw new InternalException("Illegal argument(s) to set value");
\r
315 s = getKey(op.resourceIndex, clusterUID, clusterTable);
\r
317 System.out.println("Delete " + s);
\r
318 cluster.removeValue(s, session.clusterTranslator);
\r
321 if (op.count != 5) // index, offset, size, value, valueStart
\r
322 throw new InternalException("Illegal argument(s) to modify resource file");
\r
323 s = getKey(op.resourceIndex, clusterUID, clusterTable);
\r
325 System.out.println("ModifyValue " + s + " off=" + op.valueOffset +
\r
326 " siz=" + op.valueSize + " start=" + op.valueStart);
\r
327 cluster.setValueEx(s);
\r
329 session.clusterTranslator.undoValueEx(cluster, op.resourceIndex);
\r
334 private void updateSecondPass(ClusterChangeSetI ccs, ClientChangesImpl changes)
\r
335 throws DatabaseException {
\r
337 System.out.println("DEBUG: second pass cid=" + ccs.getClusterUID());
\r
338 ClusterTable clusterTable = session.getClusterTable();
\r
339 ClusterUID clusterUID = ccs.getClusterUID();
\r
340 ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
\r
341 if (null == cluster) {
\r
342 if (fetchOnly) // This is needed because resource implementation is based on cluster keys.
\r
343 cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
\r
345 throw new DatabaseException("Missing cluster id=" + clusterUID);
\r
347 if (!fetchOnly && !cluster.isLoaded())
\r
349 Operation op = new Operation();
\r
350 for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {
\r
353 throw new InternalException("Unknown operation " + op);
\r
354 case CreateResource:
\r
356 throw new InternalException("Illegal argument(s) to create resource.");
\r
360 throw new InternalException("Illegal argument(s) to add relation.");
\r
361 int s = getKey(op.resourceIndex, clusterUID, clusterTable);
\r
362 int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
\r
363 int o = getKey(op.objectIndex, op.objectCluster, clusterTable);
\r
365 System.out.println("Add " + s + "-" + p + "-" + o);
\r
367 session.getQueryProvider2().updateStatements(s, p);
\r
368 changes.claim(s, p, o);
\r
370 case RemoveRelation:
\r
372 throw new InternalException("Illegal argument(s) to remove relation.");
\r
373 s = getKey(op.resourceIndex, clusterUID, clusterTable);
\r
374 p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
\r
375 o = getKey(op.objectIndex, op.objectCluster, clusterTable);
\r
377 System.out.println("Remove " + s + "-" + p + "-" + o);
\r
379 session.getQueryProvider2().updateStatements(s, p);
\r
380 changes.deny(s, p, o);
\r
384 throw new InternalException("Illegal argument(s) to set value.");
\r
385 s = getKey(op.resourceIndex, clusterUID, clusterTable);
\r
387 System.out.println("Set value " + s + " l=" + op.valueSize);
\r
389 session.getQueryProvider2().updateValue(s);
\r
390 changes.claimValue(s);
\r
394 throw new InternalException("Illegal argument(s) to remove value.");
\r
395 s = getKey(op.resourceIndex, clusterUID, clusterTable);
\r
397 System.out.println("Delete value " + s);
\r
399 session.getQueryProvider2().updateValue(s);
\r
400 changes.claimValue(s);
\r
404 throw new InternalException("Illegal argument(s) to modify resource file");
\r
405 s = getKey(op.resourceIndex, clusterUID, clusterTable);
\r
407 System.out.println("ModifyValue " + s + " off=" + op.valueOffset +
\r
408 " siz=" + op.valueSize + " start=" + op.valueStart);
\r
410 session.getQueryProvider2().updateValue(s);
\r
412 changes.claimValue(s);
\r