-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package org.simantics.db.impl;\r
-\r
-import java.io.Closeable;\r
-import java.io.File;\r
-import java.io.IOException;\r
-import java.util.ArrayList;\r
-import java.util.Arrays;\r
-import java.util.Collection;\r
-import java.util.HashSet;\r
-import java.util.LinkedList;\r
-\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.databoard.binding.Binding;\r
-import org.simantics.databoard.serialization.SerializationException;\r
-import org.simantics.databoard.serialization.Serializer;\r
-import org.simantics.databoard.serialization.SerializerConstructionException;\r
-import org.simantics.db.AsyncReadGraph;\r
-import org.simantics.db.AsyncRequestProcessor;\r
-import org.simantics.db.RequestProcessor;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.Statement;\r
-import org.simantics.db.VirtualGraphContext;\r
-import org.simantics.db.VirtualGraphSource;\r
-import org.simantics.db.WriteGraph;\r
-import org.simantics.db.WriteOnlyGraph;\r
-import org.simantics.db.common.ByteFileReader;\r
-import org.simantics.db.common.ByteFileWriter;\r
-import org.simantics.db.common.StandardStatement;\r
-import org.simantics.db.common.request.WriteOnlyRequest;\r
-import org.simantics.db.common.request.WriteRequest;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.impl.graph.ReadGraphImpl;\r
-import org.simantics.db.impl.support.ResourceSupport;\r
-import org.simantics.db.impl.support.VirtualGraphServerSupport;\r
-import org.simantics.db.procedure.AsyncProcedure;\r
-import org.simantics.db.request.Write;\r
-import org.simantics.db.request.WriteOnly;\r
-import org.simantics.db.service.SerialisationSupport;\r
-import org.simantics.utils.datastructures.Callback;\r
-\r
-import gnu.trove.list.array.TIntArrayList;\r
-import gnu.trove.map.hash.TIntObjectHashMap;\r
-import gnu.trove.procedure.TIntObjectProcedure;\r
-import gnu.trove.procedure.TIntProcedure;\r
-import gnu.trove.set.hash.TIntHashSet;\r
-\r
-class VirtualCluster {\r
- static final boolean DEBUG = false;\r
- final static int[] EMPTY = new int[0];\r
-\r
- private final ArrayList<TIntArrayList> statements = new ArrayList<TIntArrayList>();\r
- private final TIntHashSet lazy = new TIntHashSet();\r
- private final TIntObjectHashMap<byte[]> values = new TIntObjectHashMap<byte[]>();\r
- private final int clusterId;\r
-\r
- public VirtualCluster(int clusterId) {\r
- this.clusterId = clusterId;\r
- }\r
- \r
- public int clusterId() {\r
- return clusterId;\r
- }\r
- \r
- public void trim() {\r
- }\r
- \r
- private TIntArrayList getPredicateMap(int subject) {\r
- \r
- int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);\r
- if(rId >= statements.size()) return null;\r
- return statements.get(rId);\r
- \r
- }\r
- \r
- public boolean isPending(int subject) {\r
- return lazy.contains(subject);\r
- }\r
-\r
- boolean containsPredicate(TIntArrayList statements, int predicate) {\r
- for(int i=0;i<statements.size();i+=2) if(statements.getQuick(i) == predicate) return true;\r
- return false;\r
- }\r
-\r
- int containsStatement(TIntArrayList statements, int predicate, int object) {\r
- for(int i=0;i<statements.size();i+=2) if(statements.getQuick(i) == predicate && statements.getQuick(i+1) == object) return i;\r
- return -1;\r
- }\r
- \r
- public boolean isPending(int subject, int predicate) {\r
- \r
- if(!lazy.contains(subject)) return false;\r
- TIntArrayList predicateMap = getPredicateMap(subject);\r
- if(predicateMap == null) return true;\r
- return !containsPredicate(predicateMap, predicate);\r
-// return !predicateMap.contains(predicate);\r
- \r
- }\r
- \r
- public void resetLazy(int subject) {\r
-\r
- lazy.remove(subject);\r
- // Query all data from scratch\r
- int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);\r
- statements.set(ri, new TIntArrayList());\r
-\r
- }\r
- \r
- public void setLazy(int subject) {\r
-\r
- lazy.add(subject);\r
- \r
- }\r
-\r
- public void finish(int subject) {\r
-\r
- lazy.remove(subject);\r
- \r
- }\r
- \r
- public void setValue(int subject, byte[] data, int length) {\r
-\r
- values.put(subject, Arrays.copyOf(data, length));\r
- \r
- }\r
-\r
- public void denyValue(int subject) {\r
- \r
- values.remove(subject);\r
- \r
- }\r
-\r
- public void addStatements(int subject, int[] data) {\r
- \r
- TIntArrayList result = getPredicateMap(subject);\r
- if(result == null) {\r
- result = new TIntArrayList();\r
- int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);\r
- statements.ensureCapacity(rId+1);\r
- for(int i=statements.size();i<rId+1;i++) statements.add(null);\r
- statements.set(rId, result);\r
- }\r
- \r
- for(int i=0;i<data.length;i+=2) {\r
- int predicate = data[i];\r
- int object = data[i+1];\r
- if(containsStatement(result, predicate, object) < 0) {\r
- result.add(predicate);\r
- result.add(object);\r
- }\r
- }\r
- \r
- }\r
- \r
- public void claim(int subject, int predicate, int object) {\r
- \r
- TIntArrayList predicates = getPredicateMap(subject);\r
- if(predicates == null) {\r
- predicates = new TIntArrayList();\r
- int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);\r
- statements.ensureCapacity(rId+1);\r
- for(int i=statements.size();i<rId+1;i++) statements.add(null);\r
- statements.set(rId, predicates);\r
- }\r
- \r
- if(containsStatement(predicates, predicate, object) < 0) {\r
- predicates.add(predicate);\r
- predicates.add(object);\r
- }\r
- \r
- }\r
- \r
- public void deny(int subject, int predicate, int object) {\r
- \r
- TIntArrayList predicates = getPredicateMap(subject);\r
- if(predicates == null) return;\r
- int index = containsStatement(predicates, predicate, object);\r
- if(index < 0) return;\r
- predicates.remove(index, 2);\r
- \r
- }\r
- \r
- int[] getObjects(int subject, int predicate) {\r
- \r
- TIntArrayList predicates = getPredicateMap(subject);\r
- if(predicates == null) return EMPTY;\r
- TIntArrayList result = new TIntArrayList();\r
- for(int i=0;i<predicates.size();i+=2) {\r
- if(predicates.getQuick(i) == predicate) {\r
- result.add(predicates.getQuick(i+1));\r
- }\r
- }\r
- return result.toArray();\r
- \r
- }\r
- \r
- int[] getPredicates(int subject) {\r
-\r
- TIntArrayList predicates = getPredicateMap(subject);\r
- if(predicates == null) return EMPTY;\r
- TIntHashSet result = new TIntHashSet();\r
- for(int i=0;i<predicates.size();i+=2) {\r
- result.add(predicates.getQuick(i));\r
- }\r
- return result.toArray();\r
- \r
- }\r
- \r
- byte[] getValue(int subject) {\r
- return values.get(subject);\r
- }\r
-\r
- public int getTransientClusterKey() {\r
- int clusterBits = ClusterTraitsBase.getClusterBits(clusterId>>>1);\r
- if((clusterId & 1) == 0) // Virtual subjects\r
- return clusterBits | 0x80000000; // We are assuming that MSB means virtual resource.\r
- else // Database subjects\r
- return clusterBits;\r
- }\r
-\r
- public int getTransientId(int subject) {\r
- if((clusterId & 1) == 0) // Virtual subjects\r
- return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject) | 0x80000000;\r
- else // Database subjects\r
- return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject);\r
- }\r
- \r
- /*\r
- * Creates a persistent identifier for given transient graph cluster identifier.\r
- * \r
- * \r
- * For persistent clusters this obtains the random access id for (invalid)\r
- * resource at index 0 of given cluster. LSB in given id indicates persistence\r
- * \r
- * \r
- */\r
- \r
- public static long getClusterIdentifier(SerialisationSupport ss, int clusterKey) { \r
- if((clusterKey & 1) == 0)// Virtual subjects\r
- return clusterKey;\r
- else { // Database subjects\r
- int rk = ClusterTraitsBase.createResourceKeyNoThrow(clusterKey>>1, 1);\r
- try {\r
- // Assuming that cluster's first resource is created first and not deleted.\r
- // Assuming that resource index is in LSB bits.\r
- return ss.getRandomAccessId(rk);\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError("Could not get cluster id for virtual cluster key=" + clusterKey, e);\r
- return -1;\r
- }\r
- }\r
- }\r
-\r
- public void saveImpl(final File file, SerialisationSupport ss) throws IOException {\r
- if (DEBUG)\r
- System.out.println("DEBUG: Saving virtual cluster " + clusterId + " to " + file.getAbsolutePath());\r
- \r
- final ByteFileWriter writer = new ByteFileWriter(file);\r
- \r
- try {\r
-\r
- int stms = 0;\r
- for(TIntArrayList list : statements) {\r
- if(list != null)\r
- stms += list.size();\r
- }\r
-\r
- writer.write((int)(1.5*stms));\r
- @SuppressWarnings("unused")\r
- int count = 0;\r
- for(int i=0;i<statements.size();i++) {\r
- TIntArrayList list = statements.get(i);\r
- if(list != null) {\r
- for(int j=0;j<list.size();j+=2) {\r
- try {\r
- writer.write((short)i);\r
- int rk = list.getQuick(j);\r
- long rid = ss.getRandomAccessId(rk);\r
- writer.write(rid);\r
- rk = list.getQuick(j+1);\r
- rid = ss.getRandomAccessId(rk);\r
- writer.write(rid);\r
- count++;\r
- } catch (DatabaseException e) {\r
- e.printStackTrace();\r
- }\r
- }\r
- }\r
- }\r
-\r
- writer.write(values.size());\r
-\r
- values.forEachEntry(new TIntObjectProcedure<byte[]>() {\r
-\r
- @Override\r
- public boolean execute(int a, byte[] b) {\r
- writer.write(a);\r
- writer.write(b.length);\r
- writer.write(b);\r
- return true;\r
- }\r
- });\r
- if (DEBUG)\r
- System.out.println("TransientGraph[" + file.getAbsolutePath() + "] wrote " + count + " statements and " + values.size() + " values to disk.");\r
-\r
- } finally {\r
- writer.commit();\r
-// FileUtils.uncheckedClose(_os);\r
- }\r
- }\r
- \r
- public void load(File file, SerialisationSupport serialization, VirtualGraphServerSupport vgss) throws DatabaseException {\r
- if (DEBUG)\r
- System.out.println("DEBUG: Loading virtual cluster " + clusterId + " from " + file.getAbsolutePath() + " " + file.length());\r
- \r
- ByteFileReader reader = null;\r
- try {\r
- \r
- if (!file.exists())\r
- return;\r
-\r
- reader = new ByteFileReader(file);\r
- int clusterInt = ClusterTraitsBase.getClusterBits(clusterId()>>1);\r
- int stms = reader.readInt();\r
- for (int i = 0; i < stms; i += 3) {\r
-// int rId = reader.readShort();\r
-// if(vgss != null) vgss.addVirtual(clusterInt + rId);\r
-// claim(rId,\r
-// serialization.getTransientId(reader.readLong()),\r
-// serialization.getTransientId(reader.readLong()));\r
- int rId = reader.readShort();\r
- long sId = reader.readLong();\r
- long oId = reader.readLong();\r
- int sTransientId = serialization.getTransientId(sId);\r
- int oTransientId = serialization.getTransientId(oId);\r
- \r
- if(vgss != null)\r
- vgss.addVirtual(clusterInt + rId);\r
- \r
- claim(rId, sTransientId, oTransientId);\r
- }\r
-\r
- int values = reader.readInt();\r
- for (int i = 0; i < values; i++) {\r
- int subject = reader.readInt();\r
- int length = reader.readInt();\r
- setValue(subject, reader.readBytes(length), length);\r
- }\r
- if (DEBUG)\r
- System.out.println("DEBUG: TransientGraph[" + file.getAbsolutePath() + "] loaded " + stms / 3 + " statements and " + values + " values from disk.");\r
- \r
- } catch (IOException e) {\r
- throw new DatabaseException(e);\r
- } finally {\r
- if (reader != null)\r
- reader.close();\r
- }\r
- }\r
- \r
- void listStatements(SerialisationSupport ss, ArrayList<Statement> result) {\r
-\r
- int clusterKey = getTransientClusterKey();\r
- \r
- try {\r
- for(int i=0;i<statements.size();i++) {\r
- TIntArrayList list = statements.get(i);\r
- if(list != null) {\r
- Resource subject = ss.getResource(clusterKey | i);\r
- for(int j=0;j<list.size();j+=2) {\r
- Resource p = ss.getResource(list.getQuick(j));\r
- Resource o = ss.getResource(list.getQuick(j+1));\r
- result.add(new StandardStatement(subject, p, o));\r
- }\r
-\r
- }\r
- }\r
- } catch (DatabaseException e) {\r
- e.printStackTrace();\r
- }\r
- \r
- }\r
-\r
- void listValues(final SerialisationSupport ss, final ArrayList<Resource> result) {\r
-\r
- values.forEachKey(new TIntProcedure() {\r
-\r
- @Override\r
- public boolean execute(int value) {\r
- try {\r
- result.add(ss.getResource(getTransientId(value)));\r
- } catch (DatabaseException e) {\r
- e.printStackTrace();\r
- }\r
- return true;\r
- }\r
-\r
- });\r
- \r
- }\r
-\r
-}\r
-\r
-public class TransientGraph implements VirtualGraphImpl, VirtualGraphContext {\r
- private static final boolean DEBUG = VirtualCluster.DEBUG;\r
- final private static int SWAP_LIMIT = 30;\r
- \r
-// final private static byte[] NO_VALUE = new byte[0];\r
- final private static VirtualCluster NO_CLUSTER = new VirtualCluster(-1);\r
- \r
- final private Persistency persistency;\r
- \r
- final private SerialisationSupport serialization;\r
- final private ResourceSupport resourceSupport;\r
- final private VirtualGraphServerSupport virtualGraphServerSupport;\r
- final private RequestProcessor sessionRequestProcessor;\r
- \r
- /*\r
- * Cluster array by index.\r
- * -NO_CLUSTER value means that there is no such virtual cluster\r
- * -null value means that such virtual cluster could be on disk\r
- */\r
- final private ArrayList<VirtualCluster> clusters = new ArrayList<VirtualCluster>();\r
-\r
- /*\r
- * A list of resident clusters\r
- */\r
- final private LinkedList<VirtualCluster> memoryClusters = new LinkedList<VirtualCluster>();\r
- \r
- private final HashSet<VirtualGraphSource> sources = new HashSet<VirtualGraphSource>();\r
- \r
- final String identifier;\r
- final String databaseId;\r
-\r
- TIntObjectHashMap<TIntHashSet> NO_STATEMENTS = new TIntObjectHashMap<TIntHashSet>();\r
-\r
- int[] EMPTY = new int[0];\r
-\r
- public static TransientGraph workspacePersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, RequestProcessor srp, String databaseId, String identifier) throws DatabaseException {\r
- TransientGraph graph = new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.WORKSPACE);\r
- graph.load();\r
- return graph;\r
- }\r
- \r
- public static TransientGraph memoryPersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, RequestProcessor srp, String databaseId, String identifier) {\r
- return new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.MEMORY);\r
- }\r
- \r
- private TransientGraph(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, RequestProcessor srp, String databaseId, String identifier, Persistency persistency) {\r
- this.serialization = ss;\r
- this.virtualGraphServerSupport = vgss;\r
- this.sessionRequestProcessor = srp;\r
- this.resourceSupport = rs;\r
- this.identifier = identifier;\r
- this.databaseId = databaseId;\r
- \r
- this.persistency = persistency;\r
- }\r
-\r
- public String getIdentifier() {\r
- return identifier;\r
- }\r
- \r
- private int transientClusterId(long id) throws DatabaseException {\r
- if (DEBUG)\r
- System.out.println("DEBUG: transientClusterId=" + id);\r
- if ((id & 1) == 0) // Cluster of virtual subjects\r
- return (int)id;\r
- // Corresponds to persistent cluster\r
- int rk = serialization.getTransientId(id);\r
- return ClusterTraitsBase.getClusterKeyFromResourceKey(rk) << 1 | 1;\r
- }\r
-\r
- private String getPrefix() {\r
- return identifier + "." + persistency.identifier() + "." + databaseId;\r
- }\r
- \r
- private void load() throws DatabaseException {\r
-\r
- String prefix = getPrefix();\r
- for(String file : virtualGraphServerSupport.storagePath().list()) {\r
- try {\r
- if(file.startsWith(prefix)) {\r
- long clusterLong = Long.parseLong(file.substring(prefix.length()+4));\r
- int clusterId = transientClusterId(clusterLong);\r
- VirtualCluster cluster = new VirtualCluster(clusterId);\r
- cluster.load(new File(virtualGraphServerSupport.storagePath(), file), serialization, (clusterId & 1) > 0 ? virtualGraphServerSupport : null);\r
- clusters.ensureCapacity(clusterId+1);\r
- for(int i=clusters.size(); i<clusterId+1; i++) clusters.add(null);\r
- clusters.set(clusterId, cluster);\r
- memoryClusters.addLast(cluster);\r
- }\r
- } catch (DatabaseException t) {\r
- // file is assumably broken, delete it\r
- File filee = new File(virtualGraphServerSupport.storagePath(), file);\r
- if (!filee.delete()) {\r
- System.err.println("Could not delete file " + filee.getAbsolutePath());\r
- }\r
- throw t;\r
- }\r
- }\r
-\r
- }\r
-\r
- public void dispose() {\r
- try {\r
- saveImpl(serialization);\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- }\r
- }\r
-\r
- public void save() {\r
- \r
- try {\r
- saveImpl(serialization);\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- }\r
- \r
- }\r
-\r
- public void saveImpl(final SerialisationSupport ss) throws IOException {\r
- \r
- for(VirtualCluster cluster : memoryClusters) {\r
- String prefix = getPrefix();\r
- File file = new File(virtualGraphServerSupport.storagePath(), prefix + ".vg." + VirtualCluster.getClusterIdentifier(ss, cluster.clusterId()));\r
- cluster.saveImpl(file, ss);\r
- }\r
- \r
- }\r
-\r
- /**\r
- * Closes a stream and ignores any resulting exception. This is useful\r
- * when doing stream cleanup in a finally block where secondary exceptions\r
- * are not worth logging.\r
- */\r
- static void uncheckedClose(Closeable closeable) {\r
- try {\r
- if (closeable != null)\r
- closeable.close();\r
- } catch (IOException e) {\r
- //ignore\r
- }\r
- }\r
-\r
- private void trimClusters() {\r
- for(VirtualCluster cluster : memoryClusters) {\r
- cluster.trim();\r
- }\r
- }\r
- \r
- /*\r
- * Returns a transient cluster index\r
- * -Transient clusters for persistent resources have index with LSB=1\r
- * -Transient clusters for virtual resources have index with LSB=0\r
- * \r
- * @param subject is a DB client transient id\r
- * \r
- * For persistent resources cluster id is 2*clusterKey+1\r
- * For virtual resources transient ids are persistent and are directly chunked into 14-bit clusters.\r
- * \r
- */\r
- public static int getVirtualClusterKey(int subject) {\r
- if (subject > 0) {\r
- int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(subject);\r
- return (ck << 1) | 1;\r
- }\r
- int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(~subject);\r
- return ck << 1;\r
- }\r
- \r
- private VirtualCluster getOrLoad(int virtualClusterKey) {\r
-\r
- if(virtualClusterKey < clusters.size()) {\r
- VirtualCluster cluster = clusters.get(virtualClusterKey);\r
- if(NO_CLUSTER == cluster) return null;\r
- if(cluster != null) return cluster;\r
- }\r
- if (DEBUG)\r
- System.out.println("Loading virtual cluster " + virtualClusterKey + " for " + identifier);\r
-\r
- clusters.ensureCapacity(virtualClusterKey+1);\r
- for(int i=clusters.size(); i<virtualClusterKey+1; i++) clusters.add(null);\r
-\r
- //if(!VirtualCluster.isValidVirtualClusterKey(serialization, virtualClusterKey)) return null;\r
-\r
- long clusterIdentifier = VirtualCluster.getClusterIdentifier(serialization, virtualClusterKey);\r
- File file = new File(virtualGraphServerSupport.storagePath(), getPrefix() + ".vg." + clusterIdentifier);\r
- if(file.exists()) {\r
- VirtualCluster cluster = new VirtualCluster(virtualClusterKey);\r
-// System.out.println("Loading virtual cluster2 " + virtualClusterKey + " for " + identifier);\r
- try {\r
- cluster.load(file, serialization, (virtualClusterKey & 1) > 0 ? virtualGraphServerSupport : null);\r
- clusters.set(virtualClusterKey, cluster);\r
- memoryClusters.addFirst(cluster);\r
- swap();\r
- return cluster;\r
- } catch (DatabaseException e) {\r
- e.printStackTrace();\r
- // File must be corrupt, lets delete it so we wont load it next time in future\r
- file.delete();\r
- \r
- clusters.set(virtualClusterKey, NO_CLUSTER);\r
- return null;\r
- }\r
-\r
- } else {\r
- clusters.set(virtualClusterKey, NO_CLUSTER);\r
- return null;\r
- }\r
- }\r
-\r
- private void swap() {\r
- \r
- trimClusters();\r
- while(memoryClusters.size() > SWAP_LIMIT) {\r
- VirtualCluster remo = memoryClusters.removeLast();\r
- File file = new File(virtualGraphServerSupport.storagePath(), getPrefix() + ".vg." + VirtualCluster.getClusterIdentifier(serialization, remo.clusterId()));\r
- try {\r
- remo.saveImpl(file, serialization);\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- }\r
- clusters.set(remo.clusterId(), null);\r
- }\r
- \r
- }\r
- \r
- private synchronized VirtualCluster getCluster(int subject, boolean create) {\r
-\r
- int clusterId = getVirtualClusterKey(subject);\r
- \r
- VirtualCluster cluster = getOrLoad(clusterId);\r
- if(cluster != null) return cluster;\r
- \r
- if(create) {\r
-\r
- clusters.ensureCapacity(clusterId+1);\r
- for(int i=clusters.size(); i<clusterId+1; i++) clusters.add(null);\r
- cluster = new VirtualCluster(clusterId);\r
- clusters.set(clusterId, cluster);\r
- memoryClusters.addFirst(cluster);\r
- swap();\r
- return cluster;\r
-\r
- } else {\r
- return null;\r
- }\r
- \r
- }\r
- \r
- private synchronized void applyValue(int subject, Object value, Binding binding) {\r
-\r
- try {\r
- Serializer serializer = Bindings.getSerializer( binding );\r
- byte[] serialized = serializer.serialize(value);\r
- VirtualCluster cluster = getCluster(subject, true);\r
- cluster.setValue(subject, serialized, serialized.length);\r
- } catch (SerializationException e) {\r
- e.printStackTrace();\r
- } catch (SerializerConstructionException e) {\r
- e.printStackTrace();\r
- } catch (IOException e) {\r
- // TODO Auto-generated catch block\r
- e.printStackTrace();\r
- }\r
-\r
- }\r
-\r
- private synchronized void applyStatements(int subject, int[] statements) {\r
-\r
- VirtualCluster cluster = getCluster(subject, true);\r
- cluster.addStatements(subject, statements);\r
- \r
- \r
- if(subject > 0) virtualGraphServerSupport.addVirtual(subject);\r
-\r
- }\r
-\r
- private synchronized void produceAllStatements(ReadGraphImpl graph, final int subject, final AsyncProcedure<Object> procedure) throws DatabaseException {\r
-\r
- VirtualCluster cluster = getCluster(subject, true);\r
-\r
- // This resource becomes a normal resource, all data is requeried\r
- cluster.resetLazy(subject);\r
- \r
- for(VirtualGraphSource source : sources) {\r
- source.getStatements(graph, this, subject);\r
- }\r
- \r
- if(subject > 0) virtualGraphServerSupport.addVirtual(subject);\r
- \r
- }\r
-\r
- private synchronized void producePartialStatements(ReadGraphImpl graph, final int subject, final int predicate, final AsyncProcedure<Object> procedure) throws DatabaseException {\r
-\r
- for(VirtualGraphSource source : sources) {\r
- source.getStatements(graph, this, subject, predicate);\r
- }\r
- \r
- if(subject > 0) virtualGraphServerSupport.addVirtual(subject);\r
- \r
- }\r
-\r
- @Override\r
- public int getIndex(Resource resource) {\r
- try {\r
- return serialization.getTransientId(resource);\r
- } catch (DatabaseException e) {\r
- e.printStackTrace();\r
- }\r
- return 0;\r
- }\r
-\r
- @Override\r
- public Resource getResource(int index) {\r
- return new ResourceImpl(resourceSupport, index);\r
- }\r
-\r
- @Override\r
- public void register(final VirtualGraphSource source) {\r
- if(sources.add(source)) {\r
- source.attach(TransientGraph.this);\r
- }\r
- }\r
-\r
- @Override\r
- public void claim(int subject, int predicate, int object) {\r
-\r
- VirtualCluster cluster = getCluster(subject, true);\r
- cluster.claim(subject, predicate, object);\r
- if(subject > 0) virtualGraphServerSupport.addVirtual(subject);\r
-\r
- }\r
-\r
- @Override\r
- public synchronized int[] getObjects(int subject, int predicate) {\r
- VirtualCluster cluster = getCluster(subject, false);\r
- if(cluster == null) return EMPTY;\r
- return cluster.getObjects(subject, predicate);\r
- }\r
-\r
- @Override\r
- public synchronized int[] getPredicates(int subject) {\r
- VirtualCluster cluster = getCluster(subject, false);\r
- if(cluster == null) return EMPTY;\r
- return cluster.getPredicates(subject);\r
- }\r
-\r
- @Override\r
- public synchronized byte[] getValue(int subject) {\r
- VirtualCluster cluster = getCluster(subject, false);\r
- if(cluster == null) return null;\r
- return cluster.getValue(subject);\r
- }\r
-\r
- @Override\r
- public int newResource(boolean isLazy) {\r
- \r
- int id = virtualGraphServerSupport.createVirtual();\r
- VirtualCluster cluster = getCluster(id, true);\r
- if(isLazy) cluster.setLazy(id);\r
- return id;\r
-\r
- }\r
- \r
- @Override\r
- public void finish(int subject) {\r
- VirtualCluster cluster = getCluster(subject, false);\r
- cluster.finish(subject);\r
- }\r
-\r
- @Override\r
- public void deny(int subject, int predicate, int object) {\r
-\r
- VirtualCluster cluster = getCluster(subject, true);\r
- cluster.deny(subject, predicate, object);\r
-\r
- }\r
-\r
- @Override\r
- public void claimValue(int subject, byte[] data, int length) {\r
-\r
- VirtualCluster cluster = getCluster(subject, true);\r
- cluster.setValue(subject, data, length);\r
- if(subject > 0) virtualGraphServerSupport.addVirtual(subject);\r
-\r
- }\r
-\r
- @Override\r
- public void denyValue(int subject) {\r
- // FIXME: this implementation is probably not proper, Antti needs to work on this.\r
- VirtualCluster cluster = getCluster(subject, true);\r
- cluster.denyValue(subject);\r
- if(subject > 0) virtualGraphServerSupport.removeVirtual(subject);\r
- }\r
-\r
- @Override\r
- public void initialise(final Write write) {\r
- try {\r
- sessionRequestProcessor.syncRequest(new WriteRequest(this) {\r
-\r
- @Override\r
- public void perform(WriteGraph graph) throws DatabaseException {\r
- write.perform(graph);\r
- }\r
-\r
- });\r
- } catch (DatabaseException e) {\r
- e.printStackTrace();\r
- }\r
- }\r
-\r
- @Override\r
- public void postModification(AsyncRequestProcessor processor, final WriteOnly request) {\r
-\r
- if(processor == null) processor = sessionRequestProcessor;\r
-\r
- processor.asyncRequest(new WriteOnlyRequest(this) {\r
-\r
- @Override\r
- public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
- request.perform(graph);\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- public void updateStatements(int resource, int[] statements) {\r
- applyStatements(resource, statements);\r
- }\r
-\r
- @Override\r
- public void updateValue(int resource, Object value, Binding binding) {\r
- applyValue(resource, value, binding);\r
- }\r
-\r
- @Override\r
- public boolean isPending(int subject) {\r
- \r
- VirtualCluster cluster = getCluster(subject, false);\r
- if(cluster == null) return false;\r
- else return cluster.isPending(subject);\r
- \r
- }\r
-\r
- @Override\r
- public boolean isPending(int subject, int predicate) {\r
-\r
- VirtualCluster cluster = getCluster(subject, false);\r
- if(cluster == null) return false;\r
- else return cluster.isPending(subject, predicate);\r
-\r
- }\r
- \r
- @Override\r
- public void load(ReadGraphImpl graph, int resource, int predicate, final Callback<ReadGraphImpl> callback) throws DatabaseException {\r
- producePartialStatements(graph, resource, predicate, new AsyncProcedure<Object>() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Object result) {\r
- callback.run((ReadGraphImpl)graph);\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- callback.run((ReadGraphImpl)graph);\r
- }\r
- \r
- });\r
- }\r
-\r
- @Override\r
- public void load(ReadGraphImpl graph, int resource, final Callback<ReadGraphImpl> callback) throws DatabaseException {\r
- produceAllStatements(graph, resource, new AsyncProcedure<Object>() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, Object result) {\r
- callback.run((ReadGraphImpl)graph);\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- callback.run((ReadGraphImpl)graph);\r
- }\r
- \r
- });\r
- }\r
- \r
- public Collection<Statement> listStatements() {\r
- ArrayList<Statement> result = new ArrayList<Statement>();\r
- for(int i=0;i<clusters.size();i++) {\r
- VirtualCluster cluster = getOrLoad(i);\r
- if(cluster != null) {\r
- cluster.listStatements(serialization, result);\r
- }\r
- }\r
- return result;\r
- }\r
-\r
- public Collection<Resource> listValues() {\r
- ArrayList<Resource> result = new ArrayList<Resource>();\r
- for(int i=0;i<clusters.size();i++) {\r
- VirtualCluster cluster = getOrLoad(i);\r
- if(cluster != null) {\r
- cluster.listValues(serialization, result);\r
- }\r
- }\r
- return result;\r
- }\r
-\r
- @Override\r
- public Persistency getPersistency() {\r
- return persistency;\r
- }\r
- \r
- @Override\r
- public String toString() {\r
- String result = "'" + identifier + "'";\r
- if(Persistency.WORKSPACE == persistency) result += " (W)";\r
- else if(Persistency.MEMORY == persistency) result += " (M)";\r
- return result;\r
- }\r
-\r
-}\r
-\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.db.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.function.Consumer;
+
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.binding.Binding;
+import org.simantics.databoard.serialization.SerializationException;
+import org.simantics.databoard.serialization.Serializer;
+import org.simantics.databoard.serialization.SerializerConstructionException;
+import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.AsyncRequestProcessor;
+import org.simantics.db.Resource;
+import org.simantics.db.Statement;
+import org.simantics.db.VirtualGraphContext;
+import org.simantics.db.VirtualGraphSource;
+import org.simantics.db.WriteGraph;
+import org.simantics.db.WriteOnlyGraph;
+import org.simantics.db.common.ByteFileReader;
+import org.simantics.db.common.ByteFileWriter;
+import org.simantics.db.common.StandardStatement;
+import org.simantics.db.common.request.WriteOnlyRequest;
+import org.simantics.db.common.request.WriteRequest;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.support.ResourceSupport;
+import org.simantics.db.impl.support.VirtualGraphServerSupport;
+import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.request.Write;
+import org.simantics.db.request.WriteOnly;
+import org.simantics.db.service.SerialisationSupport;
+
+import gnu.trove.list.array.TIntArrayList;
+import gnu.trove.map.hash.TIntObjectHashMap;
+import gnu.trove.procedure.TIntObjectProcedure;
+import gnu.trove.procedure.TIntProcedure;
+import gnu.trove.set.hash.TIntHashSet;
+
+class VirtualCluster {
+ static final boolean DEBUG = false;
+ final static int[] EMPTY = new int[0];
+
+ private final ArrayList<TIntArrayList> statements = new ArrayList<TIntArrayList>();
+ private final TIntHashSet lazy = new TIntHashSet();
+ private final TIntObjectHashMap<byte[]> values = new TIntObjectHashMap<byte[]>();
+ private final int clusterId;
+
+ public VirtualCluster(int clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public int clusterId() {
+ return clusterId;
+ }
+
+ public void trim() {
+ }
+
+ private TIntArrayList getPredicateMap(int subject) {
+
+ int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);
+ if(rId >= statements.size()) return null;
+ return statements.get(rId);
+
+ }
+
+ public boolean isPending(int subject) {
+ return lazy.contains(subject);
+ }
+
+ boolean containsPredicate(TIntArrayList statements, int predicate) {
+ for(int i=0;i<statements.size();i+=2) if(statements.getQuick(i) == predicate) return true;
+ return false;
+ }
+
+ int containsStatement(TIntArrayList statements, int predicate, int object) {
+ for(int i=0;i<statements.size();i+=2) if(statements.getQuick(i) == predicate && statements.getQuick(i+1) == object) return i;
+ return -1;
+ }
+
+ public boolean isPending(int subject, int predicate) {
+
+ if(!lazy.contains(subject)) return false;
+ TIntArrayList predicateMap = getPredicateMap(subject);
+ if(predicateMap == null) return true;
+ return !containsPredicate(predicateMap, predicate);
+// return !predicateMap.contains(predicate);
+
+ }
+
+ public void resetLazy(int subject) {
+
+ lazy.remove(subject);
+ // Query all data from scratch
+ int ri = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);
+ statements.set(ri, new TIntArrayList());
+
+ }
+
+ public void setLazy(int subject) {
+
+ lazy.add(subject);
+
+ }
+
+ public void finish(int subject) {
+
+ lazy.remove(subject);
+
+ }
+
+ public void setValue(int subject, byte[] data, int length) {
+
+ values.put(subject, Arrays.copyOf(data, length));
+
+ }
+
+ public void denyValue(int subject) {
+
+ values.remove(subject);
+
+ }
+
+ public void addStatements(int subject, int[] data) {
+
+ TIntArrayList result = getPredicateMap(subject);
+ if(result == null) {
+ result = new TIntArrayList();
+ int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);
+ statements.ensureCapacity(rId+1);
+ for(int i=statements.size();i<rId+1;i++) statements.add(null);
+ statements.set(rId, result);
+ }
+
+ for(int i=0;i<data.length;i+=2) {
+ int predicate = data[i];
+ int object = data[i+1];
+ if(containsStatement(result, predicate, object) < 0) {
+ result.add(predicate);
+ result.add(object);
+ }
+ }
+
+ }
+
+ public void claim(int subject, int predicate, int object) {
+
+ TIntArrayList predicates = getPredicateMap(subject);
+ if(predicates == null) {
+ predicates = new TIntArrayList();
+ int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject);
+ statements.ensureCapacity(rId+1);
+ for(int i=statements.size();i<rId+1;i++) statements.add(null);
+ statements.set(rId, predicates);
+ }
+
+ if(containsStatement(predicates, predicate, object) < 0) {
+ predicates.add(predicate);
+ predicates.add(object);
+ }
+
+ }
+
+ public void deny(int subject, int predicate, int object) {
+
+ TIntArrayList predicates = getPredicateMap(subject);
+ if(predicates == null) return;
+ int index = containsStatement(predicates, predicate, object);
+ if(index < 0) return;
+ predicates.remove(index, 2);
+
+ }
+
+ int[] getObjects(int subject, int predicate) {
+
+ TIntArrayList predicates = getPredicateMap(subject);
+ if(predicates == null) return EMPTY;
+ TIntArrayList result = new TIntArrayList();
+ for(int i=0;i<predicates.size();i+=2) {
+ if(predicates.getQuick(i) == predicate) {
+ result.add(predicates.getQuick(i+1));
+ }
+ }
+ return result.toArray();
+
+ }
+
+ int[] getPredicates(int subject) {
+
+ TIntArrayList predicates = getPredicateMap(subject);
+ if(predicates == null) return EMPTY;
+ TIntHashSet result = new TIntHashSet();
+ for(int i=0;i<predicates.size();i+=2) {
+ result.add(predicates.getQuick(i));
+ }
+ return result.toArray();
+
+ }
+
+ byte[] getValue(int subject) {
+ return values.get(subject);
+ }
+
+ public int getTransientClusterKey() {
+ int clusterBits = ClusterTraitsBase.getClusterBits(clusterId>>>1);
+ if((clusterId & 1) == 0) // Virtual subjects
+ return clusterBits | 0x80000000; // We are assuming that MSB means virtual resource.
+ else // Database subjects
+ return clusterBits;
+ }
+
+ public int getTransientId(int subject) {
+ if((clusterId & 1) == 0) // Virtual subjects
+ return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject) | 0x80000000;
+ else // Database subjects
+ return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject);
+ }
+
+ /*
+ * Creates a persistent identifier for given transient graph cluster identifier.
+ *
+ *
+ * For persistent clusters this obtains the random access id for (invalid)
+ * resource at index 0 of given cluster. LSB in given id indicates persistence
+ *
+ *
+ */
+
+ public static long getClusterIdentifier(SerialisationSupport ss, int clusterKey) {
+ if((clusterKey & 1) == 0)// Virtual subjects
+ return clusterKey;
+ else { // Database subjects
+ int rk = ClusterTraitsBase.createResourceKeyNoThrow(clusterKey>>1, 1);
+ try {
+ // Assuming that cluster's first resource is created first and not deleted.
+ // Assuming that resource index is in LSB bits.
+ return ss.getRandomAccessId(rk);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError("Could not get cluster id for virtual cluster key=" + clusterKey, e);
+ return -1;
+ }
+ }
+ }
+
+ public void saveImpl(final File file, SerialisationSupport ss) throws IOException {
+ if (DEBUG)
+ System.out.println("DEBUG: Saving virtual cluster " + clusterId + " to " + file.getAbsolutePath());
+
+ final ByteFileWriter writer = new ByteFileWriter(file);
+
+ try {
+
+ int stms = 0;
+ for(TIntArrayList list : statements) {
+ if(list != null)
+ stms += list.size();
+ }
+
+ writer.write((int)(1.5*stms));
+ @SuppressWarnings("unused")
+ int count = 0;
+ for(int i=0;i<statements.size();i++) {
+ TIntArrayList list = statements.get(i);
+ if(list != null) {
+ for(int j=0;j<list.size();j+=2) {
+ try {
+ writer.write((short)i);
+ int rk = list.getQuick(j);
+ long rid = ss.getRandomAccessId(rk);
+ writer.write(rid);
+ rk = list.getQuick(j+1);
+ rid = ss.getRandomAccessId(rk);
+ writer.write(rid);
+ count++;
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ writer.write(values.size());
+
+ values.forEachEntry(new TIntObjectProcedure<byte[]>() {
+
+ @Override
+ public boolean execute(int a, byte[] b) {
+ writer.write(a);
+ writer.write(b.length);
+ writer.write(b);
+ return true;
+ }
+ });
+ if (DEBUG)
+ System.out.println("TransientGraph[" + file.getAbsolutePath() + "] wrote " + count + " statements and " + values.size() + " values to disk.");
+
+ } finally {
+ writer.commit();
+// FileUtils.uncheckedClose(_os);
+ }
+ }
+
+ public void load(File file, SerialisationSupport serialization, VirtualGraphServerSupport vgss) throws DatabaseException {
+ if (DEBUG)
+ System.out.println("DEBUG: Loading virtual cluster " + clusterId + " from " + file.getAbsolutePath() + " " + file.length());
+
+ ByteFileReader reader = null;
+ try {
+
+ if (!file.exists())
+ return;
+
+ reader = new ByteFileReader(file);
+ int clusterInt = ClusterTraitsBase.getClusterBits(clusterId()>>1);
+ int stms = reader.readInt();
+ for (int i = 0; i < stms; i += 3) {
+// int rId = reader.readShort();
+// if(vgss != null) vgss.addVirtual(clusterInt + rId);
+// claim(rId,
+// serialization.getTransientId(reader.readLong()),
+// serialization.getTransientId(reader.readLong()));
+ int rId = reader.readShort();
+ long sId = reader.readLong();
+ long oId = reader.readLong();
+ int sTransientId = serialization.getTransientId(sId);
+ int oTransientId = serialization.getTransientId(oId);
+
+ if(vgss != null)
+ vgss.addVirtual(clusterInt + rId);
+
+ claim(rId, sTransientId, oTransientId);
+ }
+
+ int values = reader.readInt();
+ for (int i = 0; i < values; i++) {
+ int subject = reader.readInt();
+ int length = reader.readInt();
+ setValue(subject, reader.readBytes(length), length);
+ }
+ if (DEBUG)
+ System.out.println("DEBUG: TransientGraph[" + file.getAbsolutePath() + "] loaded " + stms / 3 + " statements and " + values + " values from disk.");
+
+ } catch (IOException e) {
+ throw new DatabaseException(e);
+ } finally {
+ if (reader != null)
+ reader.close();
+ }
+ }
+
+ void listStatements(SerialisationSupport ss, ArrayList<Statement> result) {
+
+ int clusterKey = getTransientClusterKey();
+
+ try {
+ for(int i=0;i<statements.size();i++) {
+ TIntArrayList list = statements.get(i);
+ if(list != null) {
+ Resource subject = ss.getResource(clusterKey | i);
+ for(int j=0;j<list.size();j+=2) {
+ Resource p = ss.getResource(list.getQuick(j));
+ Resource o = ss.getResource(list.getQuick(j+1));
+ result.add(new StandardStatement(subject, p, o));
+ }
+
+ }
+ }
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ void listValues(final SerialisationSupport ss, final ArrayList<Resource> result) {
+
+ values.forEachKey(new TIntProcedure() {
+
+ @Override
+ public boolean execute(int value) {
+ try {
+ result.add(ss.getResource(getTransientId(value)));
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+ return true;
+ }
+
+ });
+
+ }
+
+}
+
+public class TransientGraph implements VirtualGraphImpl, VirtualGraphContext {
+ private static final boolean DEBUG = VirtualCluster.DEBUG;
+ final private static int SWAP_LIMIT = 30;
+
+// final private static byte[] NO_VALUE = new byte[0];
+ final private static VirtualCluster NO_CLUSTER = new VirtualCluster(-1);
+
+ final private Persistency persistency;
+
+ final private SerialisationSupport serialization;
+ final private ResourceSupport resourceSupport;
+ final private VirtualGraphServerSupport virtualGraphServerSupport;
+ final private AsyncRequestProcessor sessionRequestProcessor;
+
+ /*
+ * Cluster array by index.
+ * -NO_CLUSTER value means that there is no such virtual cluster
+ * -null value means that such virtual cluster could be on disk
+ */
+ final private ArrayList<VirtualCluster> clusters = new ArrayList<VirtualCluster>();
+
+ /*
+ * A list of resident clusters
+ */
+ final private LinkedList<VirtualCluster> memoryClusters = new LinkedList<VirtualCluster>();
+
+ private final HashSet<VirtualGraphSource> sources = new HashSet<VirtualGraphSource>();
+
+ final String identifier;
+ final String databaseId;
+
+ TIntObjectHashMap<TIntHashSet> NO_STATEMENTS = new TIntObjectHashMap<TIntHashSet>();
+
+ int[] EMPTY = new int[0];
+
+ public static TransientGraph workspacePersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, AsyncRequestProcessor srp, String databaseId, String identifier) throws DatabaseException {
+ TransientGraph graph = new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.WORKSPACE);
+ graph.load();
+ return graph;
+ }
+
+ public static TransientGraph memoryPersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, AsyncRequestProcessor srp, String databaseId, String identifier) {
+ return new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.MEMORY);
+ }
+
+ private TransientGraph(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, AsyncRequestProcessor srp, String databaseId, String identifier, Persistency persistency) {
+ this.serialization = ss;
+ this.virtualGraphServerSupport = vgss;
+ this.sessionRequestProcessor = srp;
+ this.resourceSupport = rs;
+ this.identifier = identifier;
+ this.databaseId = databaseId;
+
+ this.persistency = persistency;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ private int transientClusterId(long id) throws DatabaseException {
+ if (DEBUG)
+ System.out.println("DEBUG: transientClusterId=" + id);
+ if ((id & 1) == 0) // Cluster of virtual subjects
+ return (int)id;
+ // Corresponds to persistent cluster
+ int rk = serialization.getTransientId(id);
+ return ClusterTraitsBase.getClusterKeyFromResourceKey(rk) << 1 | 1;
+ }
+
+ private String getPrefix() {
+ return identifier + "." + persistency.identifier() + "." + databaseId;
+ }
+
+ private void load() throws DatabaseException {
+
+ String prefix = getPrefix();
+ for(String file : virtualGraphServerSupport.storagePath().list()) {
+ try {
+ if(file.startsWith(prefix)) {
+ long clusterLong = Long.parseLong(file.substring(prefix.length()+4));
+ int clusterId = transientClusterId(clusterLong);
+ VirtualCluster cluster = new VirtualCluster(clusterId);
+ cluster.load(new File(virtualGraphServerSupport.storagePath(), file), serialization, (clusterId & 1) > 0 ? virtualGraphServerSupport : null);
+ clusters.ensureCapacity(clusterId+1);
+ for(int i=clusters.size(); i<clusterId+1; i++) clusters.add(null);
+ clusters.set(clusterId, cluster);
+ memoryClusters.addLast(cluster);
+ }
+ } catch (DatabaseException t) {
+ // file is assumably broken, delete it
+ File filee = new File(virtualGraphServerSupport.storagePath(), file);
+ if (!filee.delete()) {
+ System.err.println("Could not delete file " + filee.getAbsolutePath());
+ }
+ throw t;
+ }
+ }
+
+ }
+
+ public void dispose() {
+ try {
+ saveImpl(serialization);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void save() {
+
+ try {
+ saveImpl(serialization);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public void saveImpl(final SerialisationSupport ss) throws IOException {
+
+ for(VirtualCluster cluster : memoryClusters) {
+ String prefix = getPrefix();
+ File file = new File(virtualGraphServerSupport.storagePath(), prefix + ".vg." + VirtualCluster.getClusterIdentifier(ss, cluster.clusterId()));
+ cluster.saveImpl(file, ss);
+ }
+
+ }
+
+ /**
+ * Closes a stream and ignores any resulting exception. This is useful
+ * when doing stream cleanup in a finally block where secondary exceptions
+ * are not worth logging.
+ */
+ static void uncheckedClose(Closeable closeable) {
+ try {
+ if (closeable != null)
+ closeable.close();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+
+ private void trimClusters() {
+ for(VirtualCluster cluster : memoryClusters) {
+ cluster.trim();
+ }
+ }
+
+ /*
+ * Returns a transient cluster index
+ * -Transient clusters for persistent resources have index with LSB=1
+ * -Transient clusters for virtual resources have index with LSB=0
+ *
+ * @param subject is a DB client transient id
+ *
+ * For persistent resources cluster id is 2*clusterKey+1
+ * For virtual resources transient ids are persistent and are directly chunked into 14-bit clusters.
+ *
+ */
+ public static int getVirtualClusterKey(int subject) {
+ if (subject > 0) {
+ int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(subject);
+ return (ck << 1) | 1;
+ }
+ int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(~subject);
+ return ck << 1;
+ }
+
+ private VirtualCluster getOrLoad(int virtualClusterKey) {
+
+ if(virtualClusterKey < clusters.size()) {
+ VirtualCluster cluster = clusters.get(virtualClusterKey);
+ if(NO_CLUSTER == cluster) return null;
+ if(cluster != null) return cluster;
+ }
+ if (DEBUG)
+ System.out.println("Loading virtual cluster " + virtualClusterKey + " for " + identifier);
+
+ clusters.ensureCapacity(virtualClusterKey+1);
+ for(int i=clusters.size(); i<virtualClusterKey+1; i++) clusters.add(null);
+
+ //if(!VirtualCluster.isValidVirtualClusterKey(serialization, virtualClusterKey)) return null;
+
+ long clusterIdentifier = VirtualCluster.getClusterIdentifier(serialization, virtualClusterKey);
+ File file = new File(virtualGraphServerSupport.storagePath(), getPrefix() + ".vg." + clusterIdentifier);
+ if(file.exists()) {
+ VirtualCluster cluster = new VirtualCluster(virtualClusterKey);
+// System.out.println("Loading virtual cluster2 " + virtualClusterKey + " for " + identifier);
+ try {
+ cluster.load(file, serialization, (virtualClusterKey & 1) > 0 ? virtualGraphServerSupport : null);
+ clusters.set(virtualClusterKey, cluster);
+ memoryClusters.addFirst(cluster);
+ swap();
+ return cluster;
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ // File must be corrupt, lets delete it so we wont load it next time in future
+ file.delete();
+
+ clusters.set(virtualClusterKey, NO_CLUSTER);
+ return null;
+ }
+
+ } else {
+ clusters.set(virtualClusterKey, NO_CLUSTER);
+ return null;
+ }
+ }
+
+ private void swap() {
+
+ trimClusters();
+ while(memoryClusters.size() > SWAP_LIMIT) {
+ VirtualCluster remo = memoryClusters.removeLast();
+ File file = new File(virtualGraphServerSupport.storagePath(), getPrefix() + ".vg." + VirtualCluster.getClusterIdentifier(serialization, remo.clusterId()));
+ try {
+ remo.saveImpl(file, serialization);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ clusters.set(remo.clusterId(), null);
+ }
+
+ }
+
+ private synchronized VirtualCluster getCluster(int subject, boolean create) {
+
+ int clusterId = getVirtualClusterKey(subject);
+
+ VirtualCluster cluster = getOrLoad(clusterId);
+ if(cluster != null) return cluster;
+
+ if(create) {
+
+ clusters.ensureCapacity(clusterId+1);
+ for(int i=clusters.size(); i<clusterId+1; i++) clusters.add(null);
+ cluster = new VirtualCluster(clusterId);
+ clusters.set(clusterId, cluster);
+ memoryClusters.addFirst(cluster);
+ swap();
+ return cluster;
+
+ } else {
+ return null;
+ }
+
+ }
+
+ private synchronized void applyValue(int subject, Object value, Binding binding) {
+
+ try {
+ Serializer serializer = Bindings.getSerializer( binding );
+ byte[] serialized = serializer.serialize(value);
+ VirtualCluster cluster = getCluster(subject, true);
+ cluster.setValue(subject, serialized, serialized.length);
+ } catch (SerializationException e) {
+ e.printStackTrace();
+ } catch (SerializerConstructionException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+
+ private synchronized void applyStatements(int subject, int[] statements) {
+
+ VirtualCluster cluster = getCluster(subject, true);
+ cluster.addStatements(subject, statements);
+
+
+ if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
+
+ }
+
+ private synchronized void produceAllStatements(ReadGraphImpl graph, final int subject, final AsyncProcedure<Object> procedure) throws DatabaseException {
+
+ VirtualCluster cluster = getCluster(subject, true);
+
+ // This resource becomes a normal resource, all data is requeried
+ cluster.resetLazy(subject);
+
+ for(VirtualGraphSource source : sources) {
+ source.getStatements(graph, this, subject);
+ }
+
+ if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
+
+ }
+
+ private synchronized void producePartialStatements(ReadGraphImpl graph, final int subject, final int predicate, final AsyncProcedure<Object> procedure) throws DatabaseException {
+
+ for(VirtualGraphSource source : sources) {
+ source.getStatements(graph, this, subject, predicate);
+ }
+
+ if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
+
+ }
+
+ @Override
+ public int getIndex(Resource resource) {
+ try {
+ return serialization.getTransientId(resource);
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+ return 0;
+ }
+
+ @Override
+ public Resource getResource(int index) {
+ return new ResourceImpl(resourceSupport, index);
+ }
+
+ @Override
+ public void register(final VirtualGraphSource source) {
+ if(sources.add(source)) {
+ source.attach(TransientGraph.this);
+ }
+ }
+
+ @Override
+ public void claim(int subject, int predicate, int object) {
+
+ VirtualCluster cluster = getCluster(subject, true);
+ cluster.claim(subject, predicate, object);
+ if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
+
+ }
+
+ @Override
+ public synchronized int[] getObjects(int subject, int predicate) {
+ VirtualCluster cluster = getCluster(subject, false);
+ if(cluster == null) return EMPTY;
+ return cluster.getObjects(subject, predicate);
+ }
+
+ @Override
+ public synchronized int[] getPredicates(int subject) {
+ VirtualCluster cluster = getCluster(subject, false);
+ if(cluster == null) return EMPTY;
+ return cluster.getPredicates(subject);
+ }
+
+ @Override
+ public synchronized byte[] getValue(int subject) {
+ VirtualCluster cluster = getCluster(subject, false);
+ if(cluster == null) return null;
+ return cluster.getValue(subject);
+ }
+
+ @Override
+ public int newResource(boolean isLazy) {
+
+ int id = virtualGraphServerSupport.createVirtual();
+ VirtualCluster cluster = getCluster(id, true);
+ if(isLazy) cluster.setLazy(id);
+ return id;
+
+ }
+
+ @Override
+ public void finish(int subject) {
+ VirtualCluster cluster = getCluster(subject, false);
+ cluster.finish(subject);
+ }
+
+ @Override
+ public void deny(int subject, int predicate, int object) {
+
+ VirtualCluster cluster = getCluster(subject, true);
+ cluster.deny(subject, predicate, object);
+
+ }
+
+ @Override
+ public void claimValue(int subject, byte[] data, int length) {
+
+ VirtualCluster cluster = getCluster(subject, true);
+ cluster.setValue(subject, data, length);
+ if(subject > 0) virtualGraphServerSupport.addVirtual(subject);
+
+ }
+
+ @Override
+ public void denyValue(int subject) {
+ // FIXME: this implementation is probably not proper, Antti needs to work on this.
+ VirtualCluster cluster = getCluster(subject, true);
+ cluster.denyValue(subject);
+ if(subject > 0) virtualGraphServerSupport.removeVirtual(subject);
+ }
+
+ @Override
+ public void initialise(final Write write) {
+ try {
+ sessionRequestProcessor.syncRequest(new WriteRequest(this) {
+
+ @Override
+ public void perform(WriteGraph graph) throws DatabaseException {
+ write.perform(graph);
+ }
+
+ });
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void postModification(AsyncRequestProcessor processor, final WriteOnly request) {
+
+ if(processor == null) processor = sessionRequestProcessor;
+
+ processor.asyncRequest(new WriteOnlyRequest(this) {
+
+ @Override
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {
+ request.perform(graph);
+ }
+
+ });
+
+ }
+
+ @Override
+ public void updateStatements(int resource, int[] statements) {
+ applyStatements(resource, statements);
+ }
+
+ @Override
+ public void updateValue(int resource, Object value, Binding binding) {
+ applyValue(resource, value, binding);
+ }
+
+ @Override
+ public boolean isPending(int subject) {
+
+ VirtualCluster cluster = getCluster(subject, false);
+ if(cluster == null) return false;
+ else return cluster.isPending(subject);
+
+ }
+
+ @Override
+ public boolean isPending(int subject, int predicate) {
+
+ VirtualCluster cluster = getCluster(subject, false);
+ if(cluster == null) return false;
+ else return cluster.isPending(subject, predicate);
+
+ }
+
+ @Override
+ public void load(ReadGraphImpl graph, int resource, int predicate, final Consumer<ReadGraphImpl> callback) throws DatabaseException {
+ producePartialStatements(graph, resource, predicate, new AsyncProcedure<Object>() {
+
+ @Override
+ public void execute(AsyncReadGraph graph, Object result) {
+ callback.accept((ReadGraphImpl)graph);
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ callback.accept((ReadGraphImpl)graph);
+ }
+
+ });
+ }
+
+ @Override
+ public void load(ReadGraphImpl graph, int resource, final Consumer<ReadGraphImpl> callback) throws DatabaseException {
+ produceAllStatements(graph, resource, new AsyncProcedure<Object>() {
+
+ @Override
+ public void execute(AsyncReadGraph graph, Object result) {
+ callback.accept((ReadGraphImpl)graph);
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ callback.accept((ReadGraphImpl)graph);
+ }
+
+ });
+ }
+
+ public Collection<Statement> listStatements() {
+ ArrayList<Statement> result = new ArrayList<Statement>();
+ for(int i=0;i<clusters.size();i++) {
+ VirtualCluster cluster = getOrLoad(i);
+ if(cluster != null) {
+ cluster.listStatements(serialization, result);
+ }
+ }
+ return result;
+ }
+
+ public Collection<Resource> listValues() {
+ ArrayList<Resource> result = new ArrayList<Resource>();
+ for(int i=0;i<clusters.size();i++) {
+ VirtualCluster cluster = getOrLoad(i);
+ if(cluster != null) {
+ cluster.listValues(serialization, result);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Persistency getPersistency() {
+ return persistency;
+ }
+
+ @Override
+ public String toString() {
+ String result = "'" + identifier + "'";
+ if(Persistency.WORKSPACE == persistency) result += " (W)";
+ else if(Persistency.MEMORY == persistency) result += " (M)";
+ return result;
+ }
+
+}
+