/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package fi.vtt.simantics.procore.internal; import java.util.ArrayList; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.service.ClusterUID; final public class ClusterStream { // public static long duration2 = 0; public static final boolean DEBUG = false; public static final byte NULL_OPERATION = 0; public static final byte CREATE_OPERATION = 1; public static final byte SET_OPERATION = 4; public static final byte MODI_OPERATION = 6; public static final byte KILL_OPERATION = 7; boolean off = false; public GraphSession graphSession; final SessionImplSocket session; // private int flushCount = 0; final private boolean alwaysOff; private int stamp; private int acceptedStamp; private boolean dirty = false; // final private ArrayList clusterChanges = new ArrayList(); final ClusterChangeManager changes = new ClusterChangeManager(); // final TLongObjectHashMap clusterChanges = new TLongObjectHashMap(); // private final Change lastChange = new Change(); ClusterStream(SessionImplSocket session, GraphSession graphSession, boolean alwaysOff) { this.session = session; this.graphSession = graphSession; this.alwaysOff = alwaysOff; } boolean isDirty() { return dirty; } void markDirty() { dirty = true; } void setOff(boolean value) { if (alwaysOff) { off = true; } else { off = value; } } boolean getOff() { return off; } void createResource(ClusterChange cc, short operationIndex, ClusterUID clusterUID) { if (off) return; assert (null != cc); assert (0 != operationIndex); assert (!ClusterUID.Null.equals(clusterUID)); if (DEBUG) System.out.println("DEBUG: Created resource index=" + operationIndex + " cluster=" + clusterUID); cc.createResource(operationIndex); } final void addStatementIndex(Change change, int key, ClusterUID clusterUID, byte op) { if (off) return; assert (key > 0); assert (null != change); assert (!ClusterUID.Null.equals(clusterUID)); change.addStatementIndex(key, clusterUID, op); } void addStatement(ClusterChange cc, Change change) { if (off) return; assert (null != cc); assert (null != change); cc.addChange(change); } void cancelStatement(Change change) { if (off) return; assert (null != change); change.init(); } void removeStatement(ClusterChange cc, Change change) { if (off) return; assert (null != cc); assert (null != change); cc.addChange(change); } void cancelValue(Change change) { if (off) return; assert (null != change); change.init(); } void removeValue(ClusterChange cc, Change change) { if (off) return; assert (null != cc); assert (null != change); cc.addChange(change); } void setValue(ClusterChange cc, Change change, long clusterId, byte[] bytes, int length) { if (off) return; assert (null != cc); assert (null != change); // ClusterChange cc = getClusterChange(clusterId); cc.setValue(change, bytes, length); } void modiValue(ClusterChange cc, Change change, long clusterId, long voffset, int length, byte[] bytes, int offset) { assert (null != cc); assert (null != change); cc.modiValue(change, voffset, length, bytes, offset); } void undoValueEx(ClusterChange cc, Change change, int resourceIndex) { cc.undoValueEx(resourceIndex); } void setImmutable(ClusterChange cc, Change change, boolean immutable) { if (off) return; cc.setImmutable(immutable); } void setDeleted(ClusterChange cc, Change change, boolean deleted) { if (off) return; cc.setDeleted(deleted); } public void corruptCluster(ClusterChange cc, long clusterId) throws DatabaseException { if (off) return; if (DEBUG) System.out.println("ClusterStream.corrupt cid=" + clusterId + "."); assert (null != cc); cc.corrupt(); } int getStamp() { return stamp; } void flush() { if (off) return; // flushCount++; return; } void flush(ClusterUID clusterUID) { if (off) return; ArrayList ccs = new ArrayList(); for(ClusterChange cc : changes.get()) { if(cc.clusterUID.equals(clusterUID)) { if (cc.flush(graphSession, cc.clusterUID)) { ccs.add(cc); if (stamp == acceptedStamp) ++stamp; } else { Logger.defaultLogError("Failed to flush ClusterStream. cluster=" + clusterUID); } } } changes.remove(ccs); } /** * @return true if the stream has accepted all changes */ public boolean reallyFlush() { // Last possibility to mark clusters immutable before write only clusters are gone session.handleCreatedClusters(); // These shall be requested from server session.clusterTable.removeWriteOnlyClusters(); if (!off && changes.size() > 0) { for(ClusterChange cc : changes.get()) { if (cc.flush(graphSession, cc.clusterUID)) if (stamp == acceptedStamp) ++stamp; } changes.clear(); } dirty = false; return hasAcceptedAllChanges(); } /** * Clear all changes and set stream status to empty. */ public void clear() { changes.clear(); acceptedStamp = stamp; dirty = false; } private boolean hasAcceptedAllChanges() { return stamp == acceptedStamp; } void accept() { acceptedStamp = stamp; } static class DebugInfo { long nStms; long nLocal; long nPartly; long nForeign; long nValues; long sValues; long sForeign; long tot; void clear() { nStms = 0; nLocal = 0; nPartly = 0; nForeign = 0; sForeign = 0; nValues = 0; sValues = 0; tot = 0; } void add(DebugInfo di) { nStms += di.nStms; nLocal += di.nLocal; nPartly += di.nPartly; nForeign += di.nForeign; sForeign += di.sForeign; nValues += di.nValues; sValues += di.sValues; tot += di.tot; } @Override public String toString() { return "val=" + nValues + " stm=" + nStms + " loc=" + nLocal + " par=" + nPartly + " ful=" + nForeign + " for=" + sForeign + " vat=" + sValues + " tot=" + tot; } } enum StmEnum { Add(0, (byte) 0), Remove(1, (byte) 0x20); StmEnum(int ordinal, byte mask) { this.ordinal = ordinal; this.mask = mask; } public int ordinal; private byte mask; byte getOrMask() { return mask; } } final static class Data { final byte mask; // or mask for operation code (don't care bits are zero) final short bits; // how many bits are reserved for resource index (0,2,4,6) final int bytes; Data(int mask, int bits, ClusterEnum a, ClusterEnum b) { this.mask = (byte) (mask << bits); this.bits = (short) bits; this.bytes = bytes(bits, a, b); } private static int bytes(int bits, ClusterEnum a, ClusterEnum b) { int left = 6 - bits; if (a != ClusterEnum.ForeignShort) { left += 6; } if (b != ClusterEnum.ForeignShort) { left += 6; } int bytes = left >>> 3; if ((left & 7) != 0) bytes++; return bytes; } } enum ClusterEnum { Local(0), ForeignShort(1), ForeignLong(2); public int ordinal; ClusterEnum(int ordinal) { this.ordinal = ordinal; } static Data[][][] maps = new Data[2][3][3]; static { maps[StmEnum.Add.ordinal][Local.ordinal][Local.ordinal] = new Data( 0, 2, Local, Local); maps[StmEnum.Add.ordinal][Local.ordinal][ForeignShort.ordinal] = new Data( 12, 4, Local, ForeignShort); maps[StmEnum.Add.ordinal][Local.ordinal][ForeignLong.ordinal] = new Data( 2, 2, Local, ForeignLong); maps[StmEnum.Add.ordinal][ForeignShort.ordinal][Local.ordinal] = new Data( 13, 4, ForeignShort, Local); maps[StmEnum.Add.ordinal][ForeignShort.ordinal][ForeignShort.ordinal] = new Data( 1, 6, ForeignShort, ForeignShort); maps[StmEnum.Add.ordinal][ForeignShort.ordinal][ForeignLong.ordinal] = new Data( 14, 4, ForeignShort, ForeignLong); maps[StmEnum.Add.ordinal][ForeignLong.ordinal][Local.ordinal] = new Data( 4, 2, ForeignLong, Local); maps[StmEnum.Add.ordinal][ForeignLong.ordinal][ForeignShort.ordinal] = new Data( 15, 4, ForeignLong, ForeignShort); maps[StmEnum.Add.ordinal][ForeignLong.ordinal][ForeignLong.ordinal] = new Data( 6, 2, ForeignLong, ForeignLong); maps[StmEnum.Remove.ordinal][Local.ordinal][Local.ordinal] = new Data( 1, 2, Local, Local); maps[StmEnum.Remove.ordinal][Local.ordinal][ForeignShort.ordinal] = new Data( 49, 0, Local, ForeignShort); maps[StmEnum.Remove.ordinal][Local.ordinal][ForeignLong.ordinal] = new Data( 3, 2, Local, ForeignLong); maps[StmEnum.Remove.ordinal][ForeignShort.ordinal][Local.ordinal] = new Data( 2, 4, ForeignShort, Local); maps[StmEnum.Remove.ordinal][ForeignShort.ordinal][ForeignShort.ordinal] = new Data( 2, 6, ForeignShort, ForeignShort); maps[StmEnum.Remove.ordinal][ForeignShort.ordinal][ForeignLong.ordinal] = new Data( 50, 0, ForeignShort, ForeignLong); maps[StmEnum.Remove.ordinal][ForeignLong.ordinal][Local.ordinal] = new Data( 5, 2, ForeignLong, Local); maps[StmEnum.Remove.ordinal][ForeignLong.ordinal][ForeignShort.ordinal] = new Data( 51, 0, ForeignLong, ForeignShort); maps[StmEnum.Remove.ordinal][ForeignLong.ordinal][ForeignLong.ordinal] = new Data( 7, 2, ForeignLong, ForeignLong); } static Data getData(StmEnum s, ClusterEnum a, ClusterEnum b) { return maps[s.ordinal][a.ordinal][b.ordinal]; // return maps.get(s).get(a).get(b); } } enum OpEnum { Create((byte) 52), Set((byte) 53), SetShort((byte) 56), Delete( (byte) 54), Modify((byte) 55); OpEnum(byte mask) { this.mask = mask; } public byte getOrMask() { return mask; } private byte mask; } }