/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.impl; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.LinkedList; import java.util.function.Consumer; import org.simantics.databoard.Bindings; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.serialization.SerializationException; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.serialization.SerializerConstructionException; import org.simantics.db.AsyncReadGraph; import org.simantics.db.AsyncRequestProcessor; import org.simantics.db.Resource; import org.simantics.db.Statement; import org.simantics.db.VirtualGraphContext; import org.simantics.db.VirtualGraphSource; import org.simantics.db.WriteGraph; import org.simantics.db.WriteOnlyGraph; import org.simantics.db.common.ByteFileReader; import org.simantics.db.common.ByteFileWriter; import org.simantics.db.common.StandardStatement; import org.simantics.db.common.request.WriteOnlyRequest; import org.simantics.db.common.request.WriteRequest; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.support.ResourceSupport; import org.simantics.db.impl.support.VirtualGraphServerSupport; import org.simantics.db.procedure.AsyncProcedure; import org.simantics.db.request.Write; import org.simantics.db.request.WriteOnly; import org.simantics.db.service.SerialisationSupport; import gnu.trove.list.array.TIntArrayList; import gnu.trove.map.hash.TIntObjectHashMap; import gnu.trove.procedure.TIntObjectProcedure; import gnu.trove.procedure.TIntProcedure; import gnu.trove.set.hash.TIntHashSet; class VirtualCluster { static final boolean DEBUG = false; final static int[] EMPTY = new int[0]; private final ArrayList statements = new ArrayList(); private final TIntHashSet lazy = new TIntHashSet(); private final TIntObjectHashMap values = new TIntObjectHashMap(); private final int clusterId; public VirtualCluster(int clusterId) { this.clusterId = clusterId; } public int clusterId() { return clusterId; } public void trim() { } private TIntArrayList getPredicateMap(int subject) { int rId = ClusterTraitsBase.getResourceIndexFromResourceKeyNoThrow(subject); if(rId >= statements.size()) return null; return statements.get(rId); } public boolean isPending(int subject) { return lazy.contains(subject); } boolean containsPredicate(TIntArrayList statements, int predicate) { for(int i=0;i>>1); if((clusterId & 1) == 0) // Virtual subjects return clusterBits | 0x80000000; // We are assuming that MSB means virtual resource. else // Database subjects return clusterBits; } public int getTransientId(int subject) { if((clusterId & 1) == 0) // Virtual subjects return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject) | 0x80000000; else // Database subjects return ClusterTraitsBase.createResourceKeyNoThrow(clusterId>>1, subject); } /* * Creates a persistent identifier for given transient graph cluster identifier. * * * For persistent clusters this obtains the random access id for (invalid) * resource at index 0 of given cluster. LSB in given id indicates persistence * * */ public static long getClusterIdentifier(SerialisationSupport ss, int clusterKey) { if((clusterKey & 1) == 0)// Virtual subjects return clusterKey; else { // Database subjects int rk = ClusterTraitsBase.createResourceKeyNoThrow(clusterKey>>1, 1); try { // Assuming that cluster's first resource is created first and not deleted. // Assuming that resource index is in LSB bits. return ss.getRandomAccessId(rk); } catch (DatabaseException e) { Logger.defaultLogError("Could not get cluster id for virtual cluster key=" + clusterKey, e); return -1; } } } public void saveImpl(final File file, SerialisationSupport ss) throws IOException { if (DEBUG) System.out.println("DEBUG: Saving virtual cluster " + clusterId + " to " + file.getAbsolutePath()); final ByteFileWriter writer = new ByteFileWriter(file); try { int stms = 0; for(TIntArrayList list : statements) { if(list != null) stms += list.size(); } writer.write((int)(1.5*stms)); @SuppressWarnings("unused") int count = 0; for(int i=0;i() { @Override public boolean execute(int a, byte[] b) { writer.write(a); writer.write(b.length); writer.write(b); return true; } }); if (DEBUG) System.out.println("TransientGraph[" + file.getAbsolutePath() + "] wrote " + count + " statements and " + values.size() + " values to disk."); } finally { writer.commit(); // FileUtils.uncheckedClose(_os); } } public void load(File file, SerialisationSupport serialization, VirtualGraphServerSupport vgss) throws DatabaseException { if (DEBUG) System.out.println("DEBUG: Loading virtual cluster " + clusterId + " from " + file.getAbsolutePath() + " " + file.length()); ByteFileReader reader = null; try { if (!file.exists()) return; reader = new ByteFileReader(file); int clusterInt = ClusterTraitsBase.getClusterBits(clusterId()>>1); int stms = reader.readInt(); for (int i = 0; i < stms; i += 3) { // int rId = reader.readShort(); // if(vgss != null) vgss.addVirtual(clusterInt + rId); // claim(rId, // serialization.getTransientId(reader.readLong()), // serialization.getTransientId(reader.readLong())); int rId = reader.readShort(); long sId = reader.readLong(); long oId = reader.readLong(); int sTransientId = serialization.getTransientId(sId); int oTransientId = serialization.getTransientId(oId); if(vgss != null) vgss.addVirtual(clusterInt + rId); claim(rId, sTransientId, oTransientId); } int values = reader.readInt(); for (int i = 0; i < values; i++) { int subject = reader.readInt(); int length = reader.readInt(); setValue(subject, reader.readBytes(length), length); } if (DEBUG) System.out.println("DEBUG: TransientGraph[" + file.getAbsolutePath() + "] loaded " + stms / 3 + " statements and " + values + " values from disk."); } catch (IOException e) { throw new DatabaseException(e); } finally { if (reader != null) reader.close(); } } void listStatements(SerialisationSupport ss, ArrayList result) { int clusterKey = getTransientClusterKey(); try { for(int i=0;i result) { values.forEachKey(new TIntProcedure() { @Override public boolean execute(int value) { try { result.add(ss.getResource(getTransientId(value))); } catch (DatabaseException e) { e.printStackTrace(); } return true; } }); } } public class TransientGraph implements VirtualGraphImpl, VirtualGraphContext { private static final boolean DEBUG = VirtualCluster.DEBUG; final private static int SWAP_LIMIT = 30; // final private static byte[] NO_VALUE = new byte[0]; final private static VirtualCluster NO_CLUSTER = new VirtualCluster(-1); final private Persistency persistency; final private SerialisationSupport serialization; final private ResourceSupport resourceSupport; final private VirtualGraphServerSupport virtualGraphServerSupport; final private AsyncRequestProcessor sessionRequestProcessor; /* * Cluster array by index. * -NO_CLUSTER value means that there is no such virtual cluster * -null value means that such virtual cluster could be on disk */ final private ArrayList clusters = new ArrayList(); /* * A list of resident clusters */ final private LinkedList memoryClusters = new LinkedList(); private final HashSet sources = new HashSet(); final String identifier; final String databaseId; TIntObjectHashMap NO_STATEMENTS = new TIntObjectHashMap(); int[] EMPTY = new int[0]; public static TransientGraph workspacePersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, AsyncRequestProcessor srp, String databaseId, String identifier) throws DatabaseException { TransientGraph graph = new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.WORKSPACE); graph.load(); return graph; } public static TransientGraph memoryPersistent(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, AsyncRequestProcessor srp, String databaseId, String identifier) { return new TransientGraph(ss, vgss, rs, srp, databaseId, identifier, Persistency.MEMORY); } private TransientGraph(SerialisationSupport ss, VirtualGraphServerSupport vgss, ResourceSupport rs, AsyncRequestProcessor srp, String databaseId, String identifier, Persistency persistency) { this.serialization = ss; this.virtualGraphServerSupport = vgss; this.sessionRequestProcessor = srp; this.resourceSupport = rs; this.identifier = identifier; this.databaseId = databaseId; this.persistency = persistency; } public String getIdentifier() { return identifier; } private int transientClusterId(long id) throws DatabaseException { if (DEBUG) System.out.println("DEBUG: transientClusterId=" + id); if ((id & 1) == 0) // Cluster of virtual subjects return (int)id; // Corresponds to persistent cluster int rk = serialization.getTransientId(id); return ClusterTraitsBase.getClusterKeyFromResourceKey(rk) << 1 | 1; } private String getPrefix() { return identifier + "." + persistency.identifier() + "." + databaseId; } private void load() throws DatabaseException { String prefix = getPrefix(); for(String file : virtualGraphServerSupport.storagePath().list()) { try { if(file.startsWith(prefix)) { long clusterLong = Long.parseLong(file.substring(prefix.length()+4)); int clusterId = transientClusterId(clusterLong); VirtualCluster cluster = new VirtualCluster(clusterId); cluster.load(new File(virtualGraphServerSupport.storagePath(), file), serialization, (clusterId & 1) > 0 ? virtualGraphServerSupport : null); clusters.ensureCapacity(clusterId+1); for(int i=clusters.size(); i 0) { int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(subject); return (ck << 1) | 1; } int ck = ClusterTraitsBase.getClusterKeyFromResourceKeyNoThrow(~subject); return ck << 1; } private VirtualCluster getOrLoad(int virtualClusterKey) { if(virtualClusterKey < clusters.size()) { VirtualCluster cluster = clusters.get(virtualClusterKey); if(NO_CLUSTER == cluster) return null; if(cluster != null) return cluster; } if (DEBUG) System.out.println("Loading virtual cluster " + virtualClusterKey + " for " + identifier); clusters.ensureCapacity(virtualClusterKey+1); for(int i=clusters.size(); i 0 ? virtualGraphServerSupport : null); clusters.set(virtualClusterKey, cluster); memoryClusters.addFirst(cluster); swap(); return cluster; } catch (DatabaseException e) { e.printStackTrace(); // File must be corrupt, lets delete it so we wont load it next time in future file.delete(); clusters.set(virtualClusterKey, NO_CLUSTER); return null; } } else { clusters.set(virtualClusterKey, NO_CLUSTER); return null; } } private void swap() { trimClusters(); while(memoryClusters.size() > SWAP_LIMIT) { VirtualCluster remo = memoryClusters.removeLast(); File file = new File(virtualGraphServerSupport.storagePath(), getPrefix() + ".vg." + VirtualCluster.getClusterIdentifier(serialization, remo.clusterId())); try { remo.saveImpl(file, serialization); } catch (IOException e) { e.printStackTrace(); } clusters.set(remo.clusterId(), null); } } private synchronized VirtualCluster getCluster(int subject, boolean create) { int clusterId = getVirtualClusterKey(subject); VirtualCluster cluster = getOrLoad(clusterId); if(cluster != null) return cluster; if(create) { clusters.ensureCapacity(clusterId+1); for(int i=clusters.size(); i 0) virtualGraphServerSupport.addVirtual(subject); } private synchronized void produceAllStatements(ReadGraphImpl graph, final int subject, final AsyncProcedure procedure) throws DatabaseException { VirtualCluster cluster = getCluster(subject, true); // This resource becomes a normal resource, all data is requeried cluster.resetLazy(subject); for(VirtualGraphSource source : sources) { source.getStatements(graph, this, subject); } if(subject > 0) virtualGraphServerSupport.addVirtual(subject); } private synchronized void producePartialStatements(ReadGraphImpl graph, final int subject, final int predicate, final AsyncProcedure procedure) throws DatabaseException { for(VirtualGraphSource source : sources) { source.getStatements(graph, this, subject, predicate); } if(subject > 0) virtualGraphServerSupport.addVirtual(subject); } @Override public int getIndex(Resource resource) { try { return serialization.getTransientId(resource); } catch (DatabaseException e) { e.printStackTrace(); } return 0; } @Override public Resource getResource(int index) { return new ResourceImpl(resourceSupport, index); } @Override public void register(final VirtualGraphSource source) { if(sources.add(source)) { source.attach(TransientGraph.this); } } @Override public void claim(int subject, int predicate, int object) { VirtualCluster cluster = getCluster(subject, true); cluster.claim(subject, predicate, object); if(subject > 0) virtualGraphServerSupport.addVirtual(subject); } @Override public synchronized int[] getObjects(int subject, int predicate) { VirtualCluster cluster = getCluster(subject, false); if(cluster == null) return EMPTY; return cluster.getObjects(subject, predicate); } @Override public synchronized int[] getPredicates(int subject) { VirtualCluster cluster = getCluster(subject, false); if(cluster == null) return EMPTY; return cluster.getPredicates(subject); } @Override public synchronized byte[] getValue(int subject) { VirtualCluster cluster = getCluster(subject, false); if(cluster == null) return null; return cluster.getValue(subject); } @Override public int newResource(boolean isLazy) { int id = virtualGraphServerSupport.createVirtual(); VirtualCluster cluster = getCluster(id, true); if(isLazy) cluster.setLazy(id); return id; } @Override public void finish(int subject) { VirtualCluster cluster = getCluster(subject, false); cluster.finish(subject); } @Override public void deny(int subject, int predicate, int object) { VirtualCluster cluster = getCluster(subject, true); cluster.deny(subject, predicate, object); } @Override public void claimValue(int subject, byte[] data, int length) { VirtualCluster cluster = getCluster(subject, true); cluster.setValue(subject, data, length); if(subject > 0) virtualGraphServerSupport.addVirtual(subject); } @Override public void denyValue(int subject) { // FIXME: this implementation is probably not proper, Antti needs to work on this. VirtualCluster cluster = getCluster(subject, true); cluster.denyValue(subject); if(subject > 0) virtualGraphServerSupport.removeVirtual(subject); } @Override public void initialise(final Write write) { try { sessionRequestProcessor.syncRequest(new WriteRequest(this) { @Override public void perform(WriteGraph graph) throws DatabaseException { write.perform(graph); } }); } catch (DatabaseException e) { e.printStackTrace(); } } @Override public void postModification(AsyncRequestProcessor processor, final WriteOnly request) { if(processor == null) processor = sessionRequestProcessor; processor.asyncRequest(new WriteOnlyRequest(this) { @Override public void perform(WriteOnlyGraph graph) throws DatabaseException { request.perform(graph); } }); } @Override public void updateStatements(int resource, int[] statements) { applyStatements(resource, statements); } @Override public void updateValue(int resource, Object value, Binding binding) { applyValue(resource, value, binding); } @Override public boolean isPending(int subject) { VirtualCluster cluster = getCluster(subject, false); if(cluster == null) return false; else return cluster.isPending(subject); } @Override public boolean isPending(int subject, int predicate) { VirtualCluster cluster = getCluster(subject, false); if(cluster == null) return false; else return cluster.isPending(subject, predicate); } @Override public void load(ReadGraphImpl graph, int resource, int predicate, final Consumer callback) throws DatabaseException { producePartialStatements(graph, resource, predicate, new AsyncProcedure() { @Override public void execute(AsyncReadGraph graph, Object result) { callback.accept((ReadGraphImpl)graph); } @Override public void exception(AsyncReadGraph graph, Throwable throwable) { callback.accept((ReadGraphImpl)graph); } }); } @Override public void load(ReadGraphImpl graph, int resource, final Consumer callback) throws DatabaseException { produceAllStatements(graph, resource, new AsyncProcedure() { @Override public void execute(AsyncReadGraph graph, Object result) { callback.accept((ReadGraphImpl)graph); } @Override public void exception(AsyncReadGraph graph, Throwable throwable) { callback.accept((ReadGraphImpl)graph); } }); } public Collection listStatements() { ArrayList result = new ArrayList(); for(int i=0;i listValues() { ArrayList result = new ArrayList(); for(int i=0;i