Delete temporary files after use in delayed writes and model TG export
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / graph / DelayedWriteGraph.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl.graph;
13
14
15 import java.io.Closeable;
16 import java.io.File;
17 import java.io.FileInputStream;
18 import java.io.FileOutputStream;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.FileChannel;
24 import java.nio.file.Files;
25 import java.util.ArrayList;
26 import java.util.HashSet;
27 import java.util.Set;
28 import java.util.TreeMap;
29 import java.util.UUID;
30
31 import org.eclipse.core.runtime.Platform;
32 import org.simantics.databoard.Bindings;
33 import org.simantics.databoard.accessor.Accessor;
34 import org.simantics.databoard.binding.Binding;
35 import org.simantics.databoard.binding.error.BindingConstructionException;
36 import org.simantics.databoard.serialization.Serializer;
37 import org.simantics.databoard.type.Datatype;
38 import org.simantics.databoard.util.binary.RandomAccessBinary;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.ReadGraph;
41 import org.simantics.db.Resource;
42 import org.simantics.db.Session;
43 import org.simantics.db.Statement;
44 import org.simantics.db.VirtualGraph;
45 import org.simantics.db.WriteGraph;
46 import org.simantics.db.WriteOnlyGraph;
47 import org.simantics.db.common.MetadataUtils;
48 import org.simantics.db.common.request.WriteOnlyRequest;
49 import org.simantics.db.common.utils.Logger;
50 import org.simantics.db.exception.BindingException;
51 import org.simantics.db.exception.ClusterSetExistException;
52 import org.simantics.db.exception.DatabaseException;
53 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
54 import org.simantics.db.exception.NoSingleResultException;
55 import org.simantics.db.exception.ResourceNotFoundException;
56 import org.simantics.db.exception.ServiceException;
57 import org.simantics.db.impl.DebugPolicy;
58 import org.simantics.db.impl.ResourceImpl;
59 import org.simantics.db.request.DelayedWrite;
60 import org.simantics.db.request.WriteOnly;
61 import org.simantics.db.request.WriteResult;
62 import org.simantics.db.request.WriteTraits;
63 import org.simantics.db.service.ByteReader;
64 import org.simantics.db.service.ClusteringSupport;
65 import org.simantics.db.service.TransferableGraphSupport;
66 import org.simantics.db.service.XSupport;
67 import org.simantics.layer0.Layer0;
68 import org.simantics.utils.datastructures.MapList;
69
70 import gnu.trove.map.hash.TIntIntHashMap;
71 import gnu.trove.map.hash.TObjectIntHashMap;
72
73 /**
74  * Write graph implementation that does not modify the database
75  * immediately but with an explicit commit method. All read operations
76  * return results based on the old graph. 
77  */
78 public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, ByteReader, Closeable {
79
80         private static final boolean DEBUG = false;
81         private static final int BUFFER_SIZE = 512*1024;
82
83         private static final PassthroughSerializerBinding PASSTHROUGH = new PassthroughSerializerBinding();
84
85         private final int TERM = 0;
86         private final int CLAIM = 1;
87     private final int CLAIM_NOINVERSE = 2;
88         private final int CLAIM_VALUE_B = 4;
89     private final int DENY = 5;
90     private final int DENY_VALUE = 6;
91     private final int COMMIT_AND_CONTINUE = 7;
92
93     private static class State {
94         public File tempFile;
95         public FileOutputStream out;
96         public FileInputStream in;
97         public ArrayList<Resource> idToResource = new ArrayList<Resource>();
98         public TIntIntHashMap externalToId = new TIntIntHashMap(); 
99         public ArrayList<Binding> idToBinding = new ArrayList<Binding>();
100         public TObjectIntHashMap<Binding> bindingToId = new TObjectIntHashMap<Binding>();
101         public Set<Resource> clusterSets = new HashSet<Resource>();
102         public Set<Resource> clusterSetsForExistingResources = new HashSet<Resource>();
103         public int clusterCount = 0;
104         public long defaultCluster;
105         public Resource defaultClusterSet;
106         public int statementCount = 0;
107         public int valueCount = 0;
108         public int fileCount = 0;
109     }
110
111     private State writeState;
112     private TreeMap<String,byte[]> metadata = new TreeMap<String,byte[]>();
113
114     private FileChannel channel;
115     private byte[] bytes = new byte[BUFFER_SIZE];
116     private ByteBuffer bb = ByteBuffer.wrap(bytes);
117     private int byteIndex = 0;
118
119     private Layer0 b;
120     private Session session;
121
122     public static Resource convertDelayedResource(Resource r) {
123         if (r instanceof InternalResource)
124             return ((InternalResource) r).resource;
125         return r;
126     }
127
128     private static class InternalResource implements Resource {
129
130         int id;
131         long clusterId = 0;
132         Resource resource = null;
133         Resource clusterSet = null;
134
135         public InternalResource(int id, long clusterId) {
136             this.id = id;
137             this.clusterId = clusterId;
138         }
139
140         public InternalResource(int id, Resource clusterSet) {
141             this.id = id;
142             this.clusterSet = clusterSet;
143         }
144
145         @Override
146         public long getResourceId() {
147             throw new UnsupportedOperationException();
148         }
149
150         @Override
151         public Resource get() {
152             return this;
153         }
154
155         @Override
156         public boolean isPersistent() {
157             return false;
158         }
159
160         @Override
161         public int compareTo(Resource o) {
162             if(o instanceof InternalResource)
163                 return Integer.compare(id, ((InternalResource)o).id);
164             return -1;
165         }
166
167         @Override
168         public int hashCode() {
169             final int prime = 31;
170             int result = 1;
171             result = prime * result + id;
172             return result;
173         }
174
175         @Override
176         public int getThreadHash() {
177             return hashCode();
178         }
179
180         @Override
181         public boolean equals(Object obj) {
182             if (this == obj)
183                 return true;
184             if (obj == null)
185                 return false;
186             if (!(obj instanceof InternalResource))
187                 return false;
188             InternalResource other = (InternalResource) obj;
189             if (id != other.id)
190                 return false;
191             return true;
192         }
193
194         @Override
195         public boolean equalsResource(Resource other) {
196             return equals(other);
197         }
198
199         @Override
200         public String toString() {
201             StringBuilder sb = new StringBuilder(32);
202             if(DebugPolicy.VERBOSE) {
203                 sb.append("[delayed id=");
204                 sb.append(id);
205                 sb.append("]");
206             } else {
207                 sb.append("[did=");
208                 sb.append(id);
209                 sb.append("]");
210             }
211             return sb.toString();
212         }
213     }
214
215         private int getId(Resource resource) {
216                 if(resource instanceof InternalResource)
217                         return ((InternalResource)resource).id;
218                 else {
219                         ResourceImpl r = (ResourceImpl)resource;
220                         int id = writeState.externalToId.get(r.id);
221                         if(id != 0) {
222                                 return id;
223                         } else {
224                                 id = writeState.idToResource.size();
225                                 writeState.idToResource.add(resource);
226                                 writeState.externalToId.put(r.id, id);
227                                 return id;
228                         }
229                 }
230         }
231         
232         private Resource getResource(int id) {
233                 return writeState.idToResource.get(id);
234         }
235         
236         private int getBindingId(Binding binding) {
237                 if(writeState.bindingToId.contains(binding))
238                         return writeState.bindingToId.get(binding);
239                 else {
240                         int id = writeState.idToBinding.size();
241                         writeState.idToBinding.add(binding);
242                         writeState.bindingToId.put(binding, id);
243                         return id;
244                 }
245         }
246
247         public DelayedWriteGraph(ReadGraph g) throws IOException {
248                 super((ReadGraphImpl)g);
249                 writeState = new State();
250                 session = g.getSession();
251                 b = Layer0.getInstance(g);
252                 writeState.defaultCluster = newCluster();
253         }       
254         
255     public DelayedWriteGraph(ReadGraph g, State state) {
256         super((ReadGraphImpl)g);
257         session = g.getSession();
258                 b = Layer0.getInstance(g);
259         this.writeState = state;
260     }
261
262         public DelayedWriteGraph newSync() {
263         return new DelayedWriteGraph(this, writeState);
264     }
265         
266         @Override
267         public void claim(Resource subject, Resource predicate, Resource object)
268                         throws ServiceException {
269                 assert(subject != null);
270         assert(predicate != null);
271         assert(object != null);
272         
273         Resource inverse = getPossibleInverse(predicate);
274         claim(subject, predicate, inverse, object);
275         }
276
277     @Override
278     public void addLiteral(Resource resource, Resource predicate, Resource inverse, Resource type, Object value,
279             Binding binding) throws BindingException,
280             ManyObjectsForFunctionalRelationException, ServiceException {
281         Resource valueResource = newResource();
282         claimValue(valueResource, value, binding);
283         claim(valueResource, b.InstanceOf, null, type);
284         claim(resource, predicate, inverse, valueResource);  
285     }
286     
287     @Override
288     public void addLiteral(Resource resource, Resource predicate,
289                 Resource inverse, Object value, Binding binding)
290                 throws BindingException, ManyObjectsForFunctionalRelationException,
291                 ServiceException {
292         
293         Resource type = getType(value);
294         if(type == null) {
295             Resource literal = newResource();
296             type = b.Literal;
297             Resource dataType = newResource();
298             claim(dataType, b.InstanceOf, null, b.DataType);
299             claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL);
300             claim(literal, b.HasDataType, null, dataType);
301             claim(literal, b.InstanceOf, null, type);
302             claimValue(literal, value, binding); 
303             claim(resource, predicate, inverse, literal);
304         } else {
305                 addLiteral(resource, predicate, inverse, type, value, binding);
306         }
307         
308     }
309
310     @Override
311     public <T extends Accessor> T newLiteral(Resource resource, Resource predicate, Datatype datatype, Object initialValue)
312     throws DatabaseException {
313
314         throw new UnsupportedOperationException();
315
316     }
317
318     @Override
319     public RandomAccessBinary createRandomAccessBinary (Resource resource, Resource predicate, Datatype datatype, Object initialValue) throws DatabaseException {
320         throw new UnsupportedOperationException();
321     }
322     
323     @Override
324     public RandomAccessBinary createRandomAccessBinary(Resource resource, Datatype datatype, Object initialValue) throws DatabaseException {
325         throw new UnsupportedOperationException();
326     }
327
328     @Override
329     public void claimLiteral(Resource resource, Resource predicate, Object value) throws ManyObjectsForFunctionalRelationException, ServiceException {
330
331                 try {
332                         Binding b = Bindings.getBinding(value.getClass());
333                 claimLiteral(resource, predicate, value, b);
334                 } catch (BindingConstructionException e) {
335                         throw new IllegalArgumentException(e);
336                 } catch (BindingException e) {
337                         throw new IllegalArgumentException(e);
338                 }
339
340     }
341     
342     @Override
343     public void claimLiteral(Resource resource, Resource predicate, Object value, Binding binding) throws BindingException,
344                 ManyObjectsForFunctionalRelationException, ServiceException {
345
346         Statement valueStatement = null;
347         if(!(resource instanceof InternalResource)) valueStatement = getPossibleStatement(resource, predicate);
348
349         if(valueStatement != null && resource.equals(valueStatement.getSubject())) {
350
351             claimValue(valueStatement.getObject(), value, binding);
352
353         } else {
354
355             Resource type = getType(value);
356             Resource literal = newResource();
357             if (type == null) {
358                 type = b.Literal;
359                 Resource dataType = newResource();
360                 claim(dataType, b.InstanceOf, null, b.DataType);
361                 claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL);
362                 claim(literal, b.HasDataType, dataType);
363             }
364             claim(literal, b.InstanceOf, null, type);
365             claimValue(literal, value, binding); 
366             claim(resource, predicate, literal);
367             
368         }
369
370     }
371     
372     @Override
373     public void claimLiteral(Resource resource, Resource predicate,
374                 Resource inverse, Resource type, Object value)
375                 throws BindingException, ManyObjectsForFunctionalRelationException,
376                 ServiceException {
377
378                 try {
379                         Binding b = Bindings.getBinding(value.getClass());
380                 claimLiteral(resource, predicate, inverse, type, value, b);
381                 } catch (BindingConstructionException e) {
382                         throw new IllegalArgumentException(e);
383                 } catch (BindingException e) {
384                         throw new IllegalArgumentException(e);
385                 }
386         
387     }
388     
389     @Override
390     public void claimLiteral(Resource resource, Resource predicate,
391                 Resource inverse, Resource type, Object value, Binding binding)
392                 throws BindingException, ManyObjectsForFunctionalRelationException,
393                 ServiceException {
394
395         Statement valueStatement = (resource instanceof InternalResource) ? null : getPossibleStatement(resource, predicate);
396
397         if(valueStatement != null && resource.equals(valueStatement.getSubject())) {
398
399             claimValue(valueStatement.getObject(), value, binding);
400
401         } else {
402
403             Resource valueResource = newResource();
404             claim(valueResource, b.InstanceOf, null, type);
405             claim(resource, predicate, inverse, valueResource);
406             claimValue(valueResource, value, binding);
407
408         }
409
410     }
411
412     @Override
413     public void claimLiteral(Resource resource, Resource predicate,
414                 Resource type, Object value) throws BindingException,
415                 ManyObjectsForFunctionalRelationException, ServiceException {
416
417                 try {
418                         Binding b = Bindings.getBinding(value.getClass());
419                 claimLiteral(resource, predicate, type, value, b);
420                 } catch (BindingConstructionException e) {
421                         throw new IllegalArgumentException(e);
422                 } catch (BindingException e) {
423                         throw new IllegalArgumentException(e);
424                 }
425         
426     }
427     
428     @Override
429     public void claimLiteral(Resource resource, Resource predicate,
430                 Resource type, Object value, Binding binding)
431                 throws BindingException, ManyObjectsForFunctionalRelationException,
432                 ServiceException {
433
434                 try {
435                 Resource inverse = getSingleObject(predicate, b.InverseOf);
436                 claimLiteral(resource, predicate, inverse, type, value, binding);
437                 } catch (NoSingleResultException e) {
438                         throw new ServiceException(e);
439                 }
440         
441     }
442     
443         @Override
444         public void deny(Resource subject) throws ServiceException {
445                 assert(subject != null);
446                 if(!(subject instanceof InternalResource)) {
447                         try {
448                                 for (Statement statement : getStatements(subject, b.IsWeaklyRelatedTo)) {
449                                         deny(statement);
450                                 }
451                         } catch (ManyObjectsForFunctionalRelationException e) {
452                                 throw new ServiceException(e);
453                         }
454                 }
455         }
456
457         @Override
458         public void deny(Resource subject, Resource predicate)
459                         throws ServiceException {
460                 assert(subject != null);
461                 if(!(subject instanceof InternalResource)) {
462                         for (Resource object : getObjects(subject, predicate)) {
463                                 deny(subject, predicate, object);
464                         }
465                 }
466         }
467
468         @Override
469         public void deny(Resource subject, Resource predicate, Resource object) throws ServiceException {
470                 denyStatement(subject, predicate, object);
471         }
472
473         @Override
474         public void denyStatement(Resource subject, Resource predicate, Resource object) throws ServiceException {
475                 deny(subject, predicate, getPossibleInverse(predicate), object);
476         }
477
478         @Override
479         public void deny(Statement statement) throws ServiceException {
480                 Resource predicate = statement.getPredicate();
481                 deny(statement.getSubject(), predicate, getPossibleInverse(predicate), statement.getObject());
482         }
483
484         @Override
485         public void denyValue(Resource resource, Resource predicate)
486                         throws ManyObjectsForFunctionalRelationException, ServiceException {
487                 assert(resource != null);
488                 assert(predicate != null);
489
490                 if(!(resource instanceof InternalResource)) {
491                 Statement valueStatement = getPossibleStatement(resource, predicate);
492
493                 if (valueStatement != null && !valueStatement.isAsserted(resource)) {
494                     Resource value = valueStatement.getObject();
495                                 denyValue(value);
496                 }
497                 }
498         }
499
500         @Override
501         public Resource newResource() throws ServiceException {
502                 if(writeState.defaultClusterSet != null) return newResource(writeState.defaultClusterSet);
503                 else return newResource(writeState.defaultCluster);
504         }
505
506         @Override
507         public Resource newResource(long clusterId) throws ServiceException {
508                 int id = writeState.idToResource.size();
509                 InternalResource ret = new InternalResource(id, clusterId);
510                 writeState.idToResource.add(ret);
511                 return ret;
512         }
513
514     @Override
515     public Resource newResource(Resource clusterSet) throws ServiceException {
516         
517         if ((clusterSet instanceof InternalResource)) {
518                 if(!writeState.clusterSets.contains(clusterSet))
519                         throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
520         } else {
521                 WriteSupport ws = session.getService(WriteSupport.class);
522                 if (!ws.hasClusterSet(null, clusterSet))
523                         if(!writeState.clusterSetsForExistingResources.contains(clusterSet))
524                         throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
525         }
526         
527         int id = writeState.idToResource.size();
528         InternalResource ret = new InternalResource(id, clusterSet);
529         writeState.idToResource.add(ret);
530         return ret;
531     }
532
533     @Override
534     public void newClusterSet(Resource clusterSet) throws ServiceException {
535         if (DEBUG)
536             System.out.println("new cluster set=" + clusterSet);
537         boolean existingResource = !(clusterSet instanceof InternalResource);
538         if (existingResource) {
539                 WriteSupport ws = session.getService(WriteSupport.class);
540                 if (ws.hasClusterSet(null, clusterSet))
541                         throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
542                 writeState.clusterSetsForExistingResources.add(clusterSet);             
543         } else {
544                 if(!writeState.clusterSets.add(clusterSet))
545                         throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
546         }
547     }
548     @Override
549     public Resource setClusterSet4NewResource(Resource clusterSet)
550     throws ServiceException {
551         Resource existing = writeState.defaultClusterSet; 
552         writeState.defaultClusterSet = clusterSet;
553         return existing;
554     }
555         
556         @Override
557         public void claim(Resource subject, Resource predicate, Resource inverse,
558                         Resource object) throws ServiceException {
559                 assert(subject != null);
560                 assert(predicate != null);
561                 assert(object != null);
562                 try {
563                     if(inverse != null && !(subject.equals(object) && inverse.equals(predicate))) {
564                         writeByte(CLAIM);
565                         writeInt(getId(subject));
566                         writeInt(getId(predicate));
567                         writeInt(getId(inverse));
568                         writeInt(getId(object));
569                     } else {
570                         writeByte(CLAIM_NOINVERSE);
571                         writeInt(getId(subject));
572                         writeInt(getId(predicate));
573                         writeInt(getId(object));
574                     }
575                 } catch(Exception e) {
576                         throw new ServiceException(e);
577                 }
578         }
579
580     @Override
581     public void deny(Resource subject, Resource predicate, Resource inverse,
582             Resource object) throws ServiceException {
583         assert(subject != null);
584         assert(predicate != null);
585         assert(object != null);
586         try {
587                 writeByte(DENY);
588                 writeInt(getId(subject));
589                 writeInt(getId(predicate));
590             if(inverse != null) writeInt(getId(inverse));
591             else writeInt(0);
592                 writeInt(getId(object));
593         } catch(Exception e) {
594             throw new ServiceException(e);
595         }
596     }
597
598     @Override
599     public void deny(Resource subject, Resource predicate, Resource inverse,
600             Resource object, VirtualGraph graph) throws ServiceException {
601         throw new UnsupportedOperationException();
602     }
603     
604         @Override
605         public void claimValue(Resource resource, Object value)
606                         throws ServiceException {
607                 try {
608                     Binding binding = Bindings.getBinding(value.getClass());
609                     claimValue(resource, value, binding);
610                 } catch (BindingConstructionException e) {
611                         throw new ServiceException(e);
612                 }
613         }
614
615         private OutputStream valueWriter = new OutputStream() {
616                 @Override
617                 public void write(int b) throws IOException {
618                         writeByte(b);
619                 }
620         };
621
622         @Override
623         public void claimValue(Resource resource, Object value, Binding binding)
624                         throws ServiceException {
625                 try {
626
627                 writeByte(CLAIM_VALUE_B);
628                 writeInt(getId(resource));
629                 Serializer serializer = binding.serializer();
630                 int size = serializer.getSize(value);
631             writeInt(size);
632             serializer.serialize(valueWriter, value);
633
634                 } catch(IOException e) {
635                 Logger.defaultLogError(e);
636                         throw new ServiceException(e);
637                 }
638         }
639
640         @Override
641         public void denyValue(Resource resource) throws ServiceException {
642                 writeByte(DENY_VALUE);
643                 writeInt(getId(resource));
644         }
645
646         @Override
647         public void denyValue(Resource resource, VirtualGraph graph) throws ServiceException {
648         throw new UnsupportedOperationException();
649         }
650
651         @Override
652         public void flushCluster() throws ServiceException {
653         writeState.defaultCluster = newCluster();
654         }
655
656     @Override
657     public void flushCluster(Resource r) throws ServiceException {
658         throw new ServiceException("Operation flushCluster(" + r + " not implemented.");
659     }
660
661     private void writeReset(int size) {
662         byteIndex = 0;
663         bb.position(0);
664         bb.limit(size);
665         try {
666             if (writeState.tempFile == null) {
667                 File workspace = Platform.getLocation().toFile();
668                 File temp = new File(workspace, "tempFiles");
669                 File base = new File(temp, "delayed");
670                 Files.createDirectories(base.toPath());
671                 writeState.tempFile = new File(base, UUID.randomUUID().toString());
672                 writeState.out = new FileOutputStream(writeState.tempFile);
673                 channel = writeState.out.getChannel();
674             }
675
676             for (int got=0;got < size;) {
677                 int n = channel.write(bb); 
678                 if (n <= 0) {
679                     Logger.defaultLogError(new Exception("FileChannel.write returned " + n));
680                     return;
681                 }
682                 got += n;
683             }
684         } catch (IOException e) {
685             Logger.defaultLogError("Failed to write buffer of " + size + " bytes to temporary file " + writeState.tempFile, e);
686         }
687     }
688
689     private void reset() {
690         byteIndex = 0;
691         bb.clear();
692         if (channel != null) {
693             try {
694                 for(int got=0; got < BUFFER_SIZE;) {
695                     int n = channel.read(bb); 
696                     if (n <= 0)
697                         return;
698                     got += n;
699                 }
700             } catch (IOException e) {
701                 Logger.defaultLogError("FileChannel.read failed", e);
702             }
703         }
704     }
705
706     private void writeInt(int i) {
707         if(byteIndex < (BUFFER_SIZE-4)) {
708                 bytes[byteIndex++] = (byte)(i&0xff);
709                 bytes[byteIndex++] = (byte)((i>>>8)&0xff);
710                 bytes[byteIndex++] = (byte)((i>>>16)&0xff);
711                 bytes[byteIndex++] = (byte)((i>>>24)&0xff);
712                 if (byteIndex == BUFFER_SIZE)
713                     writeReset(BUFFER_SIZE);
714         } else {
715                 int has = BUFFER_SIZE-byteIndex;
716                 if(has == 0) writeReset(BUFFER_SIZE);
717                 bytes[byteIndex++] = (byte)(i&0xff);
718                 if(has == 1) writeReset(BUFFER_SIZE);
719                 bytes[byteIndex++] = (byte)((i>>>8)&0xff);
720                 if(has == 2) writeReset(BUFFER_SIZE);
721                 bytes[byteIndex++] = (byte)((i>>>16)&0xff);
722                 if(has == 3) writeReset(BUFFER_SIZE);
723                 bytes[byteIndex++] = (byte)((i>>>24)&0xff);
724                 if(has == 4) writeReset(BUFFER_SIZE);
725         }
726     }
727
728     private int readInt() {
729         if(byteIndex < (BUFFER_SIZE-4)) {
730                 int result = (int) 
731                         ((bytes[byteIndex++] & 0xff) | 
732                         ((bytes[byteIndex++] & 0xff)<<8) | 
733                         ((bytes[byteIndex++] & 0xff)<<16) | 
734                         ((bytes[byteIndex++] & 0xff)<<24)); 
735                 return result;
736         } else {
737                 int has = BUFFER_SIZE-byteIndex;
738                 int result = 0;
739                 if(has == 0) reset();
740                         result = (int)(bytes[byteIndex++] & 0xff);
741                 if(has == 1) reset();
742                 result |= (int)((bytes[byteIndex++] & 0xff) <<8);
743                 if(has == 2) reset();
744                 result |= (int)((bytes[byteIndex++] & 0xff) <<16);
745                 if(has == 3) reset();
746                 result |= (int)((bytes[byteIndex++] & 0xff) <<24);
747                 if(has == 4) reset();
748                 return result;
749         }
750     }
751
752     private byte readByte() {
753         byte result = bytes[byteIndex++]; 
754         if(byteIndex == BUFFER_SIZE) reset();
755         return result;
756     }
757
758     private void writeByte(int b) {
759         bytes[byteIndex++] = (byte)b;
760         if(byteIndex == BUFFER_SIZE) writeReset(BUFFER_SIZE);
761     }
762
763     private void writeBytes(byte[] data) {
764                 int has = BUFFER_SIZE-byteIndex;
765                 int amount = data.length;
766                 if(has > amount) {
767                         System.arraycopy(data, 0, bytes, byteIndex, amount);
768                         byteIndex += amount;
769                 } else {
770                         System.arraycopy(data, 0, bytes, byteIndex, has);
771                         writeReset(BUFFER_SIZE);
772                         ByteBuffer bb2 = ByteBuffer.wrap(data);
773                         bb2.position(has);
774                         try {
775                                 channel.write(bb2);
776                         } catch (IOException e) {
777                                 Logger.defaultLogError("FileChannel.write failed", e);
778                         }
779                 }
780     }
781
782     public byte[] readBytes(byte[] result, int amount) {
783         if(result == null) result = new byte[amount];
784                 int has = BUFFER_SIZE-byteIndex;
785                 if(has > amount) {
786                         System.arraycopy(bytes, byteIndex, result, 0, amount);
787                         byteIndex += amount;
788                 } else {
789                         System.arraycopy(bytes, byteIndex, result, 0, has);
790                         ByteBuffer bb2 = ByteBuffer.wrap(result);
791                         bb2.position(has);
792                         for(int got=has;got<amount;)
793                                 try {
794                                         got += channel.read(bb2);
795                                         if(got == -1) {
796                                                 // End-of-stream, why log this?
797                                                 return result;
798                                         }
799                                 } catch (IOException e) {
800                                         Logger.defaultLogError("FileChannel.read failed", e);
801                                 }
802                         reset();
803                 }
804                 return result;
805     }
806
807     public void commit(final WriteOnlyGraph w, final WriteTraits traits) throws ServiceException {
808             writeState.bindingToId = null;
809             writeState.externalToId = null;
810                 writeByte(TERM);
811
812                 if (writeState.out != null) {
813                         // Flush current buffer to file only if backing file has already
814                         // been taken into use.
815                         if (byteIndex > 0)
816                                 writeReset(byteIndex);
817
818                         try (OutputStream out = writeState.out) {
819                                 channel.force(false);
820                         } catch (IOException e) {
821                                 throw new ServiceException(e);
822                         } finally {
823                                 writeState.out = null;
824                         }
825
826                         try {
827                                 writeState.in = new FileInputStream(writeState.tempFile);
828                                 channel = writeState.in.getChannel();
829                         } catch (IOException e) {
830                                 throw new ServiceException(e);
831                         }
832                 }
833
834                 w.getMetadata().putAll(metadata);
835
836                 TransferableGraphSupport tgs = w.getService(TransferableGraphSupport.class);
837
838                 // First create all resources defined by clusterId
839                 MapList<Long,InternalResource> clusterAssignment = new MapList<Long,InternalResource>();
840                 for(Resource r : writeState.idToResource) {
841                         if(r instanceof InternalResource) {
842                                 InternalResource ir = (InternalResource)r;
843                                 if(ir.clusterId < 0) {
844                                 if (DEBUG)
845                                         System.out.println("ASSIGN CLUSTER " + ir + " => " + ir.clusterId);
846                                         clusterAssignment.add(ir.clusterId, ir);
847                                 } else if(ir.clusterId > 0) {
848                                 if (DEBUG)
849                                         System.out.println("-CREATED RESOURCE WITH EXISTING CLUSTER ID: " + ir);
850                                         ir.resource = w.newResource(ir.clusterId);
851                                         writeState.idToResource.set(ir.id, ir.resource);
852                                         if (writeState.clusterSets.contains(ir)) {
853                                                 w.newClusterSet(ir.resource);
854                                         if (DEBUG)
855                                                 System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource);
856                                         }
857                                         
858                                 }
859                         }
860                 }
861                 
862                 for(Long clusterKey : clusterAssignment.getKeys()) {
863                 if (DEBUG)
864                         System.out.println("CREATE LOGICAL CLUSTER: " + clusterKey);
865                         w.flushCluster();
866                         for(InternalResource ir : clusterAssignment.getValuesUnsafe(clusterKey)) {
867                         if (DEBUG)
868                                 System.out.println("-CREATED RESOURCE: " + ir);
869                                 ir.resource = w.newResource();
870                                 writeState.idToResource.set(ir.id, ir.resource);
871                                 if (writeState.clusterSets.contains(ir)) {
872                                         w.newClusterSet(ir.resource);
873                                 if (DEBUG)
874                                         System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource);
875                                 }
876                         }
877                 }
878                 
879                 // Create cluster sets for all existing resources (not InternalResource)
880                 // before proceeding to create resources.
881                 for(Resource existingResource : writeState.clusterSetsForExistingResources) {
882                         w.newClusterSet(existingResource);
883                 if (DEBUG)
884                         System.out.println("CREATED NEW CLUSTER SET: " + existingResource);
885                 }
886                 
887                 // Then create all resources defined by cluster set
888                 for(Resource r : writeState.idToResource) {
889                         if(r instanceof InternalResource) {
890                                 InternalResource ir = (InternalResource)r;
891                                 Resource clusterSet = ir.clusterSet;
892                                 
893                                 if (clusterSet != null) {
894                                 if (DEBUG)
895                                         System.out.println("NEW RESOURCE " + ir + " for cluster set " + clusterSet);
896                                         if(clusterSet instanceof InternalResource) {
897                                                 ir.resource = w.newResource(((InternalResource)clusterSet).resource);
898                                         } else {
899                                                 ir.resource = w.newResource(clusterSet);
900                                         }
901                                 if (DEBUG)
902                                         System.out.println(" => " + ir.resource);
903                                         writeState.idToResource.set(ir.id, ir.resource);
904                                         if(writeState.clusterSets.contains(ir)) {
905                                         if (DEBUG)
906                                                 System.out.println(" ==> NEW CLUSTER SET");
907                                                 w.newClusterSet(ir.resource);
908                                         }
909                                 }
910                                 
911                         }
912                 }
913                 
914                 reset();
915                 bb.limit(BUFFER_SIZE);
916                 
917                 try {
918                         while(true) {
919                                 byte method = readByte();
920                                 switch(method) {
921                                 case TERM: {
922                                         if (DEBUG) {
923                                                 System.out.println("Resources:  " + writeState.idToResource.size());
924                                                 System.out.println("Statements: " + writeState.statementCount);
925                                                 System.out.println("Values:     " + writeState.valueCount);
926                                                 System.out.println("Files:      " + writeState.fileCount);
927                                                 System.out.println("Clusters:   " + writeState.clusterCount);
928                                         }
929                                         return;
930                                 }
931                                 case CLAIM: {
932                                         writeState.statementCount += 2;
933                                         Resource subject = getResource(readInt());
934                                         Resource predicate = getResource(readInt());
935                     Resource inverse = getResource(readInt());
936                                         Resource object = getResource(readInt());
937                                         w.claim(subject, predicate, inverse, object);
938                                 } break;
939                 case CLAIM_NOINVERSE: {
940                     ++writeState.statementCount;
941                     Resource subject = getResource(readInt());
942                     Resource predicate = getResource(readInt());
943                     Resource object = getResource(readInt());
944                     w.claim(subject, predicate, null, object);
945                 } break;
946                 case DENY: {
947                     Resource subject = getResource(readInt());
948                     Resource predicate = getResource(readInt());
949                     int inv = readInt();
950                     Resource inverse = null;
951                     if(inv > 0) inverse = getResource(inv);
952                     Resource object = getResource(readInt());
953                     if(!subject.isPersistent() || !object.isPersistent()) {
954                         VirtualGraph statementProvider1 = processor.getProvider(subject, predicate, object);
955                         if(inv > 0) {
956                                 VirtualGraph statementProvider2 = processor.getProvider(object, inverse, subject);
957                                 if(statementProvider2 != null)
958                                         w.deny(object, inverse, null, subject, statementProvider2);
959                         }
960                         if(statementProvider1 != null)
961                                 w.deny(subject, predicate, null, object, statementProvider1);
962                     } else {
963                         w.deny(subject, predicate, inverse, object, null);
964                     }
965                 } break;
966                 case DENY_VALUE: {
967                     Resource subject = getResource(readInt());
968                     if(!subject.isPersistent()) {
969                         VirtualGraph provider = processor.getValueProvider(subject);
970                         if(provider != null)
971                                 w.denyValue(subject, provider);
972                     } else {
973                         w.denyValue(subject);
974                     }
975                 } break;
976                                 case CLAIM_VALUE_B: {
977                                         ++writeState.valueCount;
978                                         Resource resource = getResource(readInt());
979                                         int len = readInt();
980                                         tgs.setValue(w, resource, null, this, len);
981                                 } break;
982                                 case COMMIT_AND_CONTINUE: {
983                                         XSupport xs = w.getService(XSupport.class);
984                                         xs.commitAndContinue(w, traits);
985                                 } break;
986                                 }
987                         }
988                 } catch(Exception e) {
989                         if(e instanceof ServiceException)
990                                 throw (ServiceException)e;
991                         else
992                                 throw new ServiceException(e);
993                 } finally {
994                         channel = null;
995                         if (writeState.in != null) {
996                                 try (InputStream in = writeState.in) {
997                                 } catch (IOException e) {
998                                         throw new ServiceException(e);
999                                 } finally {
1000                                         writeState.in = null;
1001                                         writeState.tempFile.delete();
1002                                 }
1003                         }
1004                 }
1005         }
1006
1007         private Resource getType(Object value) {
1008                 Class<?> clazz = value.getClass();
1009                 Resource dataType = 
1010                           clazz == Float.class ? b.Float
1011                         : clazz == Double.class ? b.Double
1012                         : clazz == Integer.class ? b.Integer
1013                         : clazz == String.class ? b.String
1014                         : clazz == Boolean.class ? b.Boolean
1015                         : clazz == Byte.class ? b.Byte
1016                         : clazz == Long.class ? b.Long
1017                         : clazz == float[].class ? b.FloatArray
1018                         : clazz == double[].class ? b.DoubleArray
1019                         : clazz == int[].class ? b.IntegerArray
1020                         : clazz == String[].class ? b.StringArray
1021                         : clazz == boolean[].class ? b.BooleanArray
1022                         : clazz == byte[].class ? b.ByteArray
1023                         : clazz == long[].class ? b.LongArray
1024                         : null
1025                         ;
1026                 return dataType;
1027         }
1028
1029         public long newCluster() {
1030                 return -1 - (++writeState.clusterCount);
1031         }
1032
1033         public long getDefaultCluster() {
1034                 return writeState.defaultCluster;
1035         }
1036
1037         public void setDefaultCluster(long cluster) {
1038             writeState.defaultCluster = cluster;
1039         }
1040
1041     @Override
1042     public void syncRequest(final DelayedWrite request) throws DatabaseException {
1043
1044         try {
1045
1046                 final DelayedWriteGraph dwg = new DelayedWriteGraph(this);
1047                 request.perform(dwg);
1048
1049                 syncRequest(new WriteOnlyRequest() {
1050
1051                         @Override
1052                         public void perform(WriteOnlyGraph graph) throws DatabaseException {
1053                                 dwg.commit(graph, request);
1054                         }
1055
1056                 });
1057
1058         } catch (DatabaseException e) {
1059
1060                 throw e;
1061
1062         } catch (Throwable e) {
1063
1064                 throw new DatabaseException(e);
1065
1066         } finally {
1067         }
1068
1069     }
1070
1071     @Override
1072     public void syncRequest(WriteOnly request) throws DatabaseException {
1073
1074         Resource defaultClusterSet = setClusterSet4NewResource(null);
1075
1076         try {
1077             WriteSupport ws = session.getService(WriteSupport.class);
1078             ws.performWriteRequest(this, request);
1079         } catch (DatabaseException e) {
1080             throw e;
1081         } catch (Throwable t) {
1082             throw new DatabaseException(t);
1083         }  finally {
1084             setClusterSet4NewResource(defaultClusterSet);
1085         }
1086
1087     }
1088
1089     @SuppressWarnings("unchecked")
1090     @Override
1091     public <T> T getService(Class<T> api) {
1092
1093         if(ClusteringSupport.class == api) {
1094
1095             final ClusteringSupport support = (ClusteringSupport)super.getService(api); 
1096
1097             return (T)new ClusteringSupport() {
1098
1099                 @Override
1100                 public Resource getResourceByIndexAndCluster(int resourceIndex, long clusterId)
1101                         throws DatabaseException, ResourceNotFoundException {
1102                     return support.getResourceByIndexAndCluster(resourceIndex, clusterId);
1103                 }
1104
1105                 @Override
1106                 public Resource getResourceByKey(int resourceKey) throws ResourceNotFoundException {
1107                     return support.getResourceByKey(resourceKey);
1108                 }
1109
1110                 @Override
1111                 public int getNumberOfResources(long clusterId)
1112                                 throws DatabaseException {
1113                         return support.getNumberOfResources(clusterId);
1114                 }
1115
1116                 @Override
1117                 public long getCluster(Resource r) {
1118                     return support.getCluster(r);
1119                 }
1120
1121                 @Override
1122                 public long createCluster() {
1123                     return newCluster();
1124                 }
1125
1126                 @Override
1127                 public boolean isClusterSet(Resource r) throws DatabaseException {
1128                         return support.isClusterSet(r);
1129                 }
1130
1131                 @Override
1132                 public Resource getClusterSetOfCluster(Resource r) throws DatabaseException {
1133                         return support.getClusterSetOfCluster(r);
1134                 }
1135
1136                 @Override
1137                 public Resource getClusterSetOfCluster(long cluster) throws DatabaseException {
1138                         return support.getClusterSetOfCluster(cluster);
1139                 }
1140
1141             };
1142
1143         } else if (TransferableGraphSupport.class == api) {
1144
1145                 final TransferableGraphSupport parentSupport = session.getService(TransferableGraphSupport.class);
1146
1147                 return (T)new TransferableGraphSupport() {
1148
1149                         @Override
1150                         public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, byte[] raw) {
1151                                 writeByte(CLAIM_VALUE_B);
1152                                 writeInt(getId(resource));
1153                                 writeInt(raw.length);
1154                                 writeBytes(raw);
1155                                 writeInt(getBindingId(PASSTHROUGH));
1156                         }
1157
1158                         @Override
1159                         public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, ByteReader reader, int amount)
1160                                 throws DatabaseException {
1161                     writeByte(CLAIM_VALUE_B);
1162                     writeInt(getId(resource));
1163                     writeInt(amount);
1164                     writeBytes(reader.readBytes(null, amount));
1165                     writeInt(getBindingId(PASSTHROUGH));
1166                         }
1167
1168                         @Override
1169                         public byte[] getValue(ReadGraph graph, Resource resource) {
1170                                 return parentSupport.getValue(graph, resource);
1171                         }
1172
1173                         @Override
1174                         public InputStream getValueStream(ReadGraph graph, Resource resource) {
1175                                 return parentSupport.getValueStream(graph, resource);
1176                         }
1177
1178                 };
1179
1180         }
1181
1182         return super.getService(api);
1183
1184     }
1185
1186     @Override
1187     public <T> void addMetadata(Metadata data) throws ServiceException {
1188         MetadataUtils.addMetadata(session, metadata, data);
1189     }
1190     
1191     public void addCommitAndContinue() {
1192                 writeByte(COMMIT_AND_CONTINUE);
1193     }
1194
1195     @Override
1196     public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1197         return MetadataUtils.getMetadata(session, metadata, clazz);
1198     }
1199
1200     @Override
1201     public TreeMap<String, byte[]> getMetadata() {
1202         return metadata;
1203     }
1204     @Override
1205     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1206         
1207         return request.perform(this);
1208         
1209     }
1210
1211     @Override
1212     public VirtualGraph getProvider() {
1213         return null;
1214     }
1215
1216     @Override
1217     public void clearUndoList(WriteTraits writeTraits) {
1218         WriteSupport ws = session.getService(WriteSupport.class);
1219         if (null != ws)
1220             ws.clearUndoList(writeTraits);
1221     }
1222
1223         @Override
1224         public void markUndoPoint() {
1225                 // TODO Auto-generated method stub
1226                 
1227         }
1228
1229         @Override
1230         public <T> T getPossibleRelatedValue(final Resource subject, final Resource relation, final Binding binding)
1231                         throws ManyObjectsForFunctionalRelationException, BindingException, ServiceException {
1232         if(!(subject instanceof InternalResource)) {
1233                 return super.getPossibleRelatedValue(subject, relation, binding);
1234         } else {
1235                 return null;
1236         }
1237         }
1238         
1239         @Override
1240         final public Resource getPossibleObject(final Resource subject, final Resource relation)
1241                         throws ManyObjectsForFunctionalRelationException, ServiceException {
1242         if(!(subject instanceof InternalResource)) {
1243                 return super.getPossibleObject(subject, relation);
1244         } else {
1245                 return null;
1246         }
1247         }
1248
1249     public void close() {
1250         if (writeState.out != null) {
1251             try (OutputStream out = writeState.out) {
1252             } catch (IOException e) {
1253                 Logger.defaultLogError("Failed to close delayed write graph temporary commit output stream", e);
1254             } finally {
1255                 writeState.out = null;
1256             }
1257         }
1258         if (writeState.in != null) {
1259             try (InputStream in = writeState.in) {
1260             } catch (IOException e) {
1261                 Logger.defaultLogError("Failed to close delayed write graph temporary commit input stream", e);
1262             } finally {
1263                 writeState.in = null;
1264             }
1265         }
1266         if (writeState.tempFile != null) {
1267             writeState.tempFile.delete();
1268             writeState.tempFile = null;
1269         }
1270     }
1271
1272 }