/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package fi.vtt.simantics.procore.internal; import java.io.IOException; import java.util.List; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; import org.simantics.databoard.Bindings; import org.simantics.databoard.binding.impl.TreeMapBinding; import org.simantics.databoard.serialization.SerializationException; import org.simantics.databoard.serialization.Serializer; import org.simantics.db.Operation; import org.simantics.db.Session; import org.simantics.db.common.CommentMetadata; import org.simantics.db.common.CommitMetadata; import org.simantics.db.common.MetadataUtils; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.ValidationException; import org.simantics.db.impl.graph.WriteSupport; import org.simantics.db.procore.protocol.Constants; import org.simantics.db.request.WriteTraits; import org.simantics.db.service.ClusterSetsSupport; import org.simantics.db.service.ClusterUID; import org.simantics.db.service.ExternalOperation; import org.simantics.db.service.TransactionPolicySupport; public class TransactionToken implements GraphSession.Listener { private boolean DEBUG = false; private Session session; private GraphSession graphSession; private AtomicLong id; // This is the last change set that we have send commit for. // This does not mean that those commits have been accepted in server. private long lastChangeSetId; private OperationImpl lastOperation = null; enum Type { Null, Read, Write } private Type type; // Count of pending transactions. private int pending = 0; TransactionToken(TransactionPolicySupport tps, Session session, GraphSession graphSession, TransactionToken old) { this.session = session; this.graphSession = graphSession; this.type = Type.Null; this.id = new AtomicLong(); this.id.set(Constants.NullTransactionId); long firstChangeSetId = graphSession.getFirstChangeSetId(); if (null != old && old.lastChangeSetId > firstChangeSetId) this.lastChangeSetId = old.lastChangeSetId; else this.lastChangeSetId = firstChangeSetId; graphSession.setListener(this); } void startReadTransaction(int thread) throws DatabaseException { if (DEBUG) System.out.println("DEBUG: Start read transaction " + graphSession); if (Type.Null == type) { id.set(graphSession.askReadTransaction(thread)); if (DEBUG) if (++pending != 1) System.out.println("kraa: read beg pending=" + pending); type = Type.Read; } } void stopReadTransaction() throws DatabaseException { if (DEBUG) System.out.println("DEBUG: Stop read transaction " + graphSession); if (Type.Null == type) return; endTransaction(false); if (DEBUG) if (--pending != 0) System.out.println("kraa: read end pending=" + pending); } void startWriteTransaction(int thread) throws DatabaseException { if (DEBUG) System.out.println("DEBUG: Start write transaction " + graphSession); if (Type.Null == type) { id.set(graphSession.askWriteTransaction(thread, Constants.NullTransactionId)); if (DEBUG) if (++pending != 1) System.out.println("kraa: write beg pending=" + pending); } else if (Type.Read == type) { id.set(graphSession.askWriteTransaction(thread, id.get())); if (DEBUG) if (++pending != 1) System.out.println("kraa: write beg pending=" + pending); } type = Type.Write; } void cancelBegin(WriteSupport writeSupport, SynchronizeContextI context, ClusterStream clusterStream) throws DatabaseException { if (DEBUG) System.out.println("DEBUG: CancelBegin " + graphSession); if (Type.Null == type) return; else if (Type.Read == type) throw new ValidationException("Illegal token type."); TreeMap metadata = writeSupport.getMetadata(); boolean empty = clusterStream.reallyFlush(); if (empty) { boolean noMetadata = !hasMetadata(metadata); if (noMetadata) return; } byte[] data = makeContext(null, metadata); graphSession.cancelCommit(id.get(), lastChangeSetId, data, context); ++lastChangeSetId; if (DEBUG) System.out.println("DEBUG: CancelBegin cancelled cs=" + lastChangeSetId); clusterStream.accept(); writeSupport.commitDone(null, lastChangeSetId); } void cancelEnd(WriteSupport writeSupport, WriteTraits writeTraits, ClusterStream clusterStream) throws DatabaseException { if (clusterStream.reallyFlush()) return; TreeMap metadata = writeSupport.getMetadata(); byte[] data = makeContext(null, metadata); graphSession.acceptCommit(id.get(), lastChangeSetId, data); ++lastChangeSetId; if (DEBUG) System.out.println("DEBUG CancelEnd accepted commit cs=" + lastChangeSetId); clusterStream.accept(); writeSupport.commitDone(writeTraits, lastChangeSetId); } void setCombine(boolean a) { combine = a; } private boolean hasMetadata(TreeMap metadata) { if(metadata == null) return false; if(metadata.size() == 0) return false; if(metadata.size() == 1 && MetadataUtils.hasMetadata(metadata, CommentMetadata.class)) return false; return true; } private boolean combine = false; void commitWriteTransaction(WriteSupport writeSupport, WriteTraits writeTraits, ClusterStream clusterStream, Operation dummy) throws DatabaseException { if (DEBUG) System.out.println("DEBUG: Commit write transaction " + graphSession); if (Type.Null == type) return; else if (Type.Read == type) throw new ValidationException("Illegal transaction type."); else if (id.get() == Constants.NullTransactionId) throw new ValidationException("Illegal transaction id."); TreeMap metadata = writeSupport.getMetadata(); boolean empty = clusterStream.reallyFlush(); if (empty) { boolean noMetadata = !hasMetadata(metadata); if (noMetadata) { List ext = graphSession.undoContext.getPendingExternals(); if(!ext.isEmpty()) { CommentMetadata cm = writeSupport.getMetadata(CommentMetadata.class); writeSupport.addMetadata(cm.add("Automatically generated comment for empty commit with external undo operation(s).")); metadata = writeSupport.getMetadata(); Logger.defaultLogError("A generated comment was added into a commit with only some external undo operation(s)."); } else { graphSession.undoContext.cancelCommit(); return; } } } CommentMetadata cm = writeSupport.getMetadata(CommentMetadata.class); if(cm.size() == 0) writeSupport.addMetadata(cm.add(writeTraits.toString())); metadata = writeSupport.getMetadata(); Operation op = combine ? lastOperation : null; combine = true; byte[] data = makeContext(op, metadata); // new Exception("committing cs + " + lastChangeSetId + " with metadata " + data).printStackTrace(); graphSession.acceptCommit(id.get(), lastChangeSetId, data); ++lastChangeSetId; if (DEBUG) System.out.println("DEBUG Accpeted commit cs=" + lastChangeSetId); clusterStream.accept(); writeSupport.commitDone(writeTraits, lastChangeSetId); long opid = (null == op) ? lastChangeSetId : op.getId(); lastOperation = new OperationImpl(opid, lastChangeSetId, graphSession.undoContext.getPendingExternals()); graphSession.undoContext.commitOk(lastOperation); if (DEBUG) System.out.println("DEBUG: Accepted operation id=" + lastOperation.getId() + " cs=" + lastOperation.getCSId() + "."); } void stopWriteTransaction() throws DatabaseException { if (DEBUG) System.out.println("DEBUG: Stop write transaction begin " + graphSession); if (Type.Null == type) return; else if (Type.Read == type) throw new ValidationException("Illegal transaction type."); // Note that even if we do cancel the cluster sets are not restored. try { session.getService(ClusterSetsSupport.class).save(); } catch (Exception e) { e.printStackTrace(); Logger.defaultLogError("Failed to save cluster sets.", e); } graphSession.undoContext.cancelCommit(); endTransaction(true); if (DEBUG) if (--pending != 0) System.out.println("kraa: write end pending=" + pending); if (DEBUG) System.out.println("DEBUG: Stop write transaction end " + graphSession); } void closing() throws DatabaseException { if (Type.Null == type || id.get() == Constants.NullTransactionId) return; endTransaction(true); if (DEBUG) if (--pending != 0) System.out.println("kraa: closing pending=" + pending); } void close() { if (id.get() != Constants.NullTransactionId) try { endTransaction(true); if (DEBUG) if (--pending != 0) System.out.println("kraa: closing pending=" + pending); } catch (DatabaseException e) { if (DEBUG) { System.out.println("Close did not finish cleanly."); e.printStackTrace(); } } } public Operation getLastOperation() { return lastOperation; } public long getHeadRevisionId() { return lastChangeSetId; } static Serializer METADATA_SERIALIZER = Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING, Bindings.BYTE_ARRAY) ); private byte[] makeContext(Operation op, TreeMap properties) { if (properties == null) properties = new TreeMap(); long csid = (null != op) ? op.getId() : 0; properties.put(CommitMetadata.class.getName(), new CommitMetadata(csid).serialise(session)); properties.put("opid", Long.toString(csid).getBytes()); try { return METADATA_SERIALIZER.serialize(properties); } catch (SerializationException e) { e.printStackTrace(); return new byte[0]; } catch (IOException e) { e.printStackTrace(); return new byte[0]; } } private void endTransaction(boolean write) throws DatabaseException { if (DEBUG) System.out.println("DEBUG: TransactionToken.end begin " + graphSession); try { graphSession.endTransaction(id.get(), write); } finally { type = Type.Null; id.set(Constants.NullTransactionId); synchronized (this) { this.notify(); } } if (DEBUG) System.out.println("DEBUG: TransactionToken.end end " + graphSession); } private void updateLastChangeSetId(long csid) { if (DEBUG) System.out.println("DEBUG: TransactionToken: Update last cs=" + lastChangeSetId + " new=" + csid + " " + graphSession + " " + this); if (lastChangeSetId > csid) return; // throw new // InternalError("Change set id has been corrupted. Last cs=" + lastChangeSetId // + " new=" + csid); lastChangeSetId = csid; } public static void testEmpty() { final int REPEAT_COUNT = 20; final int REQUEST_COUNT = 500; // 1000; double totalTime = 0; for (int j = 0; j < REPEAT_COUNT; ++j) { long start = System.nanoTime(); for (int i = 0; i < REQUEST_COUNT; ++i) { } long end = System.nanoTime(); double time = (end - start) * (1e-9); totalTime += time; } double speed = REPEAT_COUNT * REQUEST_COUNT / totalTime; String t = "Speed was " + speed + " ops per second."; System.out.println(t); } @Override public void onChangeSetId(int thread, long csid, boolean refresh) { long ocsid = lastChangeSetId; updateLastChangeSetId(csid); if (refresh && csid > ocsid && id.get() == Constants.NullTransactionId) refresh(thread, ocsid); } private void refresh(int thread, long csid) { try { if (session instanceof SessionImplSocket) { ClusterUID[] clusterUID = graphSession.getRefresh2(csid); ((SessionImplSocket)session).refresh(thread, clusterUID, csid); } } catch (DatabaseException e) { Logger.defaultLogError(e); } } }