1 package org.simantics.graph.db;
3 import java.io.DataInput;
4 import java.io.DataInputStream;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.nio.channels.ReadableByteChannel;
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.List;
12 import java.util.TreeMap;
14 import org.simantics.databoard.Bindings;
15 import org.simantics.databoard.binding.Binding;
16 import org.simantics.databoard.binding.mutable.Variant;
17 import org.simantics.databoard.container.DataContainer;
18 import org.simantics.databoard.container.DataContainers;
19 import org.simantics.databoard.serialization.Serializer;
20 import org.simantics.databoard.type.Datatype;
21 import org.simantics.db.ReadGraph;
22 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;
23 import org.simantics.graph.representation.ByteFileReader;
24 import org.simantics.graph.representation.Extensions;
25 import org.simantics.graph.representation.External;
26 import org.simantics.graph.representation.Identity;
27 import org.simantics.graph.representation.InputChannel;
28 import org.simantics.graph.representation.Internal;
29 import org.simantics.graph.representation.Root;
30 import org.simantics.graph.representation.Value;
33 final public class StreamingTransferableGraphFileReader extends ByteFileReader {
35 private static boolean init = true;
37 final private static int SIZE = 1<<18;
39 private boolean deleteOnClose = false;
42 public StreamingTransferableGraphFileReader(File file) throws Exception {
46 public StreamingTransferableGraphFileReader(File file, boolean deleteOnClose) throws Exception {
50 @SuppressWarnings("resource")
51 StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(file, 0);
52 for(int i=0;i<40000;i++) r.readTG();
56 this.deleteOnClose = deleteOnClose;
59 public StreamingTransferableGraphFileReader(InputStream stream) throws Exception {
60 super(null, new InputChannel(stream), SIZE);
63 @SuppressWarnings("resource")
64 StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(stream, 0);
65 for(int i=0;i<40000;i++) r.readTG();
70 public StreamingTransferableGraphFileReader(ReadableByteChannel channel) throws Exception {
71 super(null, channel, SIZE);
74 @SuppressWarnings("resource")
75 StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(channel, 0);
76 for(int i=0;i<40000;i++) r.readTG();
81 public StreamingTransferableGraphFileReader(ReadableByteChannel channel, int size) throws IOException {
82 super(null, channel, SIZE);
85 public StreamingTransferableGraphFileReader(InputStream stream, int size) throws IOException {
86 super(null, new InputChannel(stream), size);
89 public StreamingTransferableGraphFileReader(File file, int size) throws IOException {
94 public void close() throws IOException {
96 if (deleteOnClose && file != null && file.exists()) {
101 class FileTransferableGraphSource implements TransferableGraphSource {
103 InputStream in = new InputStream() {
106 public int read() throws IOException {
111 public int read(byte[] b) throws IOException {
112 // FIXME not correctly implemented
113 System.arraycopy(safeBytes(b.length), 0, b, 0, b.length);
118 public int read(byte[] b, int off, int len) throws IOException {
119 for(int i=0;i<len;i++)
120 b[off++] = (byte)getByte();
125 DataInputStream dis = new DataInputStream(in);
127 DataContainer header;
128 Extensions extensions;
131 private int identities = -1;
132 private int stmLength = -1;
133 private int valueLength = -1;
135 public FileTransferableGraphSource() throws Exception {
139 private void init() throws Exception {
142 header = DataContainers.readHeader(dis);
144 // Content variant data type
145 Bindings.getSerializerUnchecked(Datatype.class).deserialize((DataInput)dis);
147 resourceCount = safeInt();
149 List<Object> idcontext = new ArrayList<Object>();
150 extensions = (Extensions)Bindings.getSerializerUnchecked(Extensions.class).deserialize((DataInput)dis, idcontext);
155 public void reset() throws Exception {
156 StreamingTransferableGraphFileReader.this.reset();
157 throw new UnsupportedOperationException();
161 public DataContainer getHeader() throws Exception {
166 public int getResourceCount() throws Exception {
167 return resourceCount;
171 public int getIdentityCount() throws Exception {
172 if(identities == -1) {
173 identities = safeInt();
179 public int getStatementCount() throws Exception {
180 if(stmLength == -1) {
181 stmLength = safeInt();
187 public int getValueCount() throws Exception {
188 if(valueLength == -1) {
189 valueLength = safeInt();
195 public void forStatements(ReadGraph graph, TransferableGraphSourceProcedure<int[]> procedure) throws Exception {
197 int[] value = new int[4];
199 int stmLength = getStatementCount();
201 for(int stmIndex=0;stmIndex<stmLength;) {
203 value[stmIndex & 3] = safeInt();
205 if((stmIndex & 3) == 0) procedure.execute(value);
208 int avail = (SIZE-byteIndex) >> 2;
209 int allowed = Math.min(stmLength-stmIndex, avail);
210 for(int index = byteIndex, i=0;i<allowed;i++) {
211 value[stmIndex & 3] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff));
213 if((stmIndex & 3) == 0) procedure.execute(value);
214 // statements[stmIndex++] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff));
216 byteIndex += allowed<<2;
223 public void forIdentities(ReadGraph graph, TransferableGraphSourceProcedure<Identity> procedure) throws Exception {
225 int identities = getIdentityCount();
227 for(int i=0;i<identities;i++) {
230 byte type = bytes[byteIndex++];
234 int parent = safeInt();
235 int nameLen = getDynamicUInt32();
236 if(byteIndex+nameLen < SIZE) {
237 procedure.execute(new Identity(rid, new External(parent, utf(bytes, byteIndex, byteIndex + nameLen))));
238 byteIndex += nameLen;
240 procedure.execute(new Identity(rid, new External(parent, utf(safeBytes(nameLen), 0, nameLen))));
246 int parent = safeInt();
247 int nameLen = getDynamicUInt32();
248 if(byteIndex+nameLen < SIZE) {
249 procedure.execute(new Identity(rid, new Internal(parent, utf(bytes, byteIndex, byteIndex + nameLen))));
250 byteIndex += nameLen;
252 procedure.execute(new Identity(rid, new Internal(parent, utf(safeBytes(nameLen), 0, nameLen))));
259 int nameLen = getDynamicUInt32();
260 String name = utf(safeBytes(nameLen), 0, nameLen);
261 int nameLen2 = getDynamicUInt32();
262 String rType = utf(safeBytes(nameLen2), 0, nameLen2);
263 procedure.execute(new Identity(rid, new Root(name, rType)));
268 throw new UnsupportedOperationException("Optional identities not supported");
270 throw new IllegalStateException("Unsupported identity type " + type);
278 public void forValues(ReadGraph graph, TransferableGraphSourceProcedure<Value> procedure) throws Exception {
280 int valueLength = getValueCount();
282 Serializer variantSerializer = Bindings.getSerializerUnchecked(Bindings.VARIANT);
284 List<Object> idcontext = new ArrayList<>();
286 for(int i=0;i<valueLength;i++) {
287 int resource = safeInt();
289 Variant value = (Variant)variantSerializer
290 .deserialize((DataInput)dis, idcontext);
291 procedure.execute(new Value(resource, value));
297 public void forValues2(ReadGraph graph, TransferableGraphSourceValueProcedure procedure) throws Exception {
299 Binding datatypeBinding = Bindings.getBinding(Datatype.class);
300 Serializer datatypeSerializer = Bindings.getSerializerUnchecked(datatypeBinding);
302 List<Object> idContext = new ArrayList<>();
304 for(int i=0;i<valueLength;i++) {
305 int resource = safeInt();
307 Datatype type = (Datatype)datatypeSerializer.deserialize((DataInput)dis, idContext);
308 procedure.execute(resource, type, dis);
314 public TreeMap<String, Variant> getExtensions() {
315 return extensions.map;
319 public void close() {
323 public TransferableGraphSource readTG() throws Exception {
325 if(getSize() == 0) return null;
327 return new FileTransferableGraphSource();
331 public static void main(String[] args) {
335 File file = new File("c:/work/Model.apros");
336 StreamingTransferableGraphFileReader reader = new StreamingTransferableGraphFileReader(file, SIZE);
337 reader = new StreamingTransferableGraphFileReader(file);
338 long s = System.nanoTime();
339 TransferableGraphSource tgs = reader.readTG();
340 int ids = tgs.getIdentityCount();
341 System.out.println("identity count " + ids);
342 // tgs.forIdentities(null, id -> { /*System.out.println("Identity: " + id);*/ });
343 tgs.forIdentities(null, id -> { System.out.println("Identity: " + id); });
344 int stats = tgs.getStatementCount();
345 System.out.println("statement count " + stats/4 + " (" + stats + ")");
346 // tgs.forStatements(null, id -> { /*System.out.println(Arrays.toString(id));*/ });
347 tgs.forStatements(null, id -> { System.out.println(Arrays.toString(id)); });
348 int values = tgs.getValueCount();
349 System.out.println("value count " + values);
351 tgs.forValues2(null, new TransferableGraphSourceValueProcedure() {
353 public void rawCopy(int resource, int length, DataInput input) throws Exception {
354 System.out.println("value " + (valueNo[0]++) + ": rawCopy("+ resource + ", " + length + ", " + input + ")");
355 for (int i = 0; i < length; ++i)
359 public void execute(int resource, Datatype type, DataInput input) throws Exception {
360 Object value = Bindings.getSerializer(Bindings.getBinding(type)).deserialize(input);
361 System.out.println("value " + (valueNo[0]++) + ": execute("+ resource + ", " + type.toSingleLineString() + ", " + input + "): " + value);
364 long d = System.nanoTime() - s;
365 System.err.println("Duration=" + 1e-9*d + "s.");
366 } catch (Throwable t) {