]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SynchronizeContext.java
Fixed multiple issues causing dangling references to discarded queries
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SynchronizeContext.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.util.Arrays;
15 import java.util.Collection;
16 import java.util.Vector;
17
18 import org.simantics.db.Database.Session.ChangeSetUpdate;
19 import org.simantics.db.SessionManager;
20 import org.simantics.db.exception.DatabaseException;
21 import org.simantics.db.exception.InternalException;
22 import org.simantics.db.impl.ClusterI;
23 import org.simantics.db.impl.ClusterTraitsBase;
24 import org.simantics.db.procore.cluster.ClusterChangeSet;
25 import org.simantics.db.procore.cluster.ClusterChangeSetI;
26 import org.simantics.db.procore.cluster.ClusterChangeSetI.Operation;
27 import org.simantics.db.procore.cluster.ClusterChangeSetI.OperationEnum;
28 import org.simantics.db.procore.cluster.ClusterTraits;
29 import org.simantics.db.server.ProCoreException;
30 import org.simantics.db.service.ClusterUID;
31 import org.simantics.db.service.LifecycleSupport;
32
33 public class SynchronizeContext implements SynchronizeContextI {
34     class ChangeSet {
35         ChangeSet(long id, int size) {
36             this.id = id;
37             this.count = 0;
38             ccss = new Vector<byte[]>(size);
39             ccss.setSize(size);
40             ccss3 = new boolean[size];
41             skip = new boolean[size];
42         }
43         long id;
44         int count;
45         Vector<byte[]> ccss;
46         Vector<ClusterChangeSetI> ccss2;
47         boolean[] ccss3; // this is getting a bit ridiculous, but...
48         boolean[] skip;
49     }
50     private boolean DEBUG = false;
51     private int changeSetsCount;
52     private ChangeSet[] changeSets;
53     private ClientChangesImpl clientChangesImpl;
54     private SessionImplSocket session;
55     private boolean endOfUpdate = false;
56     private final boolean fetchOnly; // True if change set(s) are only fetched.
57     private Vector<ClientChangesImpl> changes = new Vector<ClientChangesImpl>();
58         private String compressionCodec;
59         
60     SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N) {
61         this(s, clientChangesImpl, N, false);
62     }
63     SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N, boolean fetchOnly) {
64         this.session = s;
65         this.clientChangesImpl = clientChangesImpl;
66         changeSetsCount = 0;
67         changeSets = new ChangeSet[N];
68         this.fetchOnly = fetchOnly;
69         
70         // TODO: find maybe a better way to do this
71         LifecycleSupport support = session.getService(LifecycleSupport.class);
72         SessionManager manager = support.getSessionManager();
73         compressionCodec = manager.getDatabase().getCompression();
74     }
75     Collection<org.simantics.db.ChangeSet> getChangeSets() {
76         Vector<org.simantics.db.ChangeSet> changeSets = new Vector<org.simantics.db.ChangeSet>(changes.size());
77         for (ClientChangesImpl c : changes)
78             changeSets.add(c);
79         return changeSets;
80     }
81     public synchronized boolean isOk(boolean undo)
82     throws DatabaseException {
83         while (!endOfUpdate && changeSetsCount < changeSets.length) {
84             try {
85                 this.wait(10000); // msec
86                 // We are either notified, timed out or spuriously waked up,
87                 // and I'd like to know which. I could time the wait but...
88                 boolean timeout = changeSetsCount < changeSets.length;
89                 if (DEBUG)
90                     if (timeout)
91                         System.out.println("DEBUG: Synchronize context timeout.");
92             } catch (InterruptedException e) {
93                 // I assume that someone wants me to quit waiting.
94             }
95             break;
96         }
97         if (!endOfUpdate && changeSetsCount != changeSets.length)
98             return false; // not ok
99         commitAndUpdate(undo);
100         return true;
101     }
102     @Override
103     public synchronized void onChangeSetUpdate(ChangeSetUpdate event)
104     throws ProCoreException {
105         ClusterUID clusterUID = ClusterUID.make(event.getClusterId(), 0);
106         if (DEBUG) {
107             System.out.println("cs id " + event.getChangeSetId());
108             System.out.println("cs index " + event.getChangeSetIndex());
109             System.out.println("ccs N " + event.getNumberOfClusterChangeSets());
110             System.out.println("ccs index " + event.getIndexOfClusterChangeSet());
111             System.out.println("cluster=" + clusterUID);
112             if (event.getNewCluster())
113                 System.out.println("cluster is new");
114             System.out.println("data length " + event.getData().length);
115             System.out.println(Arrays.toString(event.getData()));
116         }
117         if (0 == event.getChangeSetId() &&
118             (0 == event.getChangeSetIndex() || changeSets.length == event.getChangeSetIndex())&&
119             0 == event.getNumberOfClusterChangeSets() &&
120             0 == event.getIndexOfClusterChangeSet() &&
121             ClusterUID.Null.equals(clusterUID)) {
122             endOfUpdate = true;
123             this.notify();
124             return;
125         }
126
127         assert (changeSetsCount < changeSets.length);
128         assert (event.getChangeSetIndex() < changeSets.length);
129         assert (event.getIndexOfClusterChangeSet() < event.getNumberOfClusterChangeSets());
130         ChangeSet cs = changeSets[event.getChangeSetIndex()];
131         if (null == cs) { // first, create
132             cs = new ChangeSet(event.getChangeSetId(), event.getNumberOfClusterChangeSets());
133             changeSets[event.getChangeSetIndex()] = cs;
134         } else {
135             assert (event.getChangeSetId() == cs.id);
136             assert (event.getNumberOfClusterChangeSets() == cs.ccss.size());
137             assert (cs.count < cs.ccss.size());
138         }
139         assert (event.getData().length > 0);
140         if (DEBUG)
141             System.out.println("ccs count=" + cs.count);
142         cs.ccss3[event.getIndexOfClusterChangeSet()] = event.getNewCluster();
143         cs.ccss.set(event.getIndexOfClusterChangeSet(), event.getData());
144         ++cs.count;
145         if (cs.count < cs.ccss.size())
146             return;
147         ++changeSetsCount;
148         if (changeSetsCount >= changeSets.length)
149             this.notify();
150     }
151     private void commitAndUpdate(boolean undo) throws DatabaseException {
152         final int SIZE = changeSetsCount; // changeSets.length;
153         if (DEBUG)
154             System.out.println("commitAndUpdate: cs count=" + SIZE);
155         for (int i = 0; i < SIZE; ++i) {
156             ChangeSet cs = changeSets[i];
157             final int JSIZE = cs.ccss.size();
158             cs.ccss2 = new Vector<ClusterChangeSetI>(JSIZE);
159             ClusterTable clusterTable = session.getClusterTable();
160             cs.ccss2.setSize(JSIZE);
161             for (int j = 0; j < JSIZE; ++j) {
162                 ClusterChangeSetI ccs = ClusterChangeSet.create(cs.ccss.get(j), compressionCodec);
163                 cs.ccss2.set(j, ccs);
164                 if (fetchOnly)
165                     continue;
166                 if (cs.ccss3[j])
167                     getClusterOrCreate(ccs.getClusterUID(), clusterTable);
168                 ClusterUID clusterUID = ccs.getClusterUID();
169                 ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
170                 if (null == cluster)
171                     cs.skip[j] = true;
172             }
173         }
174         if (fetchOnly) {
175             changes.setSize(SIZE);
176             for (int i = 0; i < SIZE; ++i) {
177                 ChangeSet cs = changeSets[i];
178                 changes.set(i, new ClientChangesImpl(session));
179                 final int JSIZE = cs.ccss.size();
180                 for (int j = 0; j < JSIZE; ++j) {
181                     ClusterChangeSetI ccs = cs.ccss2.get(j);
182                     if (DEBUG)
183                         System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " index=" + j);
184                     this.updateSecondPass(ccs, changes.get(i));
185                 }
186             }
187             return;
188         }
189         for (int i = 0; i < SIZE; ++i) {
190             ChangeSet cs = changeSets[i];
191             final int JSIZE = cs.ccss.size();
192             for (int j = 0; j < JSIZE; ++j) {
193                 ClusterChangeSetI ccs = cs.ccss2.get(j);
194                 if (cs.skip[j]) {
195                     if (DEBUG)
196                         System.out.println("skipping change set " + ccs.getClusterUID());
197                     continue;
198                 }
199                 if (DEBUG)
200                     System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " ccs=" + j);
201                 this.updateFirstPass(ccs, undo);
202             }
203         }
204         for (int i = 0; i < SIZE; ++i) {
205             ChangeSet cs = changeSets[i];
206             final int JSIZE = cs.ccss2.size();
207             for (int j = 0; j < JSIZE; ++j) {
208                 ClusterChangeSetI ccs = cs.ccss2.get(j);
209                 if (cs.skip[j]) {
210                     if (DEBUG)
211                         System.out.println("skipping change set " + ccs.getClusterUID());
212                     continue;
213                 }
214                 this.updateSecondPass(ccs, clientChangesImpl);
215             }
216         }
217     }
218
219     private ClusterI getClusterOrCreate(ClusterUID clusterUID, ClusterTable clusterTable) {
220         // New clusters are created here because resource keys require that referred resources exist.
221         ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
222         if (null == cluster)
223             cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
224         if (null != cluster);
225         return cluster;
226     }
227     private int getKey(long ri, ClusterUID clusterUID, ClusterTable clusterTable)
228     throws DatabaseException {
229         if (ri < 1 || ri > ClusterTraits.getMaxNumberOfResources())
230             throw new InternalException("Illegal resource index. index=" + ri + " cluster=" + clusterUID);
231         if (!ClusterUID.isLegal(clusterUID))
232             throw new InternalException("Illegal resource cluster. index=" + ri + " cluster=" + clusterUID);
233         ClusterI cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
234         int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), (int) ri);
235         if (0 == key)
236             throw new InternalException("Illegal resource index=" + ri + " cluster=" + clusterUID);
237         return key;
238     }
239     private void updateFirstPass(ClusterChangeSetI ccs, boolean undo)
240     throws DatabaseException {
241         ClusterTable clusterTable = session.getClusterTable();
242         ClusterUID clusterUID = ccs.getClusterUID();
243         ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
244         if (null == cluster)
245             throw new DatabaseException("Missing cluster id=" + clusterUID);
246         if (!cluster.isLoaded())
247             if (!undo)
248                 return;
249             else {
250                 if (DEBUG)
251                     System.out.println("Cluster is not loaded. cluster=" + clusterUID);
252                 cluster.load(session.clusterTranslator, new Runnable() {
253                     @Override public void run() {
254                         if (DEBUG)
255                             System.out.println("Cluster loaded for undo.");
256                     }
257                 });
258                 cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
259             }
260         Operation op = new Operation();
261         for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {
262             switch (op.type) {
263                 default:
264                     throw new InternalException("Unknown operation " + op);
265                 case CreateResource:
266                     if (op.count != 1)
267                         throw new InternalException("Illegal argument(s) to create resource.");
268                     if (DEBUG)
269                         System.out.println("Create " + op.resourceIndex);
270                     int s = getKey(op.resourceIndex, clusterUID, clusterTable);
271                     if (!cluster.hasResource(s, session.clusterTranslator)) {
272                         int ss = cluster.createResource(session.clusterTranslator);
273                         short ri = ClusterTraitsBase.getResourceIndexFromResourceKey(ss);
274                         if (ri != op.resourceIndex)
275                             throw new InternalException("Created resource key=" + ss +
276                                 " does not match expected resource index=" + op.resourceIndex);
277                     }
278                     break;
279                 case AddRelation:
280                     if (op.count != 5)
281                         throw new InternalException("Illegal argument(s) to add relation");
282                     s = getKey(op.resourceIndex, clusterUID, clusterTable);
283                     int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
284                     int o = getKey(op.objectIndex, op.objectCluster, clusterTable);
285                     if (DEBUG)
286                         System.out.println("Add " + s + "-" + p + "-" + o);
287                     ClusterI c = cluster.addRelation(s, p, o, session.clusterTranslator);
288                     if (null != c && c != cluster)
289                         cluster = c;
290                     break;
291                 case RemoveRelation:
292                     if (op.count != 5)
293                         throw new InternalException("Illegal argument(s) to remove relation");
294                     s = getKey(op.resourceIndex, clusterUID, clusterTable);
295                     p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
296                     o = getKey(op.objectIndex, op.objectCluster, clusterTable);
297                     if (DEBUG)
298                         System.out.println("Remove " + s + "-" + p + "-" + o);
299                     cluster.removeRelation(s, p, o, session.clusterTranslator);
300                     break;
301                 case SetValue:
302                     if (op.count != 4)
303                         throw new InternalException("Illegal argument(s) to set value");
304                     s = getKey(op.resourceIndex, clusterUID, clusterTable);
305                     if (DEBUG)
306                         System.out.println("Set value " + s + " l=" + op.valueSize);
307                     // TODO: remove this unnecessary copy.
308                     byte[] value = new byte[op.valueSize];
309                     System.arraycopy(op.valueData, op.valueStart, value, 0, op.valueSize);
310                     cluster = cluster.setValue(s, value, value.length, session.clusterTranslator);
311                     break;
312                 case DeleteValue:
313                     if (op.count != 1)
314                         throw new InternalException("Illegal argument(s) to set value");
315                     s = getKey(op.resourceIndex, clusterUID, clusterTable);
316                     if (DEBUG)
317                         System.out.println("Delete " + s);
318                     cluster.removeValue(s, session.clusterTranslator);
319                     break;
320                 case ModifyValue:
321                     if (op.count != 5) // index, offset, size, value, valueStart
322                         throw new InternalException("Illegal argument(s) to modify resource file");
323                     s = getKey(op.resourceIndex, clusterUID, clusterTable);
324                     if (DEBUG)
325                         System.out.println("ModifyValue " + s + " off=" + op.valueOffset +
326                                 " siz=" + op.valueSize + " start=" + op.valueStart);
327                     cluster.setValueEx(s);
328                     if (undo)
329                         session.clusterTranslator.undoValueEx(cluster, op.resourceIndex);
330                     break;
331             }
332         }
333     }
334     private void updateSecondPass(ClusterChangeSetI ccs, ClientChangesImpl changes)
335     throws DatabaseException {
336         if (DEBUG)
337             System.out.println("DEBUG: second pass cid=" + ccs.getClusterUID());
338         ClusterTable clusterTable = session.getClusterTable();
339         ClusterUID clusterUID = ccs.getClusterUID();
340         ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);
341         if (null == cluster) {
342             if (fetchOnly) // This is needed because resource implementation is based on cluster keys.
343                 cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);
344             else
345                 throw new DatabaseException("Missing cluster id=" + clusterUID);
346         }
347         if (!fetchOnly && !cluster.isLoaded())
348             return;
349         Operation op = new Operation();
350         for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {
351             switch (op.type) {
352                 default:
353                     throw new InternalException("Unknown operation " + op);
354                 case CreateResource:
355                     if (op.count != 1)
356                         throw new InternalException("Illegal argument(s) to create resource.");
357                     break;
358                 case AddRelation:
359                     if (op.count != 5)
360                         throw new InternalException("Illegal argument(s) to add relation.");
361                     int s = getKey(op.resourceIndex, clusterUID, clusterTable);
362                     int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
363                     int o = getKey(op.objectIndex, op.objectCluster, clusterTable);
364                     if (DEBUG)
365                         System.out.println("Add " + s + "-" + p + "-" + o);
366                     if (!fetchOnly)
367                         session.getQueryProvider2().updateStatements(s, p);
368                     changes.claim(s, p, o);
369                     break;
370                 case RemoveRelation:
371                     if (op.count != 5)
372                         throw new InternalException("Illegal argument(s) to remove relation.");
373                     s = getKey(op.resourceIndex, clusterUID, clusterTable);
374                     p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);
375                     o = getKey(op.objectIndex, op.objectCluster, clusterTable);
376                     if (DEBUG)
377                         System.out.println("Remove " + s + "-" + p + "-" + o);
378                     if (!fetchOnly)
379                         session.getQueryProvider2().updateStatements(s, p);
380                     changes.deny(s, p, o);
381                     break;
382                 case SetValue:
383                     if (op.count != 4)
384                         throw new InternalException("Illegal argument(s) to set value.");
385                     s = getKey(op.resourceIndex, clusterUID, clusterTable);
386                     if (DEBUG)
387                         System.out.println("Set value " + s + " l=" + op.valueSize);
388                     if (!fetchOnly)
389                         session.getQueryProvider2().updateValue(s);
390                     changes.claimValue(s);
391                     break;
392                 case DeleteValue:
393                     if (op.count != 1)
394                         throw new InternalException("Illegal argument(s) to remove value.");
395                     s = getKey(op.resourceIndex, clusterUID, clusterTable);
396                     if (DEBUG)
397                         System.out.println("Delete value " + s);
398                     if (!fetchOnly)
399                         session.getQueryProvider2().updateValue(s);
400                     changes.claimValue(s);
401                     break;
402                 case ModifyValue:
403                     if (op.count != 5)
404                         throw new InternalException("Illegal argument(s) to modify resource file");
405                     s = getKey(op.resourceIndex, clusterUID, clusterTable);
406                     if (DEBUG)
407                         System.out.println("ModifyValue " + s + " off=" + op.valueOffset +
408                                 " siz=" + op.valueSize + " start=" + op.valueStart);
409                     if (!fetchOnly)
410                         session.getQueryProvider2().updateValue(s);
411                     else
412                         changes.claimValue(s);
413                     break;
414             }
415         }
416     }
417 }