1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
14 import java.io.IOException;
15 import java.util.List;
16 import java.util.TreeMap;
17 import java.util.concurrent.atomic.AtomicLong;
19 import org.simantics.databoard.Bindings;
20 import org.simantics.databoard.binding.impl.TreeMapBinding;
21 import org.simantics.databoard.serialization.SerializationException;
22 import org.simantics.databoard.serialization.Serializer;
23 import org.simantics.db.Operation;
24 import org.simantics.db.Session;
25 import org.simantics.db.common.CommentMetadata;
26 import org.simantics.db.common.CommitMetadata;
27 import org.simantics.db.common.MetadataUtils;
28 import org.simantics.db.common.utils.Logger;
29 import org.simantics.db.exception.DatabaseException;
30 import org.simantics.db.exception.ValidationException;
31 import org.simantics.db.impl.graph.WriteSupport;
32 import org.simantics.db.procore.protocol.Constants;
33 import org.simantics.db.request.WriteTraits;
34 import org.simantics.db.service.ClusterSetsSupport;
35 import org.simantics.db.service.ClusterUID;
36 import org.simantics.db.service.ExternalOperation;
37 import org.simantics.db.service.TransactionPolicySupport;
39 public class TransactionToken implements GraphSession.Listener {
40 private boolean DEBUG = false;
41 private Session session;
42 private GraphSession graphSession;
43 private AtomicLong id;
44 // This is the last change set that we have send commit for.
45 // This does not mean that those commits have been accepted in server.
46 private long lastChangeSetId;
47 private OperationImpl lastOperation = null;
48 enum Type { Null, Read, Write }
50 // Count of pending transactions.
51 private int pending = 0;
52 TransactionToken(TransactionPolicySupport tps, Session session, GraphSession graphSession, TransactionToken old) {
53 this.session = session;
54 this.graphSession = graphSession;
55 this.type = Type.Null;
56 this.id = new AtomicLong();
57 this.id.set(Constants.NullTransactionId);
58 long firstChangeSetId = graphSession.getFirstChangeSetId();
59 if (null != old && old.lastChangeSetId > firstChangeSetId)
60 this.lastChangeSetId = old.lastChangeSetId;
62 this.lastChangeSetId = firstChangeSetId;
63 graphSession.setListener(this);
65 void startReadTransaction(int thread)
66 throws DatabaseException {
68 System.out.println("DEBUG: Start read transaction " + graphSession);
69 if (Type.Null == type) {
70 id.set(graphSession.askReadTransaction(thread));
73 System.out.println("kraa: read beg pending=" + pending);
77 void stopReadTransaction()
78 throws DatabaseException {
80 System.out.println("DEBUG: Stop read transaction " + graphSession);
81 if (Type.Null == type)
83 endTransaction(false);
86 System.out.println("kraa: read end pending=" + pending);
88 void startWriteTransaction(int thread)
89 throws DatabaseException {
91 System.out.println("DEBUG: Start write transaction " + graphSession);
92 if (Type.Null == type) {
93 id.set(graphSession.askWriteTransaction(thread, Constants.NullTransactionId));
96 System.out.println("kraa: write beg pending=" + pending);
97 } else if (Type.Read == type) {
98 id.set(graphSession.askWriteTransaction(thread, id.get()));
101 System.out.println("kraa: write beg pending=" + pending);
105 void cancelBegin(WriteSupport writeSupport, SynchronizeContextI context, ClusterStream clusterStream)
106 throws DatabaseException {
108 System.out.println("DEBUG: CancelBegin " + graphSession);
109 if (Type.Null == type)
111 else if (Type.Read == type)
112 throw new ValidationException("Illegal token type.");
113 TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
114 boolean empty = clusterStream.reallyFlush();
116 boolean noMetadata = !hasMetadata(metadata);
120 byte[] data = makeContext(null, metadata);
121 graphSession.cancelCommit(id.get(), lastChangeSetId, data, context);
124 System.out.println("DEBUG: CancelBegin cancelled cs=" + lastChangeSetId);
125 clusterStream.accept();
126 writeSupport.commitDone(null, lastChangeSetId);
128 void cancelEnd(WriteSupport writeSupport, WriteTraits writeTraits, ClusterStream clusterStream)
129 throws DatabaseException {
130 if (clusterStream.reallyFlush())
132 TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
133 byte[] data = makeContext(null, metadata);
134 graphSession.acceptCommit(id.get(), lastChangeSetId, data);
137 System.out.println("DEBUG CancelEnd accepted commit cs=" + lastChangeSetId);
138 clusterStream.accept();
139 writeSupport.commitDone(writeTraits, lastChangeSetId);
141 void setCombine(boolean a) {
144 private boolean hasMetadata(TreeMap<String, byte[]> metadata) {
148 if(metadata.size() == 0)
150 if(metadata.size() == 1 && MetadataUtils.hasMetadata(metadata, CommentMetadata.class))
156 private boolean combine = false;
157 void commitWriteTransaction(WriteSupport writeSupport, WriteTraits writeTraits, ClusterStream clusterStream, Operation dummy)
158 throws DatabaseException {
160 System.out.println("DEBUG: Commit write transaction " + graphSession);
161 if (Type.Null == type)
163 else if (Type.Read == type)
164 throw new ValidationException("Illegal transaction type.");
165 else if (id.get() == Constants.NullTransactionId)
166 throw new ValidationException("Illegal transaction id.");
167 TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
168 boolean empty = clusterStream.reallyFlush();
170 boolean noMetadata = !hasMetadata(metadata);
172 List<ExternalOperation> ext = graphSession.undoContext.getPendingExternals();
174 CommentMetadata cm = writeSupport.getMetadata(CommentMetadata.class);
175 writeSupport.addMetadata(cm.add("Automatically generated comment for empty commit with external undo operation(s)."));
176 metadata = writeSupport.getMetadata();
177 Logger.defaultLogError("A generated comment was added into a commit with only some external undo operation(s).");
179 graphSession.undoContext.cancelCommit();
185 CommentMetadata cm = writeSupport.getMetadata(CommentMetadata.class);
187 writeSupport.addMetadata(cm.add(writeTraits.toString()));
189 metadata = writeSupport.getMetadata();
191 Operation op = combine ? lastOperation : null;
193 byte[] data = makeContext(op, metadata);
194 // new Exception("committing cs + " + lastChangeSetId + " with metadata " + data).printStackTrace();
195 graphSession.acceptCommit(id.get(), lastChangeSetId, data);
198 System.out.println("DEBUG Accpeted commit cs=" + lastChangeSetId);
199 clusterStream.accept();
200 writeSupport.commitDone(writeTraits, lastChangeSetId);
201 long opid = (null == op) ? lastChangeSetId : op.getId();
202 lastOperation = new OperationImpl(opid, lastChangeSetId, graphSession.undoContext.getPendingExternals());
203 graphSession.undoContext.commitOk(lastOperation);
205 System.out.println("DEBUG: Accepted operation id=" + lastOperation.getId() + " cs=" + lastOperation.getCSId() + ".");
207 void stopWriteTransaction()
208 throws DatabaseException {
210 System.out.println("DEBUG: Stop write transaction begin " + graphSession);
211 if (Type.Null == type)
213 else if (Type.Read == type)
214 throw new ValidationException("Illegal transaction type.");
215 // Note that even if we do cancel the cluster sets are not restored.
216 graphSession.undoContext.cancelCommit();
217 endTransaction(true);
220 System.out.println("kraa: write end pending=" + pending);
222 System.out.println("DEBUG: Stop write transaction end " + graphSession);
225 throws DatabaseException {
226 if (Type.Null == type || id.get() == Constants.NullTransactionId)
228 endTransaction(true);
231 System.out.println("kraa: closing pending=" + pending);
234 if (id.get() != Constants.NullTransactionId)
236 endTransaction(true);
239 System.out.println("kraa: closing pending=" + pending);
240 } catch (DatabaseException e) {
242 System.out.println("Close did not finish cleanly.");
247 public Operation getLastOperation() {
248 return lastOperation;
250 public long getHeadRevisionId() {
251 return lastChangeSetId;
254 static Serializer METADATA_SERIALIZER =
255 Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING,
256 Bindings.BYTE_ARRAY) );
258 private byte[] makeContext(Operation op, TreeMap<String, byte[]> properties) {
259 if (properties == null)
260 properties = new TreeMap<String, byte[]>();
261 long csid = (null != op) ? op.getId() : 0;
262 properties.put(CommitMetadata.class.getName(), new CommitMetadata(csid).serialise(session));
263 properties.put("opid", Long.toString(csid).getBytes());
266 return METADATA_SERIALIZER.serialize(properties);
267 } catch (SerializationException e) {
270 } catch (IOException e) {
275 private void endTransaction(boolean write)
276 throws DatabaseException {
278 System.out.println("DEBUG: TransactionToken.end begin " + graphSession);
280 graphSession.endTransaction(id.get(), write);
283 id.set(Constants.NullTransactionId);
284 synchronized (this) {
289 System.out.println("DEBUG: TransactionToken.end end " + graphSession);
291 private void updateLastChangeSetId(long csid) {
293 System.out.println("DEBUG: TransactionToken: Update last cs=" + lastChangeSetId + " new=" + csid + " "
294 + graphSession + " " + this);
295 if (lastChangeSetId > csid)
297 // InternalError("Change set id has been corrupted. Last cs=" + lastChangeSetId
298 // + " new=" + csid);
299 lastChangeSetId = csid;
301 public static void testEmpty() {
302 final int REPEAT_COUNT = 20;
303 final int REQUEST_COUNT = 500; // 1000;
304 double totalTime = 0;
305 for (int j = 0; j < REPEAT_COUNT; ++j) {
306 long start = System.nanoTime();
307 for (int i = 0; i < REQUEST_COUNT; ++i) {
309 long end = System.nanoTime();
310 double time = (end - start) * (1e-9);
313 double speed = REPEAT_COUNT * REQUEST_COUNT / totalTime;
314 String t = "Speed was " + speed + " ops per second.";
315 System.out.println(t);
318 public void onChangeSetId(int thread, long csid, boolean refresh) {
319 long ocsid = lastChangeSetId;
320 updateLastChangeSetId(csid);
321 if (refresh && csid > ocsid && id.get() == Constants.NullTransactionId)
322 refresh(thread, ocsid);
324 private void refresh(int thread, long csid) {
326 if (session instanceof SessionImplSocket) {
327 ClusterUID[] clusterUID = graphSession.getRefresh2(csid);
328 ((SessionImplSocket)session).refresh(thread, clusterUID, csid);
330 } catch (DatabaseException e) {
331 Logger.defaultLogError(e);