--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package fi.vtt.simantics.procore.internal;\r
+\r
+import java.util.ArrayList;\r
+\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.service.ClusterUID;\r
+\r
+final public class ClusterStream {\r
+\r
+ // public static long duration2 = 0;\r
+\r
+ public static final boolean DEBUG = false;\r
+ public static final byte NULL_OPERATION = 0;\r
+ public static final byte CREATE_OPERATION = 1;\r
+ public static final byte SET_OPERATION = 4;\r
+ public static final byte MODI_OPERATION = 6;\r
+ public static final byte KILL_OPERATION = 7;\r
+ boolean off = false;\r
+ public GraphSession graphSession;\r
+ final SessionImplSocket session;\r
+// private int flushCount = 0;\r
+ final private boolean alwaysOff;\r
+ private int stamp;\r
+ private int acceptedStamp;\r
+ private boolean dirty = false;\r
+// final private ArrayList<ClusterChange> clusterChanges = new ArrayList<ClusterChange>();\r
+\r
+ final ClusterChangeManager changes = new ClusterChangeManager();\r
+\r
+// final TLongObjectHashMap<ClusterChange> clusterChanges = new TLongObjectHashMap<ClusterChange>();\r
+\r
+ // private final Change lastChange = new Change();\r
+ ClusterStream(SessionImplSocket session, GraphSession graphSession,\r
+ boolean alwaysOff) {\r
+ this.session = session;\r
+ this.graphSession = graphSession;\r
+ this.alwaysOff = alwaysOff;\r
+ }\r
+\r
+\r
+ boolean isDirty() {\r
+ return dirty;\r
+ }\r
+\r
+ void markDirty() {\r
+ dirty = true;\r
+ }\r
+\r
+ void setOff(boolean value) {\r
+ if (alwaysOff) {\r
+ off = true;\r
+ } else {\r
+ off = value;\r
+ }\r
+ }\r
+\r
+ boolean getOff() {\r
+ return off;\r
+ }\r
+\r
+ void createResource(ClusterChange cc, short operationIndex, ClusterUID clusterUID) {\r
+ if (off)\r
+ return;\r
+ assert (null != cc);\r
+ assert (0 != operationIndex);\r
+ assert (!ClusterUID.Null.equals(clusterUID));\r
+ if (DEBUG)\r
+ System.out.println("DEBUG: Created resource index=" + operationIndex + " cluster=" + clusterUID);\r
+ cc.createResource(operationIndex);\r
+ }\r
+\r
+ final void addStatementIndex(Change change, int key, ClusterUID clusterUID, byte op) {\r
+ if (off)\r
+ return;\r
+ assert (key > 0);\r
+ assert (null != change);\r
+ assert (!ClusterUID.Null.equals(clusterUID));\r
+ change.addStatementIndex(key, clusterUID, op);\r
+ }\r
+\r
+ void addStatement(ClusterChange cc, Change change) {\r
+ if (off)\r
+ return;\r
+ assert (null != cc);\r
+ assert (null != change);\r
+ cc.addChange(change);\r
+ }\r
+\r
+ void cancelStatement(Change change) {\r
+ if (off)\r
+ return;\r
+ assert (null != change);\r
+ change.init();\r
+ }\r
+\r
+ void removeStatement(ClusterChange cc, Change change) {\r
+ if (off)\r
+ return;\r
+ assert (null != cc);\r
+ assert (null != change);\r
+ cc.addChange(change);\r
+ }\r
+\r
+ void cancelValue(Change change) {\r
+ if (off)\r
+ return;\r
+ assert (null != change);\r
+ change.init();\r
+ }\r
+\r
+ void removeValue(ClusterChange cc, Change change) {\r
+ if (off)\r
+ return;\r
+ assert (null != cc);\r
+ assert (null != change);\r
+ cc.addChange(change);\r
+ }\r
+\r
+ void setValue(ClusterChange cc, Change change, long clusterId, byte[] bytes, int length) {\r
+ if (off)\r
+ return;\r
+ assert (null != cc);\r
+ assert (null != change);\r
+ // ClusterChange cc = getClusterChange(clusterId);\r
+ cc.setValue(change, bytes, length);\r
+ }\r
+\r
+ void modiValue(ClusterChange cc, Change change, long clusterId,\r
+ long voffset, int length, byte[] bytes, int offset) {\r
+ assert (null != cc);\r
+ assert (null != change);\r
+ cc.modiValue(change, voffset, length, bytes, offset);\r
+ }\r
+\r
+ void undoValueEx(ClusterChange cc, Change change, int resourceIndex) {\r
+ cc.undoValueEx(resourceIndex);\r
+ }\r
+ void setImmutable(ClusterChange cc, Change change, boolean immutable) {\r
+ if (off)\r
+ return;\r
+ cc.setImmutable(immutable);\r
+ }\r
+ void setDeleted(ClusterChange cc, Change change, boolean deleted) {\r
+ if (off)\r
+ return;\r
+ cc.setDeleted(deleted);\r
+ }\r
+ public void corruptCluster(ClusterChange cc, long clusterId)\r
+ throws DatabaseException {\r
+ if (off)\r
+ return;\r
+ if (DEBUG)\r
+ System.out.println("ClusterStream.corrupt cid=" + clusterId + ".");\r
+ assert (null != cc);\r
+ cc.corrupt();\r
+ }\r
+\r
+ int getStamp() {\r
+ return stamp;\r
+ }\r
+\r
+ void flush() {\r
+ if (off)\r
+ return;\r
+// flushCount++;\r
+ return;\r
+ }\r
+\r
+ void flush(ClusterUID clusterUID) {\r
+ if (off)\r
+ return;\r
+ ArrayList<ClusterChange> ccs = new ArrayList<ClusterChange>();\r
+ for(ClusterChange cc : changes.get()) {\r
+ if(cc.clusterUID.equals(clusterUID)) {\r
+ if (cc.flush(graphSession, cc.clusterUID)) {\r
+ ccs.add(cc);\r
+ if (stamp == acceptedStamp)\r
+ ++stamp;\r
+ } else {\r
+ Logger.defaultLogError("Failed to flush ClusterStream. cluster=" + clusterUID);\r
+ }\r
+ }\r
+ }\r
+ changes.remove(ccs);\r
+ }\r
+\r
+ /**\r
+ * @return true if the stream has accepted all changes\r
+ */\r
+ public boolean reallyFlush() {\r
+ // Last possibility to mark clusters immutable before write only clusters are gone\r
+ session.handleCreatedClusters();\r
+ // These shall be requested from server\r
+ session.clusterTable.removeWriteOnlyClusters();\r
+ if (!off && changes.size() > 0) {\r
+ for(ClusterChange cc : changes.get()) {\r
+ if (cc.flush(graphSession, cc.clusterUID))\r
+ if (stamp == acceptedStamp)\r
+ ++stamp;\r
+ }\r
+ changes.clear();\r
+ }\r
+ dirty = false;\r
+ return hasAcceptedAllChanges();\r
+ }\r
+\r
+ /**\r
+ * Clear all changes and set stream status to empty.\r
+ */\r
+ public void clear() {\r
+ changes.clear();\r
+ acceptedStamp = stamp;\r
+ dirty = false;\r
+ }\r
+\r
+ private boolean hasAcceptedAllChanges() {\r
+ return stamp == acceptedStamp;\r
+ }\r
+\r
+ void accept() {\r
+ acceptedStamp = stamp;\r
+ }\r
+\r
+\r
+\r
+ static class DebugInfo {\r
+ long nStms;\r
+ long nLocal;\r
+ long nPartly;\r
+ long nForeign;\r
+ long nValues;\r
+ long sValues;\r
+ long sForeign;\r
+ long tot;\r
+\r
+ void clear() {\r
+ nStms = 0;\r
+ nLocal = 0;\r
+ nPartly = 0;\r
+ nForeign = 0;\r
+ sForeign = 0;\r
+ nValues = 0;\r
+ sValues = 0;\r
+ tot = 0;\r
+ }\r
+\r
+ void add(DebugInfo di) {\r
+ nStms += di.nStms;\r
+ nLocal += di.nLocal;\r
+ nPartly += di.nPartly;\r
+ nForeign += di.nForeign;\r
+ sForeign += di.sForeign;\r
+ nValues += di.nValues;\r
+ sValues += di.sValues;\r
+ tot += di.tot;\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return "val=" + nValues + " stm=" + nStms + " loc=" + nLocal\r
+ + " par=" + nPartly + " ful=" + nForeign + " for="\r
+ + sForeign + " vat=" + sValues + " tot=" + tot;\r
+ }\r
+ }\r
+\r
+ enum StmEnum {\r
+ Add(0, (byte) 0), Remove(1, (byte) 0x20);\r
+ StmEnum(int ordinal, byte mask) {\r
+ this.ordinal = ordinal;\r
+ this.mask = mask;\r
+ }\r
+\r
+ public int ordinal;\r
+ private byte mask;\r
+\r
+ byte getOrMask() {\r
+ return mask;\r
+ }\r
+ }\r
+\r
+ final static class Data {\r
+\r
+ final byte mask; // or mask for operation code (don't care bits are zero)\r
+ final short bits; // how many bits are reserved for resource index (0,2,4,6)\r
+ final int bytes;\r
+\r
+ Data(int mask, int bits, ClusterEnum a, ClusterEnum b) {\r
+ this.mask = (byte) (mask << bits);\r
+ this.bits = (short) bits;\r
+ this.bytes = bytes(bits, a, b);\r
+ }\r
+\r
+ private static int bytes(int bits, ClusterEnum a, ClusterEnum b) {\r
+ int left = 6 - bits;\r
+ if (a != ClusterEnum.ForeignShort) {\r
+ left += 6;\r
+ }\r
+ if (b != ClusterEnum.ForeignShort) {\r
+ left += 6;\r
+ }\r
+ int bytes = left >>> 3;\r
+ if ((left & 7) != 0)\r
+ bytes++;\r
+ return bytes;\r
+ }\r
+\r
+\r
+ }\r
+\r
+ enum ClusterEnum {\r
+ Local(0), ForeignShort(1), ForeignLong(2);\r
+ public int ordinal;\r
+\r
+ ClusterEnum(int ordinal) {\r
+ this.ordinal = ordinal;\r
+ }\r
+\r
+ static Data[][][] maps = new Data[2][3][3];\r
+ static {\r
+ maps[StmEnum.Add.ordinal][Local.ordinal][Local.ordinal] = new Data(\r
+ 0, 2, Local, Local);\r
+ maps[StmEnum.Add.ordinal][Local.ordinal][ForeignShort.ordinal] = new Data(\r
+ 12, 4, Local, ForeignShort);\r
+ maps[StmEnum.Add.ordinal][Local.ordinal][ForeignLong.ordinal] = new Data(\r
+ 2, 2, Local, ForeignLong);\r
+ maps[StmEnum.Add.ordinal][ForeignShort.ordinal][Local.ordinal] = new Data(\r
+ 13, 4, ForeignShort, Local);\r
+ maps[StmEnum.Add.ordinal][ForeignShort.ordinal][ForeignShort.ordinal] = new Data(\r
+ 1, 6, ForeignShort, ForeignShort);\r
+ maps[StmEnum.Add.ordinal][ForeignShort.ordinal][ForeignLong.ordinal] = new Data(\r
+ 14, 4, ForeignShort, ForeignLong);\r
+ maps[StmEnum.Add.ordinal][ForeignLong.ordinal][Local.ordinal] = new Data(\r
+ 4, 2, ForeignLong, Local);\r
+ maps[StmEnum.Add.ordinal][ForeignLong.ordinal][ForeignShort.ordinal] = new Data(\r
+ 15, 4, ForeignLong, ForeignShort);\r
+ maps[StmEnum.Add.ordinal][ForeignLong.ordinal][ForeignLong.ordinal] = new Data(\r
+ 6, 2, ForeignLong, ForeignLong);\r
+\r
+ maps[StmEnum.Remove.ordinal][Local.ordinal][Local.ordinal] = new Data(\r
+ 1, 2, Local, Local);\r
+ maps[StmEnum.Remove.ordinal][Local.ordinal][ForeignShort.ordinal] = new Data(\r
+ 49, 0, Local, ForeignShort);\r
+ maps[StmEnum.Remove.ordinal][Local.ordinal][ForeignLong.ordinal] = new Data(\r
+ 3, 2, Local, ForeignLong);\r
+ maps[StmEnum.Remove.ordinal][ForeignShort.ordinal][Local.ordinal] = new Data(\r
+ 2, 4, ForeignShort, Local);\r
+ maps[StmEnum.Remove.ordinal][ForeignShort.ordinal][ForeignShort.ordinal] = new Data(\r
+ 2, 6, ForeignShort, ForeignShort);\r
+ maps[StmEnum.Remove.ordinal][ForeignShort.ordinal][ForeignLong.ordinal] = new Data(\r
+ 50, 0, ForeignShort, ForeignLong);\r
+ maps[StmEnum.Remove.ordinal][ForeignLong.ordinal][Local.ordinal] = new Data(\r
+ 5, 2, ForeignLong, Local);\r
+ maps[StmEnum.Remove.ordinal][ForeignLong.ordinal][ForeignShort.ordinal] = new Data(\r
+ 51, 0, ForeignLong, ForeignShort);\r
+ maps[StmEnum.Remove.ordinal][ForeignLong.ordinal][ForeignLong.ordinal] = new Data(\r
+ 7, 2, ForeignLong, ForeignLong);\r
+ }\r
+\r
+ static Data getData(StmEnum s, ClusterEnum a, ClusterEnum b) {\r
+ return maps[s.ordinal][a.ordinal][b.ordinal];\r
+ // return maps.get(s).get(a).get(b);\r
+ }\r
+ }\r
+\r
+ enum OpEnum {\r
+ Create((byte) 52), Set((byte) 53), SetShort((byte) 56), Delete(\r
+ (byte) 54), Modify((byte) 55);\r
+ OpEnum(byte mask) {\r
+ this.mask = mask;\r
+ }\r
+\r
+ public byte getOrMask() {\r
+ return mask;\r
+ }\r
+\r
+ private byte mask;\r
+ }\r
+}\r