]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.graph.db/src/org/simantics/graph/db/StreamingTransferableGraphFileReader.java
eb6294e3e98cece34c737406ba4ce2caa84128a2
[simantics/platform.git] / bundles / org.simantics.graph.db / src / org / simantics / graph / db / StreamingTransferableGraphFileReader.java
1 package org.simantics.graph.db;
2
3 import java.io.DataInput;
4 import java.io.DataInputStream;
5 import java.io.File;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.nio.channels.ReadableByteChannel;
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.List;
12 import java.util.TreeMap;
13
14 import org.simantics.databoard.Bindings;
15 import org.simantics.databoard.binding.Binding;
16 import org.simantics.databoard.binding.mutable.Variant;
17 import org.simantics.databoard.container.DataContainer;
18 import org.simantics.databoard.container.DataContainers;
19 import org.simantics.databoard.serialization.Serializer;
20 import org.simantics.databoard.type.Datatype;
21 import org.simantics.db.ReadGraph;
22 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;
23 import org.simantics.graph.representation.ByteFileReader;
24 import org.simantics.graph.representation.Extensions;
25 import org.simantics.graph.representation.External;
26 import org.simantics.graph.representation.Identity;
27 import org.simantics.graph.representation.InputChannel;
28 import org.simantics.graph.representation.Internal;
29 import org.simantics.graph.representation.Root;
30 import org.simantics.graph.representation.Value;
31
32
33 final public class StreamingTransferableGraphFileReader extends ByteFileReader {
34         
35         private static boolean init = true;
36         
37         final private static int SIZE = 1<<18;
38         
39         private boolean deleteOnClose = false;
40         private File file;
41         
42         public StreamingTransferableGraphFileReader(File file) throws Exception {
43                 this(file,false);
44         }
45         
46         public StreamingTransferableGraphFileReader(File file, boolean deleteOnClose) throws Exception {
47                 super(file, SIZE);
48                 if(init) {
49                         init=false;
50                         @SuppressWarnings("resource")
51                         StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(file, 0);
52                         for(int i=0;i<40000;i++) r.readTG();
53                         r.close();
54                 }
55                 this.file = file;
56                 this.deleteOnClose = deleteOnClose;
57         }
58         
59         public StreamingTransferableGraphFileReader(InputStream stream) throws Exception {
60                 super(null, new InputChannel(stream), SIZE);
61                 if(init) {
62                         init=false;
63                         @SuppressWarnings("resource")
64                         StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(stream, 0);
65                         for(int i=0;i<40000;i++) r.readTG();
66                         r.close();
67                 }
68         }
69
70         public StreamingTransferableGraphFileReader(ReadableByteChannel channel) throws Exception {
71                 super(null, channel, SIZE);
72                 if(init) {
73                         init=false;
74                         @SuppressWarnings("resource")
75                         StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(channel, 0);
76                         for(int i=0;i<40000;i++) r.readTG();
77                         r.close();
78                 }
79         }
80
81         public StreamingTransferableGraphFileReader(ReadableByteChannel channel, int size) throws IOException {
82                 super(null, channel, SIZE);
83         }
84
85         public StreamingTransferableGraphFileReader(InputStream stream, int size) throws IOException {
86                 super(null, new InputChannel(stream), size);
87         }
88
89         public StreamingTransferableGraphFileReader(File file, int size) throws IOException {
90                 super(file, size);
91         }
92         
93         @Override
94         public void close() throws IOException {
95                 super.close();
96                 if (deleteOnClose && file != null && file.exists()) {
97                         file.delete();
98                 }
99         }
100
101         class FileTransferableGraphSource implements TransferableGraphSource {
102
103                 InputStream in = new InputStream() {
104
105             @Override
106             public int read() throws IOException {
107                 return getByte();
108             }
109             
110             @Override
111             public int read(byte[] b) throws IOException {
112                 // FIXME not correctly implemented                
113                 System.arraycopy(safeBytes(b.length), 0, b, 0, b.length);
114                 return b.length;
115             }
116             
117             @Override
118             public int read(byte[] b, int off, int len) throws IOException {
119                 for(int i=0;i<len;i++)
120                     b[off++] = (byte)getByte();
121                 return len;
122             }
123                     
124                 };
125                 DataInputStream dis = new DataInputStream(in);
126                 
127                 DataContainer header;
128                 Extensions extensions;
129                 int resourceCount;
130                 
131                 private int identities = -1;
132                 private int stmLength = -1;
133                 private int valueLength = -1;
134                 
135                 public FileTransferableGraphSource() throws Exception {
136                     init();
137                 }
138                 
139                 private void init() throws Exception {
140
141             // Header
142             header = DataContainers.readHeader(dis);
143             
144             // Content variant data type
145             Bindings.getSerializerUnchecked(Datatype.class).deserialize((DataInput)dis);
146
147             resourceCount = safeInt();
148             
149             List<Object> idcontext = new ArrayList<Object>(); 
150             extensions = (Extensions)Bindings.getSerializerUnchecked(Extensions.class).deserialize((DataInput)dis, idcontext);
151                     
152                 }
153                 
154         @Override
155         public void reset() throws Exception {
156             StreamingTransferableGraphFileReader.this.reset();
157             throw new UnsupportedOperationException();
158         }
159         
160                 @Override
161                 public DataContainer getHeader() throws Exception {
162                     return header;
163                 }
164                 
165                 @Override
166                 public int getResourceCount() throws Exception {
167                         return resourceCount;
168                 }
169
170                 @Override
171                 public int getIdentityCount() throws Exception {
172                         if(identities == -1) {
173                                 identities = safeInt();
174                         }
175                         return identities;
176                 }
177
178                 @Override
179                 public int getStatementCount() throws Exception {
180                         if(stmLength == -1) {
181                                 stmLength = safeInt();
182                         }
183                         return stmLength;
184                 }
185
186                 @Override
187                 public int getValueCount() throws Exception {
188                         if(valueLength == -1) {
189                                 valueLength = safeInt();
190                         }
191                         return valueLength;
192                 }
193
194                 @Override
195                 public void forStatements(ReadGraph graph, TransferableGraphSourceProcedure<int[]> procedure) throws Exception {
196
197                         int[] value = new int[4];
198
199                         int stmLength = getStatementCount();
200
201                         for(int stmIndex=0;stmIndex<stmLength;) {
202
203                                 value[stmIndex & 3] = safeInt();
204                                 stmIndex++;
205                                 if((stmIndex & 3) == 0) procedure.execute(value);
206
207                                 // Cached bytes 
208                                 int avail = (SIZE-byteIndex) >> 2;
209                                 int allowed = Math.min(stmLength-stmIndex, avail);
210                                 for(int index = byteIndex, i=0;i<allowed;i++) {
211                                         value[stmIndex & 3] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff));
212                                         stmIndex++;
213                                         if((stmIndex & 3) == 0) procedure.execute(value);
214 //                                      statements[stmIndex++] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff));                              
215                                 }
216                                 byteIndex += allowed<<2;
217
218                         }
219
220                 }
221
222                 @Override
223                 public void forIdentities(ReadGraph graph, TransferableGraphSourceProcedure<Identity> procedure) throws Exception {
224
225                         int identities = getIdentityCount();
226
227                         for(int i=0;i<identities;i++) {
228
229                                 int rid = safeInt();
230                                 byte type = bytes[byteIndex++];
231                                 // External
232                                 if(type == 1) {
233
234                                         int parent = safeInt();
235                                         int nameLen = getDynamicUInt32();
236                                         if(byteIndex+nameLen < SIZE) {
237                                                 procedure.execute(new Identity(rid, new External(parent, utf(bytes, byteIndex, byteIndex + nameLen))));
238                                                 byteIndex += nameLen;
239                                         } else {
240                                                 procedure.execute(new Identity(rid, new External(parent, utf(safeBytes(nameLen), 0, nameLen))));
241                                         }
242                                 } 
243                                 // Internal
244                                 else if(type == 3) {
245
246                                         int parent = safeInt();
247                                         int nameLen = getDynamicUInt32();
248                                         if(byteIndex+nameLen < SIZE) {
249                                                 procedure.execute(new Identity(rid, new Internal(parent, utf(bytes, byteIndex, byteIndex + nameLen))));
250                                                 byteIndex += nameLen;
251                                         } else {
252                                                 procedure.execute(new Identity(rid, new Internal(parent, utf(safeBytes(nameLen), 0, nameLen))));
253                                         }
254
255                                 }
256                                 // Root
257                                 else if(type == 0) {
258
259                                         int nameLen = getDynamicUInt32();
260                                         String name = utf(safeBytes(nameLen), 0, nameLen);
261                                         int nameLen2 = getDynamicUInt32();
262                                         String rType = utf(safeBytes(nameLen2), 0, nameLen2);
263                                         procedure.execute(new Identity(rid, new Root(name, rType)));
264
265                                 }
266                                 // Optional
267                                 else if(type == 2) {
268                                         throw new UnsupportedOperationException("Optional identities not supported");
269                                 } else {
270                                         throw new IllegalStateException("Unsupported identity type " + type);
271                                 }
272
273                         }
274
275                 }
276
277                 @Override
278                 public void forValues(ReadGraph graph, TransferableGraphSourceProcedure<Value> procedure) throws Exception {
279
280                         int valueLength = getValueCount();
281
282                         Serializer variantSerializer = Bindings.getSerializerUnchecked(Bindings.VARIANT);
283
284                         List<Object> idcontext = new ArrayList<>(); 
285
286                         for(int i=0;i<valueLength;i++) {
287                                 int resource = safeInt();
288                                 idcontext.clear();
289                                 Variant value = (Variant)variantSerializer
290                                                 .deserialize((DataInput)dis, idcontext);
291                                 procedure.execute(new Value(resource, value));
292                         }
293
294                 }
295
296                 @Override
297                 public void forValues2(ReadGraph graph, TransferableGraphSourceValueProcedure procedure) throws Exception {
298
299                     Binding datatypeBinding = Bindings.getBinding(Datatype.class);
300             Serializer datatypeSerializer = Bindings.getSerializerUnchecked(datatypeBinding);
301
302             List<Object> idContext = new ArrayList<>(); 
303
304             for(int i=0;i<valueLength;i++) {
305                 int resource = safeInt();
306                 idContext.clear();
307                 Datatype type = (Datatype)datatypeSerializer.deserialize((DataInput)dis, idContext);
308                 procedure.execute(resource, type, dis);
309             }
310
311                 }
312
313                 @Override
314                 public TreeMap<String, Variant> getExtensions() {
315                         return extensions.map;
316                 }
317
318                 @Override
319                 public void close() {
320                 }
321         }
322         
323         public TransferableGraphSource readTG() throws Exception {
324
325                 if(getSize() == 0) return null;
326
327                 return new FileTransferableGraphSource();
328
329         }
330         
331     public static void main(String[] args) {
332
333         try {
334
335             File file = new File("c:/work/Model.apros");
336             StreamingTransferableGraphFileReader reader = new StreamingTransferableGraphFileReader(file, SIZE);
337             reader = new StreamingTransferableGraphFileReader(file);
338             long s = System.nanoTime();
339             TransferableGraphSource tgs = reader.readTG();
340             int ids = tgs.getIdentityCount();
341             System.out.println("identity count " + ids);
342 //            tgs.forIdentities(null, id -> { /*System.out.println("Identity: " + id);*/ });
343             tgs.forIdentities(null, id -> { System.out.println("Identity: " + id); });
344             int stats = tgs.getStatementCount();
345             System.out.println("statement count " + stats/4 + " (" + stats + ")");
346 //            tgs.forStatements(null, id -> { /*System.out.println(Arrays.toString(id));*/ });
347             tgs.forStatements(null, id -> { System.out.println(Arrays.toString(id)); });
348             int values = tgs.getValueCount();
349             System.out.println("value count " + values);
350             int[] valueNo = {0};
351             tgs.forValues2(null, new TransferableGraphSourceValueProcedure() {
352                 @Override
353                 public void rawCopy(int resource, int length, DataInput input) throws Exception {
354                     System.out.println("value " + (valueNo[0]++) + ": rawCopy("+ resource + ", " + length + ", " + input + ")");
355                     for (int i = 0; i < length; ++i)
356                         input.readByte();
357                 }
358                 @Override
359                 public void execute(int resource, Datatype type, DataInput input) throws Exception {
360                     Object value = Bindings.getSerializer(Bindings.getBinding(type)).deserialize(input);
361                     System.out.println("value " + (valueNo[0]++) + ": execute("+ resource + ", " + type.toSingleLineString() + ", " + input + "): " + value);
362                 }
363             });
364             long d = System.nanoTime() - s;
365             System.err.println("Duration=" + 1e-9*d + "s.");
366         } catch (Throwable t) {
367             t.printStackTrace();
368         }
369     }
370
371 }