1 package org.simantics.graph.db;
3 import java.io.DataInput;
4 import java.io.DataInputStream;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.nio.ByteBuffer;
9 import java.nio.channels.ReadableByteChannel;
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.List;
13 import java.util.TreeMap;
15 import org.simantics.databoard.Bindings;
16 import org.simantics.databoard.binding.Binding;
17 import org.simantics.databoard.binding.mutable.Variant;
18 import org.simantics.databoard.container.DataContainer;
19 import org.simantics.databoard.container.DataContainers;
20 import org.simantics.databoard.serialization.Serializer;
21 import org.simantics.databoard.type.Datatype;
22 import org.simantics.db.ReadGraph;
23 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;
24 import org.simantics.graph.representation.ByteFileReader;
25 import org.simantics.graph.representation.Extensions;
26 import org.simantics.graph.representation.External;
27 import org.simantics.graph.representation.Identity;
28 import org.simantics.graph.representation.Internal;
29 import org.simantics.graph.representation.Root;
30 import org.simantics.graph.representation.Value;
33 final public class StreamingTransferableGraphFileReader extends ByteFileReader {
35 final static class InputChannel implements ReadableByteChannel {
37 final private InputStream stream;
39 public InputChannel(InputStream stream) {
44 public boolean isOpen() {
49 public void close() throws IOException {
53 public int read(ByteBuffer dst) throws IOException {
54 int pos = dst.position();
55 int limit = dst.limit();
56 int i=stream.read(dst.array(), pos, limit-pos);
62 private static boolean init = true;
64 final private static int SIZE = 1<<18;
66 public StreamingTransferableGraphFileReader(File file) throws Exception {
70 @SuppressWarnings("resource")
71 StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(file, 0);
72 for(int i=0;i<40000;i++) r.readTG();
76 public StreamingTransferableGraphFileReader(InputStream stream) throws Exception {
77 super(null, new InputChannel(stream), SIZE);
80 @SuppressWarnings("resource")
81 StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(stream, 0);
82 for(int i=0;i<40000;i++) r.readTG();
86 public StreamingTransferableGraphFileReader(ReadableByteChannel channel) throws Exception {
87 super(null, channel, SIZE);
90 @SuppressWarnings("resource")
91 StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(channel, 0);
92 for(int i=0;i<40000;i++) r.readTG();
96 public StreamingTransferableGraphFileReader(ReadableByteChannel channel, int size) throws IOException {
97 super(null, channel, SIZE);
100 public StreamingTransferableGraphFileReader(InputStream stream, int size) throws IOException {
101 super(null, new InputChannel(stream), size);
104 public StreamingTransferableGraphFileReader(File file, int size) throws IOException {
108 class FileTransferableGraphSource implements TransferableGraphSource {
110 InputStream in = new InputStream() {
113 public int read() throws IOException {
118 public int read(byte[] b) throws IOException {
119 // FIXME not correctly implemented
120 System.arraycopy(safeBytes(b.length), 0, b, 0, b.length);
125 public int read(byte[] b, int off, int len) throws IOException {
126 for(int i=0;i<len;i++)
127 b[off++] = (byte)getByte();
132 DataInputStream dis = new DataInputStream(in);
134 DataContainer header;
135 Extensions extensions;
138 private int identities;
139 private int stmLength;
140 private int valueLength;
142 public FileTransferableGraphSource() throws Exception {
146 private void init() throws Exception {
149 header = DataContainers.readHeader(dis);
151 // Content variant data type
152 Bindings.getSerializerUnchecked(Datatype.class).deserialize((DataInput)dis);
154 resourceCount = safeInt();
156 List<Object> idcontext = new ArrayList<Object>();
157 extensions = (Extensions)Bindings.getSerializerUnchecked(Extensions.class).deserialize((DataInput)dis, idcontext);
162 public void reset() throws Exception {
163 StreamingTransferableGraphFileReader.this.reset();
164 throw new UnsupportedOperationException();
168 public DataContainer getHeader() throws Exception {
173 public int getResourceCount() throws Exception {
174 return resourceCount;
178 public int getIdentityCount() throws Exception {
179 identities = safeInt();
184 public int getStatementCount() throws Exception {
185 stmLength = safeInt();
190 public int getValueCount() throws Exception {
191 valueLength = safeInt();
196 public void forStatements(ReadGraph graph, TransferableGraphSourceProcedure<int[]> procedure) throws Exception {
198 int[] value = new int[4];
200 for(int stmIndex=0;stmIndex<stmLength;) {
202 value[stmIndex & 3] = safeInt();
204 if((stmIndex & 3) == 0) procedure.execute(value);
207 int avail = (SIZE-byteIndex) >> 2;
208 int allowed = Math.min(stmLength-stmIndex, avail);
209 for(int index = byteIndex, i=0;i<allowed;i++) {
210 value[stmIndex & 3] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff));
212 if((stmIndex & 3) == 0) procedure.execute(value);
213 // statements[stmIndex++] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff));
215 byteIndex += allowed<<2;
222 public void forIdentities(ReadGraph graph, TransferableGraphSourceProcedure<Identity> procedure) throws Exception {
224 for(int i=0;i<identities;i++) {
227 byte type = bytes[byteIndex++];
231 int parent = safeInt();
232 int nameLen = bytes[byteIndex++]&0xff;
234 if(byteIndex+nameLen < SIZE) {
235 procedure.execute(new Identity(rid, new External(parent, utf(bytes, byteIndex, byteIndex + nameLen))));
236 byteIndex += nameLen;
238 procedure.execute(new Identity(rid, new External(parent, utf(safeBytes(nameLen), 0, nameLen))));
245 int parent = safeInt();
246 int nameLen = bytes[byteIndex++]&0xff;
247 if(byteIndex+nameLen < SIZE) {
248 procedure.execute(new Identity(rid, new Internal(parent, utf(bytes, byteIndex, byteIndex + nameLen))));
249 byteIndex += nameLen;
251 procedure.execute(new Identity(rid, new Internal(parent, utf(safeBytes(nameLen), 0, nameLen))));
258 int nameLen = bytes[byteIndex++]&0xff;
259 String name = utf(safeBytes(nameLen), 0, nameLen);
260 int nameLen2 = bytes[byteIndex++]&0xff;
261 String rType = utf(safeBytes(nameLen2), 0, nameLen2);
262 procedure.execute(new Identity(rid, new Root(name, rType)));
264 } else if(type == 2) {
265 throw new UnsupportedOperationException();
273 public void forValues(ReadGraph graph, TransferableGraphSourceProcedure<Value> procedure) throws Exception {
275 Serializer variantSerializer = Bindings.getSerializerUnchecked(Bindings.VARIANT);
277 List<Object> idcontext = new ArrayList<>();
279 for(int i=0;i<valueLength;i++) {
280 int resource = safeInt();
281 Variant value = (Variant)variantSerializer
282 .deserialize((DataInput)dis, idcontext);
283 procedure.execute(new Value(resource, value));
290 public void forValues2(ReadGraph graph, TransferableGraphSourceValueProcedure procedure) throws Exception {
292 Binding datatypeBinding = Bindings.getBinding(Datatype.class);
293 Serializer datatypeSerializer = Bindings.getSerializerUnchecked(datatypeBinding);
295 List<Object> idContext = new ArrayList<>();
297 for(int i=0;i<valueLength;i++) {
298 int resource = safeInt();
300 Datatype type = (Datatype)datatypeSerializer.deserialize((DataInput)dis, idContext);
301 procedure.execute(resource, type, dis);
307 public TreeMap<String, Variant> getExtensions() {
308 return extensions.map;
312 public void close() {
316 public TransferableGraphSource readTG() throws Exception {
318 if(getSize() == 0) return null;
320 return new FileTransferableGraphSource();
324 public static void main(String[] args) {
328 File file = new File("c:/work/Model.apros");
329 StreamingTransferableGraphFileReader reader = new StreamingTransferableGraphFileReader(file, SIZE);
330 reader = new StreamingTransferableGraphFileReader(file);
331 long s = System.nanoTime();
332 TransferableGraphSource tgs = reader.readTG();
333 int ids = tgs.getIdentityCount();
334 System.out.println("identity count " + ids);
335 // tgs.forIdentities(null, id -> { /*System.out.println("Identity: " + id);*/ });
336 tgs.forIdentities(null, id -> { System.out.println("Identity: " + id); });
337 int stats = tgs.getStatementCount();
338 System.out.println("statement count " + stats/4 + " (" + stats + ")");
339 // tgs.forStatements(null, id -> { /*System.out.println(Arrays.toString(id));*/ });
340 tgs.forStatements(null, id -> { System.out.println(Arrays.toString(id)); });
341 int values = tgs.getValueCount();
342 System.out.println("value count " + values);
344 tgs.forValues2(null, new TransferableGraphSourceValueProcedure() {
346 public void rawCopy(int resource, int length, DataInput input) throws Exception {
347 System.out.println("value " + (valueNo[0]++) + ": rawCopy("+ resource + ", " + length + ", " + input + ")");
348 for (int i = 0; i < length; ++i)
352 public void execute(int resource, Datatype type, DataInput input) throws Exception {
353 Object value = Bindings.getSerializer(Bindings.getBinding(type)).deserialize(input);
354 System.out.println("value " + (valueNo[0]++) + ": execute("+ resource + ", " + type.toSingleLineString() + ", " + input + "): " + value);
357 long d = System.nanoTime() - s;
358 System.err.println("Duration=" + 1e-9*d + "s.");
359 } catch (Throwable t) {