1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
14 import java.io.IOException;
15 import java.util.List;
16 import java.util.TreeMap;
17 import java.util.concurrent.atomic.AtomicLong;
19 import org.simantics.databoard.Bindings;
20 import org.simantics.databoard.binding.impl.TreeMapBinding;
21 import org.simantics.databoard.serialization.SerializationException;
22 import org.simantics.databoard.serialization.Serializer;
23 import org.simantics.db.Operation;
24 import org.simantics.db.Session;
25 import org.simantics.db.common.CommentMetadata;
26 import org.simantics.db.common.CommitMetadata;
27 import org.simantics.db.common.MetadataUtils;
28 import org.simantics.db.common.utils.Logger;
29 import org.simantics.db.exception.DatabaseException;
30 import org.simantics.db.exception.ValidationException;
31 import org.simantics.db.impl.graph.WriteSupport;
32 import org.simantics.db.procore.protocol.Constants;
33 import org.simantics.db.request.WriteTraits;
34 import org.simantics.db.service.ClusterUID;
35 import org.simantics.db.service.ExternalOperation;
36 import org.simantics.db.service.TransactionPolicySupport;
38 public class TransactionToken implements GraphSession.Listener {
39 private boolean DEBUG = false;
40 private Session session;
41 private GraphSession graphSession;
42 private AtomicLong id;
43 // This is the last change set that we have send commit for.
44 // This does not mean that those commits have been accepted in server.
45 private long lastChangeSetId;
46 private OperationImpl lastOperation = null;
47 enum Type { Null, Read, Write }
49 // Count of pending transactions.
50 private int pending = 0;
51 TransactionToken(TransactionPolicySupport tps, Session session, GraphSession graphSession, TransactionToken old) {
52 this.session = session;
53 this.graphSession = graphSession;
54 this.type = Type.Null;
55 this.id = new AtomicLong();
56 this.id.set(Constants.NullTransactionId);
57 long firstChangeSetId = graphSession.getFirstChangeSetId();
58 if (null != old && old.lastChangeSetId > firstChangeSetId)
59 this.lastChangeSetId = old.lastChangeSetId;
61 this.lastChangeSetId = firstChangeSetId;
62 graphSession.setListener(this);
64 void startReadTransaction(int thread)
65 throws DatabaseException {
67 System.out.println("DEBUG: Start read transaction " + graphSession);
68 if (Type.Null == type) {
69 id.set(graphSession.askReadTransaction(thread));
72 System.out.println("kraa: read beg pending=" + pending);
76 void stopReadTransaction()
77 throws DatabaseException {
79 System.out.println("DEBUG: Stop read transaction " + graphSession);
80 if (Type.Null == type)
82 endTransaction(false);
85 System.out.println("kraa: read end pending=" + pending);
87 void startWriteTransaction(int thread)
88 throws DatabaseException {
90 System.out.println("DEBUG: Start write transaction " + graphSession);
91 if (Type.Null == type) {
92 id.set(graphSession.askWriteTransaction(thread, Constants.NullTransactionId));
95 System.out.println("kraa: write beg pending=" + pending);
96 } else if (Type.Read == type) {
97 id.set(graphSession.askWriteTransaction(thread, id.get()));
100 System.out.println("kraa: write beg pending=" + pending);
104 void cancelBegin(WriteSupport writeSupport, SynchronizeContextI context, ClusterStream clusterStream)
105 throws DatabaseException {
107 System.out.println("DEBUG: CancelBegin " + graphSession);
108 if (Type.Null == type)
110 else if (Type.Read == type)
111 throw new ValidationException("Illegal token type.");
112 TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
113 boolean empty = clusterStream.reallyFlush();
115 boolean noMetadata = !hasMetadata(metadata);
119 byte[] data = makeContext(null, metadata);
120 graphSession.cancelCommit(id.get(), lastChangeSetId, data, context);
123 System.out.println("DEBUG: CancelBegin cancelled cs=" + lastChangeSetId);
124 clusterStream.accept();
125 writeSupport.commitDone(null, lastChangeSetId);
127 void cancelEnd(WriteSupport writeSupport, WriteTraits writeTraits, ClusterStream clusterStream)
128 throws DatabaseException {
129 if (clusterStream.reallyFlush())
131 TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
132 byte[] data = makeContext(null, metadata);
133 graphSession.acceptCommit(id.get(), lastChangeSetId, data);
136 System.out.println("DEBUG CancelEnd accepted commit cs=" + lastChangeSetId);
137 clusterStream.accept();
138 writeSupport.commitDone(writeTraits, lastChangeSetId);
140 void setCombine(boolean a) {
143 private boolean hasMetadata(TreeMap<String, byte[]> metadata) {
147 if(metadata.size() == 0)
149 if(metadata.size() == 1 && MetadataUtils.hasMetadata(metadata, CommentMetadata.class))
155 private boolean combine = false;
156 void commitWriteTransaction(WriteSupport writeSupport, WriteTraits writeTraits, ClusterStream clusterStream, Operation dummy)
157 throws DatabaseException {
159 System.out.println("DEBUG: Commit write transaction " + graphSession);
160 if (Type.Null == type)
162 else if (Type.Read == type)
163 throw new ValidationException("Illegal transaction type.");
164 else if (id.get() == Constants.NullTransactionId)
165 throw new ValidationException("Illegal transaction id.");
166 TreeMap<String, byte[]> metadata = writeSupport.getMetadata();
167 boolean empty = clusterStream.reallyFlush();
169 boolean noMetadata = !hasMetadata(metadata);
171 List<ExternalOperation> ext = graphSession.undoContext.getPendingExternals();
173 CommentMetadata cm = writeSupport.getMetadata(CommentMetadata.class);
174 writeSupport.addMetadata(cm.add("Automatically generated comment for empty commit with external undo operation(s)."));
175 metadata = writeSupport.getMetadata();
176 Logger.defaultLogError("A generated comment was added into a commit with only some external undo operation(s).");
178 graphSession.undoContext.cancelCommit();
184 CommentMetadata cm = writeSupport.getMetadata(CommentMetadata.class);
186 writeSupport.addMetadata(cm.add(writeTraits.toString()));
188 metadata = writeSupport.getMetadata();
190 Operation op = combine ? lastOperation : null;
192 byte[] data = makeContext(op, metadata);
193 // new Exception("committing cs + " + lastChangeSetId + " with metadata " + data).printStackTrace();
194 graphSession.acceptCommit(id.get(), lastChangeSetId, data);
197 System.out.println("DEBUG Accpeted commit cs=" + lastChangeSetId);
198 clusterStream.accept();
199 writeSupport.commitDone(writeTraits, lastChangeSetId);
200 long opid = (null == op) ? lastChangeSetId : op.getId();
201 lastOperation = new OperationImpl(opid, lastChangeSetId, graphSession.undoContext.getPendingExternals());
202 graphSession.undoContext.commitOk(lastOperation);
204 System.out.println("DEBUG: Accepted operation id=" + lastOperation.getId() + " cs=" + lastOperation.getCSId() + ".");
206 void stopWriteTransaction()
207 throws DatabaseException {
209 System.out.println("DEBUG: Stop write transaction begin " + graphSession);
210 if (Type.Null == type)
212 else if (Type.Read == type)
213 throw new ValidationException("Illegal transaction type.");
214 // Note that even if we do cancel the cluster sets are not restored.
215 graphSession.undoContext.cancelCommit();
216 endTransaction(true);
219 System.out.println("kraa: write end pending=" + pending);
221 System.out.println("DEBUG: Stop write transaction end " + graphSession);
224 throws DatabaseException {
225 if (Type.Null == type || id.get() == Constants.NullTransactionId)
227 endTransaction(true);
230 System.out.println("kraa: closing pending=" + pending);
233 if (id.get() != Constants.NullTransactionId)
235 endTransaction(true);
238 System.out.println("kraa: closing pending=" + pending);
239 } catch (DatabaseException e) {
241 System.out.println("Close did not finish cleanly.");
246 public Operation getLastOperation() {
247 return lastOperation;
249 public long getHeadRevisionId() {
250 return lastChangeSetId;
253 static Serializer METADATA_SERIALIZER =
254 Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING,
255 Bindings.BYTE_ARRAY) );
257 private byte[] makeContext(Operation op, TreeMap<String, byte[]> properties) {
258 if (properties == null)
259 properties = new TreeMap<String, byte[]>();
260 long csid = (null != op) ? op.getId() : 0;
261 properties.put(CommitMetadata.class.getName(), new CommitMetadata(csid).serialise(session));
262 properties.put("opid", Long.toString(csid).getBytes());
265 return METADATA_SERIALIZER.serialize(properties);
266 } catch (SerializationException e) {
269 } catch (IOException e) {
274 private void endTransaction(boolean write)
275 throws DatabaseException {
277 System.out.println("DEBUG: TransactionToken.end begin " + graphSession);
279 graphSession.endTransaction(id.get(), write);
282 id.set(Constants.NullTransactionId);
283 synchronized (this) {
288 System.out.println("DEBUG: TransactionToken.end end " + graphSession);
290 private void updateLastChangeSetId(long csid) {
292 System.out.println("DEBUG: TransactionToken: Update last cs=" + lastChangeSetId + " new=" + csid + " "
293 + graphSession + " " + this);
294 if (lastChangeSetId > csid)
296 // InternalError("Change set id has been corrupted. Last cs=" + lastChangeSetId
297 // + " new=" + csid);
298 lastChangeSetId = csid;
300 public static void testEmpty() {
301 final int REPEAT_COUNT = 20;
302 final int REQUEST_COUNT = 500; // 1000;
303 double totalTime = 0;
304 for (int j = 0; j < REPEAT_COUNT; ++j) {
305 long start = System.nanoTime();
306 for (int i = 0; i < REQUEST_COUNT; ++i) {
308 long end = System.nanoTime();
309 double time = (end - start) * (1e-9);
312 double speed = REPEAT_COUNT * REQUEST_COUNT / totalTime;
313 String t = "Speed was " + speed + " ops per second.";
314 System.out.println(t);
317 public void onChangeSetId(int thread, long csid, boolean refresh) {
318 long ocsid = lastChangeSetId;
319 updateLastChangeSetId(csid);
320 if (refresh && csid > ocsid && id.get() == Constants.NullTransactionId)
321 refresh(thread, ocsid);
323 private void refresh(int thread, long csid) {
325 if (session instanceof SessionImplSocket) {
326 ClusterUID[] clusterUID = graphSession.getRefresh2(csid);
327 ((SessionImplSocket)session).refresh(thread, clusterUID, csid);
329 } catch (DatabaseException e) {
330 Logger.defaultLogError(e);