]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/TransientGraph.java
f4e722fe5adea864496e4a7864197ae7fb6e877d
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / TransientGraph.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl;
13
14 import java.io.Closeable;
15 import java.io.File;
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.Collection;
20 import java.util.HashSet;
21 import java.util.LinkedList;
22 import java.util.function.Consumer;
23
24 import org.simantics.databoard.Bindings;
25 import org.simantics.databoard.binding.Binding;
26 import org.simantics.databoard.serialization.SerializationException;
27 import org.simantics.databoard.serialization.Serializer;
28 import org.simantics.databoard.serialization.SerializerConstructionException;
29 import org.simantics.db.AsyncReadGraph;
30 import org.simantics.db.AsyncRequestProcessor;
31 import org.simantics.db.RequestProcessor;
32 import org.simantics.db.Resource;
33 import org.simantics.db.Statement;
34 import org.simantics.db.VirtualGraphContext;
35 import org.simantics.db.VirtualGraphSource;
36 import org.simantics.db.WriteGraph;
37 import org.simantics.db.WriteOnlyGraph;
38 import org.simantics.db.common.ByteFileReader;
39 import org.simantics.db.common.ByteFileWriter;
40 import org.simantics.db.common.StandardStatement;
41 import org.simantics.db.common.request.WriteOnlyRequest;
42 import org.simantics.db.common.request.WriteRequest;
43 import org.simantics.db.common.utils.Logger;
44 import org.simantics.db.exception.DatabaseException;
45 import org.simantics.db.impl.graph.ReadGraphImpl;
46 import org.simantics.db.impl.support.ResourceSupport;
47 import org.simantics.db.impl.support.VirtualGraphServerSupport;
48 import org.simantics.db.procedure.AsyncProcedure;
49 import org.simantics.db.request.Write;
50 import org.simantics.db.request.WriteOnly;
51 import org.simantics.db.service.SerialisationSupport;
52
53 import gnu.trove.list.array.TIntArrayList;
54 import gnu.trove.map.hash.TIntObjectHashMap;
55 import gnu.trove.procedure.TIntObjectProcedure;
56 import gnu.trove.procedure.TIntProcedure;
57 import gnu.trove.set.hash.TIntHashSet;
58
59 class VirtualCluster {
60     static final boolean DEBUG = false;
61     final static int[] EMPTY = new int[0];
62
63     private final ArrayList<TIntArrayList> statements = new ArrayList<TIntArrayList>();
64     private final TIntHashSet lazy = new TIntHashSet();
65     private final TIntObjectHashMap<byte[]> values = new TIntObjectHashMap<byte[]>();
66     private final int clusterId;
67
68     public VirtualCluster(int clusterId) {
69         this.clusterId = clusterId;
70     }
71     
72     public int clusterId() {
73         return clusterId;
74     }
75     
76     public void trim() {
77     }
78     
79     private TIntArrayList getPredicateMap(int subject) {
80         
81         int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);
82         if(rId >= statements.size()) return null;
83         return statements.get(rId);
84         
85     }
86     
87     public boolean isPending(int subject) {
88         return lazy.contains(subject);
89     }
90
91     boolean containsPredicate(TIntArrayList statements, int predicate) {
92         for(int i=0;i<statements.size();i+=2) if(statements.getQuick(i) == predicate) return true;
93         return false;
94     }
95
96     int containsStatement(TIntArrayList statements, int predicate, int object) {
97         for(int i=0;i<statements.size();i+=2) if(statements.getQuick(i) == predicate && statements.getQuick(i+1) == object) return i;
98         return -1;
99     }
100     
101     public boolean isPending(int subject, int predicate) {
102         
103         if(!lazy.contains(subject)) return false;
104         TIntArrayList predicateMap = getPredicateMap(subject);
105         if(predicateMap == null) return true;
106         return !containsPredicate(predicateMap, predicate);
107 //        return !predicateMap.contains(predicate);
108         
109     }
110     
111     public void resetLazy(int subject) {
112
113         lazy.remove(subject);
114         // Query all data from scratch
115         int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);
116         statements.set(ri, new TIntArrayList());
117
118     }
119     
120     public void setLazy(int subject) {
121
122         lazy.add(subject);
123         
124     }
125
126     public void finish(int subject) {
127
128         lazy.remove(subject);
129         
130     }
131     
132     public void setValue(int subject, byte[] data, int length) {
133
134         values.put(subject, Arrays.copyOf(data, length));
135         
136     }
137
138     public void denyValue(int subject) {
139         
140         values.remove(subject);
141         
142     }
143
144     public void addStatements(int subject, int[] data) {
145         
146         TIntArrayList result = getPredicateMap(subject);
147         if(result == null) {
148             result = new TIntArrayList();
149             int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);
150             statements.ensureCapacity(rId+1);
151             for(int i=statements.size();i<rId+1;i++) statements.add(null);
152             statements.set(rId, result);
153         }
154         
155         for(int i=0;i<data.length;i+=2) {
156             int predicate =  data[i];
157             int object =  data[i+1];
158             if(containsStatement(result, predicate, object) < 0) {
159                 result.add(predicate);
160                 result.add(object);
161             }
162         }
163         
164     }
165     
166     public void claim(int subject, int predicate, int object) {
167         
168         TIntArrayList predicates = getPredicateMap(subject);
169         if(predicates == null) {
170             predicates = new TIntArrayList();
171             int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);
172             statements.ensureCapacity(rId+1);
173             for(int i=statements.size();i<rId+1;i++) statements.add(null);
174             statements.set(rId, predicates);
175         }
176         
177         if(containsStatement(predicates, predicate, object) < 0) {
178             predicates.add(predicate);
179             predicates.add(object);
180         }
181         
182     }
183     
184     public void deny(int subject, int predicate, int object) {
185     
186         TIntArrayList predicates = getPredicateMap(subject);
187         if(predicates == null) return;
188         int index = containsStatement(predicates, predicate, object);
189         if(index < 0) return;
190         predicates.remove(index, 2);
191         
192     }
193     
194     int[] getObjects(int subject, int predicate) {
195         
196         TIntArrayList predicates = getPredicateMap(subject);
197         if(predicates == null) return EMPTY;
198         TIntArrayList result = new TIntArrayList();
199         for(int i=0;i<predicates.size();i+=2) {
200             if(predicates.getQuick(i) == predicate) {
201                 result.add(predicates.getQuick(i+1));
202             }
203         }
204         return result.toArray();
205         
206     }
207     
208     int[] getPredicates(int subject) {
209
210         TIntArrayList predicates = getPredicateMap(subject);
211         if(predicates == null) return EMPTY;
212         TIntHashSet result = new TIntHashSet();
213         for(int i=0;i<predicates.size();i+=2) {
214             result.add(predicates.getQuick(i));
215         }
216         return result.toArray();
217         
218     }
219     
220     byte[] getValue(int subject) {
221         return values.get(subject);
222     }
223
224     public int getTransientClusterKey() {
225         int clusterBits = ClusterTraitsBase.getClusterBits(clusterId>>>1);
226         if((clusterId & 1) == 0) // Virtual subjects
227             return clusterBits | 0x80000000; // We are assuming that MSB means virtual resource.
228         else // Database subjects
229             return clusterBits;
230     }
231
232     public int getTransientId(int subject) {
233         if((clusterId & 1) == 0) // Virtual subjects
234             return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject) | 0x80000000;
235         else // Database subjects
236             return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject);
237     }
238     
239     /*
240      * Creates a persistent identifier for given transient graph cluster identifier.
241      * 
242      * 
243      * For persistent clusters this obtains the random access id for (invalid)
244      * resource at index 0 of given cluster. LSB in given id indicates persistence
245      * 
246      * 
247      */
248     
249     public static long getClusterIdentifier(SerialisationSupport ss, int clusterKey) { 
250         if((clusterKey & 1) == 0)// Virtual subjects
251             return clusterKey;
252         else { // Database subjects
253             int rk = ClusterTraitsBase.createResourceKeyNoThrow(clusterKey>>1, 1);
254             try {
255                 // Assuming that cluster's first resource is created first and not deleted.
256                 // Assuming that resource index is in LSB bits.
257                 return ss.getRandomAccessId(rk);
258             } catch (DatabaseException e) {
259                 Logger.defaultLogError("Could not get cluster id for virtual cluster key=" + clusterKey, e);
260                 return -1;
261             }
262         }
263     }
264
265     public void saveImpl(final File file, SerialisationSupport ss) throws IOException {
266         if (DEBUG)
267             System.out.println("DEBUG: Saving virtual cluster " + clusterId + " to " + file.getAbsolutePath());
268          
269         final ByteFileWriter writer = new ByteFileWriter(file);
270         
271         try {
272
273             int stms = 0;
274             for(TIntArrayList list : statements) {
275                 if(list != null)
276                     stms += list.size();
277             }
278
279             writer.write((int)(1.5*stms));
280             @SuppressWarnings("unused")
281             int count = 0;
282             for(int i=0;i<statements.size();i++) {
283                 TIntArrayList list = statements.get(i);
284                 if(list != null) {
285                     for(int j=0;j<list.size();j+=2) {
286                         try {
287                             writer.write((short)i);
288                             int rk = list.getQuick(j);
289                             long rid = ss.getRandomAccessId(rk);
290                             writer.write(rid);
291                             rk = list.getQuick(j+1);
292                             rid = ss.getRandomAccessId(rk);
293                             writer.write(rid);
294                             count++;
295                         } catch (DatabaseException e) {
296                             e.printStackTrace();
297                         }
298                     }
299                 }
300             }
301
302             writer.write(values.size());
303
304             values.forEachEntry(new TIntObjectProcedure<byte[]>() {
305
306                 @Override
307                 public boolean execute(int a, byte[] b) {
308                     writer.write(a);
309                     writer.write(b.length);
310                     writer.write(b);
311                     return true;
312                 }
313             });
314             if (DEBUG)
315                 System.out.println("TransientGraph[" + file.getAbsolutePath() + "] wrote " + count + " statements and " + values.size() + " values to disk.");
316
317         } finally {
318             writer.commit();
319 //            FileUtils.uncheckedClose(_os);
320         }
321     }
322     
323     public void load(File file, SerialisationSupport serialization, VirtualGraphServerSupport vgss) throws DatabaseException {
324         if (DEBUG)
325             System.out.println("DEBUG: Loading virtual cluster " + clusterId + " from " + file.getAbsolutePath() + " " + file.length());
326         
327         ByteFileReader reader = null;
328         try {
329             
330             if (!file.exists())
331                 return;
332
333             reader = new ByteFileReader(file);
334             int clusterInt = ClusterTraitsBase.getClusterBits(clusterId()>>1);
335             int stms = reader.readInt();
336             for (int i = 0; i < stms; i += 3) {
337 //                int rId = reader.readShort();
338 //                if(vgss != null) vgss.addVirtual(clusterInt + rId);
339 //                claim(rId,
340 //                        serialization.getTransientId(reader.readLong()),
341 //                        serialization.getTransientId(reader.readLong()));
342                 int rId = reader.readShort();
343                 long sId = reader.readLong();
344                 long oId = reader.readLong();
345                 int sTransientId = serialization.getTransientId(sId);
346                 int oTransientId = serialization.getTransientId(oId);
347                 
348                 if(vgss != null)
349                     vgss.addVirtual(clusterInt + rId);
350                 
351                 claim(rId, sTransientId, oTransientId);
352             }
353
354             int values = reader.readInt();
355             for (int i = 0; i < values; i++) {
356                 int subject = reader.readInt();
357                 int length = reader.readInt();
358                 setValue(subject, reader.readBytes(length), length);
359             }
360             if (DEBUG)
361                 System.out.println("DEBUG: TransientGraph[" + file.getAbsolutePath() + "] loaded " + stms / 3 + " statements and " + values + " values from disk.");
362             
363         } catch (IOException e) {
364             throw new DatabaseException(e);
365         } finally {
366             if (reader != null)
367                 reader.close();
368         }
369     }
370     
371     void listStatements(SerialisationSupport ss, ArrayList<Statement> result) {
372
373         int clusterKey = getTransientClusterKey();
374         
375         try {
376             for(int i=0;i<statements.size();i++) {
377                 TIntArrayList list = statements.get(i);
378                 if(list != null) {
379                     Resource subject = ss.getResource(clusterKey | i);
380                     for(int j=0;j<list.size();j+=2) {
381                         Resource p = ss.getResource(list.getQuick(j));
382                         Resource o = ss.getResource(list.getQuick(j+1));
383                         result.add(new StandardStatement(subject, p, o));
384                     }
385
386                 }
387             }
388         } catch (DatabaseException e) {
389             e.printStackTrace();
390         }
391         
392     }
393
394     void listValues(final SerialisationSupport ss, final ArrayList<Resource> result) {
395
396         values.forEachKey(new TIntProcedure() {
397
398             @Override
399             public boolean execute(int value) {
400                 try {
401                     result.add(ss.getResource(getTransientId(value)));
402                 } catch (DatabaseException e) {
403                     e.printStackTrace();
404                 }
405                 return true;
406             }
407
408         });
409         
410     }
411
412 }
413
414 public class TransientGraph implements VirtualGraphImpl, VirtualGraphContext {
415     private static final boolean DEBUG = VirtualCluster.DEBUG;
416     final private static int SWAP_LIMIT = 30;
417     
418 //    final private static byte[] NO_VALUE = new byte[0];
419     final private static VirtualCluster NO_CLUSTER = new VirtualCluster(-1);
420     
421     final private Persistency persistency;
422     
423     final private SerialisationSupport serialization;
424     final private ResourceSupport resourceSupport;
425     final private VirtualGraphServerSupport virtualGraphServerSupport;
426     final private RequestProcessor sessionRequestProcessor;
427     
428     /*
429      * Cluster array by index.
430      * -NO_CLUSTER value means that there is no such virtual cluster
431      * -null value means that such virtual cluster could be on disk
432      */
433     final private ArrayList<VirtualCluster> clusters = new ArrayList<VirtualCluster>();
434
435     /*
436      * A list of resident clusters
437      */
438     final private LinkedList<VirtualCluster> memoryClusters = new LinkedList<VirtualCluster>();
439     
440     private final HashSet<VirtualGraphSource> sources = new HashSet<VirtualGraphSource>();
441     
442     final String identifier;
443     final String databaseId;
444
445     TIntObjectHashMap<TIntHashSet> NO_STATEMENTS = new TIntObjectHashMap<TIntHashSet>();
446
447     int[] EMPTY = new int[0];
448
449     public static TransientGraph workspacePersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, RequestProcessor srp, String databaseId, String identifier) throws DatabaseException {
450         TransientGraph graph = new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.WORKSPACE);
451         graph.load();
452         return graph;
453     }
454     
455     public static TransientGraph memoryPersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, RequestProcessor srp, String databaseId, String identifier) {
456         return new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.MEMORY);
457     }
458     
459     private TransientGraph(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, RequestProcessor srp, String databaseId, String identifier, Persistency persistency) {
460         this.serialization = ss;
461         this.virtualGraphServerSupport = vgss;
462         this.sessionRequestProcessor = srp;
463         this.resourceSupport = rs;
464         this.identifier = identifier;
465         this.databaseId = databaseId;
466         
467         this.persistency = persistency;
468     }
469
470     public String getIdentifier()  {
471         return identifier;
472     }
473     
474     private int transientClusterId(long id) throws DatabaseException {
475         if (DEBUG)
476             System.out.println("DEBUG: transientClusterId=" + id);
477         if ((id & 1) == 0) // Cluster of virtual subjects
478             return (int)id;
479         // Corresponds to persistent cluster
480         int rk = serialization.getTransientId(id);
481         return ClusterTraitsBase.getClusterKeyFromResourceKey(rk) << 1 | 1;
482     }
483
484     private String getPrefix() {
485         return identifier + "." + persistency.identifier() + "." + databaseId;
486     }
487     
488     private void load() throws DatabaseException {
489
490         String prefix = getPrefix();
491         for(String file : virtualGraphServerSupport.storagePath().list()) {
492             try {
493                 if(file.startsWith(prefix)) {
494                     long clusterLong = Long.parseLong(file.substring(prefix.length()+4));
495                     int clusterId = transientClusterId(clusterLong);
496                     VirtualCluster cluster = new VirtualCluster(clusterId);
497                     cluster.load(new File(virtualGraphServerSupport.storagePath(), file), serialization, (clusterId & 1) > 0 ? virtualGraphServerSupport : null);
498                     clusters.ensureCapacity(clusterId+1);
499                     for(int i=clusters.size(); i<clusterId+1; i++) clusters.add(null);
500                     clusters.set(clusterId, cluster);
501                     memoryClusters.addLast(cluster);
502                 }
503             } catch (DatabaseException t) {
504                 // file is assumably broken, delete it
505                 File filee = new File(virtualGraphServerSupport.storagePath(), file);
506                 if (!filee.delete()) {
507                     System.err.println("Could not delete file " + filee.getAbsolutePath());
508                 }
509                 throw t;
510             }
511         }
512
513     }
514
515     public void dispose() {
516         try {
517             saveImpl(serialization);
518         } catch (IOException e) {
519             e.printStackTrace();
520         }
521     }
522
523     public void save() {
524         
525         try {
526             saveImpl(serialization);
527         } catch (IOException e) {
528             e.printStackTrace();
529         }
530         
531     }
532
533     public void saveImpl(final SerialisationSupport ss) throws IOException {
534         
535         for(VirtualCluster cluster : memoryClusters) {
536                 String prefix = getPrefix();
537             File file = new File(virtualGraphServerSupport.storagePath(), prefix + ".vg." + VirtualCluster.getClusterIdentifier(ss, cluster.clusterId()));
538             cluster.saveImpl(file, ss);
539         }
540         
541     }
542
543     /**
544      * Closes a stream and ignores any resulting exception. This is useful
545      * when doing stream cleanup in a finally block where secondary exceptions
546      * are not worth logging.
547      */
548     static void uncheckedClose(Closeable closeable) {
549         try {
550             if (closeable != null)
551                 closeable.close();
552         } catch (IOException e) {
553             //ignore
554         }
555     }
556
557     private void trimClusters() {
558         for(VirtualCluster cluster : memoryClusters) {
559             cluster.trim();
560         }
561     }
562     
563     /*
564      * Returns a transient cluster index
565      * -Transient clusters for persistent resources have index with LSB=1
566      * -Transient clusters for virtual resources have index with LSB=0
567      * 
568      * @param subject is a DB client transient id
569      * 
570      * For persistent resources cluster id is 2*clusterKey+1
571      * For virtual resources transient ids are persistent and are directly chunked into 14-bit clusters.
572      * 
573      */
574     public static int getVirtualClusterKey(int subject) {
575         if (subject > 0) {
576             int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(subject);
577             return (ck << 1) | 1;
578         }
579         int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(~subject);
580         return ck << 1;
581     }
582     
583     private VirtualCluster getOrLoad(int virtualClusterKey) {
584
585         if(virtualClusterKey < clusters.size()) {
586             VirtualCluster cluster = clusters.get(virtualClusterKey);
587             if(NO_CLUSTER == cluster) return null;
588             if(cluster != null) return cluster;
589         }
590         if (DEBUG)
591             System.out.println("Loading virtual cluster " + virtualClusterKey + " for " + identifier);
592
593         clusters.ensureCapacity(virtualClusterKey+1);
594         for(int i=clusters.size(); i<virtualClusterKey+1; i++) clusters.add(null);
595
596         //if(!VirtualCluster.isValidVirtualClusterKey(serialization, virtualClusterKey)) return null;
597
598         long clusterIdentifier = VirtualCluster.getClusterIdentifier(serialization, virtualClusterKey);
599         File file = new File(virtualGraphServerSupport.storagePath(), getPrefix() + ".vg." + clusterIdentifier);
600         if(file.exists()) {
601             VirtualCluster cluster = new VirtualCluster(virtualClusterKey);
602 //      System.out.println("Loading virtual cluster2 " + virtualClusterKey + " for " + identifier);
603             try {
604                 cluster.load(file, serialization, (virtualClusterKey & 1) > 0 ? virtualGraphServerSupport : null);
605                 clusters.set(virtualClusterKey, cluster);
606                 memoryClusters.addFirst(cluster);
607                 swap();
608                 return cluster;
609             } catch (DatabaseException e) {
610                 e.printStackTrace();
611                 // File must be corrupt, lets delete it so we wont load it next time in future
612                 file.delete();
613                 
614                 clusters.set(virtualClusterKey, NO_CLUSTER);
615                 return null;
616             }
617
618         } else {
619             clusters.set(virtualClusterKey, NO_CLUSTER);
620             return null;
621         }
622     }
623
624     private void swap() {
625         
626         trimClusters();
627         while(memoryClusters.size() > SWAP_LIMIT) {
628             VirtualCluster remo = memoryClusters.removeLast();
629             File file = new File(virtualGraphServerSupport.storagePath(), getPrefix() + ".vg." + VirtualCluster.getClusterIdentifier(serialization, remo.clusterId()));
630             try {
631                 remo.saveImpl(file, serialization);
632             } catch (IOException e) {
633                 e.printStackTrace();
634             }
635             clusters.set(remo.clusterId(), null);
636         }
637         
638     }
639     
640     private synchronized VirtualCluster getCluster(int subject, boolean create) {
641
642         int clusterId = getVirtualClusterKey(subject);
643         
644         VirtualCluster cluster = getOrLoad(clusterId);
645         if(cluster != null) return cluster;
646         
647         if(create) {
648
649             clusters.ensureCapacity(clusterId+1);
650             for(int i=clusters.size(); i<clusterId+1; i++) clusters.add(null);
651             cluster = new VirtualCluster(clusterId);
652             clusters.set(clusterId, cluster);
653             memoryClusters.addFirst(cluster);
654             swap();
655             return cluster;
656
657         } else {
658             return null;
659         }
660         
661     }
662     
663     private synchronized void applyValue(int subject, Object value, Binding binding) {
664
665         try {
666             Serializer serializer = Bindings.getSerializer( binding );
667             byte[] serialized = serializer.serialize(value);
668             VirtualCluster cluster = getCluster(subject, true);
669             cluster.setValue(subject, serialized, serialized.length);
670         } catch (SerializationException e) {
671             e.printStackTrace();
672         } catch (SerializerConstructionException e) {
673             e.printStackTrace();
674         } catch (IOException e) {
675             // TODO Auto-generated catch block
676             e.printStackTrace();
677         }
678
679     }
680
681     private synchronized void applyStatements(int subject, int[] statements) {
682
683         VirtualCluster cluster = getCluster(subject, true);
684         cluster.addStatements(subject, statements);
685         
686         
687         if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
688
689     }
690
691     private synchronized void produceAllStatements(ReadGraphImpl graph, final int subject, final AsyncProcedure<Object> procedure) throws DatabaseException {
692
693         VirtualCluster cluster = getCluster(subject, true);
694
695         // This resource becomes a normal resource, all data is requeried
696         cluster.resetLazy(subject);
697         
698         for(VirtualGraphSource source : sources) {
699             source.getStatements(graph, this, subject);
700         }
701         
702         if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
703         
704     }
705
706     private synchronized void producePartialStatements(ReadGraphImpl graph, final int subject, final int predicate, final AsyncProcedure<Object> procedure) throws DatabaseException {
707
708         for(VirtualGraphSource source : sources) {
709             source.getStatements(graph, this, subject, predicate);
710         }
711         
712         if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
713         
714     }
715
716     @Override
717     public int getIndex(Resource resource) {
718         try {
719             return serialization.getTransientId(resource);
720         } catch (DatabaseException e) {
721             e.printStackTrace();
722         }
723         return 0;
724     }
725
726     @Override
727     public Resource getResource(int index) {
728         return new ResourceImpl(resourceSupport, index);
729     }
730
731     @Override
732     public void register(final VirtualGraphSource source) {
733         if(sources.add(source)) {
734             source.attach(TransientGraph.this);
735         }
736     }
737
738     @Override
739     public void claim(int subject, int predicate, int object) {
740
741         VirtualCluster cluster = getCluster(subject, true);
742         cluster.claim(subject, predicate, object);
743         if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
744
745     }
746
747     @Override
748     public synchronized int[] getObjects(int subject, int predicate) {
749         VirtualCluster cluster = getCluster(subject, false);
750         if(cluster == null) return EMPTY;
751         return cluster.getObjects(subject, predicate);
752     }
753
754     @Override
755     public synchronized int[] getPredicates(int subject) {
756         VirtualCluster cluster = getCluster(subject, false);
757         if(cluster == null) return EMPTY;
758         return cluster.getPredicates(subject);
759     }
760
761     @Override
762     public synchronized byte[] getValue(int subject) {
763         VirtualCluster cluster = getCluster(subject, false);
764         if(cluster == null) return null;
765         return cluster.getValue(subject);
766     }
767
768     @Override
769     public int newResource(boolean isLazy) {
770         
771         int id = virtualGraphServerSupport.createVirtual();
772         VirtualCluster cluster = getCluster(id, true);
773         if(isLazy) cluster.setLazy(id);
774         return id;
775
776     }
777     
778     @Override
779     public void finish(int subject) {
780         VirtualCluster cluster = getCluster(subject, false);
781         cluster.finish(subject);
782     }
783
784     @Override
785     public void deny(int subject, int predicate, int object) {
786
787         VirtualCluster cluster = getCluster(subject, true);
788         cluster.deny(subject, predicate, object);
789
790     }
791
792     @Override
793     public void claimValue(int subject, byte[] data, int length) {
794
795         VirtualCluster cluster = getCluster(subject, true);
796         cluster.setValue(subject, data, length);
797         if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
798
799     }
800
801     @Override
802     public void denyValue(int subject) {
803         // FIXME: this implementation is probably not proper, Antti needs to work on this.
804         VirtualCluster cluster = getCluster(subject, true);
805         cluster.denyValue(subject);
806         if(subject > 0) virtualGraphServerSupport.removeVirtual(subject);
807     }
808
809     @Override
810     public void initialise(final Write write) {
811         try {
812             sessionRequestProcessor.syncRequest(new WriteRequest(this) {
813
814                 @Override
815                 public void perform(WriteGraph graph) throws DatabaseException {
816                     write.perform(graph);
817                 }
818
819             });
820         } catch (DatabaseException e) {
821             e.printStackTrace();
822         }
823     }
824
825     @Override
826     public void postModification(AsyncRequestProcessor processor, final WriteOnly request) {
827
828         if(processor == null) processor = sessionRequestProcessor;
829
830         processor.asyncRequest(new WriteOnlyRequest(this) {
831
832             @Override
833             public void perform(WriteOnlyGraph graph) throws DatabaseException {
834                 request.perform(graph);
835             }
836
837         });
838
839     }
840
841     @Override
842     public void updateStatements(int resource, int[] statements) {
843         applyStatements(resource, statements);
844     }
845
846     @Override
847     public void updateValue(int resource, Object value, Binding binding) {
848         applyValue(resource, value, binding);
849     }
850
851     @Override
852     public boolean isPending(int subject) {
853         
854         VirtualCluster cluster = getCluster(subject, false);
855         if(cluster == null) return false;
856         else return cluster.isPending(subject);
857         
858     }
859
860     @Override
861     public boolean isPending(int subject, int predicate) {
862
863         VirtualCluster cluster = getCluster(subject, false);
864         if(cluster == null) return false;
865         else return cluster.isPending(subject, predicate);
866
867     }
868     
869     @Override
870     public void load(ReadGraphImpl graph, int resource, int predicate, final Consumer<ReadGraphImpl> callback) throws DatabaseException {
871         producePartialStatements(graph, resource, predicate, new AsyncProcedure<Object>() {
872
873             @Override
874             public void execute(AsyncReadGraph graph, Object result) {
875                 callback.accept((ReadGraphImpl)graph);
876             }
877
878             @Override
879             public void exception(AsyncReadGraph graph, Throwable throwable) {
880                 callback.accept((ReadGraphImpl)graph);
881             }
882             
883         });
884     }
885
886     @Override
887     public void load(ReadGraphImpl graph, int resource, final Consumer<ReadGraphImpl> callback) throws DatabaseException {
888         produceAllStatements(graph, resource, new AsyncProcedure<Object>() {
889
890             @Override
891             public void execute(AsyncReadGraph graph, Object result) {
892                 callback.accept((ReadGraphImpl)graph);
893             }
894
895             @Override
896             public void exception(AsyncReadGraph graph, Throwable throwable) {
897                 callback.accept((ReadGraphImpl)graph);
898             }
899             
900         });
901     }
902     
903     public Collection<Statement> listStatements() {
904         ArrayList<Statement> result = new ArrayList<Statement>();
905         for(int i=0;i<clusters.size();i++) {
906             VirtualCluster cluster = getOrLoad(i);
907             if(cluster != null) {
908                 cluster.listStatements(serialization, result);
909             }
910         }
911         return result;
912     }
913
914     public Collection<Resource> listValues() {
915         ArrayList<Resource> result = new ArrayList<Resource>();
916         for(int i=0;i<clusters.size();i++) {
917             VirtualCluster cluster = getOrLoad(i);
918             if(cluster != null) {
919                 cluster.listValues(serialization, result);
920             }
921         }
922         return result;
923     }
924
925     @Override
926     public Persistency getPersistency() {
927         return persistency;
928     }
929     
930     @Override
931     public String toString() {
932         String result = "'" + identifier + "'";
933         if(Persistency.WORKSPACE == persistency) result += " (W)";
934         else if(Persistency.MEMORY == persistency) result += " (M)";
935         return result;
936     }
937
938 }
939