+package org.simantics.graph.db;\r
+\r
+import java.io.DataInput;\r
+import java.io.DataInputStream;\r
+import java.io.File;\r
+import java.io.IOException;\r
+import java.io.InputStream;\r
+import java.nio.ByteBuffer;\r
+import java.nio.channels.ReadableByteChannel;\r
+import java.util.ArrayList;\r
+import java.util.Arrays;\r
+import java.util.List;\r
+import java.util.TreeMap;\r
+\r
+import org.simantics.databoard.Bindings;\r
+import org.simantics.databoard.binding.Binding;\r
+import org.simantics.databoard.binding.mutable.Variant;\r
+import org.simantics.databoard.container.DataContainer;\r
+import org.simantics.databoard.container.DataContainers;\r
+import org.simantics.databoard.serialization.Serializer;\r
+import org.simantics.databoard.type.Datatype;\r
+import org.simantics.db.ReadGraph;\r
+import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;\r
+import org.simantics.graph.representation.ByteFileReader;\r
+import org.simantics.graph.representation.Extensions;\r
+import org.simantics.graph.representation.External;\r
+import org.simantics.graph.representation.Identity;\r
+import org.simantics.graph.representation.Internal;\r
+import org.simantics.graph.representation.Root;\r
+import org.simantics.graph.representation.Value;\r
+\r
+\r
+final public class StreamingTransferableGraphFileReader extends ByteFileReader {\r
+\r
+ final static class InputChannel implements ReadableByteChannel {\r
+\r
+ final private InputStream stream;\r
+ \r
+ public InputChannel(InputStream stream) {\r
+ this.stream = stream;\r
+ }\r
+ \r
+ @Override\r
+ public boolean isOpen() {\r
+ return true;\r
+ }\r
+\r
+ @Override\r
+ public void close() throws IOException {\r
+ }\r
+\r
+ @Override\r
+ public int read(ByteBuffer dst) throws IOException {\r
+ int pos = dst.position();\r
+ int limit = dst.limit();\r
+ int i=stream.read(dst.array(), pos, limit-pos);\r
+ return i;\r
+ }\r
+ \r
+ }\r
+ \r
+ private static boolean init = true;\r
+ \r
+ final private static int SIZE = 1<<18;\r
+ \r
+ public StreamingTransferableGraphFileReader(File file) throws Exception {\r
+ super(file, SIZE);\r
+ if(init) {\r
+ init=false;\r
+ @SuppressWarnings("resource")\r
+ StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(file, 0);\r
+ for(int i=0;i<40000;i++) r.readTG();\r
+ }\r
+ }\r
+ \r
+ public StreamingTransferableGraphFileReader(InputStream stream) throws Exception {\r
+ super(null, new InputChannel(stream), SIZE);\r
+ if(init) {\r
+ init=false;\r
+ @SuppressWarnings("resource")\r
+ StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(stream, 0);\r
+ for(int i=0;i<40000;i++) r.readTG();\r
+ }\r
+ }\r
+\r
+ public StreamingTransferableGraphFileReader(ReadableByteChannel channel) throws Exception {\r
+ super(null, channel, SIZE);\r
+ if(init) {\r
+ init=false;\r
+ @SuppressWarnings("resource")\r
+ StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(channel, 0);\r
+ for(int i=0;i<40000;i++) r.readTG();\r
+ }\r
+ }\r
+\r
+ public StreamingTransferableGraphFileReader(ReadableByteChannel channel, int size) throws IOException {\r
+ super(null, channel, SIZE);\r
+ }\r
+\r
+ public StreamingTransferableGraphFileReader(InputStream stream, int size) throws IOException {\r
+ super(null, new InputChannel(stream), size);\r
+ }\r
+\r
+ public StreamingTransferableGraphFileReader(File file, int size) throws IOException {\r
+ super(file, size);\r
+ }\r
+\r
+ class FileTransferableGraphSource implements TransferableGraphSource {\r
+\r
+ InputStream in = new InputStream() {\r
+\r
+ @Override\r
+ public int read() throws IOException {\r
+ return getByte();\r
+ }\r
+ \r
+ @Override\r
+ public int read(byte[] b) throws IOException {\r
+ // FIXME not correctly implemented \r
+ System.arraycopy(safeBytes(b.length), 0, b, 0, b.length);\r
+ return b.length;\r
+ }\r
+ \r
+ @Override\r
+ public int read(byte[] b, int off, int len) throws IOException {\r
+ for(int i=0;i<len;i++)\r
+ b[off++] = (byte)getByte();\r
+ return len;\r
+ }\r
+ \r
+ };\r
+ DataInputStream dis = new DataInputStream(in);\r
+ \r
+ DataContainer header;\r
+ Extensions extensions;\r
+ int resourceCount;\r
+ \r
+ private int identities;\r
+ private int stmLength;\r
+ private int valueLength;\r
+ \r
+ public FileTransferableGraphSource() throws Exception {\r
+ init();\r
+ }\r
+ \r
+ private void init() throws Exception {\r
+\r
+ // Header\r
+ header = DataContainers.readHeader(dis);\r
+ \r
+ // Content variant data type\r
+ Bindings.getSerializerUnchecked(Datatype.class).deserialize((DataInput)dis);\r
+\r
+ resourceCount = safeInt();\r
+ \r
+ List<Object> idcontext = new ArrayList<Object>(); \r
+ extensions = (Extensions)Bindings.getSerializerUnchecked(Extensions.class).deserialize((DataInput)dis, idcontext);\r
+ \r
+ }\r
+ \r
+ @Override\r
+ public void reset() throws Exception {\r
+ StreamingTransferableGraphFileReader.this.reset();\r
+ throw new UnsupportedOperationException();\r
+ }\r
+ \r
+ @Override\r
+ public DataContainer getHeader() throws Exception {\r
+ return header;\r
+ }\r
+ \r
+ @Override\r
+ public int getResourceCount() throws Exception {\r
+ return resourceCount;\r
+ }\r
+\r
+ @Override\r
+ public int getIdentityCount() throws Exception {\r
+ identities = safeInt();\r
+ return identities;\r
+ }\r
+\r
+ @Override\r
+ public int getStatementCount() throws Exception {\r
+ stmLength = safeInt();\r
+ return stmLength;\r
+ }\r
+\r
+ @Override\r
+ public int getValueCount() throws Exception {\r
+ valueLength = safeInt();\r
+ return valueLength;\r
+ }\r
+\r
+ @Override\r
+ public void forStatements(ReadGraph graph, TransferableGraphSourceProcedure<int[]> procedure) throws Exception {\r
+ \r
+ int[] value = new int[4];\r
+\r
+ for(int stmIndex=0;stmIndex<stmLength;) {\r
+ \r
+ value[stmIndex & 3] = safeInt();\r
+ stmIndex++;\r
+ if((stmIndex & 3) == 0) procedure.execute(value);\r
+ \r
+ // Cached bytes \r
+ int avail = (SIZE-byteIndex) >> 2;\r
+ int allowed = Math.min(stmLength-stmIndex, avail);\r
+ for(int index = byteIndex, i=0;i<allowed;i++) {\r
+ value[stmIndex & 3] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff));\r
+ stmIndex++;\r
+ if((stmIndex & 3) == 0) procedure.execute(value);\r
+// statements[stmIndex++] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff)); \r
+ }\r
+ byteIndex += allowed<<2;\r
+ \r
+ }\r
+ \r
+ }\r
+\r
+ @Override\r
+ public void forIdentities(ReadGraph graph, TransferableGraphSourceProcedure<Identity> procedure) throws Exception {\r
+ \r
+ for(int i=0;i<identities;i++) {\r
+ \r
+ int rid = safeInt();\r
+ byte type = bytes[byteIndex++];\r
+ // External\r
+ if(type == 1) {\r
+ \r
+ int parent = safeInt();\r
+ int nameLen = bytes[byteIndex++]&0xff;\r
+ \r
+ if(byteIndex+nameLen < SIZE) {\r
+ procedure.execute(new Identity(rid, new External(parent, utf(bytes, byteIndex, byteIndex + nameLen))));\r
+ byteIndex += nameLen;\r
+ } else {\r
+ procedure.execute(new Identity(rid, new External(parent, utf(safeBytes(nameLen), 0, nameLen))));\r
+ }\r
+ \r
+ } \r
+ // Internal\r
+ else if(type == 3) {\r
+ \r
+ int parent = safeInt();\r
+ int nameLen = bytes[byteIndex++]&0xff;\r
+ if(byteIndex+nameLen < SIZE) {\r
+ procedure.execute(new Identity(rid, new Internal(parent, utf(bytes, byteIndex, byteIndex + nameLen))));\r
+ byteIndex += nameLen;\r
+ } else {\r
+ procedure.execute(new Identity(rid, new Internal(parent, utf(safeBytes(nameLen), 0, nameLen))));\r
+ }\r
+ \r
+ }\r
+ // Root\r
+ else if(type == 0) {\r
+ \r
+ int nameLen = bytes[byteIndex++]&0xff;\r
+ String name = utf(safeBytes(nameLen), 0, nameLen);\r
+ int nameLen2 = bytes[byteIndex++]&0xff;\r
+ String rType = utf(safeBytes(nameLen2), 0, nameLen2);\r
+ procedure.execute(new Identity(rid, new Root(name, rType)));\r
+\r
+ } else if(type == 2) {\r
+ throw new UnsupportedOperationException();\r
+ }\r
+\r
+ }\r
+ \r
+ }\r
+\r
+ @Override\r
+ public void forValues(ReadGraph graph, TransferableGraphSourceProcedure<Value> procedure) throws Exception {\r
+ \r
+ Serializer variantSerializer = Bindings.getSerializerUnchecked(Bindings.VARIANT);\r
+ \r
+ List<Object> idcontext = new ArrayList<>(); \r
+ \r
+ for(int i=0;i<valueLength;i++) {\r
+ int resource = safeInt();\r
+ Variant value = (Variant)variantSerializer\r
+ .deserialize((DataInput)dis, idcontext);\r
+ procedure.execute(new Value(resource, value));\r
+ }\r
+ \r
+ \r
+ }\r
+ \r
+ @Override\r
+ public void forValues2(ReadGraph graph, TransferableGraphSourceValueProcedure procedure) throws Exception {\r
+\r
+ Binding datatypeBinding = Bindings.getBinding(Datatype.class);\r
+ Serializer datatypeSerializer = Bindings.getSerializerUnchecked(datatypeBinding);\r
+ \r
+ List<Object> idContext = new ArrayList<>(); \r
+ \r
+ for(int i=0;i<valueLength;i++) {\r
+ int resource = safeInt();\r
+ idContext.clear();\r
+ Datatype type = (Datatype)datatypeSerializer.deserialize((DataInput)dis, idContext);\r
+ procedure.execute(resource, type, dis);\r
+ }\r
+ \r
+ }\r
+\r
+ @Override\r
+ public TreeMap<String, Variant> getExtensions() {\r
+ return extensions.map;\r
+ }\r
+\r
+ @Override\r
+ public void close() {\r
+ }\r
+ }\r
+ \r
+ public TransferableGraphSource readTG() throws Exception {\r
+\r
+ if(getSize() == 0) return null;\r
+ \r
+ return new FileTransferableGraphSource();\r
+\r
+ }\r
+ \r
+ public static void main(String[] args) {\r
+\r
+ try {\r
+\r
+ File file = new File("c:/work/Model.apros");\r
+ StreamingTransferableGraphFileReader reader = new StreamingTransferableGraphFileReader(file, SIZE);\r
+ reader = new StreamingTransferableGraphFileReader(file);\r
+ long s = System.nanoTime();\r
+ TransferableGraphSource tgs = reader.readTG();\r
+ int ids = tgs.getIdentityCount();\r
+ System.out.println("identity count " + ids);\r
+// tgs.forIdentities(null, id -> { /*System.out.println("Identity: " + id);*/ });\r
+ tgs.forIdentities(null, id -> { System.out.println("Identity: " + id); });\r
+ int stats = tgs.getStatementCount();\r
+ System.out.println("statement count " + stats/4 + " (" + stats + ")");\r
+// tgs.forStatements(null, id -> { /*System.out.println(Arrays.toString(id));*/ });\r
+ tgs.forStatements(null, id -> { System.out.println(Arrays.toString(id)); });\r
+ int values = tgs.getValueCount();\r
+ System.out.println("value count " + values);\r
+ int[] valueNo = {0};\r
+ tgs.forValues2(null, new TransferableGraphSourceValueProcedure() {\r
+ @Override\r
+ public void rawCopy(int resource, int length, DataInput input) throws Exception {\r
+ System.out.println("value " + (valueNo[0]++) + ": rawCopy("+ resource + ", " + length + ", " + input + ")");\r
+ for (int i = 0; i < length; ++i)\r
+ input.readByte();\r
+ }\r
+ @Override\r
+ public void execute(int resource, Datatype type, DataInput input) throws Exception {\r
+ Object value = Bindings.getSerializer(Bindings.getBinding(type)).deserialize(input);\r
+ System.out.println("value " + (valueNo[0]++) + ": execute("+ resource + ", " + type.toSingleLineString() + ", " + input + "): " + value);\r
+ }\r
+ });\r
+ long d = System.nanoTime() - s;\r
+ System.err.println("Duration=" + 1e-9*d + "s.");\r
+ } catch (Throwable t) {\r
+ t.printStackTrace();\r
+ }\r
+ }\r
+\r
+}\r