package org.simantics.acorn.internal; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import org.simantics.acorn.ClusterManager; import org.simantics.acorn.exception.IllegalAcornStateException; import org.simantics.acorn.internal.ClusterStream.ClusterEnum; import org.simantics.acorn.internal.ClusterStream.Data; import org.simantics.acorn.internal.ClusterStream.StmEnum; import org.simantics.db.exception.DatabaseException; import org.simantics.db.service.Bytes; import org.simantics.db.service.ClusterUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import fi.vtt.simantics.procore.internal.ClusterChange2; abstract public class ClusterUpdateProcessorBase { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterUpdateProcessorBase.class); public final static boolean DEBUG = false; final protected ClusterManager manager; final public byte[] bytes; private int pos = 0; final private int len; final private ClusterUID uid; final private int clusterKey; final public int version; final Map clusterKeyCache = new HashMap<>(); public int getResourceKey(ClusterUID uid, int index) throws IllegalAcornStateException { Integer match = clusterKeyCache.get(uid); if(match != null) return match+index; int key = manager.getResourceKeyWitoutMutex(uid, 0); clusterKeyCache.put(uid, key); return key+index; } public ClusterUpdateProcessorBase(ClusterManager client, byte[] operations) throws DatabaseException { this.manager = client; this.bytes = operations; this.len = Bytes.readLE4(bytes, 0)+4; // whatta? this.version = Bytes.readLE4(bytes, 4); long cuid1 = Bytes.readLE8(bytes, 8); long cuid2 = Bytes.readLE8(bytes, 16); uid = ClusterUID.make(cuid1, cuid2); pos = 24; client.clusterLRU.acquireMutex(); try { clusterKey = client.clusterLRU.getClusterKeyByUID(cuid1, cuid2) << 12; } catch (Throwable t) { throw new IllegalStateException(t); } finally { client.clusterLRU.releaseMutex(); } } public ClusterUID getClusterUID() { return uid; } private void processCreate() { int r = Bytes.readLE2(bytes, pos); pos+=2; if(DEBUG) System.err.println("DEBUG: New ri=" + r + " offset=" + (pos-3-24)); try { create(); } catch (DatabaseException e) { LOGGER.error("resource create failed", e); } } private void processDelete() { int ri = Bytes.readLE2(bytes, pos); pos += 2; if(DEBUG) System.err.println("DEBUG: Delete " + ri); try { delete(ri); } catch (DatabaseException e) { LOGGER.error("resource {} value delete failed", ri, e); } } private void processModify(int op) { int ri = Bytes.readLE2(bytes, pos); pos+=2; long offset = Bytes.readLE7(bytes, pos); pos+=7; int size = Bytes.readLE2(bytes, pos); pos+=2; offset += (ri>>14) << 56; ri = ri & 0x3FFF; if(size < 0) throw new IllegalStateException(); if(ri < 1) throw new IllegalStateException(); if(ri > 4095) throw new IllegalStateException(); if(DEBUG) System.err.println("DEBUG: Modify " + ri + " " + offset + " " + size + " offset=" + (pos-1-24) + " " + Arrays.toString(Arrays.copyOf(valueBuffer,size))); try { modify(clusterKey + ri, offset, size, bytes, pos); } catch (DatabaseException e) { LOGGER.error("resource value modify(clusterKey: {}, ri: {}, offset: {}, size: {}, pos: {}) failed", clusterKey, ri, offset, size, pos, e); } pos += size; } private void processSet(int op) { int s = Bytes.readLE4(bytes, pos); int length = (s >> 14); if(length < 1) throw new IllegalStateException(); int r = s & 0x3FFF; pos += 4; System.arraycopy(bytes, pos, valueBuffer, 0, length); pos += length; if(DEBUG) System.err.println("DEBUG: Set " + r + " " + length + " offset=" + (pos-1-24) + " " + Arrays.toString(Arrays.copyOf(valueBuffer,length))); try { set(clusterKey+r, valueBuffer, length); } catch (DatabaseException e) { LOGGER.error("resource value set(clusterKey: {}, r: {}, length: {}) failed", clusterKey, r, length, e); } } byte[] valueBuffer = new byte[65536]; private void processSetShort(int op) { int s = Bytes.readLE2(bytes, pos); int length = ((op&7)<<2) + (s >> 14); if(length < 1) throw new IllegalStateException(); if(length > 31) throw new IllegalStateException(); int r = s & 0x3FFF; if(DEBUG) System.err.println("DEBUG: SetShort " + r + " " + length + " offset=" + (pos-1-24) + " " + Arrays.toString(Arrays.copyOf(valueBuffer,length))); pos += 2; System.arraycopy(bytes, pos, valueBuffer, 0, length); pos += length; try { set(clusterKey+r, valueBuffer, length); } catch (DatabaseException e) { LOGGER.error("resource value setShort(clusterKey: {}, r: {}, length: {}) failed", clusterKey, r, length, e); } } private void processStatementResource(ClusterEnum enu, int pOrO) { if(ClusterEnum.ForeignShort == enu) { int fs = bytes[pos++]&0xff; foreignRefs[pOrO] = fs; } else if(ClusterEnum.Local == enu) { int lo = bytes[pos++]&0xff; lows[pOrO] = lo; } else { long l1 = Bytes.readLE8(bytes, pos); pos += 8; long l2 = Bytes.readLE8(bytes, pos); pos += 8; ClusterUID cuid = ClusterUID.make(l1, l2); foreignClusters[foreignPos] = cuid; int lo = bytes[pos++]&0xff; foreignIndices[foreignPos] = lo; foreignRefs[pOrO] = foreignPos; foreignPos++; lows[pOrO] = lo; } } ClusterUID[] foreignClusters = new ClusterUID[256]; int[] foreignIndices = new int[256]; int foreignPos = 0; int lows[] = new int[2]; int foreignRefs[] = new int[2]; private void processStatement(int op, StmEnum stmEnum, ClusterEnum p, ClusterEnum o) throws IllegalAcornStateException { int curPos = pos-1-24; processStatementResource(p, 0); processStatementResource(o, 1); int ri = bytes[pos++]&0xff; int pi = 0; int oi = 0; ClusterUID puid = uid; ClusterUID ouid = puid; if(ClusterEnum.ForeignShort == p && ClusterEnum.ForeignShort == o) { ri |= (op&0x3F) << 8; } else { Data data = ClusterEnum.getData(stmEnum, p, o); // data.left is the amount of bytes in last two bytes if(data.bytes == 0) { ri = ri | ((op&0x3F)<<8); } else { int extra = 0; int opBits = data.bits; int extraBits = 6-opBits; if(data.bytes == 1) { extra = bytes[pos++]&0xff; int high = extra >> extraBits; if(ClusterEnum.ForeignShort == p) { oi = lows[1] + (high<<8); } else { pi = lows[0] + (high<<8); } } else { extra = Bytes.readLE2(bytes, pos); pos += 2; int high1 = (extra >> extraBits)&((1<<6)-1); int high2 = (extra >> (extraBits+6))&((1<<6)-1); if(ClusterEnum.ForeignShort == p) { oi = lows[1] + (high1<<8); } else { pi = lows[0] + (high1<<8); oi = lows[1] + (high2<<8); } } ri = ri | ((extra&((1< 4095) throw new IllegalStateException(); if(pi > 4095) throw new IllegalStateException(); if(oi > 4095) throw new IllegalStateException(); if(StmEnum.Add == stmEnum) { if(DEBUG) System.err.println("DEBUG: ClusterChange " + uid + ": Add ri=" + ri + " pi=" + pi + " oi=" + oi + " pc=" + puid + " oc=" + ouid + " offset=" + curPos + " " + p.ordinal + " " + o.ordinal); int predicateKey = getResourceKey(puid, pi); int objectKey = getResourceKey(ouid, oi); try { claim(clusterKey+ri, predicateKey, objectKey, puid, ouid); } catch (DatabaseException e) { LOGGER.error("statement add(clusterKey: {}, ri: {}, predicateKey: {}, objectKey: {}, puid: {}, ouid: {}) failed", clusterKey, ri, predicateKey, objectKey, puid.toString(), ouid.toString(), e); } } else { if(DEBUG) System.err.println("DEBUG: ClusterChange " + uid + ": Rem ri=" + ri + " pi=" + pi + " oi=" + oi + " pc=" + puid + " oc=" + ouid + " offset=" + curPos + " " + p.ordinal + " " + o.ordinal); int predicateKey = getResourceKey(puid, pi); int objectKey = getResourceKey(ouid, oi); try { deny(clusterKey+ri, predicateKey, objectKey, puid, ouid); } catch (DatabaseException e) { LOGGER.error("statement deny(clusterKey: {}, ri: {}, predicateKey: {}, objectKey: {}, puid: {}, ouid: {}) failed", clusterKey, ri, predicateKey, objectKey, puid.toString(), ouid.toString(), e); } } } public void process() throws IllegalAcornStateException { if (version == ClusterChange.VERSION) { process1(); } else if (version == ClusterChange2.VERSION) { process2(); } } private void process1() throws IllegalAcornStateException { foreignPos = 0; if(DEBUG) System.err.println("DEBUG: process " + uid + " " + len); // op resolution for statement operation: // 2 first bits // op: 01 | r8-13 // op: 10 | r8-13 // 3 first bits (000) // op: 000000 | r12-13 // op: 000001 | r12-13 // op: 000010 | r12-13 // op: 000011 | r12-13 // op: 000100 | r12-13 // op: 000101 | r12-13 // op: 000110 | r12-13 // op: 000111 | r12-13 // 4 first bits // op: 1100 | r10-13 // op: 1101 | r10-13 // op: 1110 | r10-13 // op: 1111 | r10-13 // op: 0010 | r10-13 // 6 bits // op: 00110001 = 49 // op: 00110010 = 50 // op: 00110011 = 51 // other: 0011xxxx while(pos < len) { int op = bytes[pos++]&0xff; // common prefix: 0011 switch(op) { case 49: processStatement(op, StmEnum.Remove, ClusterEnum.Local, ClusterEnum.ForeignShort); break; case 50: processStatement(op, StmEnum.Remove, ClusterEnum.ForeignShort, ClusterEnum.ForeignLong); break; case 51: processStatement(op, StmEnum.Remove, ClusterEnum.ForeignLong, ClusterEnum.ForeignShort); break; // 52 = 32+16+4 = 00110100 case 52: processCreate(); break; // 53 = 32+16+4+1 = 00110101 case 53: processSet(op); break; // 54 = 32+16+4+2 = 00110110 case 54: processDelete(); break; // 55 = 32+16+4+2+1 = 00110111 case 55: processModify(op); break; default: int bits6 = ((int)op)&0xC0; switch(bits6) { case 0x40: processStatement(op, StmEnum.Add, ClusterEnum.ForeignShort, ClusterEnum.ForeignShort); break; case 0x80: processStatement(op, StmEnum.Remove, ClusterEnum.ForeignShort, ClusterEnum.ForeignShort); break; default: int bits5 = ((int)op)&0xE0; if(bits5 == 0) { int bits2 = (((int)op)&0xFC) >> 2; // 3 top bits are 0 // 6 bits of op switch(bits2) { case 0: processStatement(op, StmEnum.Add, ClusterEnum.Local, ClusterEnum.Local); break; case 1: processStatement(op, StmEnum.Remove, ClusterEnum.Local, ClusterEnum.Local); break; case 2: processStatement(op, StmEnum.Add, ClusterEnum.Local, ClusterEnum.ForeignLong); break; case 3: processStatement(op, StmEnum.Remove, ClusterEnum.Local, ClusterEnum.ForeignLong); break; case 4: processStatement(op, StmEnum.Add, ClusterEnum.ForeignLong, ClusterEnum.Local); break; case 5: processStatement(op, StmEnum.Remove, ClusterEnum.ForeignLong, ClusterEnum.Local); break; case 6: processStatement(op, StmEnum.Add, ClusterEnum.ForeignLong, ClusterEnum.ForeignLong); break; case 7: processStatement(op, StmEnum.Remove, ClusterEnum.ForeignLong, ClusterEnum.ForeignLong); break; } } else { // 4 top bits of op // 4 low bits of payload int bits4 = (((int)op)&0xF0)>>4; switch(bits4) { case 0b1100: processStatement(op, StmEnum.Add, ClusterEnum.Local, ClusterEnum.ForeignShort); break; case 0b1101: processStatement(op, StmEnum.Add, ClusterEnum.ForeignShort, ClusterEnum.Local); break; case 0b1110: processStatement(op, StmEnum.Add, ClusterEnum.ForeignShort, ClusterEnum.ForeignLong); break; case 0b1111: processStatement(op, StmEnum.Add, ClusterEnum.ForeignLong, ClusterEnum.ForeignShort); break; case 0b0010: processStatement(op, StmEnum.Remove, ClusterEnum.ForeignShort, ClusterEnum.Local); break; case 0b0011: int bits3 = (((int)op)&0xF8)>>3; if(bits3 == 7) processSetShort(op); break; } } } } } } private void process2() throws IllegalAcornStateException { while(pos < len) { int op = bytes[pos++]&0xff; switch(op) { case ClusterChange2.SET_IMMUTABLE_OPERATION: processSetImmutable(op); break; case ClusterChange2.UNDO_VALUE_OPERATION: processUndoValue(op); break; case ClusterChange2.SET_DELETED_OPERATION: // TODO: implement? break; default: throw new IllegalAcornStateException("Can not process operation " + op + " for cluster " + uid); } } } private void processSetImmutable(int op) { int value = bytes[pos++]&0xff; setImmutable(value > 0); } private void processUndoValue(int op) { Bytes.readLE4(bytes, pos); pos+=4; } abstract void create() throws DatabaseException; abstract void delete(int resourceIndex) throws DatabaseException; abstract void modify(int resourceKey, long offset, int size, byte[] bytes, int pos) throws DatabaseException; abstract void set(int resourceKey, byte[] bytes, int length) throws DatabaseException; abstract void claim(int resourceKey, int predicateKey, int objectKey, ClusterUID puid, ClusterUID ouid) throws DatabaseException; abstract void deny(int resourceKey, int predicateKey, int objectKey, ClusterUID puid, ClusterUID ouid) throws DatabaseException; abstract void setImmutable(boolean value); }