]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/TransactionToken.java
Fixed ClusterTable cluster hash/importance map book-keeping problem
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / TransactionToken.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.IOException;
15 import java.util.List;
16 import java.util.TreeMap;
17 import java.util.concurrent.atomic.AtomicLong;
18
19 import org.simantics.databoard.Bindings;
20 import org.simantics.databoard.binding.impl.TreeMapBinding;
21 import org.simantics.databoard.serialization.SerializationException;
22 import org.simantics.databoard.serialization.Serializer;
23 import org.simantics.db.Operation;
24 import org.simantics.db.Session;
25 import org.simantics.db.common.CommentMetadata;
26 import org.simantics.db.common.CommitMetadata;
27 import org.simantics.db.common.MetadataUtils;
28 import org.simantics.db.common.utils.Logger;
29 import org.simantics.db.exception.DatabaseException;
30 import org.simantics.db.exception.ValidationException;
31 import org.simantics.db.impl.graph.WriteSupport;
32 import org.simantics.db.procore.protocol.Constants;
33 import org.simantics.db.request.WriteTraits;
34 import org.simantics.db.service.ClusterUID;
35 import org.simantics.db.service.ExternalOperation;
36 import org.simantics.db.service.TransactionPolicySupport;
37
38 public class TransactionToken implements GraphSession.Listener {
39     private boolean DEBUG = false;
40     private Session session;
41     private GraphSession graphSession;
42     private AtomicLong id;
43     // This is the last change set that we have send commit for.
44     // This does not mean that those commits have been accepted in server.
45     private long lastChangeSetId;
46     private OperationImpl lastOperation = null;
47     enum Type { Null, Read, Write }
48     private Type type;
49     // Count of pending transactions.
50     private int pending = 0;
51     TransactionToken(TransactionPolicySupport tps, Session session, GraphSession graphSession, TransactionToken old) {
52         this.session = session;
53         this.graphSession = graphSession;
54         this.type = Type.Null;
55         this.id = new AtomicLong();
56         this.id.set(Constants.NullTransactionId);
57         long firstChangeSetId = graphSession.getFirstChangeSetId();
58         if (null != old && old.lastChangeSetId > firstChangeSetId)
59             this.lastChangeSetId = old.lastChangeSetId;
60         else
61             this.lastChangeSetId = firstChangeSetId;
62         graphSession.setListener(this);
63     }
64     void startReadTransaction(int thread)
65     throws DatabaseException {
66         if (DEBUG)
67             System.out.println("DEBUG: Start read transaction " + graphSession);
68         if (Type.Null == type) {
69             id.set(graphSession.askReadTransaction(thread));
70             if (DEBUG)
71                 if (++pending != 1)
72                     System.out.println("kraa: read beg pending=" + pending);
73             type = Type.Read;
74         }
75     }
76     void stopReadTransaction()
77     throws DatabaseException {
78         if (DEBUG)
79             System.out.println("DEBUG: Stop read transaction " + graphSession);
80         if (Type.Null == type)
81             return;
82         endTransaction(false);
83         if (DEBUG)
84             if (--pending != 0)
85                 System.out.println("kraa: read end pending=" + pending);
86     }
87     void startWriteTransaction(int thread)
88     throws DatabaseException {
89         if (DEBUG)
90             System.out.println("DEBUG: Start write transaction " + graphSession);
91         if (Type.Null == type) {
92             id.set(graphSession.askWriteTransaction(thread, Constants.NullTransactionId));
93             if (DEBUG)
94                 if (++pending != 1)
95                     System.out.println("kraa: write beg pending=" + pending);
96         } else if (Type.Read == type) {
97             id.set(graphSession.askWriteTransaction(thread, id.get()));
98             if (DEBUG)
99                 if (++pending != 1)
100                     System.out.println("kraa: write beg pending=" + pending);
101         }
102         type = Type.Write;
103     }
104     void cancelBegin(WriteSupport writeSupport, SynchronizeContextI context, ClusterStream clusterStream)
105     throws DatabaseException {
106         if (DEBUG)
107             System.out.println("DEBUG: CancelBegin " + graphSession);
108         if (Type.Null == type)
109             return;
110         else if (Type.Read == type)
111             throw new ValidationException("Illegal token type.");
112         TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
113         boolean empty = clusterStream.reallyFlush();
114         if (empty) {
115             boolean noMetadata = !hasMetadata(metadata);
116             if (noMetadata)
117                 return;
118         }
119         byte[] data = makeContext(null, metadata);
120         graphSession.cancelCommit(id.get(), lastChangeSetId, data, context);
121         ++lastChangeSetId;
122         if (DEBUG)
123             System.out.println("DEBUG: CancelBegin cancelled cs=" + lastChangeSetId);
124         clusterStream.accept();
125         writeSupport.commitDone(null, lastChangeSetId);
126     }
127     void cancelEnd(WriteSupport writeSupport, WriteTraits writeTraits, ClusterStream clusterStream)
128     throws DatabaseException {
129         if (clusterStream.reallyFlush())
130             return;
131         TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
132         byte[] data = makeContext(null, metadata);
133         graphSession.acceptCommit(id.get(), lastChangeSetId, data);
134         ++lastChangeSetId;
135         if (DEBUG)
136             System.out.println("DEBUG CancelEnd accepted commit cs=" + lastChangeSetId);
137         clusterStream.accept();
138         writeSupport.commitDone(writeTraits, lastChangeSetId);
139     }
140     void setCombine(boolean a) {
141         combine = a;
142     }
143     private boolean hasMetadata(TreeMap<String, byte[]> metadata) {
144         
145         if(metadata == null)
146                 return false;
147         if(metadata.size() == 0)
148                 return false;
149         if(metadata.size() == 1 && MetadataUtils.hasMetadata(metadata, CommentMetadata.class))
150                 return false;
151         
152         return true;
153         
154     }
155     private boolean combine = false;
156     void commitWriteTransaction(WriteSupport writeSupport, WriteTraits writeTraits, ClusterStream clusterStream, Operation dummy)
157     throws DatabaseException {
158         if (DEBUG)
159             System.out.println("DEBUG: Commit write transaction " + graphSession);
160         if (Type.Null == type)
161             return;
162         else if (Type.Read == type)
163             throw new ValidationException("Illegal transaction type.");
164         else if (id.get() == Constants.NullTransactionId)
165             throw new ValidationException("Illegal transaction id.");
166         TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
167         boolean empty = clusterStream.reallyFlush();
168         if (empty) {
169             boolean noMetadata = !hasMetadata(metadata);
170             if (noMetadata) {
171                 List<ExternalOperation> ext = graphSession.undoContext.getPendingExternals();
172                 if(!ext.isEmpty()) {
173                                 CommentMetadata cm = writeSupport.getMetadata(CommentMetadata.class);
174                                 writeSupport.addMetadata(cm.add("Automatically generated comment for empty commit with external undo operation(s)."));
175                                 metadata = writeSupport.getMetadata();
176                                 Logger.defaultLogError("A generated comment was added into a commit with only some external undo operation(s).");
177                 } else {
178                         graphSession.undoContext.cancelCommit();
179                         return;
180                 }
181             }
182         }
183
184                 CommentMetadata cm = writeSupport.getMetadata(CommentMetadata.class);
185                 if(cm.size() == 0)
186                         writeSupport.addMetadata(cm.add(writeTraits.toString()));
187
188                 metadata = writeSupport.getMetadata();
189
190         Operation op = combine ? lastOperation : null;
191         combine = true;
192         byte[] data = makeContext(op, metadata);
193 //        new Exception("committing cs + " + lastChangeSetId + " with metadata " + data).printStackTrace();
194         graphSession.acceptCommit(id.get(), lastChangeSetId, data);
195         ++lastChangeSetId;
196         if (DEBUG)
197             System.out.println("DEBUG Accpeted commit cs=" + lastChangeSetId);
198         clusterStream.accept();
199         writeSupport.commitDone(writeTraits, lastChangeSetId);
200         long opid = (null == op) ? lastChangeSetId : op.getId();
201         lastOperation = new OperationImpl(opid, lastChangeSetId, graphSession.undoContext.getPendingExternals());
202         graphSession.undoContext.commitOk(lastOperation);
203         if (DEBUG)
204             System.out.println("DEBUG: Accepted operation id=" + lastOperation.getId() + " cs=" + lastOperation.getCSId() + ".");
205     }
206     void stopWriteTransaction()
207     throws DatabaseException {
208         if (DEBUG)
209             System.out.println("DEBUG: Stop write transaction begin " + graphSession);
210         if (Type.Null == type)
211             return;
212         else if (Type.Read == type)
213             throw new ValidationException("Illegal transaction type.");
214         // Note that even if we do cancel the cluster sets are not restored.
215         graphSession.undoContext.cancelCommit();
216         endTransaction(true);
217         if (DEBUG)
218             if (--pending != 0)
219                 System.out.println("kraa: write end pending=" + pending);
220         if (DEBUG)
221             System.out.println("DEBUG: Stop write transaction end " + graphSession);
222     }
223     void closing()
224     throws DatabaseException {
225         if (Type.Null == type || id.get() == Constants.NullTransactionId)
226             return;
227         endTransaction(true);
228         if (DEBUG)
229             if (--pending != 0)
230                 System.out.println("kraa: closing pending=" + pending);
231     }
232     void close() {
233         if (id.get() != Constants.NullTransactionId)
234             try {
235                 endTransaction(true);
236                 if (DEBUG)
237                     if (--pending != 0)
238                         System.out.println("kraa: closing pending=" + pending);
239             } catch (DatabaseException e) {
240                 if (DEBUG) {
241                     System.out.println("Close did not finish cleanly.");
242                     e.printStackTrace();
243                 }
244             }
245     }
246     public Operation getLastOperation() {
247         return lastOperation;
248     }
249     public long getHeadRevisionId() {
250         return lastChangeSetId;
251     }
252
253     static Serializer METADATA_SERIALIZER =
254         Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING,
255             Bindings.BYTE_ARRAY) );
256
257     private byte[] makeContext(Operation op, TreeMap<String, byte[]> properties) {
258         if (properties == null)
259             properties = new TreeMap<String, byte[]>();
260         long csid = (null != op) ? op.getId() : 0;
261         properties.put(CommitMetadata.class.getName(), new CommitMetadata(csid).serialise(session));
262         properties.put("opid", Long.toString(csid).getBytes());
263
264         try {
265             return METADATA_SERIALIZER.serialize(properties);
266         } catch (SerializationException e) {
267             e.printStackTrace();
268             return new byte[0];
269         } catch (IOException e) {
270             e.printStackTrace();
271             return new byte[0];
272         }
273     }
274     private void endTransaction(boolean write)
275     throws DatabaseException {
276         if (DEBUG)
277             System.out.println("DEBUG: TransactionToken.end begin " + graphSession);
278         try {
279             graphSession.endTransaction(id.get(), write);
280         } finally {
281             type = Type.Null;
282             id.set(Constants.NullTransactionId);
283             synchronized (this) {
284                 this.notify();
285             }
286         }
287         if (DEBUG)
288             System.out.println("DEBUG: TransactionToken.end end " + graphSession);
289     }
290     private void updateLastChangeSetId(long csid) {
291         if (DEBUG)
292             System.out.println("DEBUG: TransactionToken: Update last cs=" + lastChangeSetId + " new=" + csid + " "
293                     + graphSession + " " + this);
294         if (lastChangeSetId > csid)
295             return; // throw new
296 // InternalError("Change set id has been corrupted. Last cs=" + lastChangeSetId
297 // + " new=" + csid);
298         lastChangeSetId = csid;
299     }
300     public static void testEmpty() {
301         final int REPEAT_COUNT = 20;
302         final int REQUEST_COUNT = 500; // 1000;
303         double totalTime = 0;
304         for (int j = 0; j < REPEAT_COUNT; ++j) {
305             long start = System.nanoTime();
306             for (int i = 0; i < REQUEST_COUNT; ++i) {
307             }
308             long end = System.nanoTime();
309             double time = (end - start) * (1e-9);
310             totalTime += time;
311         }
312         double speed = REPEAT_COUNT * REQUEST_COUNT / totalTime;
313         String t = "Speed was " + speed + " ops per second.";
314         System.out.println(t);
315     }
316     @Override
317     public void onChangeSetId(int thread, long csid, boolean refresh) {
318         long ocsid = lastChangeSetId;
319         updateLastChangeSetId(csid);
320         if (refresh && csid > ocsid && id.get() == Constants.NullTransactionId)
321             refresh(thread, ocsid);
322     }
323     private void refresh(int thread, long csid) {
324         try {
325             if (session instanceof SessionImplSocket) {
326                 ClusterUID[] clusterUID = graphSession.getRefresh2(csid);
327                 ((SessionImplSocket)session).refresh(thread, clusterUID, csid);
328             }
329         } catch (DatabaseException e) {
330             Logger.defaultLogError(e);
331         }
332     }
333 }