1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package fi.vtt.simantics.procore.internal;
\r
14 import java.io.IOException;
\r
15 import java.util.List;
\r
16 import java.util.TreeMap;
\r
17 import java.util.concurrent.atomic.AtomicLong;
\r
19 import org.simantics.databoard.Bindings;
\r
20 import org.simantics.databoard.binding.impl.TreeMapBinding;
\r
21 import org.simantics.databoard.serialization.SerializationException;
\r
22 import org.simantics.databoard.serialization.Serializer;
\r
23 import org.simantics.db.Operation;
\r
24 import org.simantics.db.Session;
\r
25 import org.simantics.db.common.CommentMetadata;
\r
26 import org.simantics.db.common.CommitMetadata;
\r
27 import org.simantics.db.common.MetadataUtils;
\r
28 import org.simantics.db.common.utils.Logger;
\r
29 import org.simantics.db.exception.DatabaseException;
\r
30 import org.simantics.db.exception.ValidationException;
\r
31 import org.simantics.db.impl.graph.WriteSupport;
\r
32 import org.simantics.db.procore.protocol.Constants;
\r
33 import org.simantics.db.request.WriteTraits;
\r
34 import org.simantics.db.service.ClusterSetsSupport;
\r
35 import org.simantics.db.service.ClusterUID;
\r
36 import org.simantics.db.service.ExternalOperation;
\r
37 import org.simantics.db.service.TransactionPolicySupport;
\r
39 public class TransactionToken implements GraphSession.Listener {
\r
40 private boolean DEBUG = false;
\r
41 private Session session;
\r
42 private GraphSession graphSession;
\r
43 private AtomicLong id;
\r
44 // This is the last change set that we have send commit for.
\r
45 // This does not mean that those commits have been accepted in server.
\r
46 private long lastChangeSetId;
\r
47 private OperationImpl lastOperation = null;
\r
48 enum Type { Null, Read, Write }
\r
50 // Count of pending transactions.
\r
51 private int pending = 0;
\r
52 TransactionToken(TransactionPolicySupport tps, Session session, GraphSession graphSession, TransactionToken old) {
\r
53 this.session = session;
\r
54 this.graphSession = graphSession;
\r
55 this.type = Type.Null;
\r
56 this.id = new AtomicLong();
\r
57 this.id.set(Constants.NullTransactionId);
\r
58 long firstChangeSetId = graphSession.getFirstChangeSetId();
\r
59 if (null != old && old.lastChangeSetId > firstChangeSetId)
\r
60 this.lastChangeSetId = old.lastChangeSetId;
\r
62 this.lastChangeSetId = firstChangeSetId;
\r
63 graphSession.setListener(this);
\r
65 void startReadTransaction(int thread)
\r
66 throws DatabaseException {
\r
68 System.out.println("DEBUG: Start read transaction " + graphSession);
\r
69 if (Type.Null == type) {
\r
70 id.set(graphSession.askReadTransaction(thread));
\r
73 System.out.println("kraa: read beg pending=" + pending);
\r
77 void stopReadTransaction()
\r
78 throws DatabaseException {
\r
80 System.out.println("DEBUG: Stop read transaction " + graphSession);
\r
81 if (Type.Null == type)
\r
83 endTransaction(false);
\r
86 System.out.println("kraa: read end pending=" + pending);
\r
88 void startWriteTransaction(int thread)
\r
89 throws DatabaseException {
\r
91 System.out.println("DEBUG: Start write transaction " + graphSession);
\r
92 if (Type.Null == type) {
\r
93 id.set(graphSession.askWriteTransaction(thread, Constants.NullTransactionId));
\r
96 System.out.println("kraa: write beg pending=" + pending);
\r
97 } else if (Type.Read == type) {
\r
98 id.set(graphSession.askWriteTransaction(thread, id.get()));
\r
100 if (++pending != 1)
\r
101 System.out.println("kraa: write beg pending=" + pending);
\r
105 void cancelBegin(WriteSupport writeSupport, SynchronizeContextI context, ClusterStream clusterStream)
\r
106 throws DatabaseException {
\r
108 System.out.println("DEBUG: CancelBegin " + graphSession);
\r
109 if (Type.Null == type)
\r
111 else if (Type.Read == type)
\r
112 throw new ValidationException("Illegal token type.");
\r
113 TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
\r
114 boolean empty = clusterStream.reallyFlush();
\r
116 boolean noMetadata = !hasMetadata(metadata);
\r
120 byte[] data = makeContext(null, metadata);
\r
121 graphSession.cancelCommit(id.get(), lastChangeSetId, data, context);
\r
124 System.out.println("DEBUG: CancelBegin cancelled cs=" + lastChangeSetId);
\r
125 clusterStream.accept();
\r
126 writeSupport.commitDone(null, lastChangeSetId);
\r
128 void cancelEnd(WriteSupport writeSupport, WriteTraits writeTraits, ClusterStream clusterStream)
\r
129 throws DatabaseException {
\r
130 if (clusterStream.reallyFlush())
\r
132 TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
\r
133 byte[] data = makeContext(null, metadata);
\r
134 graphSession.acceptCommit(id.get(), lastChangeSetId, data);
\r
137 System.out.println("DEBUG CancelEnd accepted commit cs=" + lastChangeSetId);
\r
138 clusterStream.accept();
\r
139 writeSupport.commitDone(writeTraits, lastChangeSetId);
\r
141 void setCombine(boolean a) {
\r
144 private boolean hasMetadata(TreeMap<String, byte[]> metadata) {
\r
146 if(metadata == null)
\r
148 if(metadata.size() == 0)
\r
150 if(metadata.size() == 1 && MetadataUtils.hasMetadata(metadata, CommentMetadata.class))
\r
156 private boolean combine = false;
\r
157 void commitWriteTransaction(WriteSupport writeSupport, WriteTraits writeTraits, ClusterStream clusterStream, Operation dummy)
\r
158 throws DatabaseException {
\r
160 System.out.println("DEBUG: Commit write transaction " + graphSession);
\r
161 if (Type.Null == type)
\r
163 else if (Type.Read == type)
\r
164 throw new ValidationException("Illegal transaction type.");
\r
165 else if (id.get() == Constants.NullTransactionId)
\r
166 throw new ValidationException("Illegal transaction id.");
\r
167 TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
\r
168 boolean empty = clusterStream.reallyFlush();
\r
170 boolean noMetadata = !hasMetadata(metadata);
\r
172 List<ExternalOperation> ext = graphSession.undoContext.getPendingExternals();
\r
173 if(!ext.isEmpty()) {
\r
174 CommentMetadata cm = writeSupport.getMetadata(CommentMetadata.class);
\r
175 writeSupport.addMetadata(cm.add("Automatically generated comment for empty commit with external undo operation(s)."));
\r
176 metadata = writeSupport.getMetadata();
\r
177 Logger.defaultLogError("A generated comment was added into a commit with only some external undo operation(s).");
\r
179 graphSession.undoContext.cancelCommit();
\r
185 CommentMetadata cm = writeSupport.getMetadata(CommentMetadata.class);
\r
187 writeSupport.addMetadata(cm.add(writeTraits.toString()));
\r
189 metadata = writeSupport.getMetadata();
\r
191 Operation op = combine ? lastOperation : null;
\r
193 byte[] data = makeContext(op, metadata);
\r
194 // new Exception("committing cs + " + lastChangeSetId + " with metadata " + data).printStackTrace();
\r
195 graphSession.acceptCommit(id.get(), lastChangeSetId, data);
\r
198 System.out.println("DEBUG Accpeted commit cs=" + lastChangeSetId);
\r
199 clusterStream.accept();
\r
200 writeSupport.commitDone(writeTraits, lastChangeSetId);
\r
201 long opid = (null == op) ? lastChangeSetId : op.getId();
\r
202 lastOperation = new OperationImpl(opid, lastChangeSetId, graphSession.undoContext.getPendingExternals());
\r
203 graphSession.undoContext.commitOk(lastOperation);
\r
205 System.out.println("DEBUG: Accepted operation id=" + lastOperation.getId() + " cs=" + lastOperation.getCSId() + ".");
\r
207 void stopWriteTransaction()
\r
208 throws DatabaseException {
\r
210 System.out.println("DEBUG: Stop write transaction begin " + graphSession);
\r
211 if (Type.Null == type)
\r
213 else if (Type.Read == type)
\r
214 throw new ValidationException("Illegal transaction type.");
\r
215 // Note that even if we do cancel the cluster sets are not restored.
\r
217 session.getService(ClusterSetsSupport.class).save();
\r
218 } catch (Exception e) {
\r
219 e.printStackTrace();
\r
220 Logger.defaultLogError("Failed to save cluster sets.", e);
\r
222 graphSession.undoContext.cancelCommit();
\r
223 endTransaction(true);
\r
225 if (--pending != 0)
\r
226 System.out.println("kraa: write end pending=" + pending);
\r
228 System.out.println("DEBUG: Stop write transaction end " + graphSession);
\r
231 throws DatabaseException {
\r
232 if (Type.Null == type || id.get() == Constants.NullTransactionId)
\r
234 endTransaction(true);
\r
236 if (--pending != 0)
\r
237 System.out.println("kraa: closing pending=" + pending);
\r
240 if (id.get() != Constants.NullTransactionId)
\r
242 endTransaction(true);
\r
244 if (--pending != 0)
\r
245 System.out.println("kraa: closing pending=" + pending);
\r
246 } catch (DatabaseException e) {
\r
248 System.out.println("Close did not finish cleanly.");
\r
249 e.printStackTrace();
\r
253 public Operation getLastOperation() {
\r
254 return lastOperation;
\r
256 public long getHeadRevisionId() {
\r
257 return lastChangeSetId;
\r
260 static Serializer METADATA_SERIALIZER =
\r
261 Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING,
\r
262 Bindings.BYTE_ARRAY) );
\r
264 private byte[] makeContext(Operation op, TreeMap<String, byte[]> properties) {
\r
265 if (properties == null)
\r
266 properties = new TreeMap<String, byte[]>();
\r
267 long csid = (null != op) ? op.getId() : 0;
\r
268 properties.put(CommitMetadata.class.getName(), new CommitMetadata(csid).serialise(session));
\r
269 properties.put("opid", Long.toString(csid).getBytes());
\r
272 return METADATA_SERIALIZER.serialize(properties);
\r
273 } catch (SerializationException e) {
\r
274 e.printStackTrace();
\r
275 return new byte[0];
\r
276 } catch (IOException e) {
\r
277 e.printStackTrace();
\r
278 return new byte[0];
\r
281 private void endTransaction(boolean write)
\r
282 throws DatabaseException {
\r
284 System.out.println("DEBUG: TransactionToken.end begin " + graphSession);
\r
286 graphSession.endTransaction(id.get(), write);
\r
289 id.set(Constants.NullTransactionId);
\r
290 synchronized (this) {
\r
295 System.out.println("DEBUG: TransactionToken.end end " + graphSession);
\r
297 private void updateLastChangeSetId(long csid) {
\r
299 System.out.println("DEBUG: TransactionToken: Update last cs=" + lastChangeSetId + " new=" + csid + " "
\r
300 + graphSession + " " + this);
\r
301 if (lastChangeSetId > csid)
\r
302 return; // throw new
\r
303 // InternalError("Change set id has been corrupted. Last cs=" + lastChangeSetId
\r
304 // + " new=" + csid);
\r
305 lastChangeSetId = csid;
\r
307 public static void testEmpty() {
\r
308 final int REPEAT_COUNT = 20;
\r
309 final int REQUEST_COUNT = 500; // 1000;
\r
310 double totalTime = 0;
\r
311 for (int j = 0; j < REPEAT_COUNT; ++j) {
\r
312 long start = System.nanoTime();
\r
313 for (int i = 0; i < REQUEST_COUNT; ++i) {
\r
315 long end = System.nanoTime();
\r
316 double time = (end - start) * (1e-9);
\r
319 double speed = REPEAT_COUNT * REQUEST_COUNT / totalTime;
\r
320 String t = "Speed was " + speed + " ops per second.";
\r
321 System.out.println(t);
\r
324 public void onChangeSetId(int thread, long csid, boolean refresh) {
\r
325 long ocsid = lastChangeSetId;
\r
326 updateLastChangeSetId(csid);
\r
327 if (refresh && csid > ocsid && id.get() == Constants.NullTransactionId)
\r
328 refresh(thread, ocsid);
\r
330 private void refresh(int thread, long csid) {
\r
332 if (session instanceof SessionImplSocket) {
\r
333 ClusterUID[] clusterUID = graphSession.getRefresh2(csid);
\r
334 ((SessionImplSocket)session).refresh(thread, clusterUID, csid);
\r
336 } catch (DatabaseException e) {
\r
337 Logger.defaultLogError(e);
\r