]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/TransientGraph.java
Merge "Multiple reader thread support for db client"
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / TransientGraph.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl;
13
14 import java.io.Closeable;
15 import java.io.File;
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.Collection;
20 import java.util.HashSet;
21 import java.util.LinkedList;
22 import java.util.function.Consumer;
23
24 import org.simantics.databoard.Bindings;
25 import org.simantics.databoard.binding.Binding;
26 import org.simantics.databoard.serialization.SerializationException;
27 import org.simantics.databoard.serialization.Serializer;
28 import org.simantics.databoard.serialization.SerializerConstructionException;
29 import org.simantics.db.AsyncReadGraph;
30 import org.simantics.db.AsyncRequestProcessor;
31 import org.simantics.db.Resource;
32 import org.simantics.db.Statement;
33 import org.simantics.db.VirtualGraphContext;
34 import org.simantics.db.VirtualGraphSource;
35 import org.simantics.db.WriteGraph;
36 import org.simantics.db.WriteOnlyGraph;
37 import org.simantics.db.common.ByteFileReader;
38 import org.simantics.db.common.ByteFileWriter;
39 import org.simantics.db.common.StandardStatement;
40 import org.simantics.db.common.request.WriteOnlyRequest;
41 import org.simantics.db.common.request.WriteRequest;
42 import org.simantics.db.common.utils.Logger;
43 import org.simantics.db.exception.DatabaseException;
44 import org.simantics.db.impl.graph.ReadGraphImpl;
45 import org.simantics.db.impl.support.ResourceSupport;
46 import org.simantics.db.impl.support.VirtualGraphServerSupport;
47 import org.simantics.db.procedure.AsyncProcedure;
48 import org.simantics.db.request.Write;
49 import org.simantics.db.request.WriteOnly;
50 import org.simantics.db.service.SerialisationSupport;
51
52 import gnu.trove.list.array.TIntArrayList;
53 import gnu.trove.map.hash.TIntObjectHashMap;
54 import gnu.trove.procedure.TIntObjectProcedure;
55 import gnu.trove.procedure.TIntProcedure;
56 import gnu.trove.set.hash.TIntHashSet;
57
58 class VirtualCluster {
59     static final boolean DEBUG = false;
60     final static int[] EMPTY = new int[0];
61
62     private final ArrayList<TIntArrayList> statements = new ArrayList<TIntArrayList>();
63     private final TIntHashSet lazy = new TIntHashSet();
64     private final TIntObjectHashMap<byte[]> values = new TIntObjectHashMap<byte[]>();
65     private final int clusterId;
66
67     public VirtualCluster(int clusterId) {
68         this.clusterId = clusterId;
69     }
70     
71     public int clusterId() {
72         return clusterId;
73     }
74     
75     public void trim() {
76     }
77     
78     private TIntArrayList getPredicateMap(int subject) {
79         
80         int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);
81         if(rId >= statements.size()) return null;
82         return statements.get(rId);
83         
84     }
85     
86     public boolean isPending(int subject) {
87         return lazy.contains(subject);
88     }
89
90     boolean containsPredicate(TIntArrayList statements, int predicate) {
91         for(int i=0;i<statements.size();i+=2) if(statements.getQuick(i) == predicate) return true;
92         return false;
93     }
94
95     int containsStatement(TIntArrayList statements, int predicate, int object) {
96         for(int i=0;i<statements.size();i+=2) if(statements.getQuick(i) == predicate && statements.getQuick(i+1) == object) return i;
97         return -1;
98     }
99     
100     public boolean isPending(int subject, int predicate) {
101         
102         if(!lazy.contains(subject)) return false;
103         TIntArrayList predicateMap = getPredicateMap(subject);
104         if(predicateMap == null) return true;
105         return !containsPredicate(predicateMap, predicate);
106 //        return !predicateMap.contains(predicate);
107         
108     }
109     
110     public void resetLazy(int subject) {
111
112         lazy.remove(subject);
113         // Query all data from scratch
114         int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);
115         statements.set(ri, new TIntArrayList());
116
117     }
118     
119     public void setLazy(int subject) {
120
121         lazy.add(subject);
122         
123     }
124
125     public void finish(int subject) {
126
127         lazy.remove(subject);
128         
129     }
130     
131     public void setValue(int subject, byte[] data, int length) {
132
133         values.put(subject, Arrays.copyOf(data, length));
134         
135     }
136
137     public void denyValue(int subject) {
138         
139         values.remove(subject);
140         
141     }
142
143     public void addStatements(int subject, int[] data) {
144         
145         TIntArrayList result = getPredicateMap(subject);
146         if(result == null) {
147             result = new TIntArrayList();
148             int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);
149             statements.ensureCapacity(rId+1);
150             for(int i=statements.size();i<rId+1;i++) statements.add(null);
151             statements.set(rId, result);
152         }
153         
154         for(int i=0;i<data.length;i+=2) {
155             int predicate =  data[i];
156             int object =  data[i+1];
157             if(containsStatement(result, predicate, object) < 0) {
158                 result.add(predicate);
159                 result.add(object);
160             }
161         }
162         
163     }
164     
165     public void claim(int subject, int predicate, int object) {
166         
167         TIntArrayList predicates = getPredicateMap(subject);
168         if(predicates == null) {
169             predicates = new TIntArrayList();
170             int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);
171             statements.ensureCapacity(rId+1);
172             for(int i=statements.size();i<rId+1;i++) statements.add(null);
173             statements.set(rId, predicates);
174         }
175         
176         if(containsStatement(predicates, predicate, object) < 0) {
177             predicates.add(predicate);
178             predicates.add(object);
179         }
180         
181     }
182     
183     public void deny(int subject, int predicate, int object) {
184     
185         TIntArrayList predicates = getPredicateMap(subject);
186         if(predicates == null) return;
187         int index = containsStatement(predicates, predicate, object);
188         if(index < 0) return;
189         predicates.remove(index, 2);
190         
191     }
192     
193     int[] getObjects(int subject, int predicate) {
194         
195         TIntArrayList predicates = getPredicateMap(subject);
196         if(predicates == null) return EMPTY;
197         TIntArrayList result = new TIntArrayList();
198         for(int i=0;i<predicates.size();i+=2) {
199             if(predicates.getQuick(i) == predicate) {
200                 result.add(predicates.getQuick(i+1));
201             }
202         }
203         return result.toArray();
204         
205     }
206     
207     int[] getPredicates(int subject) {
208
209         TIntArrayList predicates = getPredicateMap(subject);
210         if(predicates == null) return EMPTY;
211         TIntHashSet result = new TIntHashSet();
212         for(int i=0;i<predicates.size();i+=2) {
213             result.add(predicates.getQuick(i));
214         }
215         return result.toArray();
216         
217     }
218     
219     byte[] getValue(int subject) {
220         return values.get(subject);
221     }
222
223     public int getTransientClusterKey() {
224         int clusterBits = ClusterTraitsBase.getClusterBits(clusterId>>>1);
225         if((clusterId & 1) == 0) // Virtual subjects
226             return clusterBits | 0x80000000; // We are assuming that MSB means virtual resource.
227         else // Database subjects
228             return clusterBits;
229     }
230
231     public int getTransientId(int subject) {
232         if((clusterId & 1) == 0) // Virtual subjects
233             return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject) | 0x80000000;
234         else // Database subjects
235             return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject);
236     }
237     
238     /*
239      * Creates a persistent identifier for given transient graph cluster identifier.
240      * 
241      * 
242      * For persistent clusters this obtains the random access id for (invalid)
243      * resource at index 0 of given cluster. LSB in given id indicates persistence
244      * 
245      * 
246      */
247     
248     public static long getClusterIdentifier(SerialisationSupport ss, int clusterKey) { 
249         if((clusterKey & 1) == 0)// Virtual subjects
250             return clusterKey;
251         else { // Database subjects
252             int rk = ClusterTraitsBase.createResourceKeyNoThrow(clusterKey>>1, 1);
253             try {
254                 // Assuming that cluster's first resource is created first and not deleted.
255                 // Assuming that resource index is in LSB bits.
256                 return ss.getRandomAccessId(rk);
257             } catch (DatabaseException e) {
258                 Logger.defaultLogError("Could not get cluster id for virtual cluster key=" + clusterKey, e);
259                 return -1;
260             }
261         }
262     }
263
264     public void saveImpl(final File file, SerialisationSupport ss) throws IOException {
265         if (DEBUG)
266             System.out.println("DEBUG: Saving virtual cluster " + clusterId + " to " + file.getAbsolutePath());
267          
268         final ByteFileWriter writer = new ByteFileWriter(file);
269         
270         try {
271
272             int stms = 0;
273             for(TIntArrayList list : statements) {
274                 if(list != null)
275                     stms += list.size();
276             }
277
278             writer.write((int)(1.5*stms));
279             @SuppressWarnings("unused")
280             int count = 0;
281             for(int i=0;i<statements.size();i++) {
282                 TIntArrayList list = statements.get(i);
283                 if(list != null) {
284                     for(int j=0;j<list.size();j+=2) {
285                         try {
286                             writer.write((short)i);
287                             int rk = list.getQuick(j);
288                             long rid = ss.getRandomAccessId(rk);
289                             writer.write(rid);
290                             rk = list.getQuick(j+1);
291                             rid = ss.getRandomAccessId(rk);
292                             writer.write(rid);
293                             count++;
294                         } catch (DatabaseException e) {
295                             e.printStackTrace();
296                         }
297                     }
298                 }
299             }
300
301             writer.write(values.size());
302
303             values.forEachEntry(new TIntObjectProcedure<byte[]>() {
304
305                 @Override
306                 public boolean execute(int a, byte[] b) {
307                     writer.write(a);
308                     writer.write(b.length);
309                     writer.write(b);
310                     return true;
311                 }
312             });
313             if (DEBUG)
314                 System.out.println("TransientGraph[" + file.getAbsolutePath() + "] wrote " + count + " statements and " + values.size() + " values to disk.");
315
316         } finally {
317             writer.commit();
318 //            FileUtils.uncheckedClose(_os);
319         }
320     }
321     
322     public void load(File file, SerialisationSupport serialization, VirtualGraphServerSupport vgss) throws DatabaseException {
323         if (DEBUG)
324             System.out.println("DEBUG: Loading virtual cluster " + clusterId + " from " + file.getAbsolutePath() + " " + file.length());
325         
326         ByteFileReader reader = null;
327         try {
328             
329             if (!file.exists())
330                 return;
331
332             reader = new ByteFileReader(file);
333             int clusterInt = ClusterTraitsBase.getClusterBits(clusterId()>>1);
334             int stms = reader.readInt();
335             for (int i = 0; i < stms; i += 3) {
336 //                int rId = reader.readShort();
337 //                if(vgss != null) vgss.addVirtual(clusterInt + rId);
338 //                claim(rId,
339 //                        serialization.getTransientId(reader.readLong()),
340 //                        serialization.getTransientId(reader.readLong()));
341                 int rId = reader.readShort();
342                 long sId = reader.readLong();
343                 long oId = reader.readLong();
344                 int sTransientId = serialization.getTransientId(sId);
345                 int oTransientId = serialization.getTransientId(oId);
346                 
347                 if(vgss != null)
348                     vgss.addVirtual(clusterInt + rId);
349                 
350                 claim(rId, sTransientId, oTransientId);
351             }
352
353             int values = reader.readInt();
354             for (int i = 0; i < values; i++) {
355                 int subject = reader.readInt();
356                 int length = reader.readInt();
357                 setValue(subject, reader.readBytes(length), length);
358             }
359             if (DEBUG)
360                 System.out.println("DEBUG: TransientGraph[" + file.getAbsolutePath() + "] loaded " + stms / 3 + " statements and " + values + " values from disk.");
361             
362         } catch (IOException e) {
363             throw new DatabaseException(e);
364         } finally {
365             if (reader != null)
366                 reader.close();
367         }
368     }
369     
370     void listStatements(SerialisationSupport ss, ArrayList<Statement> result) {
371
372         int clusterKey = getTransientClusterKey();
373         
374         try {
375             for(int i=0;i<statements.size();i++) {
376                 TIntArrayList list = statements.get(i);
377                 if(list != null) {
378                     Resource subject = ss.getResource(clusterKey | i);
379                     for(int j=0;j<list.size();j+=2) {
380                         Resource p = ss.getResource(list.getQuick(j));
381                         Resource o = ss.getResource(list.getQuick(j+1));
382                         result.add(new StandardStatement(subject, p, o));
383                     }
384
385                 }
386             }
387         } catch (DatabaseException e) {
388             e.printStackTrace();
389         }
390         
391     }
392
393     void listValues(final SerialisationSupport ss, final ArrayList<Resource> result) {
394
395         values.forEachKey(new TIntProcedure() {
396
397             @Override
398             public boolean execute(int value) {
399                 try {
400                     result.add(ss.getResource(getTransientId(value)));
401                 } catch (DatabaseException e) {
402                     e.printStackTrace();
403                 }
404                 return true;
405             }
406
407         });
408         
409     }
410
411 }
412
413 public class TransientGraph implements VirtualGraphImpl, VirtualGraphContext {
414     private static final boolean DEBUG = VirtualCluster.DEBUG;
415     final private static int SWAP_LIMIT = 30;
416     
417 //    final private static byte[] NO_VALUE = new byte[0];
418     final private static VirtualCluster NO_CLUSTER = new VirtualCluster(-1);
419     
420     final private Persistency persistency;
421     
422     final private SerialisationSupport serialization;
423     final private ResourceSupport resourceSupport;
424     final private VirtualGraphServerSupport virtualGraphServerSupport;
425     final private AsyncRequestProcessor sessionRequestProcessor;
426     
427     /*
428      * Cluster array by index.
429      * -NO_CLUSTER value means that there is no such virtual cluster
430      * -null value means that such virtual cluster could be on disk
431      */
432     final private ArrayList<VirtualCluster> clusters = new ArrayList<VirtualCluster>();
433
434     /*
435      * A list of resident clusters
436      */
437     final private LinkedList<VirtualCluster> memoryClusters = new LinkedList<VirtualCluster>();
438     
439     private final HashSet<VirtualGraphSource> sources = new HashSet<VirtualGraphSource>();
440     
441     final String identifier;
442     final String databaseId;
443
444     TIntObjectHashMap<TIntHashSet> NO_STATEMENTS = new TIntObjectHashMap<TIntHashSet>();
445
446     int[] EMPTY = new int[0];
447
448     public static TransientGraph workspacePersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, AsyncRequestProcessor srp, String databaseId, String identifier) throws DatabaseException {
449         TransientGraph graph = new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.WORKSPACE);
450         graph.load();
451         return graph;
452     }
453     
454     public static TransientGraph memoryPersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, AsyncRequestProcessor srp, String databaseId, String identifier) {
455         return new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.MEMORY);
456     }
457     
458     private TransientGraph(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, AsyncRequestProcessor srp, String databaseId, String identifier, Persistency persistency) {
459         this.serialization = ss;
460         this.virtualGraphServerSupport = vgss;
461         this.sessionRequestProcessor = srp;
462         this.resourceSupport = rs;
463         this.identifier = identifier;
464         this.databaseId = databaseId;
465         
466         this.persistency = persistency;
467     }
468
469     public String getIdentifier()  {
470         return identifier;
471     }
472     
473     private int transientClusterId(long id) throws DatabaseException {
474         if (DEBUG)
475             System.out.println("DEBUG: transientClusterId=" + id);
476         if ((id & 1) == 0) // Cluster of virtual subjects
477             return (int)id;
478         // Corresponds to persistent cluster
479         int rk = serialization.getTransientId(id);
480         return ClusterTraitsBase.getClusterKeyFromResourceKey(rk) << 1 | 1;
481     }
482
483     private String getPrefix() {
484         return identifier + "." + persistency.identifier() + "." + databaseId;
485     }
486     
487     private void load() throws DatabaseException {
488
489         String prefix = getPrefix();
490         for(String file : virtualGraphServerSupport.storagePath().list()) {
491             try {
492                 if(file.startsWith(prefix)) {
493                     long clusterLong = Long.parseLong(file.substring(prefix.length()+4));
494                     int clusterId = transientClusterId(clusterLong);
495                     VirtualCluster cluster = new VirtualCluster(clusterId);
496                     cluster.load(new File(virtualGraphServerSupport.storagePath(), file), serialization, (clusterId & 1) > 0 ? virtualGraphServerSupport : null);
497                     clusters.ensureCapacity(clusterId+1);
498                     for(int i=clusters.size(); i<clusterId+1; i++) clusters.add(null);
499                     clusters.set(clusterId, cluster);
500                     memoryClusters.addLast(cluster);
501                 }
502             } catch (DatabaseException t) {
503                 // file is assumably broken, delete it
504                 File filee = new File(virtualGraphServerSupport.storagePath(), file);
505                 if (!filee.delete()) {
506                     System.err.println("Could not delete file " + filee.getAbsolutePath());
507                 }
508                 throw t;
509             }
510         }
511
512     }
513
514     public void dispose() {
515         try {
516             saveImpl(serialization);
517         } catch (IOException e) {
518             e.printStackTrace();
519         }
520     }
521
522     public void save() {
523         
524         try {
525             saveImpl(serialization);
526         } catch (IOException e) {
527             e.printStackTrace();
528         }
529         
530     }
531
532     public void saveImpl(final SerialisationSupport ss) throws IOException {
533         
534         for(VirtualCluster cluster : memoryClusters) {
535                 String prefix = getPrefix();
536             File file = new File(virtualGraphServerSupport.storagePath(), prefix + ".vg." + VirtualCluster.getClusterIdentifier(ss, cluster.clusterId()));
537             cluster.saveImpl(file, ss);
538         }
539         
540     }
541
542     /**
543      * Closes a stream and ignores any resulting exception. This is useful
544      * when doing stream cleanup in a finally block where secondary exceptions
545      * are not worth logging.
546      */
547     static void uncheckedClose(Closeable closeable) {
548         try {
549             if (closeable != null)
550                 closeable.close();
551         } catch (IOException e) {
552             //ignore
553         }
554     }
555
556     private void trimClusters() {
557         for(VirtualCluster cluster : memoryClusters) {
558             cluster.trim();
559         }
560     }
561     
562     /*
563      * Returns a transient cluster index
564      * -Transient clusters for persistent resources have index with LSB=1
565      * -Transient clusters for virtual resources have index with LSB=0
566      * 
567      * @param subject is a DB client transient id
568      * 
569      * For persistent resources cluster id is 2*clusterKey+1
570      * For virtual resources transient ids are persistent and are directly chunked into 14-bit clusters.
571      * 
572      */
573     public static int getVirtualClusterKey(int subject) {
574         if (subject > 0) {
575             int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(subject);
576             return (ck << 1) | 1;
577         }
578         int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(~subject);
579         return ck << 1;
580     }
581     
582     private VirtualCluster getOrLoad(int virtualClusterKey) {
583
584         if(virtualClusterKey < clusters.size()) {
585             VirtualCluster cluster = clusters.get(virtualClusterKey);
586             if(NO_CLUSTER == cluster) return null;
587             if(cluster != null) return cluster;
588         }
589         if (DEBUG)
590             System.out.println("Loading virtual cluster " + virtualClusterKey + " for " + identifier);
591
592         clusters.ensureCapacity(virtualClusterKey+1);
593         for(int i=clusters.size(); i<virtualClusterKey+1; i++) clusters.add(null);
594
595         //if(!VirtualCluster.isValidVirtualClusterKey(serialization, virtualClusterKey)) return null;
596
597         long clusterIdentifier = VirtualCluster.getClusterIdentifier(serialization, virtualClusterKey);
598         File file = new File(virtualGraphServerSupport.storagePath(), getPrefix() + ".vg." + clusterIdentifier);
599         if(file.exists()) {
600             VirtualCluster cluster = new VirtualCluster(virtualClusterKey);
601 //      System.out.println("Loading virtual cluster2 " + virtualClusterKey + " for " + identifier);
602             try {
603                 cluster.load(file, serialization, (virtualClusterKey & 1) > 0 ? virtualGraphServerSupport : null);
604                 clusters.set(virtualClusterKey, cluster);
605                 memoryClusters.addFirst(cluster);
606                 swap();
607                 return cluster;
608             } catch (DatabaseException e) {
609                 e.printStackTrace();
610                 // File must be corrupt, lets delete it so we wont load it next time in future
611                 file.delete();
612                 
613                 clusters.set(virtualClusterKey, NO_CLUSTER);
614                 return null;
615             }
616
617         } else {
618             clusters.set(virtualClusterKey, NO_CLUSTER);
619             return null;
620         }
621     }
622
623     private void swap() {
624         
625         trimClusters();
626         while(memoryClusters.size() > SWAP_LIMIT) {
627             VirtualCluster remo = memoryClusters.removeLast();
628             File file = new File(virtualGraphServerSupport.storagePath(), getPrefix() + ".vg." + VirtualCluster.getClusterIdentifier(serialization, remo.clusterId()));
629             try {
630                 remo.saveImpl(file, serialization);
631             } catch (IOException e) {
632                 e.printStackTrace();
633             }
634             clusters.set(remo.clusterId(), null);
635         }
636         
637     }
638     
639     private synchronized VirtualCluster getCluster(int subject, boolean create) {
640
641         int clusterId = getVirtualClusterKey(subject);
642         
643         VirtualCluster cluster = getOrLoad(clusterId);
644         if(cluster != null) return cluster;
645         
646         if(create) {
647
648             clusters.ensureCapacity(clusterId+1);
649             for(int i=clusters.size(); i<clusterId+1; i++) clusters.add(null);
650             cluster = new VirtualCluster(clusterId);
651             clusters.set(clusterId, cluster);
652             memoryClusters.addFirst(cluster);
653             swap();
654             return cluster;
655
656         } else {
657             return null;
658         }
659         
660     }
661     
662     private synchronized void applyValue(int subject, Object value, Binding binding) {
663
664         try {
665             Serializer serializer = Bindings.getSerializer( binding );
666             byte[] serialized = serializer.serialize(value);
667             VirtualCluster cluster = getCluster(subject, true);
668             cluster.setValue(subject, serialized, serialized.length);
669         } catch (SerializationException e) {
670             e.printStackTrace();
671         } catch (SerializerConstructionException e) {
672             e.printStackTrace();
673         } catch (IOException e) {
674             // TODO Auto-generated catch block
675             e.printStackTrace();
676         }
677
678     }
679
680     private synchronized void applyStatements(int subject, int[] statements) {
681
682         VirtualCluster cluster = getCluster(subject, true);
683         cluster.addStatements(subject, statements);
684         
685         
686         if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
687
688     }
689
690     private synchronized void produceAllStatements(ReadGraphImpl graph, final int subject, final AsyncProcedure<Object> procedure) throws DatabaseException {
691
692         VirtualCluster cluster = getCluster(subject, true);
693
694         // This resource becomes a normal resource, all data is requeried
695         cluster.resetLazy(subject);
696         
697         for(VirtualGraphSource source : sources) {
698             source.getStatements(graph, this, subject);
699         }
700         
701         if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
702         
703     }
704
705     private synchronized void producePartialStatements(ReadGraphImpl graph, final int subject, final int predicate, final AsyncProcedure<Object> procedure) throws DatabaseException {
706
707         for(VirtualGraphSource source : sources) {
708             source.getStatements(graph, this, subject, predicate);
709         }
710         
711         if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
712         
713     }
714
715     @Override
716     public int getIndex(Resource resource) {
717         try {
718             return serialization.getTransientId(resource);
719         } catch (DatabaseException e) {
720             e.printStackTrace();
721         }
722         return 0;
723     }
724
725     @Override
726     public Resource getResource(int index) {
727         return new ResourceImpl(resourceSupport, index);
728     }
729
730     @Override
731     public void register(final VirtualGraphSource source) {
732         if(sources.add(source)) {
733             source.attach(TransientGraph.this);
734         }
735     }
736
737     @Override
738     public void claim(int subject, int predicate, int object) {
739
740         VirtualCluster cluster = getCluster(subject, true);
741         cluster.claim(subject, predicate, object);
742         if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
743
744     }
745
746     @Override
747     public synchronized int[] getObjects(int subject, int predicate) {
748         VirtualCluster cluster = getCluster(subject, false);
749         if(cluster == null) return EMPTY;
750         return cluster.getObjects(subject, predicate);
751     }
752
753     @Override
754     public synchronized int[] getPredicates(int subject) {
755         VirtualCluster cluster = getCluster(subject, false);
756         if(cluster == null) return EMPTY;
757         return cluster.getPredicates(subject);
758     }
759
760     @Override
761     public synchronized byte[] getValue(int subject) {
762         VirtualCluster cluster = getCluster(subject, false);
763         if(cluster == null) return null;
764         return cluster.getValue(subject);
765     }
766
767     @Override
768     public int newResource(boolean isLazy) {
769         
770         int id = virtualGraphServerSupport.createVirtual();
771         VirtualCluster cluster = getCluster(id, true);
772         if(isLazy) cluster.setLazy(id);
773         return id;
774
775     }
776     
777     @Override
778     public void finish(int subject) {
779         VirtualCluster cluster = getCluster(subject, false);
780         cluster.finish(subject);
781     }
782
783     @Override
784     public void deny(int subject, int predicate, int object) {
785
786         VirtualCluster cluster = getCluster(subject, true);
787         cluster.deny(subject, predicate, object);
788
789     }
790
791     @Override
792     public void claimValue(int subject, byte[] data, int length) {
793
794         VirtualCluster cluster = getCluster(subject, true);
795         cluster.setValue(subject, data, length);
796         if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
797
798     }
799
800     @Override
801     public void denyValue(int subject) {
802         // FIXME: this implementation is probably not proper, Antti needs to work on this.
803         VirtualCluster cluster = getCluster(subject, true);
804         cluster.denyValue(subject);
805         if(subject > 0) virtualGraphServerSupport.removeVirtual(subject);
806     }
807
808     @Override
809     public void initialise(final Write write) {
810         try {
811             sessionRequestProcessor.syncRequest(new WriteRequest(this) {
812
813                 @Override
814                 public void perform(WriteGraph graph) throws DatabaseException {
815                     write.perform(graph);
816                 }
817
818             });
819         } catch (DatabaseException e) {
820             e.printStackTrace();
821         }
822     }
823
824     @Override
825     public void postModification(AsyncRequestProcessor processor, final WriteOnly request) {
826
827         if(processor == null) processor = sessionRequestProcessor;
828
829         processor.asyncRequest(new WriteOnlyRequest(this) {
830
831             @Override
832             public void perform(WriteOnlyGraph graph) throws DatabaseException {
833                 request.perform(graph);
834             }
835
836         });
837
838     }
839
840     @Override
841     public void updateStatements(int resource, int[] statements) {
842         applyStatements(resource, statements);
843     }
844
845     @Override
846     public void updateValue(int resource, Object value, Binding binding) {
847         applyValue(resource, value, binding);
848     }
849
850     @Override
851     public boolean isPending(int subject) {
852         
853         VirtualCluster cluster = getCluster(subject, false);
854         if(cluster == null) return false;
855         else return cluster.isPending(subject);
856         
857     }
858
859     @Override
860     public boolean isPending(int subject, int predicate) {
861
862         VirtualCluster cluster = getCluster(subject, false);
863         if(cluster == null) return false;
864         else return cluster.isPending(subject, predicate);
865
866     }
867     
868     @Override
869     public void load(ReadGraphImpl graph, int resource, int predicate, final Consumer<ReadGraphImpl> callback) throws DatabaseException {
870         producePartialStatements(graph, resource, predicate, new AsyncProcedure<Object>() {
871
872             @Override
873             public void execute(AsyncReadGraph graph, Object result) {
874                 callback.accept((ReadGraphImpl)graph);
875             }
876
877             @Override
878             public void exception(AsyncReadGraph graph, Throwable throwable) {
879                 callback.accept((ReadGraphImpl)graph);
880             }
881             
882         });
883     }
884
885     @Override
886     public void load(ReadGraphImpl graph, int resource, final Consumer<ReadGraphImpl> callback) throws DatabaseException {
887         produceAllStatements(graph, resource, new AsyncProcedure<Object>() {
888
889             @Override
890             public void execute(AsyncReadGraph graph, Object result) {
891                 callback.accept((ReadGraphImpl)graph);
892             }
893
894             @Override
895             public void exception(AsyncReadGraph graph, Throwable throwable) {
896                 callback.accept((ReadGraphImpl)graph);
897             }
898             
899         });
900     }
901     
902     public Collection<Statement> listStatements() {
903         ArrayList<Statement> result = new ArrayList<Statement>();
904         for(int i=0;i<clusters.size();i++) {
905             VirtualCluster cluster = getOrLoad(i);
906             if(cluster != null) {
907                 cluster.listStatements(serialization, result);
908             }
909         }
910         return result;
911     }
912
913     public Collection<Resource> listValues() {
914         ArrayList<Resource> result = new ArrayList<Resource>();
915         for(int i=0;i<clusters.size();i++) {
916             VirtualCluster cluster = getOrLoad(i);
917             if(cluster != null) {
918                 cluster.listValues(serialization, result);
919             }
920         }
921         return result;
922     }
923
924     @Override
925     public Persistency getPersistency() {
926         return persistency;
927     }
928     
929     @Override
930     public String toString() {
931         String result = "'" + identifier + "'";
932         if(Persistency.WORKSPACE == persistency) result += " (W)";
933         else if(Persistency.MEMORY == persistency) result += " (M)";
934         return result;
935     }
936
937 }
938