1 package org.simantics.graph.db;
3 import java.io.DataInput;
4 import java.io.DataInputStream;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.nio.ByteBuffer;
9 import java.nio.channels.ReadableByteChannel;
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.List;
13 import java.util.TreeMap;
15 import org.simantics.databoard.Bindings;
16 import org.simantics.databoard.binding.Binding;
17 import org.simantics.databoard.binding.mutable.Variant;
18 import org.simantics.databoard.container.DataContainer;
19 import org.simantics.databoard.container.DataContainers;
20 import org.simantics.databoard.serialization.Serializer;
21 import org.simantics.databoard.type.Datatype;
22 import org.simantics.db.ReadGraph;
23 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;
24 import org.simantics.graph.representation.ByteFileReader;
25 import org.simantics.graph.representation.Extensions;
26 import org.simantics.graph.representation.External;
27 import org.simantics.graph.representation.Identity;
28 import org.simantics.graph.representation.Internal;
29 import org.simantics.graph.representation.Root;
30 import org.simantics.graph.representation.Value;
33 final public class StreamingTransferableGraphFileReader extends ByteFileReader {
35 final static class InputChannel implements ReadableByteChannel {
37 final private InputStream stream;
39 public InputChannel(InputStream stream) {
44 public boolean isOpen() {
49 public void close() throws IOException {
53 public int read(ByteBuffer dst) throws IOException {
54 int pos = dst.position();
55 int limit = dst.limit();
56 int i=stream.read(dst.array(), pos, limit-pos);
62 private static boolean init = true;
64 final private static int SIZE = 1<<18;
66 public StreamingTransferableGraphFileReader(File file) throws Exception {
70 @SuppressWarnings("resource")
71 StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(file, 0);
72 for(int i=0;i<40000;i++) r.readTG();
76 public StreamingTransferableGraphFileReader(InputStream stream) throws Exception {
77 super(null, new InputChannel(stream), SIZE);
80 @SuppressWarnings("resource")
81 StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(stream, 0);
82 for(int i=0;i<40000;i++) r.readTG();
86 public StreamingTransferableGraphFileReader(ReadableByteChannel channel) throws Exception {
87 super(null, channel, SIZE);
90 @SuppressWarnings("resource")
91 StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(channel, 0);
92 for(int i=0;i<40000;i++) r.readTG();
96 public StreamingTransferableGraphFileReader(ReadableByteChannel channel, int size) throws IOException {
97 super(null, channel, SIZE);
100 public StreamingTransferableGraphFileReader(InputStream stream, int size) throws IOException {
101 super(null, new InputChannel(stream), size);
104 public StreamingTransferableGraphFileReader(File file, int size) throws IOException {
108 class FileTransferableGraphSource implements TransferableGraphSource {
110 InputStream in = new InputStream() {
113 public int read() throws IOException {
118 public int read(byte[] b) throws IOException {
119 // FIXME not correctly implemented
120 System.arraycopy(safeBytes(b.length), 0, b, 0, b.length);
125 public int read(byte[] b, int off, int len) throws IOException {
126 for(int i=0;i<len;i++)
127 b[off++] = (byte)getByte();
132 DataInputStream dis = new DataInputStream(in);
134 DataContainer header;
135 Extensions extensions;
138 private int identities = -1;
139 private int stmLength = -1;
140 private int valueLength = -1;
142 public FileTransferableGraphSource() throws Exception {
146 private void init() throws Exception {
149 header = DataContainers.readHeader(dis);
151 // Content variant data type
152 Bindings.getSerializerUnchecked(Datatype.class).deserialize((DataInput)dis);
154 resourceCount = safeInt();
156 List<Object> idcontext = new ArrayList<Object>();
157 extensions = (Extensions)Bindings.getSerializerUnchecked(Extensions.class).deserialize((DataInput)dis, idcontext);
162 public void reset() throws Exception {
163 StreamingTransferableGraphFileReader.this.reset();
164 throw new UnsupportedOperationException();
168 public DataContainer getHeader() throws Exception {
173 public int getResourceCount() throws Exception {
174 return resourceCount;
178 public int getIdentityCount() throws Exception {
179 if(identities == -1) {
180 identities = safeInt();
186 public int getStatementCount() throws Exception {
187 if(stmLength == -1) {
188 stmLength = safeInt();
194 public int getValueCount() throws Exception {
195 if(valueLength == -1) {
196 valueLength = safeInt();
202 public void forStatements(ReadGraph graph, TransferableGraphSourceProcedure<int[]> procedure) throws Exception {
204 int[] value = new int[4];
206 int stmLength = getStatementCount();
208 for(int stmIndex=0;stmIndex<stmLength;) {
210 value[stmIndex & 3] = safeInt();
212 if((stmIndex & 3) == 0) procedure.execute(value);
215 int avail = (SIZE-byteIndex) >> 2;
216 int allowed = Math.min(stmLength-stmIndex, avail);
217 for(int index = byteIndex, i=0;i<allowed;i++) {
218 value[stmIndex & 3] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff));
220 if((stmIndex & 3) == 0) procedure.execute(value);
221 // statements[stmIndex++] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff));
223 byteIndex += allowed<<2;
230 public void forIdentities(ReadGraph graph, TransferableGraphSourceProcedure<Identity> procedure) throws Exception {
232 int identities = getIdentityCount();
234 for(int i=0;i<identities;i++) {
237 byte type = bytes[byteIndex++];
241 int parent = safeInt();
242 int nameLen = getDynamicUInt32();
243 if(byteIndex+nameLen < SIZE) {
244 procedure.execute(new Identity(rid, new External(parent, utf(bytes, byteIndex, byteIndex + nameLen))));
245 byteIndex += nameLen;
247 procedure.execute(new Identity(rid, new External(parent, utf(safeBytes(nameLen), 0, nameLen))));
253 int parent = safeInt();
254 int nameLen = getDynamicUInt32();
255 if(byteIndex+nameLen < SIZE) {
256 procedure.execute(new Identity(rid, new Internal(parent, utf(bytes, byteIndex, byteIndex + nameLen))));
257 byteIndex += nameLen;
259 procedure.execute(new Identity(rid, new Internal(parent, utf(safeBytes(nameLen), 0, nameLen))));
266 int nameLen = getDynamicUInt32();
267 String name = utf(safeBytes(nameLen), 0, nameLen);
268 int nameLen2 = getDynamicUInt32();
269 String rType = utf(safeBytes(nameLen2), 0, nameLen2);
270 procedure.execute(new Identity(rid, new Root(name, rType)));
275 throw new UnsupportedOperationException("Optional identities not supported");
277 throw new IllegalStateException("Unsupported identity type " + type);
285 public void forValues(ReadGraph graph, TransferableGraphSourceProcedure<Value> procedure) throws Exception {
287 int valueLength = getValueCount();
289 Serializer variantSerializer = Bindings.getSerializerUnchecked(Bindings.VARIANT);
291 List<Object> idcontext = new ArrayList<>();
293 for(int i=0;i<valueLength;i++) {
294 int resource = safeInt();
296 Variant value = (Variant)variantSerializer
297 .deserialize((DataInput)dis, idcontext);
298 procedure.execute(new Value(resource, value));
304 public void forValues2(ReadGraph graph, TransferableGraphSourceValueProcedure procedure) throws Exception {
306 Binding datatypeBinding = Bindings.getBinding(Datatype.class);
307 Serializer datatypeSerializer = Bindings.getSerializerUnchecked(datatypeBinding);
309 List<Object> idContext = new ArrayList<>();
311 for(int i=0;i<valueLength;i++) {
312 int resource = safeInt();
314 Datatype type = (Datatype)datatypeSerializer.deserialize((DataInput)dis, idContext);
315 procedure.execute(resource, type, dis);
321 public TreeMap<String, Variant> getExtensions() {
322 return extensions.map;
326 public void close() {
330 public TransferableGraphSource readTG() throws Exception {
332 if(getSize() == 0) return null;
334 return new FileTransferableGraphSource();
338 public static void main(String[] args) {
342 File file = new File("c:/work/Model.apros");
343 StreamingTransferableGraphFileReader reader = new StreamingTransferableGraphFileReader(file, SIZE);
344 reader = new StreamingTransferableGraphFileReader(file);
345 long s = System.nanoTime();
346 TransferableGraphSource tgs = reader.readTG();
347 int ids = tgs.getIdentityCount();
348 System.out.println("identity count " + ids);
349 // tgs.forIdentities(null, id -> { /*System.out.println("Identity: " + id);*/ });
350 tgs.forIdentities(null, id -> { System.out.println("Identity: " + id); });
351 int stats = tgs.getStatementCount();
352 System.out.println("statement count " + stats/4 + " (" + stats + ")");
353 // tgs.forStatements(null, id -> { /*System.out.println(Arrays.toString(id));*/ });
354 tgs.forStatements(null, id -> { System.out.println(Arrays.toString(id)); });
355 int values = tgs.getValueCount();
356 System.out.println("value count " + values);
358 tgs.forValues2(null, new TransferableGraphSourceValueProcedure() {
360 public void rawCopy(int resource, int length, DataInput input) throws Exception {
361 System.out.println("value " + (valueNo[0]++) + ": rawCopy("+ resource + ", " + length + ", " + input + ")");
362 for (int i = 0; i < length; ++i)
366 public void execute(int resource, Datatype type, DataInput input) throws Exception {
367 Object value = Bindings.getSerializer(Bindings.getBinding(type)).deserialize(input);
368 System.out.println("value " + (valueNo[0]++) + ": execute("+ resource + ", " + type.toSingleLineString() + ", " + input + "): " + value);
371 long d = System.nanoTime() - s;
372 System.err.println("Duration=" + 1e-9*d + "s.");
373 } catch (Throwable t) {