Sync git svn branch with SVN repository r33249.
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SynchronizeContext.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package fi.vtt.simantics.procore.internal;\r
13 \r
14 import java.util.Arrays;\r
15 import java.util.Collection;\r
16 import java.util.Vector;\r
17 \r
18 import org.simantics.db.Database.Session.ChangeSetUpdate;\r
19 import org.simantics.db.SessionManager;\r
20 import org.simantics.db.exception.DatabaseException;\r
21 import org.simantics.db.exception.InternalException;\r
22 import org.simantics.db.impl.ClusterI;\r
23 import org.simantics.db.impl.ClusterTraitsBase;\r
24 import org.simantics.db.procore.cluster.ClusterChangeSet;\r
25 import org.simantics.db.procore.cluster.ClusterChangeSetI;\r
26 import org.simantics.db.procore.cluster.ClusterChangeSetI.Operation;\r
27 import org.simantics.db.procore.cluster.ClusterChangeSetI.OperationEnum;\r
28 import org.simantics.db.procore.cluster.ClusterTraits;\r
29 import org.simantics.db.server.ProCoreException;\r
30 import org.simantics.db.service.ClusterUID;\r
31 import org.simantics.db.service.LifecycleSupport;\r
32 \r
33 public class SynchronizeContext implements SynchronizeContextI {\r
34     class ChangeSet {\r
35         ChangeSet(long id, int size) {\r
36             this.id = id;\r
37             this.count = 0;\r
38             ccss = new Vector<byte[]>(size);\r
39             ccss.setSize(size);\r
40             ccss3 = new boolean[size];\r
41             skip = new boolean[size];\r
42         }\r
43         long id;\r
44         int count;\r
45         Vector<byte[]> ccss;\r
46         Vector<ClusterChangeSetI> ccss2;\r
47         boolean[] ccss3; // this is getting a bit ridiculous, but...\r
48         boolean[] skip;\r
49     }\r
50     private boolean DEBUG = false;\r
51     private int changeSetsCount;\r
52     private ChangeSet[] changeSets;\r
53     private ClientChangesImpl clientChangesImpl;\r
54     private SessionImplSocket session;\r
55     private boolean endOfUpdate = false;\r
56     private final boolean fetchOnly; // True if change set(s) are only fetched.\r
57     private Vector<ClientChangesImpl> changes = new Vector<ClientChangesImpl>();\r
58         private String compressionCodec;\r
59         \r
60     SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N) {\r
61         this(s, clientChangesImpl, N, false);\r
62     }\r
63     SynchronizeContext(SessionImplSocket s, ClientChangesImpl clientChangesImpl, int N, boolean fetchOnly) {\r
64         this.session = s;\r
65         this.clientChangesImpl = clientChangesImpl;\r
66         changeSetsCount = 0;\r
67         changeSets = new ChangeSet[N];\r
68         this.fetchOnly = fetchOnly;\r
69         \r
70         // TODO: find maybe a better way to do this\r
71         LifecycleSupport support = session.getService(LifecycleSupport.class);\r
72         SessionManager manager = support.getSessionManager();\r
73         compressionCodec = manager.getDatabase().getCompression();\r
74     }\r
75     Collection<org.simantics.db.ChangeSet> getChangeSets() {\r
76         Vector<org.simantics.db.ChangeSet> changeSets = new Vector<org.simantics.db.ChangeSet>(changes.size());\r
77         for (ClientChangesImpl c : changes)\r
78             changeSets.add(c);\r
79         return changeSets;\r
80     }\r
81     public synchronized boolean isOk(boolean undo)\r
82     throws DatabaseException {\r
83         while (!endOfUpdate && changeSetsCount < changeSets.length) {\r
84             try {\r
85                 this.wait(10000); // msec\r
86                 // We are either notified, timed out or spuriously waked up,\r
87                 // and I'd like to know which. I could time the wait but...\r
88                 boolean timeout = changeSetsCount < changeSets.length;\r
89                 if (DEBUG)\r
90                     if (timeout)\r
91                         System.out.println("DEBUG: Synchronize context timeout.");\r
92             } catch (InterruptedException e) {\r
93                 // I assume that someone wants me to quit waiting.\r
94             }\r
95             break;\r
96         }\r
97         if (!endOfUpdate && changeSetsCount != changeSets.length)\r
98             return false; // not ok\r
99         commitAndUpdate(undo);\r
100         return true;\r
101     }\r
102     @Override\r
103     public synchronized void onChangeSetUpdate(ChangeSetUpdate event)\r
104     throws ProCoreException {\r
105         ClusterUID clusterUID = ClusterUID.make(event.getClusterId(), 0);\r
106         if (DEBUG) {\r
107             System.out.println("cs id " + event.getChangeSetId());\r
108             System.out.println("cs index " + event.getChangeSetIndex());\r
109             System.out.println("ccs N " + event.getNumberOfClusterChangeSets());\r
110             System.out.println("ccs index " + event.getIndexOfClusterChangeSet());\r
111             System.out.println("cluster=" + clusterUID);\r
112             if (event.getNewCluster())\r
113                 System.out.println("cluster is new");\r
114             System.out.println("data length " + event.getData().length);\r
115             System.out.println(Arrays.toString(event.getData()));\r
116         }\r
117         if (0 == event.getChangeSetId() &&\r
118             (0 == event.getChangeSetIndex() || changeSets.length == event.getChangeSetIndex())&&\r
119             0 == event.getNumberOfClusterChangeSets() &&\r
120             0 == event.getIndexOfClusterChangeSet() &&\r
121             ClusterUID.Null.equals(clusterUID)) {\r
122             endOfUpdate = true;\r
123             this.notify();\r
124             return;\r
125         }\r
126 \r
127         assert (changeSetsCount < changeSets.length);\r
128         assert (event.getChangeSetIndex() < changeSets.length);\r
129         assert (event.getIndexOfClusterChangeSet() < event.getNumberOfClusterChangeSets());\r
130         ChangeSet cs = changeSets[event.getChangeSetIndex()];\r
131         if (null == cs) { // first, create\r
132             cs = new ChangeSet(event.getChangeSetId(), event.getNumberOfClusterChangeSets());\r
133             changeSets[event.getChangeSetIndex()] = cs;\r
134         } else {\r
135             assert (event.getChangeSetId() == cs.id);\r
136             assert (event.getNumberOfClusterChangeSets() == cs.ccss.size());\r
137             assert (cs.count < cs.ccss.size());\r
138         }\r
139         assert (event.getData().length > 0);\r
140         if (DEBUG)\r
141             System.out.println("ccs count=" + cs.count);\r
142         cs.ccss3[event.getIndexOfClusterChangeSet()] = event.getNewCluster();\r
143         cs.ccss.set(event.getIndexOfClusterChangeSet(), event.getData());\r
144         ++cs.count;\r
145         if (cs.count < cs.ccss.size())\r
146             return;\r
147         ++changeSetsCount;\r
148         if (changeSetsCount >= changeSets.length)\r
149             this.notify();\r
150     }\r
151     private void commitAndUpdate(boolean undo) throws DatabaseException {\r
152         final int SIZE = changeSetsCount; // changeSets.length;\r
153         if (DEBUG)\r
154             System.out.println("commitAndUpdate: cs count=" + SIZE);\r
155         for (int i = 0; i < SIZE; ++i) {\r
156             ChangeSet cs = changeSets[i];\r
157             final int JSIZE = cs.ccss.size();\r
158             cs.ccss2 = new Vector<ClusterChangeSetI>(JSIZE);\r
159             ClusterTable clusterTable = session.getClusterTable();\r
160             cs.ccss2.setSize(JSIZE);\r
161             for (int j = 0; j < JSIZE; ++j) {\r
162                 ClusterChangeSetI ccs = ClusterChangeSet.create(cs.ccss.get(j), compressionCodec);\r
163                 cs.ccss2.set(j, ccs);\r
164                 if (fetchOnly)\r
165                     continue;\r
166                 if (cs.ccss3[j])\r
167                     getClusterOrCreate(ccs.getClusterUID(), clusterTable);\r
168                 ClusterUID clusterUID = ccs.getClusterUID();\r
169                 ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
170                 if (null == cluster)\r
171                     cs.skip[j] = true;\r
172             }\r
173         }\r
174         if (fetchOnly) {\r
175             changes.setSize(SIZE);\r
176             for (int i = 0; i < SIZE; ++i) {\r
177                 ChangeSet cs = changeSets[i];\r
178                 changes.set(i, new ClientChangesImpl(session));\r
179                 final int JSIZE = cs.ccss.size();\r
180                 for (int j = 0; j < JSIZE; ++j) {\r
181                     ClusterChangeSetI ccs = cs.ccss2.get(j);\r
182                     if (DEBUG)\r
183                         System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " index=" + j);\r
184                     this.updateSecondPass(ccs, changes.get(i));\r
185                 }\r
186             }\r
187             return;\r
188         }\r
189         for (int i = 0; i < SIZE; ++i) {\r
190             ChangeSet cs = changeSets[i];\r
191             final int JSIZE = cs.ccss.size();\r
192             for (int j = 0; j < JSIZE; ++j) {\r
193                 ClusterChangeSetI ccs = cs.ccss2.get(j);\r
194                 if (cs.skip[j]) {\r
195                     if (DEBUG)\r
196                         System.out.println("skipping change set " + ccs.getClusterUID());\r
197                     continue;\r
198                 }\r
199                 if (DEBUG)\r
200                     System.out.println("Update first pass. Cluster=" + ccs.getClusterUID() + " ccs=" + j);\r
201                 this.updateFirstPass(ccs, undo);\r
202             }\r
203         }\r
204         for (int i = 0; i < SIZE; ++i) {\r
205             ChangeSet cs = changeSets[i];\r
206             final int JSIZE = cs.ccss2.size();\r
207             for (int j = 0; j < JSIZE; ++j) {\r
208                 ClusterChangeSetI ccs = cs.ccss2.get(j);\r
209                 if (cs.skip[j]) {\r
210                     if (DEBUG)\r
211                         System.out.println("skipping change set " + ccs.getClusterUID());\r
212                     continue;\r
213                 }\r
214                 this.updateSecondPass(ccs, clientChangesImpl);\r
215             }\r
216         }\r
217     }\r
218 \r
219     private ClusterI getClusterOrCreate(ClusterUID clusterUID, ClusterTable clusterTable) {\r
220         // New clusters are created here because resource keys require that referred resources exist.\r
221         ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
222         if (null == cluster)\r
223             cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
224         if (null != cluster);\r
225         return cluster;\r
226     }\r
227     private int getKey(long ri, ClusterUID clusterUID, ClusterTable clusterTable)\r
228     throws DatabaseException {\r
229         if (ri < 1 || ri > ClusterTraits.getMaxNumberOfResources())\r
230             throw new InternalException("Illegal resource index. index=" + ri + " cluster=" + clusterUID);\r
231         if (!ClusterUID.isLegal(clusterUID))\r
232             throw new InternalException("Illegal resource cluster. index=" + ri + " cluster=" + clusterUID);\r
233         ClusterI cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
234         int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), (int) ri);\r
235         if (0 == key)\r
236             throw new InternalException("Illegal resource index=" + ri + " cluster=" + clusterUID);\r
237         return key;\r
238     }\r
239     private void updateFirstPass(ClusterChangeSetI ccs, boolean undo)\r
240     throws DatabaseException {\r
241         ClusterTable clusterTable = session.getClusterTable();\r
242         ClusterUID clusterUID = ccs.getClusterUID();\r
243         ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
244         if (null == cluster)\r
245             throw new DatabaseException("Missing cluster id=" + clusterUID);\r
246         if (!cluster.isLoaded())\r
247             if (!undo)\r
248                 return;\r
249             else {\r
250                 if (DEBUG)\r
251                     System.out.println("Cluster is not loaded. cluster=" + clusterUID);\r
252                 cluster.load(session.clusterTranslator, new Runnable() {\r
253                     @Override public void run() {\r
254                         if (DEBUG)\r
255                             System.out.println("Cluster loaded for undo.");\r
256                     }\r
257                 });\r
258                 cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
259             }\r
260         Operation op = new Operation();\r
261         for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {\r
262             switch (op.type) {\r
263                 default:\r
264                     throw new InternalException("Unknown operation " + op);\r
265                 case CreateResource:\r
266                     if (op.count != 1)\r
267                         throw new InternalException("Illegal argument(s) to create resource.");\r
268                     if (DEBUG)\r
269                         System.out.println("Create " + op.resourceIndex);\r
270                     int s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
271                     if (!cluster.hasResource(s, session.clusterTranslator)) {\r
272                         int ss = cluster.createResource(session.clusterTranslator);\r
273                         short ri = ClusterTraitsBase.getResourceIndexFromResourceKey(ss);\r
274                         if (ri != op.resourceIndex)\r
275                             throw new InternalException("Created resource key=" + ss +\r
276                                 " does not match expected resource index=" + op.resourceIndex);\r
277                     }\r
278                     break;\r
279                 case AddRelation:\r
280                     if (op.count != 5)\r
281                         throw new InternalException("Illegal argument(s) to add relation");\r
282                     s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
283                     int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
284                     int o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
285                     if (DEBUG)\r
286                         System.out.println("Add " + s + "-" + p + "-" + o);\r
287                     ClusterI c = cluster.addRelation(s, p, o, session.clusterTranslator);\r
288                     if (null != c && c != cluster)\r
289                         cluster = c;\r
290                     break;\r
291                 case RemoveRelation:\r
292                     if (op.count != 5)\r
293                         throw new InternalException("Illegal argument(s) to remove relation");\r
294                     s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
295                     p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
296                     o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
297                     if (DEBUG)\r
298                         System.out.println("Remove " + s + "-" + p + "-" + o);\r
299                     cluster.removeRelation(s, p, o, session.clusterTranslator);\r
300                     break;\r
301                 case SetValue:\r
302                     if (op.count != 4)\r
303                         throw new InternalException("Illegal argument(s) to set value");\r
304                     s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
305                     if (DEBUG)\r
306                         System.out.println("Set value " + s + " l=" + op.valueSize);\r
307                     // TODO: remove this unnecessary copy.\r
308                     byte[] value = new byte[op.valueSize];\r
309                     System.arraycopy(op.valueData, op.valueStart, value, 0, op.valueSize);\r
310                     cluster = cluster.setValue(s, value, value.length, session.clusterTranslator);\r
311                     break;\r
312                 case DeleteValue:\r
313                     if (op.count != 1)\r
314                         throw new InternalException("Illegal argument(s) to set value");\r
315                     s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
316                     if (DEBUG)\r
317                         System.out.println("Delete " + s);\r
318                     cluster.removeValue(s, session.clusterTranslator);\r
319                     break;\r
320                 case ModifyValue:\r
321                     if (op.count != 5) // index, offset, size, value, valueStart\r
322                         throw new InternalException("Illegal argument(s) to modify resource file");\r
323                     s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
324                     if (DEBUG)\r
325                         System.out.println("ModifyValue " + s + " off=" + op.valueOffset +\r
326                                 " siz=" + op.valueSize + " start=" + op.valueStart);\r
327                     cluster.setValueEx(s);\r
328                     if (undo)\r
329                         session.clusterTranslator.undoValueEx(cluster, op.resourceIndex);\r
330                     break;\r
331             }\r
332         }\r
333     }\r
334     private void updateSecondPass(ClusterChangeSetI ccs, ClientChangesImpl changes)\r
335     throws DatabaseException {\r
336         if (DEBUG)\r
337             System.out.println("DEBUG: second pass cid=" + ccs.getClusterUID());\r
338         ClusterTable clusterTable = session.getClusterTable();\r
339         ClusterUID clusterUID = ccs.getClusterUID();\r
340         ClusterI cluster = clusterTable.getClusterByClusterUID(clusterUID);\r
341         if (null == cluster) {\r
342             if (fetchOnly) // This is needed because resource implementation is based on cluster keys.\r
343                 cluster = clusterTable.getClusterByClusterUIDOrMakeProxy(clusterUID);\r
344             else\r
345                 throw new DatabaseException("Missing cluster id=" + clusterUID);\r
346         }\r
347         if (!fetchOnly && !cluster.isLoaded())\r
348             return;\r
349         Operation op = new Operation();\r
350         for (ccs.getNextOperation(op); OperationEnum.EndOf != op.type; ccs.getNextOperation(op)) {\r
351             switch (op.type) {\r
352                 default:\r
353                     throw new InternalException("Unknown operation " + op);\r
354                 case CreateResource:\r
355                     if (op.count != 1)\r
356                         throw new InternalException("Illegal argument(s) to create resource.");\r
357                     break;\r
358                 case AddRelation:\r
359                     if (op.count != 5)\r
360                         throw new InternalException("Illegal argument(s) to add relation.");\r
361                     int s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
362                     int p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
363                     int o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
364                     if (DEBUG)\r
365                         System.out.println("Add " + s + "-" + p + "-" + o);\r
366                     if (!fetchOnly)\r
367                         session.getQueryProvider2().updateStatements(s, p);\r
368                     changes.claim(s, p, o);\r
369                     break;\r
370                 case RemoveRelation:\r
371                     if (op.count != 5)\r
372                         throw new InternalException("Illegal argument(s) to remove relation.");\r
373                     s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
374                     p = getKey(op.predicateIndex, op.predicateCluster, clusterTable);\r
375                     o = getKey(op.objectIndex, op.objectCluster, clusterTable);\r
376                     if (DEBUG)\r
377                         System.out.println("Remove " + s + "-" + p + "-" + o);\r
378                     if (!fetchOnly)\r
379                         session.getQueryProvider2().updateStatements(s, p);\r
380                     changes.deny(s, p, o);\r
381                     break;\r
382                 case SetValue:\r
383                     if (op.count != 4)\r
384                         throw new InternalException("Illegal argument(s) to set value.");\r
385                     s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
386                     if (DEBUG)\r
387                         System.out.println("Set value " + s + " l=" + op.valueSize);\r
388                     if (!fetchOnly)\r
389                         session.getQueryProvider2().updateValue(s);\r
390                     changes.claimValue(s);\r
391                     break;\r
392                 case DeleteValue:\r
393                     if (op.count != 1)\r
394                         throw new InternalException("Illegal argument(s) to remove value.");\r
395                     s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
396                     if (DEBUG)\r
397                         System.out.println("Delete value " + s);\r
398                     if (!fetchOnly)\r
399                         session.getQueryProvider2().updateValue(s);\r
400                     changes.claimValue(s);\r
401                     break;\r
402                 case ModifyValue:\r
403                     if (op.count != 5)\r
404                         throw new InternalException("Illegal argument(s) to modify resource file");\r
405                     s = getKey(op.resourceIndex, clusterUID, clusterTable);\r
406                     if (DEBUG)\r
407                         System.out.println("ModifyValue " + s + " off=" + op.valueOffset +\r
408                                 " siz=" + op.valueSize + " start=" + op.valueStart);\r
409                     if (!fetchOnly)\r
410                         session.getQueryProvider2().updateValue(s);\r
411                     else\r
412                         changes.claimValue(s);\r
413                     break;\r
414             }\r
415         }\r
416     }\r
417 }