]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.graph.db/src/org/simantics/graph/db/StreamingTransferableGraphFileReader.java
Remove duplicate InputChannel inner classes
[simantics/platform.git] / bundles / org.simantics.graph.db / src / org / simantics / graph / db / StreamingTransferableGraphFileReader.java
1 package org.simantics.graph.db;
2
3 import java.io.DataInput;
4 import java.io.DataInputStream;
5 import java.io.File;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.nio.channels.ReadableByteChannel;
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.List;
12 import java.util.TreeMap;
13
14 import org.simantics.databoard.Bindings;
15 import org.simantics.databoard.binding.Binding;
16 import org.simantics.databoard.binding.mutable.Variant;
17 import org.simantics.databoard.container.DataContainer;
18 import org.simantics.databoard.container.DataContainers;
19 import org.simantics.databoard.serialization.Serializer;
20 import org.simantics.databoard.type.Datatype;
21 import org.simantics.db.ReadGraph;
22 import org.simantics.graph.db.TransferableGraphSource.TransferableGraphSourceValueProcedure;
23 import org.simantics.graph.representation.ByteFileReader;
24 import org.simantics.graph.representation.Extensions;
25 import org.simantics.graph.representation.External;
26 import org.simantics.graph.representation.Identity;
27 import org.simantics.graph.representation.InputChannel;
28 import org.simantics.graph.representation.Internal;
29 import org.simantics.graph.representation.Root;
30 import org.simantics.graph.representation.Value;
31
32
33 final public class StreamingTransferableGraphFileReader extends ByteFileReader {
34         
35         private static boolean init = true;
36         
37         final private static int SIZE = 1<<18;
38         
39         public StreamingTransferableGraphFileReader(File file) throws Exception {
40                 super(file, SIZE);
41                 if(init) {
42                         init=false;
43                         @SuppressWarnings("resource")
44                         StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(file, 0);
45                         for(int i=0;i<40000;i++) r.readTG();
46                 }
47         }
48         
49         public StreamingTransferableGraphFileReader(InputStream stream) throws Exception {
50                 super(null, new InputChannel(stream), SIZE);
51                 if(init) {
52                         init=false;
53                         @SuppressWarnings("resource")
54                         StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(stream, 0);
55                         for(int i=0;i<40000;i++) r.readTG();
56                 }
57         }
58
59         public StreamingTransferableGraphFileReader(ReadableByteChannel channel) throws Exception {
60                 super(null, channel, SIZE);
61                 if(init) {
62                         init=false;
63                         @SuppressWarnings("resource")
64                         StreamingTransferableGraphFileReader r = new StreamingTransferableGraphFileReader(channel, 0);
65                         for(int i=0;i<40000;i++) r.readTG();
66                 }
67         }
68
69         public StreamingTransferableGraphFileReader(ReadableByteChannel channel, int size) throws IOException {
70                 super(null, channel, SIZE);
71         }
72
73         public StreamingTransferableGraphFileReader(InputStream stream, int size) throws IOException {
74                 super(null, new InputChannel(stream), size);
75         }
76
77         public StreamingTransferableGraphFileReader(File file, int size) throws IOException {
78                 super(file, size);
79         }
80
81         class FileTransferableGraphSource implements TransferableGraphSource {
82
83                 InputStream in = new InputStream() {
84
85             @Override
86             public int read() throws IOException {
87                 return getByte();
88             }
89             
90             @Override
91             public int read(byte[] b) throws IOException {
92                 // FIXME not correctly implemented                
93                 System.arraycopy(safeBytes(b.length), 0, b, 0, b.length);
94                 return b.length;
95             }
96             
97             @Override
98             public int read(byte[] b, int off, int len) throws IOException {
99                 for(int i=0;i<len;i++)
100                     b[off++] = (byte)getByte();
101                 return len;
102             }
103                     
104                 };
105                 DataInputStream dis = new DataInputStream(in);
106                 
107                 DataContainer header;
108                 Extensions extensions;
109                 int resourceCount;
110                 
111                 private int identities = -1;
112                 private int stmLength = -1;
113                 private int valueLength = -1;
114                 
115                 public FileTransferableGraphSource() throws Exception {
116                     init();
117                 }
118                 
119                 private void init() throws Exception {
120
121             // Header
122             header = DataContainers.readHeader(dis);
123             
124             // Content variant data type
125             Bindings.getSerializerUnchecked(Datatype.class).deserialize((DataInput)dis);
126
127             resourceCount = safeInt();
128             
129             List<Object> idcontext = new ArrayList<Object>(); 
130             extensions = (Extensions)Bindings.getSerializerUnchecked(Extensions.class).deserialize((DataInput)dis, idcontext);
131                     
132                 }
133                 
134         @Override
135         public void reset() throws Exception {
136             StreamingTransferableGraphFileReader.this.reset();
137             throw new UnsupportedOperationException();
138         }
139         
140                 @Override
141                 public DataContainer getHeader() throws Exception {
142                     return header;
143                 }
144                 
145                 @Override
146                 public int getResourceCount() throws Exception {
147                         return resourceCount;
148                 }
149
150                 @Override
151                 public int getIdentityCount() throws Exception {
152                         if(identities == -1) {
153                                 identities = safeInt();
154                         }
155                         return identities;
156                 }
157
158                 @Override
159                 public int getStatementCount() throws Exception {
160                         if(stmLength == -1) {
161                                 stmLength = safeInt();
162                         }
163                         return stmLength;
164                 }
165
166                 @Override
167                 public int getValueCount() throws Exception {
168                         if(valueLength == -1) {
169                                 valueLength = safeInt();
170                         }
171                         return valueLength;
172                 }
173
174                 @Override
175                 public void forStatements(ReadGraph graph, TransferableGraphSourceProcedure<int[]> procedure) throws Exception {
176
177                         int[] value = new int[4];
178
179                         int stmLength = getStatementCount();
180
181                         for(int stmIndex=0;stmIndex<stmLength;) {
182
183                                 value[stmIndex & 3] = safeInt();
184                                 stmIndex++;
185                                 if((stmIndex & 3) == 0) procedure.execute(value);
186
187                                 // Cached bytes 
188                                 int avail = (SIZE-byteIndex) >> 2;
189                                 int allowed = Math.min(stmLength-stmIndex, avail);
190                                 for(int index = byteIndex, i=0;i<allowed;i++) {
191                                         value[stmIndex & 3] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff));
192                                         stmIndex++;
193                                         if((stmIndex & 3) == 0) procedure.execute(value);
194 //                                      statements[stmIndex++] = ((bytes[index++]&0xff)<<24) | ((bytes[index++]&0xff)<<16) | ((bytes[index++]&0xff)<<8) | ((bytes[index++]&0xff));                              
195                                 }
196                                 byteIndex += allowed<<2;
197
198                         }
199
200                 }
201
202                 @Override
203                 public void forIdentities(ReadGraph graph, TransferableGraphSourceProcedure<Identity> procedure) throws Exception {
204
205                         int identities = getIdentityCount();
206
207                         for(int i=0;i<identities;i++) {
208
209                                 int rid = safeInt();
210                                 byte type = bytes[byteIndex++];
211                                 // External
212                                 if(type == 1) {
213
214                                         int parent = safeInt();
215                                         int nameLen = getDynamicUInt32();
216                                         if(byteIndex+nameLen < SIZE) {
217                                                 procedure.execute(new Identity(rid, new External(parent, utf(bytes, byteIndex, byteIndex + nameLen))));
218                                                 byteIndex += nameLen;
219                                         } else {
220                                                 procedure.execute(new Identity(rid, new External(parent, utf(safeBytes(nameLen), 0, nameLen))));
221                                         }
222                                 } 
223                                 // Internal
224                                 else if(type == 3) {
225
226                                         int parent = safeInt();
227                                         int nameLen = getDynamicUInt32();
228                                         if(byteIndex+nameLen < SIZE) {
229                                                 procedure.execute(new Identity(rid, new Internal(parent, utf(bytes, byteIndex, byteIndex + nameLen))));
230                                                 byteIndex += nameLen;
231                                         } else {
232                                                 procedure.execute(new Identity(rid, new Internal(parent, utf(safeBytes(nameLen), 0, nameLen))));
233                                         }
234
235                                 }
236                                 // Root
237                                 else if(type == 0) {
238
239                                         int nameLen = getDynamicUInt32();
240                                         String name = utf(safeBytes(nameLen), 0, nameLen);
241                                         int nameLen2 = getDynamicUInt32();
242                                         String rType = utf(safeBytes(nameLen2), 0, nameLen2);
243                                         procedure.execute(new Identity(rid, new Root(name, rType)));
244
245                                 }
246                                 // Optional
247                                 else if(type == 2) {
248                                         throw new UnsupportedOperationException("Optional identities not supported");
249                                 } else {
250                                         throw new IllegalStateException("Unsupported identity type " + type);
251                                 }
252
253                         }
254
255                 }
256
257                 @Override
258                 public void forValues(ReadGraph graph, TransferableGraphSourceProcedure<Value> procedure) throws Exception {
259
260                         int valueLength = getValueCount();
261
262                         Serializer variantSerializer = Bindings.getSerializerUnchecked(Bindings.VARIANT);
263
264                         List<Object> idcontext = new ArrayList<>(); 
265
266                         for(int i=0;i<valueLength;i++) {
267                                 int resource = safeInt();
268                                 idcontext.clear();
269                                 Variant value = (Variant)variantSerializer
270                                                 .deserialize((DataInput)dis, idcontext);
271                                 procedure.execute(new Value(resource, value));
272                         }
273
274                 }
275
276                 @Override
277                 public void forValues2(ReadGraph graph, TransferableGraphSourceValueProcedure procedure) throws Exception {
278
279                     Binding datatypeBinding = Bindings.getBinding(Datatype.class);
280             Serializer datatypeSerializer = Bindings.getSerializerUnchecked(datatypeBinding);
281
282             List<Object> idContext = new ArrayList<>(); 
283
284             for(int i=0;i<valueLength;i++) {
285                 int resource = safeInt();
286                 idContext.clear();
287                 Datatype type = (Datatype)datatypeSerializer.deserialize((DataInput)dis, idContext);
288                 procedure.execute(resource, type, dis);
289             }
290
291                 }
292
293                 @Override
294                 public TreeMap<String, Variant> getExtensions() {
295                         return extensions.map;
296                 }
297
298                 @Override
299                 public void close() {
300                 }
301         }
302         
303         public TransferableGraphSource readTG() throws Exception {
304
305                 if(getSize() == 0) return null;
306
307                 return new FileTransferableGraphSource();
308
309         }
310         
311     public static void main(String[] args) {
312
313         try {
314
315             File file = new File("c:/work/Model.apros");
316             StreamingTransferableGraphFileReader reader = new StreamingTransferableGraphFileReader(file, SIZE);
317             reader = new StreamingTransferableGraphFileReader(file);
318             long s = System.nanoTime();
319             TransferableGraphSource tgs = reader.readTG();
320             int ids = tgs.getIdentityCount();
321             System.out.println("identity count " + ids);
322 //            tgs.forIdentities(null, id -> { /*System.out.println("Identity: " + id);*/ });
323             tgs.forIdentities(null, id -> { System.out.println("Identity: " + id); });
324             int stats = tgs.getStatementCount();
325             System.out.println("statement count " + stats/4 + " (" + stats + ")");
326 //            tgs.forStatements(null, id -> { /*System.out.println(Arrays.toString(id));*/ });
327             tgs.forStatements(null, id -> { System.out.println(Arrays.toString(id)); });
328             int values = tgs.getValueCount();
329             System.out.println("value count " + values);
330             int[] valueNo = {0};
331             tgs.forValues2(null, new TransferableGraphSourceValueProcedure() {
332                 @Override
333                 public void rawCopy(int resource, int length, DataInput input) throws Exception {
334                     System.out.println("value " + (valueNo[0]++) + ": rawCopy("+ resource + ", " + length + ", " + input + ")");
335                     for (int i = 0; i < length; ++i)
336                         input.readByte();
337                 }
338                 @Override
339                 public void execute(int resource, Datatype type, DataInput input) throws Exception {
340                     Object value = Bindings.getSerializer(Bindings.getBinding(type)).deserialize(input);
341                     System.out.println("value " + (valueNo[0]++) + ": execute("+ resource + ", " + type.toSingleLineString() + ", " + input + "): " + value);
342                 }
343             });
344             long d = System.nanoTime() - s;
345             System.err.println("Duration=" + 1e-9*d + "s.");
346         } catch (Throwable t) {
347             t.printStackTrace();
348         }
349     }
350
351 }