]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.graph.db/src/org/simantics/graph/db/StreamingTransferableGraphFileReader.java
68a94e304581f71d55d1c57d9e3220a5da6bac3c
[simantics/platform.git] / bundles / org.simantics.graph.db / src / org / simantics / graph / db / StreamingTransferableGraphFileReader.java
1 package org.simantics.graph.db;
2
3 import java.io.DataInput;
4 import java.io.DataInputStream;
5 import java.io.File;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.nio.ByteBuffer;
9 import java.nio.channels.ReadableByteChannel;
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.List;
13 import java.util.TreeMap;
14
15 import org.simantics.databoard.Bindings;
16 import org.simantics.databoard.binding.Binding;
17 import org.simantics.databoard.binding.mutable.Variant;
18 import org.simantics.databoard.container.DataContainer;
19 import org.simantics.databoard.container.DataContainers;
20 import org.simantics.databoard.serialization.Serializer;
21 import org.simantics.databoard.type.Datatype;
22 import org.simantics.db.ReadGraph;
23 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;
24 import org.simantics.graph.representation.ByteFileReader;
25 import org.simantics.graph.representation.Extensions;
26 import org.simantics.graph.representation.External;
27 import org.simantics.graph.representation.Identity;
28 import org.simantics.graph.representation.Internal;
29 import org.simantics.graph.representation.Root;
30 import org.simantics.graph.representation.Value;
31
32
33 final public class StreamingTransferableGraphFileReader extends ByteFileReader {
34
35         final static class InputChannel implements ReadableByteChannel {
36
37                 final private InputStream stream;
38                 
39                 public InputChannel(InputStream stream) {
40                         this.stream = stream;
41                 }
42                 
43                 @Override
44                 public boolean isOpen() {
45                         return true;
46                 }
47
48                 @Override
49                 public void close() throws IOException {
50                 }
51
52                 @Override
53                 public int read(ByteBuffer dst) throws IOException {
54                         int pos = dst.position();
55                         int limit = dst.limit();
56                         int i=stream.read(dst.array(), pos, limit-pos);
57                         return i;
58                 }
59                 
60         }
61         
62         private static boolean init = true;
63         
64         final private static int SIZE = 1<<18;
65         
66         public StreamingTransferableGraphFileReader(File file) throws Exception {
67                 super(file, SIZE);
68                 if(init) {
69                         init=false;
70                         @SuppressWarnings("resource")
71                         StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(file, 0);
72                         for(int i=0;i<40000;i++) r.readTG();
73                 }
74         }
75         
76         public StreamingTransferableGraphFileReader(InputStream stream) throws Exception {
77                 super(null, new InputChannel(stream), SIZE);
78                 if(init) {
79                         init=false;
80                         @SuppressWarnings("resource")
81                         StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(stream, 0);
82                         for(int i=0;i<40000;i++) r.readTG();
83                 }
84         }
85
86         public StreamingTransferableGraphFileReader(ReadableByteChannel channel) throws Exception {
87                 super(null, channel, SIZE);
88                 if(init) {
89                         init=false;
90                         @SuppressWarnings("resource")
91                         StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(channel, 0);
92                         for(int i=0;i<40000;i++) r.readTG();
93                 }
94         }
95
96         public StreamingTransferableGraphFileReader(ReadableByteChannel channel, int size) throws IOException {
97                 super(null, channel, SIZE);
98         }
99
100         public StreamingTransferableGraphFileReader(InputStream stream, int size) throws IOException {
101                 super(null, new InputChannel(stream), size);
102         }
103
104         public StreamingTransferableGraphFileReader(File file, int size) throws IOException {
105                 super(file, size);
106         }
107
108         class FileTransferableGraphSource implements TransferableGraphSource {
109
110                 InputStream in = new InputStream() {
111
112             @Override
113             public int read() throws IOException {
114                 return getByte();
115             }
116             
117             @Override
118             public int read(byte[] b) throws IOException {
119                 // FIXME not correctly implemented                
120                 System.arraycopy(safeBytes(b.length), 0, b, 0, b.length);
121                 return b.length;
122             }
123             
124             @Override
125             public int read(byte[] b, int off, int len) throws IOException {
126                 for(int i=0;i<len;i++)
127                     b[off++] = (byte)getByte();
128                 return len;
129             }
130                     
131                 };
132                 DataInputStream dis = new DataInputStream(in);
133                 
134                 DataContainer header;
135                 Extensions extensions;
136                 int resourceCount;
137                 
138                 private int identities = -1;
139                 private int stmLength = -1;
140                 private int valueLength = -1;
141                 
142                 public FileTransferableGraphSource() throws Exception {
143                     init();
144                 }
145                 
146                 private void init() throws Exception {
147
148             // Header
149             header = DataContainers.readHeader(dis);
150             
151             // Content variant data type
152             Bindings.getSerializerUnchecked(Datatype.class).deserialize((DataInput)dis);
153
154             resourceCount = safeInt();
155             
156             List<Object> idcontext = new ArrayList<Object>(); 
157             extensions = (Extensions)Bindings.getSerializerUnchecked(Extensions.class).deserialize((DataInput)dis, idcontext);
158                     
159                 }
160                 
161         @Override
162         public void reset() throws Exception {
163             StreamingTransferableGraphFileReader.this.reset();
164             throw new UnsupportedOperationException();
165         }
166         
167                 @Override
168                 public DataContainer getHeader() throws Exception {
169                     return header;
170                 }
171                 
172                 @Override
173                 public int getResourceCount() throws Exception {
174                         return resourceCount;
175                 }
176
177                 @Override
178                 public int getIdentityCount() throws Exception {
179                         if(identities == -1) {
180                                 identities = safeInt();
181                         }
182                         return identities;
183                 }
184
185                 @Override
186                 public int getStatementCount() throws Exception {
187                         if(stmLength == -1) {
188                                 stmLength = safeInt();
189                         }
190                         return stmLength;
191                 }
192
193                 @Override
194                 public int getValueCount() throws Exception {
195                         if(valueLength == -1) {
196                                 valueLength = safeInt();
197                         }
198                         return valueLength;
199                 }
200
201                 @Override
202                 public void forStatements(ReadGraph graph, TransferableGraphSourceProcedure<int[]> procedure) throws Exception {
203
204                         int[] value = new int[4];
205
206                         int stmLength = getStatementCount();
207
208                         for(int stmIndex=0;stmIndex<stmLength;) {
209
210                                 value[stmIndex & 3] = safeInt();
211                                 stmIndex++;
212                                 if((stmIndex & 3) == 0) procedure.execute(value);
213
214                                 // Cached bytes 
215                                 int avail = (SIZE-byteIndex) >> 2;
216                                 int allowed = Math.min(stmLength-stmIndex, avail);
217                                 for(int index = byteIndex, i=0;i<allowed;i++) {
218                                         value[stmIndex & 3] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff));
219                                         stmIndex++;
220                                         if((stmIndex & 3) == 0) procedure.execute(value);
221 //                                      statements[stmIndex++] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff));                              
222                                 }
223                                 byteIndex += allowed<<2;
224
225                         }
226
227                 }
228
229                 @Override
230                 public void forIdentities(ReadGraph graph, TransferableGraphSourceProcedure<Identity> procedure) throws Exception {
231
232                         int identities = getIdentityCount();
233
234                         for(int i=0;i<identities;i++) {
235
236                                 int rid = safeInt();
237                                 byte type = bytes[byteIndex++];
238                                 // External
239                                 if(type == 1) {
240
241                                         int parent = safeInt();
242                                         int nameLen = getDynamicUInt32();
243                                         if(byteIndex+nameLen < SIZE) {
244                                                 procedure.execute(new Identity(rid, new External(parent, utf(bytes, byteIndex, byteIndex + nameLen))));
245                                                 byteIndex += nameLen;
246                                         } else {
247                                                 procedure.execute(new Identity(rid, new External(parent, utf(safeBytes(nameLen), 0, nameLen))));
248                                         }
249                                 } 
250                                 // Internal
251                                 else if(type == 3) {
252
253                                         int parent = safeInt();
254                                         int nameLen = getDynamicUInt32();
255                                         if(byteIndex+nameLen < SIZE) {
256                                                 procedure.execute(new Identity(rid, new Internal(parent, utf(bytes, byteIndex, byteIndex + nameLen))));
257                                                 byteIndex += nameLen;
258                                         } else {
259                                                 procedure.execute(new Identity(rid, new Internal(parent, utf(safeBytes(nameLen), 0, nameLen))));
260                                         }
261
262                                 }
263                                 // Root
264                                 else if(type == 0) {
265
266                                         int nameLen = getDynamicUInt32();
267                                         String name = utf(safeBytes(nameLen), 0, nameLen);
268                                         int nameLen2 = getDynamicUInt32();
269                                         String rType = utf(safeBytes(nameLen2), 0, nameLen2);
270                                         procedure.execute(new Identity(rid, new Root(name, rType)));
271
272                                 }
273                                 // Optional
274                                 else if(type == 2) {
275                                         throw new UnsupportedOperationException("Optional identities not supported");
276                                 } else {
277                                         throw new IllegalStateException("Unsupported identity type " + type);
278                                 }
279
280                         }
281
282                 }
283
284                 @Override
285                 public void forValues(ReadGraph graph, TransferableGraphSourceProcedure<Value> procedure) throws Exception {
286
287                         int valueLength = getValueCount();
288
289                         Serializer variantSerializer = Bindings.getSerializerUnchecked(Bindings.VARIANT);
290
291                         List<Object> idcontext = new ArrayList<>(); 
292
293                         for(int i=0;i<valueLength;i++) {
294                                 int resource = safeInt();
295                                 idcontext.clear();
296                                 Variant value = (Variant)variantSerializer
297                                                 .deserialize((DataInput)dis, idcontext);
298                                 procedure.execute(new Value(resource, value));
299                         }
300
301                 }
302
303                 @Override
304                 public void forValues2(ReadGraph graph, TransferableGraphSourceValueProcedure procedure) throws Exception {
305
306                     Binding datatypeBinding = Bindings.getBinding(Datatype.class);
307             Serializer datatypeSerializer = Bindings.getSerializerUnchecked(datatypeBinding);
308
309             List<Object> idContext = new ArrayList<>(); 
310
311             for(int i=0;i<valueLength;i++) {
312                 int resource = safeInt();
313                 idContext.clear();
314                 Datatype type = (Datatype)datatypeSerializer.deserialize((DataInput)dis, idContext);
315                 procedure.execute(resource, type, dis);
316             }
317
318                 }
319
320                 @Override
321                 public TreeMap<String, Variant> getExtensions() {
322                         return extensions.map;
323                 }
324
325                 @Override
326                 public void close() {
327                 }
328         }
329         
330         public TransferableGraphSource readTG() throws Exception {
331
332                 if(getSize() == 0) return null;
333
334                 return new FileTransferableGraphSource();
335
336         }
337         
338     public static void main(String[] args) {
339
340         try {
341
342             File file = new File("c:/work/Model.apros");
343             StreamingTransferableGraphFileReader reader = new StreamingTransferableGraphFileReader(file, SIZE);
344             reader = new StreamingTransferableGraphFileReader(file);
345             long s = System.nanoTime();
346             TransferableGraphSource tgs = reader.readTG();
347             int ids = tgs.getIdentityCount();
348             System.out.println("identity count " + ids);
349 //            tgs.forIdentities(null, id -> { /*System.out.println("Identity: " + id);*/ });
350             tgs.forIdentities(null, id -> { System.out.println("Identity: " + id); });
351             int stats = tgs.getStatementCount();
352             System.out.println("statement count " + stats/4 + " (" + stats + ")");
353 //            tgs.forStatements(null, id -> { /*System.out.println(Arrays.toString(id));*/ });
354             tgs.forStatements(null, id -> { System.out.println(Arrays.toString(id)); });
355             int values = tgs.getValueCount();
356             System.out.println("value count " + values);
357             int[] valueNo = {0};
358             tgs.forValues2(null, new TransferableGraphSourceValueProcedure() {
359                 @Override
360                 public void rawCopy(int resource, int length, DataInput input) throws Exception {
361                     System.out.println("value " + (valueNo[0]++) + ": rawCopy("+ resource + ", " + length + ", " + input + ")");
362                     for (int i = 0; i < length; ++i)
363                         input.readByte();
364                 }
365                 @Override
366                 public void execute(int resource, Datatype type, DataInput input) throws Exception {
367                     Object value = Bindings.getSerializer(Bindings.getBinding(type)).deserialize(input);
368                     System.out.println("value " + (valueNo[0]++) + ": execute("+ resource + ", " + type.toSingleLineString() + ", " + input + "): " + value);
369                 }
370             });
371             long d = System.nanoTime() - s;
372             System.err.println("Duration=" + 1e-9*d + "s.");
373         } catch (Throwable t) {
374             t.printStackTrace();
375         }
376     }
377
378 }