X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2FTransientGraph.java;h=3a552ed194a44877ce20f9e4e5e37fb7668df1f3;hb=0ae2b770234dfc3cbb18bd38f324125cf0faca07;hp=94a66bb7416c972993e94b8c94c9347190b856e7;hpb=969bd23cab98a79ca9101af33334000879fb60c5;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/TransientGraph.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/TransientGraph.java index 94a66bb74..3a552ed19 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/TransientGraph.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/TransientGraph.java @@ -1,939 +1,939 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.db.impl; - -import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedList; - -import org.simantics.databoard.Bindings; -import org.simantics.databoard.binding.Binding; -import org.simantics.databoard.serialization.SerializationException; -import org.simantics.databoard.serialization.Serializer; -import org.simantics.databoard.serialization.SerializerConstructionException; -import org.simantics.db.AsyncReadGraph; -import org.simantics.db.AsyncRequestProcessor; -import org.simantics.db.RequestProcessor; -import org.simantics.db.Resource; -import org.simantics.db.Statement; -import org.simantics.db.VirtualGraphContext; -import org.simantics.db.VirtualGraphSource; -import org.simantics.db.WriteGraph; -import org.simantics.db.WriteOnlyGraph; -import org.simantics.db.common.ByteFileReader; -import org.simantics.db.common.ByteFileWriter; -import org.simantics.db.common.StandardStatement; -import org.simantics.db.common.request.WriteOnlyRequest; -import org.simantics.db.common.request.WriteRequest; -import org.simantics.db.common.utils.Logger; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.impl.graph.ReadGraphImpl; -import org.simantics.db.impl.support.ResourceSupport; -import org.simantics.db.impl.support.VirtualGraphServerSupport; -import org.simantics.db.procedure.AsyncProcedure; -import org.simantics.db.request.Write; -import org.simantics.db.request.WriteOnly; -import org.simantics.db.service.SerialisationSupport; -import org.simantics.utils.datastructures.Callback; - -import gnu.trove.list.array.TIntArrayList; -import gnu.trove.map.hash.TIntObjectHashMap; -import gnu.trove.procedure.TIntObjectProcedure; -import gnu.trove.procedure.TIntProcedure; -import gnu.trove.set.hash.TIntHashSet; - -class VirtualCluster { - static final boolean DEBUG = false; - final static int[] EMPTY = new int[0]; - - private final ArrayList statements = new ArrayList(); - private final TIntHashSet lazy = new TIntHashSet(); - private final TIntObjectHashMap values = new TIntObjectHashMap(); - private final int clusterId; - - public VirtualCluster(int clusterId) { - this.clusterId = clusterId; - } - - public int clusterId() { - return clusterId; - } - - public void trim() { - } - - private TIntArrayList getPredicateMap(int subject) { - - int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject); - if(rId >= statements.size()) return null; - return statements.get(rId); - - } - - public boolean isPending(int subject) { - return lazy.contains(subject); - } - - boolean containsPredicate(TIntArrayList statements, int predicate) { - for(int i=0;i>>1); - if((clusterId & 1) == 0) // Virtual subjects - return clusterBits | 0x80000000; // We are assuming that MSB means virtual resource. - else // Database subjects - return clusterBits; - } - - public int getTransientId(int subject) { - if((clusterId & 1) == 0) // Virtual subjects - return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject) | 0x80000000; - else // Database subjects - return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject); - } - - /* - * Creates a persistent identifier for given transient graph cluster identifier. - * - * - * For persistent clusters this obtains the random access id for (invalid) - * resource at index 0 of given cluster. LSB in given id indicates persistence - * - * - */ - - public static long getClusterIdentifier(SerialisationSupport ss, int clusterKey) { - if((clusterKey & 1) == 0)// Virtual subjects - return clusterKey; - else { // Database subjects - int rk = ClusterTraitsBase.createResourceKeyNoThrow(clusterKey>>1, 1); - try { - // Assuming that cluster's first resource is created first and not deleted. - // Assuming that resource index is in LSB bits. - return ss.getRandomAccessId(rk); - } catch (DatabaseException e) { - Logger.defaultLogError("Could not get cluster id for virtual cluster key=" + clusterKey, e); - return -1; - } - } - } - - public void saveImpl(final File file, SerialisationSupport ss) throws IOException { - if (DEBUG) - System.out.println("DEBUG: Saving virtual cluster " + clusterId + " to " + file.getAbsolutePath()); - - final ByteFileWriter writer = new ByteFileWriter(file); - - try { - - int stms = 0; - for(TIntArrayList list : statements) { - if(list != null) - stms += list.size(); - } - - writer.write((int)(1.5*stms)); - @SuppressWarnings("unused") - int count = 0; - for(int i=0;i() { - - @Override - public boolean execute(int a, byte[] b) { - writer.write(a); - writer.write(b.length); - writer.write(b); - return true; - } - }); - if (DEBUG) - System.out.println("TransientGraph[" + file.getAbsolutePath() + "] wrote " + count + " statements and " + values.size() + " values to disk."); - - } finally { - writer.commit(); -// FileUtils.uncheckedClose(_os); - } - } - - public void load(File file, SerialisationSupport serialization, VirtualGraphServerSupport vgss) throws DatabaseException { - if (DEBUG) - System.out.println("DEBUG: Loading virtual cluster " + clusterId + " from " + file.getAbsolutePath() + " " + file.length()); - - ByteFileReader reader = null; - try { - - if (!file.exists()) - return; - - reader = new ByteFileReader(file); - int clusterInt = ClusterTraitsBase.getClusterBits(clusterId()>>1); - int stms = reader.readInt(); - for (int i = 0; i < stms; i += 3) { -// int rId = reader.readShort(); -// if(vgss != null) vgss.addVirtual(clusterInt + rId); -// claim(rId, -// serialization.getTransientId(reader.readLong()), -// serialization.getTransientId(reader.readLong())); - int rId = reader.readShort(); - long sId = reader.readLong(); - long oId = reader.readLong(); - int sTransientId = serialization.getTransientId(sId); - int oTransientId = serialization.getTransientId(oId); - - if(vgss != null) - vgss.addVirtual(clusterInt + rId); - - claim(rId, sTransientId, oTransientId); - } - - int values = reader.readInt(); - for (int i = 0; i < values; i++) { - int subject = reader.readInt(); - int length = reader.readInt(); - setValue(subject, reader.readBytes(length), length); - } - if (DEBUG) - System.out.println("DEBUG: TransientGraph[" + file.getAbsolutePath() + "] loaded " + stms / 3 + " statements and " + values + " values from disk."); - - } catch (IOException e) { - throw new DatabaseException(e); - } finally { - if (reader != null) - reader.close(); - } - } - - void listStatements(SerialisationSupport ss, ArrayList result) { - - int clusterKey = getTransientClusterKey(); - - try { - for(int i=0;i result) { - - values.forEachKey(new TIntProcedure() { - - @Override - public boolean execute(int value) { - try { - result.add(ss.getResource(getTransientId(value))); - } catch (DatabaseException e) { - e.printStackTrace(); - } - return true; - } - - }); - - } - -} - -public class TransientGraph implements VirtualGraphImpl, VirtualGraphContext { - private static final boolean DEBUG = VirtualCluster.DEBUG; - final private static int SWAP_LIMIT = 30; - -// final private static byte[] NO_VALUE = new byte[0]; - final private static VirtualCluster NO_CLUSTER = new VirtualCluster(-1); - - final private Persistency persistency; - - final private SerialisationSupport serialization; - final private ResourceSupport resourceSupport; - final private VirtualGraphServerSupport virtualGraphServerSupport; - final private RequestProcessor sessionRequestProcessor; - - /* - * Cluster array by index. - * -NO_CLUSTER value means that there is no such virtual cluster - * -null value means that such virtual cluster could be on disk - */ - final private ArrayList clusters = new ArrayList(); - - /* - * A list of resident clusters - */ - final private LinkedList memoryClusters = new LinkedList(); - - private final HashSet sources = new HashSet(); - - final String identifier; - final String databaseId; - - TIntObjectHashMap NO_STATEMENTS = new TIntObjectHashMap(); - - int[] EMPTY = new int[0]; - - public static TransientGraph workspacePersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, RequestProcessor srp, String databaseId, String identifier) throws DatabaseException { - TransientGraph graph = new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.WORKSPACE); - graph.load(); - return graph; - } - - public static TransientGraph memoryPersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, RequestProcessor srp, String databaseId, String identifier) { - return new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.MEMORY); - } - - private TransientGraph(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, RequestProcessor srp, String databaseId, String identifier, Persistency persistency) { - this.serialization = ss; - this.virtualGraphServerSupport = vgss; - this.sessionRequestProcessor = srp; - this.resourceSupport = rs; - this.identifier = identifier; - this.databaseId = databaseId; - - this.persistency = persistency; - } - - public String getIdentifier() { - return identifier; - } - - private int transientClusterId(long id) throws DatabaseException { - if (DEBUG) - System.out.println("DEBUG: transientClusterId=" + id); - if ((id & 1) == 0) // Cluster of virtual subjects - return (int)id; - // Corresponds to persistent cluster - int rk = serialization.getTransientId(id); - return ClusterTraitsBase.getClusterKeyFromResourceKey(rk) << 1 | 1; - } - - private String getPrefix() { - return identifier + "." + persistency.identifier() + "." + databaseId; - } - - private void load() throws DatabaseException { - - String prefix = getPrefix(); - for(String file : virtualGraphServerSupport.storagePath().list()) { - try { - if(file.startsWith(prefix)) { - long clusterLong = Long.parseLong(file.substring(prefix.length()+4)); - int clusterId = transientClusterId(clusterLong); - VirtualCluster cluster = new VirtualCluster(clusterId); - cluster.load(new File(virtualGraphServerSupport.storagePath(), file), serialization, (clusterId & 1) > 0 ? virtualGraphServerSupport : null); - clusters.ensureCapacity(clusterId+1); - for(int i=clusters.size(); i 0) { - int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(subject); - return (ck << 1) | 1; - } - int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(~subject); - return ck << 1; - } - - private VirtualCluster getOrLoad(int virtualClusterKey) { - - if(virtualClusterKey < clusters.size()) { - VirtualCluster cluster = clusters.get(virtualClusterKey); - if(NO_CLUSTER == cluster) return null; - if(cluster != null) return cluster; - } - if (DEBUG) - System.out.println("Loading virtual cluster " + virtualClusterKey + " for " + identifier); - - clusters.ensureCapacity(virtualClusterKey+1); - for(int i=clusters.size(); i 0 ? virtualGraphServerSupport : null); - clusters.set(virtualClusterKey, cluster); - memoryClusters.addFirst(cluster); - swap(); - return cluster; - } catch (DatabaseException e) { - e.printStackTrace(); - // File must be corrupt, lets delete it so we wont load it next time in future - file.delete(); - - clusters.set(virtualClusterKey, NO_CLUSTER); - return null; - } - - } else { - clusters.set(virtualClusterKey, NO_CLUSTER); - return null; - } - } - - private void swap() { - - trimClusters(); - while(memoryClusters.size() > SWAP_LIMIT) { - VirtualCluster remo = memoryClusters.removeLast(); - File file = new File(virtualGraphServerSupport.storagePath(), getPrefix() + ".vg." + VirtualCluster.getClusterIdentifier(serialization, remo.clusterId())); - try { - remo.saveImpl(file, serialization); - } catch (IOException e) { - e.printStackTrace(); - } - clusters.set(remo.clusterId(), null); - } - - } - - private synchronized VirtualCluster getCluster(int subject, boolean create) { - - int clusterId = getVirtualClusterKey(subject); - - VirtualCluster cluster = getOrLoad(clusterId); - if(cluster != null) return cluster; - - if(create) { - - clusters.ensureCapacity(clusterId+1); - for(int i=clusters.size(); i 0) virtualGraphServerSupport.addVirtual(subject); - - } - - private synchronized void produceAllStatements(ReadGraphImpl graph, final int subject, final AsyncProcedure procedure) throws DatabaseException { - - VirtualCluster cluster = getCluster(subject, true); - - // This resource becomes a normal resource, all data is requeried - cluster.resetLazy(subject); - - for(VirtualGraphSource source : sources) { - source.getStatements(graph, this, subject); - } - - if(subject > 0) virtualGraphServerSupport.addVirtual(subject); - - } - - private synchronized void producePartialStatements(ReadGraphImpl graph, final int subject, final int predicate, final AsyncProcedure procedure) throws DatabaseException { - - for(VirtualGraphSource source : sources) { - source.getStatements(graph, this, subject, predicate); - } - - if(subject > 0) virtualGraphServerSupport.addVirtual(subject); - - } - - @Override - public int getIndex(Resource resource) { - try { - return serialization.getTransientId(resource); - } catch (DatabaseException e) { - e.printStackTrace(); - } - return 0; - } - - @Override - public Resource getResource(int index) { - return new ResourceImpl(resourceSupport, index); - } - - @Override - public void register(final VirtualGraphSource source) { - if(sources.add(source)) { - source.attach(TransientGraph.this); - } - } - - @Override - public void claim(int subject, int predicate, int object) { - - VirtualCluster cluster = getCluster(subject, true); - cluster.claim(subject, predicate, object); - if(subject > 0) virtualGraphServerSupport.addVirtual(subject); - - } - - @Override - public synchronized int[] getObjects(int subject, int predicate) { - VirtualCluster cluster = getCluster(subject, false); - if(cluster == null) return EMPTY; - return cluster.getObjects(subject, predicate); - } - - @Override - public synchronized int[] getPredicates(int subject) { - VirtualCluster cluster = getCluster(subject, false); - if(cluster == null) return EMPTY; - return cluster.getPredicates(subject); - } - - @Override - public synchronized byte[] getValue(int subject) { - VirtualCluster cluster = getCluster(subject, false); - if(cluster == null) return null; - return cluster.getValue(subject); - } - - @Override - public int newResource(boolean isLazy) { - - int id = virtualGraphServerSupport.createVirtual(); - VirtualCluster cluster = getCluster(id, true); - if(isLazy) cluster.setLazy(id); - return id; - - } - - @Override - public void finish(int subject) { - VirtualCluster cluster = getCluster(subject, false); - cluster.finish(subject); - } - - @Override - public void deny(int subject, int predicate, int object) { - - VirtualCluster cluster = getCluster(subject, true); - cluster.deny(subject, predicate, object); - - } - - @Override - public void claimValue(int subject, byte[] data, int length) { - - VirtualCluster cluster = getCluster(subject, true); - cluster.setValue(subject, data, length); - if(subject > 0) virtualGraphServerSupport.addVirtual(subject); - - } - - @Override - public void denyValue(int subject) { - // FIXME: this implementation is probably not proper, Antti needs to work on this. - VirtualCluster cluster = getCluster(subject, true); - cluster.denyValue(subject); - if(subject > 0) virtualGraphServerSupport.removeVirtual(subject); - } - - @Override - public void initialise(final Write write) { - try { - sessionRequestProcessor.syncRequest(new WriteRequest(this) { - - @Override - public void perform(WriteGraph graph) throws DatabaseException { - write.perform(graph); - } - - }); - } catch (DatabaseException e) { - e.printStackTrace(); - } - } - - @Override - public void postModification(AsyncRequestProcessor processor, final WriteOnly request) { - - if(processor == null) processor = sessionRequestProcessor; - - processor.asyncRequest(new WriteOnlyRequest(this) { - - @Override - public void perform(WriteOnlyGraph graph) throws DatabaseException { - request.perform(graph); - } - - }); - - } - - @Override - public void updateStatements(int resource, int[] statements) { - applyStatements(resource, statements); - } - - @Override - public void updateValue(int resource, Object value, Binding binding) { - applyValue(resource, value, binding); - } - - @Override - public boolean isPending(int subject) { - - VirtualCluster cluster = getCluster(subject, false); - if(cluster == null) return false; - else return cluster.isPending(subject); - - } - - @Override - public boolean isPending(int subject, int predicate) { - - VirtualCluster cluster = getCluster(subject, false); - if(cluster == null) return false; - else return cluster.isPending(subject, predicate); - - } - - @Override - public void load(ReadGraphImpl graph, int resource, int predicate, final Callback callback) throws DatabaseException { - producePartialStatements(graph, resource, predicate, new AsyncProcedure() { - - @Override - public void execute(AsyncReadGraph graph, Object result) { - callback.run((ReadGraphImpl)graph); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - callback.run((ReadGraphImpl)graph); - } - - }); - } - - @Override - public void load(ReadGraphImpl graph, int resource, final Callback callback) throws DatabaseException { - produceAllStatements(graph, resource, new AsyncProcedure() { - - @Override - public void execute(AsyncReadGraph graph, Object result) { - callback.run((ReadGraphImpl)graph); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - callback.run((ReadGraphImpl)graph); - } - - }); - } - - public Collection listStatements() { - ArrayList result = new ArrayList(); - for(int i=0;i listValues() { - ArrayList result = new ArrayList(); - for(int i=0;i statements = new ArrayList(); + private final TIntHashSet lazy = new TIntHashSet(); + private final TIntObjectHashMap values = new TIntObjectHashMap(); + private final int clusterId; + + public VirtualCluster(int clusterId) { + this.clusterId = clusterId; + } + + public int clusterId() { + return clusterId; + } + + public void trim() { + } + + private TIntArrayList getPredicateMap(int subject) { + + int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject); + if(rId >= statements.size()) return null; + return statements.get(rId); + + } + + public boolean isPending(int subject) { + return lazy.contains(subject); + } + + boolean containsPredicate(TIntArrayList statements, int predicate) { + for(int i=0;i>>1); + if((clusterId & 1) == 0) // Virtual subjects + return clusterBits | 0x80000000; // We are assuming that MSB means virtual resource. + else // Database subjects + return clusterBits; + } + + public int getTransientId(int subject) { + if((clusterId & 1) == 0) // Virtual subjects + return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject) | 0x80000000; + else // Database subjects + return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject); + } + + /* + * Creates a persistent identifier for given transient graph cluster identifier. + * + * + * For persistent clusters this obtains the random access id for (invalid) + * resource at index 0 of given cluster. LSB in given id indicates persistence + * + * + */ + + public static long getClusterIdentifier(SerialisationSupport ss, int clusterKey) { + if((clusterKey & 1) == 0)// Virtual subjects + return clusterKey; + else { // Database subjects + int rk = ClusterTraitsBase.createResourceKeyNoThrow(clusterKey>>1, 1); + try { + // Assuming that cluster's first resource is created first and not deleted. + // Assuming that resource index is in LSB bits. + return ss.getRandomAccessId(rk); + } catch (DatabaseException e) { + Logger.defaultLogError("Could not get cluster id for virtual cluster key=" + clusterKey, e); + return -1; + } + } + } + + public void saveImpl(final File file, SerialisationSupport ss) throws IOException { + if (DEBUG) + System.out.println("DEBUG: Saving virtual cluster " + clusterId + " to " + file.getAbsolutePath()); + + final ByteFileWriter writer = new ByteFileWriter(file); + + try { + + int stms = 0; + for(TIntArrayList list : statements) { + if(list != null) + stms += list.size(); + } + + writer.write((int)(1.5*stms)); + @SuppressWarnings("unused") + int count = 0; + for(int i=0;i() { + + @Override + public boolean execute(int a, byte[] b) { + writer.write(a); + writer.write(b.length); + writer.write(b); + return true; + } + }); + if (DEBUG) + System.out.println("TransientGraph[" + file.getAbsolutePath() + "] wrote " + count + " statements and " + values.size() + " values to disk."); + + } finally { + writer.commit(); +// FileUtils.uncheckedClose(_os); + } + } + + public void load(File file, SerialisationSupport serialization, VirtualGraphServerSupport vgss) throws DatabaseException { + if (DEBUG) + System.out.println("DEBUG: Loading virtual cluster " + clusterId + " from " + file.getAbsolutePath() + " " + file.length()); + + ByteFileReader reader = null; + try { + + if (!file.exists()) + return; + + reader = new ByteFileReader(file); + int clusterInt = ClusterTraitsBase.getClusterBits(clusterId()>>1); + int stms = reader.readInt(); + for (int i = 0; i < stms; i += 3) { +// int rId = reader.readShort(); +// if(vgss != null) vgss.addVirtual(clusterInt + rId); +// claim(rId, +// serialization.getTransientId(reader.readLong()), +// serialization.getTransientId(reader.readLong())); + int rId = reader.readShort(); + long sId = reader.readLong(); + long oId = reader.readLong(); + int sTransientId = serialization.getTransientId(sId); + int oTransientId = serialization.getTransientId(oId); + + if(vgss != null) + vgss.addVirtual(clusterInt + rId); + + claim(rId, sTransientId, oTransientId); + } + + int values = reader.readInt(); + for (int i = 0; i < values; i++) { + int subject = reader.readInt(); + int length = reader.readInt(); + setValue(subject, reader.readBytes(length), length); + } + if (DEBUG) + System.out.println("DEBUG: TransientGraph[" + file.getAbsolutePath() + "] loaded " + stms / 3 + " statements and " + values + " values from disk."); + + } catch (IOException e) { + throw new DatabaseException(e); + } finally { + if (reader != null) + reader.close(); + } + } + + void listStatements(SerialisationSupport ss, ArrayList result) { + + int clusterKey = getTransientClusterKey(); + + try { + for(int i=0;i result) { + + values.forEachKey(new TIntProcedure() { + + @Override + public boolean execute(int value) { + try { + result.add(ss.getResource(getTransientId(value))); + } catch (DatabaseException e) { + e.printStackTrace(); + } + return true; + } + + }); + + } + +} + +public class TransientGraph implements VirtualGraphImpl, VirtualGraphContext { + private static final boolean DEBUG = VirtualCluster.DEBUG; + final private static int SWAP_LIMIT = 30; + +// final private static byte[] NO_VALUE = new byte[0]; + final private static VirtualCluster NO_CLUSTER = new VirtualCluster(-1); + + final private Persistency persistency; + + final private SerialisationSupport serialization; + final private ResourceSupport resourceSupport; + final private VirtualGraphServerSupport virtualGraphServerSupport; + final private RequestProcessor sessionRequestProcessor; + + /* + * Cluster array by index. + * -NO_CLUSTER value means that there is no such virtual cluster + * -null value means that such virtual cluster could be on disk + */ + final private ArrayList clusters = new ArrayList(); + + /* + * A list of resident clusters + */ + final private LinkedList memoryClusters = new LinkedList(); + + private final HashSet sources = new HashSet(); + + final String identifier; + final String databaseId; + + TIntObjectHashMap NO_STATEMENTS = new TIntObjectHashMap(); + + int[] EMPTY = new int[0]; + + public static TransientGraph workspacePersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, RequestProcessor srp, String databaseId, String identifier) throws DatabaseException { + TransientGraph graph = new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.WORKSPACE); + graph.load(); + return graph; + } + + public static TransientGraph memoryPersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, RequestProcessor srp, String databaseId, String identifier) { + return new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.MEMORY); + } + + private TransientGraph(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, RequestProcessor srp, String databaseId, String identifier, Persistency persistency) { + this.serialization = ss; + this.virtualGraphServerSupport = vgss; + this.sessionRequestProcessor = srp; + this.resourceSupport = rs; + this.identifier = identifier; + this.databaseId = databaseId; + + this.persistency = persistency; + } + + public String getIdentifier() { + return identifier; + } + + private int transientClusterId(long id) throws DatabaseException { + if (DEBUG) + System.out.println("DEBUG: transientClusterId=" + id); + if ((id & 1) == 0) // Cluster of virtual subjects + return (int)id; + // Corresponds to persistent cluster + int rk = serialization.getTransientId(id); + return ClusterTraitsBase.getClusterKeyFromResourceKey(rk) << 1 | 1; + } + + private String getPrefix() { + return identifier + "." + persistency.identifier() + "." + databaseId; + } + + private void load() throws DatabaseException { + + String prefix = getPrefix(); + for(String file : virtualGraphServerSupport.storagePath().list()) { + try { + if(file.startsWith(prefix)) { + long clusterLong = Long.parseLong(file.substring(prefix.length()+4)); + int clusterId = transientClusterId(clusterLong); + VirtualCluster cluster = new VirtualCluster(clusterId); + cluster.load(new File(virtualGraphServerSupport.storagePath(), file), serialization, (clusterId & 1) > 0 ? virtualGraphServerSupport : null); + clusters.ensureCapacity(clusterId+1); + for(int i=clusters.size(); i 0) { + int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(subject); + return (ck << 1) | 1; + } + int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(~subject); + return ck << 1; + } + + private VirtualCluster getOrLoad(int virtualClusterKey) { + + if(virtualClusterKey < clusters.size()) { + VirtualCluster cluster = clusters.get(virtualClusterKey); + if(NO_CLUSTER == cluster) return null; + if(cluster != null) return cluster; + } + if (DEBUG) + System.out.println("Loading virtual cluster " + virtualClusterKey + " for " + identifier); + + clusters.ensureCapacity(virtualClusterKey+1); + for(int i=clusters.size(); i 0 ? virtualGraphServerSupport : null); + clusters.set(virtualClusterKey, cluster); + memoryClusters.addFirst(cluster); + swap(); + return cluster; + } catch (DatabaseException e) { + e.printStackTrace(); + // File must be corrupt, lets delete it so we wont load it next time in future + file.delete(); + + clusters.set(virtualClusterKey, NO_CLUSTER); + return null; + } + + } else { + clusters.set(virtualClusterKey, NO_CLUSTER); + return null; + } + } + + private void swap() { + + trimClusters(); + while(memoryClusters.size() > SWAP_LIMIT) { + VirtualCluster remo = memoryClusters.removeLast(); + File file = new File(virtualGraphServerSupport.storagePath(), getPrefix() + ".vg." + VirtualCluster.getClusterIdentifier(serialization, remo.clusterId())); + try { + remo.saveImpl(file, serialization); + } catch (IOException e) { + e.printStackTrace(); + } + clusters.set(remo.clusterId(), null); + } + + } + + private synchronized VirtualCluster getCluster(int subject, boolean create) { + + int clusterId = getVirtualClusterKey(subject); + + VirtualCluster cluster = getOrLoad(clusterId); + if(cluster != null) return cluster; + + if(create) { + + clusters.ensureCapacity(clusterId+1); + for(int i=clusters.size(); i 0) virtualGraphServerSupport.addVirtual(subject); + + } + + private synchronized void produceAllStatements(ReadGraphImpl graph, final int subject, final AsyncProcedure procedure) throws DatabaseException { + + VirtualCluster cluster = getCluster(subject, true); + + // This resource becomes a normal resource, all data is requeried + cluster.resetLazy(subject); + + for(VirtualGraphSource source : sources) { + source.getStatements(graph, this, subject); + } + + if(subject > 0) virtualGraphServerSupport.addVirtual(subject); + + } + + private synchronized void producePartialStatements(ReadGraphImpl graph, final int subject, final int predicate, final AsyncProcedure procedure) throws DatabaseException { + + for(VirtualGraphSource source : sources) { + source.getStatements(graph, this, subject, predicate); + } + + if(subject > 0) virtualGraphServerSupport.addVirtual(subject); + + } + + @Override + public int getIndex(Resource resource) { + try { + return serialization.getTransientId(resource); + } catch (DatabaseException e) { + e.printStackTrace(); + } + return 0; + } + + @Override + public Resource getResource(int index) { + return new ResourceImpl(resourceSupport, index); + } + + @Override + public void register(final VirtualGraphSource source) { + if(sources.add(source)) { + source.attach(TransientGraph.this); + } + } + + @Override + public void claim(int subject, int predicate, int object) { + + VirtualCluster cluster = getCluster(subject, true); + cluster.claim(subject, predicate, object); + if(subject > 0) virtualGraphServerSupport.addVirtual(subject); + + } + + @Override + public synchronized int[] getObjects(int subject, int predicate) { + VirtualCluster cluster = getCluster(subject, false); + if(cluster == null) return EMPTY; + return cluster.getObjects(subject, predicate); + } + + @Override + public synchronized int[] getPredicates(int subject) { + VirtualCluster cluster = getCluster(subject, false); + if(cluster == null) return EMPTY; + return cluster.getPredicates(subject); + } + + @Override + public synchronized byte[] getValue(int subject) { + VirtualCluster cluster = getCluster(subject, false); + if(cluster == null) return null; + return cluster.getValue(subject); + } + + @Override + public int newResource(boolean isLazy) { + + int id = virtualGraphServerSupport.createVirtual(); + VirtualCluster cluster = getCluster(id, true); + if(isLazy) cluster.setLazy(id); + return id; + + } + + @Override + public void finish(int subject) { + VirtualCluster cluster = getCluster(subject, false); + cluster.finish(subject); + } + + @Override + public void deny(int subject, int predicate, int object) { + + VirtualCluster cluster = getCluster(subject, true); + cluster.deny(subject, predicate, object); + + } + + @Override + public void claimValue(int subject, byte[] data, int length) { + + VirtualCluster cluster = getCluster(subject, true); + cluster.setValue(subject, data, length); + if(subject > 0) virtualGraphServerSupport.addVirtual(subject); + + } + + @Override + public void denyValue(int subject) { + // FIXME: this implementation is probably not proper, Antti needs to work on this. + VirtualCluster cluster = getCluster(subject, true); + cluster.denyValue(subject); + if(subject > 0) virtualGraphServerSupport.removeVirtual(subject); + } + + @Override + public void initialise(final Write write) { + try { + sessionRequestProcessor.syncRequest(new WriteRequest(this) { + + @Override + public void perform(WriteGraph graph) throws DatabaseException { + write.perform(graph); + } + + }); + } catch (DatabaseException e) { + e.printStackTrace(); + } + } + + @Override + public void postModification(AsyncRequestProcessor processor, final WriteOnly request) { + + if(processor == null) processor = sessionRequestProcessor; + + processor.asyncRequest(new WriteOnlyRequest(this) { + + @Override + public void perform(WriteOnlyGraph graph) throws DatabaseException { + request.perform(graph); + } + + }); + + } + + @Override + public void updateStatements(int resource, int[] statements) { + applyStatements(resource, statements); + } + + @Override + public void updateValue(int resource, Object value, Binding binding) { + applyValue(resource, value, binding); + } + + @Override + public boolean isPending(int subject) { + + VirtualCluster cluster = getCluster(subject, false); + if(cluster == null) return false; + else return cluster.isPending(subject); + + } + + @Override + public boolean isPending(int subject, int predicate) { + + VirtualCluster cluster = getCluster(subject, false); + if(cluster == null) return false; + else return cluster.isPending(subject, predicate); + + } + + @Override + public void load(ReadGraphImpl graph, int resource, int predicate, final Callback callback) throws DatabaseException { + producePartialStatements(graph, resource, predicate, new AsyncProcedure() { + + @Override + public void execute(AsyncReadGraph graph, Object result) { + callback.run((ReadGraphImpl)graph); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + callback.run((ReadGraphImpl)graph); + } + + }); + } + + @Override + public void load(ReadGraphImpl graph, int resource, final Callback callback) throws DatabaseException { + produceAllStatements(graph, resource, new AsyncProcedure() { + + @Override + public void execute(AsyncReadGraph graph, Object result) { + callback.run((ReadGraphImpl)graph); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + callback.run((ReadGraphImpl)graph); + } + + }); + } + + public Collection listStatements() { + ArrayList result = new ArrayList(); + for(int i=0;i listValues() { + ArrayList result = new ArrayList(); + for(int i=0;i