]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/TransactionToken.java
isImmutable can NPE
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / TransactionToken.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.IOException;
15 import java.util.List;
16 import java.util.TreeMap;
17 import java.util.concurrent.atomic.AtomicLong;
18
19 import org.simantics.databoard.Bindings;
20 import org.simantics.databoard.binding.impl.TreeMapBinding;
21 import org.simantics.databoard.serialization.SerializationException;
22 import org.simantics.databoard.serialization.Serializer;
23 import org.simantics.db.Operation;
24 import org.simantics.db.Session;
25 import org.simantics.db.common.CommentMetadata;
26 import org.simantics.db.common.CommitMetadata;
27 import org.simantics.db.common.MetadataUtils;
28 import org.simantics.db.common.utils.Logger;
29 import org.simantics.db.exception.DatabaseException;
30 import org.simantics.db.exception.ValidationException;
31 import org.simantics.db.impl.graph.WriteSupport;
32 import org.simantics.db.procore.protocol.Constants;
33 import org.simantics.db.request.WriteTraits;
34 import org.simantics.db.service.ClusterSetsSupport;
35 import org.simantics.db.service.ClusterUID;
36 import org.simantics.db.service.ExternalOperation;
37 import org.simantics.db.service.TransactionPolicySupport;
38
39 public class TransactionToken implements GraphSession.Listener {
40     private boolean DEBUG = false;
41     private Session session;
42     private GraphSession graphSession;
43     private AtomicLong id;
44     // This is the last change set that we have send commit for.
45     // This does not mean that those commits have been accepted in server.
46     private long lastChangeSetId;
47     private OperationImpl lastOperation = null;
48     enum Type { Null, Read, Write }
49     private Type type;
50     // Count of pending transactions.
51     private int pending = 0;
52     TransactionToken(TransactionPolicySupport tps, Session session, GraphSession graphSession, TransactionToken old) {
53         this.session = session;
54         this.graphSession = graphSession;
55         this.type = Type.Null;
56         this.id = new AtomicLong();
57         this.id.set(Constants.NullTransactionId);
58         long firstChangeSetId = graphSession.getFirstChangeSetId();
59         if (null != old && old.lastChangeSetId > firstChangeSetId)
60             this.lastChangeSetId = old.lastChangeSetId;
61         else
62             this.lastChangeSetId = firstChangeSetId;
63         graphSession.setListener(this);
64     }
65     void startReadTransaction(int thread)
66     throws DatabaseException {
67         if (DEBUG)
68             System.out.println("DEBUG: Start read transaction " + graphSession);
69         if (Type.Null == type) {
70             id.set(graphSession.askReadTransaction(thread));
71             if (DEBUG)
72                 if (++pending != 1)
73                     System.out.println("kraa: read beg pending=" + pending);
74             type = Type.Read;
75         }
76     }
77     void stopReadTransaction()
78     throws DatabaseException {
79         if (DEBUG)
80             System.out.println("DEBUG: Stop read transaction " + graphSession);
81         if (Type.Null == type)
82             return;
83         endTransaction(false);
84         if (DEBUG)
85             if (--pending != 0)
86                 System.out.println("kraa: read end pending=" + pending);
87     }
88     void startWriteTransaction(int thread)
89     throws DatabaseException {
90         if (DEBUG)
91             System.out.println("DEBUG: Start write transaction " + graphSession);
92         if (Type.Null == type) {
93             id.set(graphSession.askWriteTransaction(thread, Constants.NullTransactionId));
94             if (DEBUG)
95                 if (++pending != 1)
96                     System.out.println("kraa: write beg pending=" + pending);
97         } else if (Type.Read == type) {
98             id.set(graphSession.askWriteTransaction(thread, id.get()));
99             if (DEBUG)
100                 if (++pending != 1)
101                     System.out.println("kraa: write beg pending=" + pending);
102         }
103         type = Type.Write;
104     }
105     void cancelBegin(WriteSupport writeSupport, SynchronizeContextI context, ClusterStream clusterStream)
106     throws DatabaseException {
107         if (DEBUG)
108             System.out.println("DEBUG: CancelBegin " + graphSession);
109         if (Type.Null == type)
110             return;
111         else if (Type.Read == type)
112             throw new ValidationException("Illegal token type.");
113         TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
114         boolean empty = clusterStream.reallyFlush();
115         if (empty) {
116             boolean noMetadata = !hasMetadata(metadata);
117             if (noMetadata)
118                 return;
119         }
120         byte[] data = makeContext(null, metadata);
121         graphSession.cancelCommit(id.get(), lastChangeSetId, data, context);
122         ++lastChangeSetId;
123         if (DEBUG)
124             System.out.println("DEBUG: CancelBegin cancelled cs=" + lastChangeSetId);
125         clusterStream.accept();
126         writeSupport.commitDone(null, lastChangeSetId);
127     }
128     void cancelEnd(WriteSupport writeSupport, WriteTraits writeTraits, ClusterStream clusterStream)
129     throws DatabaseException {
130         if (clusterStream.reallyFlush())
131             return;
132         TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
133         byte[] data = makeContext(null, metadata);
134         graphSession.acceptCommit(id.get(), lastChangeSetId, data);
135         ++lastChangeSetId;
136         if (DEBUG)
137             System.out.println("DEBUG CancelEnd accepted commit cs=" + lastChangeSetId);
138         clusterStream.accept();
139         writeSupport.commitDone(writeTraits, lastChangeSetId);
140     }
141     void setCombine(boolean a) {
142         combine = a;
143     }
144     private boolean hasMetadata(TreeMap<String, byte[]> metadata) {
145         
146         if(metadata == null)
147                 return false;
148         if(metadata.size() == 0)
149                 return false;
150         if(metadata.size() == 1 && MetadataUtils.hasMetadata(metadata, CommentMetadata.class))
151                 return false;
152         
153         return true;
154         
155     }
156     private boolean combine = false;
157     void commitWriteTransaction(WriteSupport writeSupport, WriteTraits writeTraits, ClusterStream clusterStream, Operation dummy)
158     throws DatabaseException {
159         if (DEBUG)
160             System.out.println("DEBUG: Commit write transaction " + graphSession);
161         if (Type.Null == type)
162             return;
163         else if (Type.Read == type)
164             throw new ValidationException("Illegal transaction type.");
165         else if (id.get() == Constants.NullTransactionId)
166             throw new ValidationException("Illegal transaction id.");
167         TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
168         boolean empty = clusterStream.reallyFlush();
169         if (empty) {
170             boolean noMetadata = !hasMetadata(metadata);
171             if (noMetadata) {
172                 List<ExternalOperation> ext = graphSession.undoContext.getPendingExternals();
173                 if(!ext.isEmpty()) {
174                                 CommentMetadata cm = writeSupport.getMetadata(CommentMetadata.class);
175                                 writeSupport.addMetadata(cm.add("Automatically generated comment for empty commit with external undo operation(s)."));
176                                 metadata = writeSupport.getMetadata();
177                                 Logger.defaultLogError("A generated comment was added into a commit with only some external undo operation(s).");
178                 } else {
179                         graphSession.undoContext.cancelCommit();
180                         return;
181                 }
182             }
183         }
184
185                 CommentMetadata cm = writeSupport.getMetadata(CommentMetadata.class);
186                 if(cm.size() == 0)
187                         writeSupport.addMetadata(cm.add(writeTraits.toString()));
188
189                 metadata = writeSupport.getMetadata();
190
191         Operation op = combine ? lastOperation : null;
192         combine = true;
193         byte[] data = makeContext(op, metadata);
194 //        new Exception("committing cs + " + lastChangeSetId + " with metadata " + data).printStackTrace();
195         graphSession.acceptCommit(id.get(), lastChangeSetId, data);
196         ++lastChangeSetId;
197         if (DEBUG)
198             System.out.println("DEBUG Accpeted commit cs=" + lastChangeSetId);
199         clusterStream.accept();
200         writeSupport.commitDone(writeTraits, lastChangeSetId);
201         long opid = (null == op) ? lastChangeSetId : op.getId();
202         lastOperation = new OperationImpl(opid, lastChangeSetId, graphSession.undoContext.getPendingExternals());
203         graphSession.undoContext.commitOk(lastOperation);
204         if (DEBUG)
205             System.out.println("DEBUG: Accepted operation id=" + lastOperation.getId() + " cs=" + lastOperation.getCSId() + ".");
206     }
207     void stopWriteTransaction()
208     throws DatabaseException {
209         if (DEBUG)
210             System.out.println("DEBUG: Stop write transaction begin " + graphSession);
211         if (Type.Null == type)
212             return;
213         else if (Type.Read == type)
214             throw new ValidationException("Illegal transaction type.");
215         // Note that even if we do cancel the cluster sets are not restored.
216         graphSession.undoContext.cancelCommit();
217         endTransaction(true);
218         if (DEBUG)
219             if (--pending != 0)
220                 System.out.println("kraa: write end pending=" + pending);
221         if (DEBUG)
222             System.out.println("DEBUG: Stop write transaction end " + graphSession);
223     }
224     void closing()
225     throws DatabaseException {
226         if (Type.Null == type || id.get() == Constants.NullTransactionId)
227             return;
228         endTransaction(true);
229         if (DEBUG)
230             if (--pending != 0)
231                 System.out.println("kraa: closing pending=" + pending);
232     }
233     void close() {
234         if (id.get() != Constants.NullTransactionId)
235             try {
236                 endTransaction(true);
237                 if (DEBUG)
238                     if (--pending != 0)
239                         System.out.println("kraa: closing pending=" + pending);
240             } catch (DatabaseException e) {
241                 if (DEBUG) {
242                     System.out.println("Close did not finish cleanly.");
243                     e.printStackTrace();
244                 }
245             }
246     }
247     public Operation getLastOperation() {
248         return lastOperation;
249     }
250     public long getHeadRevisionId() {
251         return lastChangeSetId;
252     }
253
254     static Serializer METADATA_SERIALIZER =
255         Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING,
256             Bindings.BYTE_ARRAY) );
257
258     private byte[] makeContext(Operation op, TreeMap<String, byte[]> properties) {
259         if (properties == null)
260             properties = new TreeMap<String, byte[]>();
261         long csid = (null != op) ? op.getId() : 0;
262         properties.put(CommitMetadata.class.getName(), new CommitMetadata(csid).serialise(session));
263         properties.put("opid", Long.toString(csid).getBytes());
264
265         try {
266             return METADATA_SERIALIZER.serialize(properties);
267         } catch (SerializationException e) {
268             e.printStackTrace();
269             return new byte[0];
270         } catch (IOException e) {
271             e.printStackTrace();
272             return new byte[0];
273         }
274     }
275     private void endTransaction(boolean write)
276     throws DatabaseException {
277         if (DEBUG)
278             System.out.println("DEBUG: TransactionToken.end begin " + graphSession);
279         try {
280             graphSession.endTransaction(id.get(), write);
281         } finally {
282             type = Type.Null;
283             id.set(Constants.NullTransactionId);
284             synchronized (this) {
285                 this.notify();
286             }
287         }
288         if (DEBUG)
289             System.out.println("DEBUG: TransactionToken.end end " + graphSession);
290     }
291     private void updateLastChangeSetId(long csid) {
292         if (DEBUG)
293             System.out.println("DEBUG: TransactionToken: Update last cs=" + lastChangeSetId + " new=" + csid + " "
294                     + graphSession + " " + this);
295         if (lastChangeSetId > csid)
296             return; // throw new
297 // InternalError("Change set id has been corrupted. Last cs=" + lastChangeSetId
298 // + " new=" + csid);
299         lastChangeSetId = csid;
300     }
301     public static void testEmpty() {
302         final int REPEAT_COUNT = 20;
303         final int REQUEST_COUNT = 500; // 1000;
304         double totalTime = 0;
305         for (int j = 0; j < REPEAT_COUNT; ++j) {
306             long start = System.nanoTime();
307             for (int i = 0; i < REQUEST_COUNT; ++i) {
308             }
309             long end = System.nanoTime();
310             double time = (end - start) * (1e-9);
311             totalTime += time;
312         }
313         double speed = REPEAT_COUNT * REQUEST_COUNT / totalTime;
314         String t = "Speed was " + speed + " ops per second.";
315         System.out.println(t);
316     }
317     @Override
318     public void onChangeSetId(int thread, long csid, boolean refresh) {
319         long ocsid = lastChangeSetId;
320         updateLastChangeSetId(csid);
321         if (refresh && csid > ocsid && id.get() == Constants.NullTransactionId)
322             refresh(thread, ocsid);
323     }
324     private void refresh(int thread, long csid) {
325         try {
326             if (session instanceof SessionImplSocket) {
327                 ClusterUID[] clusterUID = graphSession.getRefresh2(csid);
328                 ((SessionImplSocket)session).refresh(thread, clusterUID, csid);
329             }
330         } catch (DatabaseException e) {
331             Logger.defaultLogError(e);
332         }
333     }
334 }