-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package org.simantics.db.impl.graph;\r
-\r
-\r
-import java.io.DataInput;\r
-import java.io.DataOutput;\r
-import java.io.File;\r
-import java.io.FileInputStream;\r
-import java.io.FileOutputStream;\r
-import java.io.IOException;\r
-import java.io.InputStream;\r
-import java.io.OutputStream;\r
-import java.nio.ByteBuffer;\r
-import java.nio.channels.FileChannel;\r
-import java.util.ArrayList;\r
-import java.util.HashSet;\r
-import java.util.IdentityHashMap;\r
-import java.util.List;\r
-import java.util.Set;\r
-import java.util.TreeMap;\r
-import java.util.UUID;\r
-\r
-import org.eclipse.core.runtime.Platform;\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.databoard.accessor.Accessor;\r
-import org.simantics.databoard.accessor.reference.ChildReference;\r
-import org.simantics.databoard.binding.Binding;\r
-import org.simantics.databoard.binding.error.BindingConstructionException;\r
-import org.simantics.databoard.binding.impl.BindingPrintContext;\r
-import org.simantics.databoard.serialization.SerializationException;\r
-import org.simantics.databoard.serialization.Serializer;\r
-import org.simantics.databoard.type.Datatype;\r
-import org.simantics.databoard.util.IdentityPair;\r
-import org.simantics.databoard.util.binary.RandomAccessBinary;\r
-import org.simantics.db.Metadata;\r
-import org.simantics.db.ReadGraph;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.Session;\r
-import org.simantics.db.Statement;\r
-import org.simantics.db.VirtualGraph;\r
-import org.simantics.db.WriteGraph;\r
-import org.simantics.db.WriteOnlyGraph;\r
-import org.simantics.db.common.MetadataUtils;\r
-import org.simantics.db.common.request.WriteOnlyRequest;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.exception.BindingException;\r
-import org.simantics.db.exception.ClusterSetExistException;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;\r
-import org.simantics.db.exception.NoSingleResultException;\r
-import org.simantics.db.exception.ResourceNotFoundException;\r
-import org.simantics.db.exception.ServiceException;\r
-import org.simantics.db.impl.DebugPolicy;\r
-import org.simantics.db.impl.ResourceImpl;\r
-import org.simantics.db.request.DelayedWrite;\r
-import org.simantics.db.request.WriteOnly;\r
-import org.simantics.db.request.WriteResult;\r
-import org.simantics.db.request.WriteTraits;\r
-import org.simantics.db.service.ByteReader;\r
-import org.simantics.db.service.ClusteringSupport;\r
-import org.simantics.db.service.TransferableGraphSupport;\r
-import org.simantics.db.service.XSupport;\r
-import org.simantics.layer0.Layer0;\r
-import org.simantics.utils.datastructures.MapList;\r
-\r
-import gnu.trove.list.array.TIntArrayList;\r
-import gnu.trove.map.hash.TIntIntHashMap;\r
-import gnu.trove.map.hash.TObjectIntHashMap;\r
-\r
-/**\r
- * Write graph implementation that does not modify the database\r
- * immediately but with an explicit commit method. All read operations\r
- * return results based on the old graph. \r
- */\r
-public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, ByteReader {\r
- \r
- private static final boolean DEBUG = false;\r
- public static int BUFFER = 65536;\r
-\r
- static class Haxx extends Binding {\r
-\r
- static final Serializer serializer = new Serializer() {\r
- \r
- public byte[] serialize(Object obj) throws SerializationException {\r
- return (byte[])obj;\r
- }\r
-\r
- @Override\r
- public void serialize(DataOutput out,\r
- TObjectIntHashMap<Object> identities, Object obj)\r
- throws IOException {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public void serialize(DataOutput out, Object obj)\r
- throws IOException {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public Object deserialize(DataInput in, List<Object> identities)\r
- throws IOException {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public Object deserialize(DataInput in) throws IOException {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public void deserializeTo(DataInput in, List<Object> identities,\r
- Object obj) throws IOException {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public void deserializeTo(DataInput in, Object obj)\r
- throws IOException {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public void skip(DataInput in, List<Object> identities)\r
- throws IOException {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public void skip(DataInput in) throws IOException {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public Integer getConstantSize() {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public int getSize(Object obj, TObjectIntHashMap<Object> identities)\r
- throws IOException {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public int getSize(Object obj) throws IOException {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public int getMinSize() {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- };\r
- \r
- @Override\r
- public Serializer serializer() {\r
- return serializer;\r
- }\r
- \r
- @Override\r
- public void accept(Visitor1 v, Object obj) {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public <T> T accept(Visitor<T> v) {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public boolean isInstance(Object obj) {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public void assertInstaceIsValid(Object obj, Set<Object> validInstances)\r
- throws org.simantics.databoard.binding.error.BindingException {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public int deepHashValue(Object value,\r
- IdentityHashMap<Object, Object> hashedObjects)\r
- throws org.simantics.databoard.binding.error.BindingException {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public int deepCompare(Object o1, Object o2,\r
- Set<IdentityPair<Object, Object>> compareHistory)\r
- throws org.simantics.databoard.binding.error.BindingException {\r
- throw new Error("Not supported.");\r
- }\r
- \r
- @Override\r
- public void readFrom(Binding srcBinding, Object src, Object dst)\r
- throws org.simantics.databoard.binding.error.BindingException {\r
- throw new Error("Not supported.");\r
- }\r
- \r
- @Override\r
- public Object readFromTry(Binding srcBinding, Object src, Object dst)\r
- throws org.simantics.databoard.binding.error.BindingException {\r
- throw new Error("Not supported.");\r
- }\r
- \r
- @Override\r
- protected void toString(Object value, BindingPrintContext ctx) throws org.simantics.databoard.binding.error.BindingException {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public int getComponentCount() {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public Binding getComponentBinding(int index) {\r
- throw new Error("Not supported.");\r
- }\r
-\r
- @Override\r
- public Binding getComponentBinding(ChildReference path) {\r
- throw new Error("Not supported.");\r
- }\r
- \r
- }\r
- \r
- private static final Haxx haxx = new Haxx();\r
-\r
- private final int TERM = 0;\r
- private final int CLAIM = 1;\r
- private final int CLAIM_NOINVERSE = 2;\r
- private final int CLAIM_VALUE_B = 4;\r
- private final int DENY = 5;\r
- private final int DENY_VALUE = 6;\r
- private final int COMMIT_AND_CONTINUE = 7;\r
- \r
- static class ClusterSet {\r
- public Resource resource;\r
- ClusterSet() {\r
- ids = new TIntArrayList();\r
- old = false;\r
- }\r
- ClusterSet(boolean old, Resource r) {\r
- ids = new TIntArrayList();\r
- this.old = old;\r
- this.resource = r;\r
- }\r
- void add(int id) {\r
- ids.add(id);\r
- }\r
- private TIntArrayList ids;\r
- private final boolean old; // true if\r
- boolean isNew() {\r
- return !old;\r
- }\r
- }\r
- public class State {\r
- public File tempFile;\r
- public FileOutputStream out;\r
- public ArrayList<Resource> idToResource = new ArrayList<Resource>();\r
- public TIntIntHashMap externalToId = new TIntIntHashMap(); \r
- public ArrayList<Binding> idToBinding = new ArrayList<Binding>();\r
- public TObjectIntHashMap<Binding> bindingToId = new TObjectIntHashMap<Binding>();\r
- public Set<Resource> clusterSets = new HashSet<Resource>();\r
- public Set<Resource> clusterSetsForExistingResources = new HashSet<Resource>();\r
- public int clusterCount = 0;\r
- public long defaultCluster;\r
- public Resource defaultClusterSet;\r
- public int statementCount = 0;\r
- public int valueCount = 0;\r
- public int fileCount = 0;\r
- }\r
- \r
- public State writeState;\r
- public TreeMap<String,byte[]> metadata = new TreeMap<String,byte[]>();\r
- \r
- Layer0 b;\r
- Session session;\r
- public static Resource convertDelayedResource(Resource r) {\r
- if (r instanceof InternalResource) {\r
- InternalResource ri = (InternalResource)r;\r
- return ri.resource;\r
- }\r
- return r;\r
- }\r
- private static class InternalResource implements Resource {\r
-\r
- int id;\r
- long clusterId = 0;\r
- Resource resource = null;\r
- Resource clusterSet = null;\r
- \r
- public InternalResource(int id, long clusterId) {\r
- this.id = id;\r
- this.clusterId = clusterId;\r
- }\r
-\r
- public InternalResource(int id, Resource clusterSet) {\r
- this.id = id;\r
- this.clusterSet = clusterSet;\r
- }\r
- \r
- @Override\r
- public long getResourceId() {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public Resource get() {\r
- return this;\r
- }\r
- \r
- @Override\r
- public boolean isPersistent() {\r
- return false;\r
- }\r
-\r
- @Override\r
- public int compareTo(Resource o) {\r
- if(o instanceof InternalResource) {\r
- return Integer.compare(id, ((InternalResource)o).id);\r
- } else {\r
- return -1;\r
- }\r
- }\r
-\r
- @Override\r
- public int hashCode() {\r
- final int prime = 31;\r
- int result = 1;\r
- result = prime * result + id;\r
- return result;\r
- }\r
- \r
- @Override\r
- public int getThreadHash() {\r
- return hashCode();\r
- }\r
-\r
- @Override\r
- public boolean equals(Object obj) {\r
- if (this == obj)\r
- return true;\r
- if (obj == null)\r
- return false;\r
- if (!(obj instanceof InternalResource))\r
- return false;\r
- InternalResource other = (InternalResource) obj;\r
- if (id != other.id)\r
- return false;\r
- return true;\r
- }\r
- \r
-\r
- @Override\r
- public boolean equalsResource(Resource other) {\r
- return equals(other);\r
- }\r
- @Override\r
- public String toString() {\r
- StringBuilder sb = new StringBuilder(32);\r
- if(DebugPolicy.VERBOSE) {\r
- sb.append("[delayed id=");\r
- sb.append(id);\r
- sb.append("]");\r
- } else {\r
- sb.append("[did=");\r
- sb.append(id);\r
- sb.append("]");\r
- }\r
- return sb.toString();\r
- }\r
- }\r
-\r
- private int getId(Resource resource) {\r
- if(resource instanceof InternalResource)\r
- return ((InternalResource)resource).id;\r
- else {\r
- ResourceImpl r = (ResourceImpl)resource;\r
- int id = writeState.externalToId.get(r.id);\r
- if(id != 0) {\r
- return id;\r
- } else {\r
- id = writeState.idToResource.size();\r
- writeState.idToResource.add(resource);\r
- writeState.externalToId.put(r.id, id);\r
- return id;\r
- }\r
- }\r
- }\r
- \r
- private Resource getResource(int id) {\r
- return writeState.idToResource.get(id);\r
- }\r
- \r
- private int getBindingId(Binding binding) {\r
- if(writeState.bindingToId.contains(binding))\r
- return writeState.bindingToId.get(binding);\r
- else {\r
- int id = writeState.idToBinding.size();\r
- writeState.idToBinding.add(binding);\r
- writeState.bindingToId.put(binding, id);\r
- return id;\r
- }\r
- }\r
-\r
- public DelayedWriteGraph(ReadGraph g) throws IOException {\r
- super((ReadGraphImpl)g);\r
- writeState = new State();\r
- session = g.getSession();\r
- b = Layer0.getInstance(g);\r
- writeState.defaultCluster = newCluster();\r
- } \r
- \r
- public DelayedWriteGraph(ReadGraph g, State state) {\r
- super((ReadGraphImpl)g);\r
- session = g.getSession();\r
- b = Layer0.getInstance(g);\r
- this.writeState = state;\r
- } \r
-\r
- public DelayedWriteGraph newSync() {\r
- return new DelayedWriteGraph(this, writeState);\r
- }\r
- \r
- @Override\r
- public void claim(Resource subject, Resource predicate, Resource object)\r
- throws ServiceException {\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(object != null);\r
- \r
- Resource inverse = getPossibleInverse(predicate);\r
- claim(subject, predicate, inverse, object);\r
- }\r
-\r
- @Override\r
- public void addLiteral(Resource resource, Resource predicate, Resource inverse, Resource type, Object value,\r
- Binding binding) throws BindingException,\r
- ManyObjectsForFunctionalRelationException, ServiceException {\r
- Resource valueResource = newResource();\r
- claimValue(valueResource, value, binding);\r
- claim(valueResource, b.InstanceOf, null, type);\r
- claim(resource, predicate, inverse, valueResource); \r
- }\r
- \r
- @Override\r
- public void addLiteral(Resource resource, Resource predicate,\r
- Resource inverse, Object value, Binding binding)\r
- throws BindingException, ManyObjectsForFunctionalRelationException,\r
- ServiceException {\r
- \r
- Resource type = getType(value);\r
- if(type == null) {\r
- Resource literal = newResource();\r
- type = b.Literal;\r
- Resource dataType = newResource();\r
- claim(dataType, b.InstanceOf, null, b.DataType);\r
- claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL);\r
- claim(literal, b.HasDataType, null, dataType);\r
- claim(literal, b.InstanceOf, null, type);\r
- claimValue(literal, value, binding); \r
- claim(resource, predicate, inverse, literal);\r
- } else {\r
- addLiteral(resource, predicate, inverse, type, value, binding);\r
- }\r
- \r
- }\r
-\r
- @Override\r
- public <T extends Accessor> T newLiteral(Resource resource, Resource predicate, Datatype datatype, Object initialValue)\r
- throws DatabaseException {\r
-\r
- throw new UnsupportedOperationException();\r
-\r
- }\r
-\r
- @Override\r
- public RandomAccessBinary createRandomAccessBinary (Resource resource, Resource predicate, Datatype datatype, Object initialValue) throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
- \r
- @Override\r
- public RandomAccessBinary createRandomAccessBinary(Resource resource, Datatype datatype, Object initialValue) throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public void claimLiteral(Resource resource, Resource predicate, Object value) throws ManyObjectsForFunctionalRelationException, ServiceException {\r
-\r
- try {\r
- Binding b = Bindings.getBinding(value.getClass());\r
- claimLiteral(resource, predicate, value, b);\r
- } catch (BindingConstructionException e) {\r
- throw new IllegalArgumentException(e);\r
- } catch (BindingException e) {\r
- throw new IllegalArgumentException(e);\r
- }\r
-\r
- }\r
- \r
- @Override\r
- public void claimLiteral(Resource resource, Resource predicate, Object value, Binding binding) throws BindingException,\r
- ManyObjectsForFunctionalRelationException, ServiceException {\r
-\r
- Statement valueStatement = null;\r
- if(!(resource instanceof InternalResource)) valueStatement = getPossibleStatement(resource, predicate);\r
-\r
- if(valueStatement != null && resource.equals(valueStatement.getSubject())) {\r
-\r
- claimValue(valueStatement.getObject(), value, binding);\r
-\r
- } else {\r
-\r
- Resource type = getType(value);\r
- Resource literal = newResource();\r
- if (type == null) {\r
- type = b.Literal;\r
- Resource dataType = newResource();\r
- claim(dataType, b.InstanceOf, null, b.DataType);\r
- claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL);\r
- claim(literal, b.HasDataType, dataType);\r
- }\r
- claim(literal, b.InstanceOf, null, type);\r
- claimValue(literal, value, binding); \r
- claim(resource, predicate, literal);\r
- \r
- }\r
-\r
- }\r
- \r
- @Override\r
- public void claimLiteral(Resource resource, Resource predicate,\r
- Resource inverse, Resource type, Object value)\r
- throws BindingException, ManyObjectsForFunctionalRelationException,\r
- ServiceException {\r
-\r
- try {\r
- Binding b = Bindings.getBinding(value.getClass());\r
- claimLiteral(resource, predicate, inverse, type, value, b);\r
- } catch (BindingConstructionException e) {\r
- throw new IllegalArgumentException(e);\r
- } catch (BindingException e) {\r
- throw new IllegalArgumentException(e);\r
- }\r
- \r
- }\r
- \r
- @Override\r
- public void claimLiteral(Resource resource, Resource predicate,\r
- Resource inverse, Resource type, Object value, Binding binding)\r
- throws BindingException, ManyObjectsForFunctionalRelationException,\r
- ServiceException {\r
-\r
- Statement valueStatement = (resource instanceof InternalResource) ? null : getPossibleStatement(resource, predicate);\r
-\r
- if(valueStatement != null && resource.equals(valueStatement.getSubject())) {\r
-\r
- claimValue(valueStatement.getObject(), value, binding);\r
-\r
- } else {\r
-\r
- Resource valueResource = newResource();\r
- claim(valueResource, b.InstanceOf, null, type);\r
- claim(resource, predicate, inverse, valueResource);\r
- claimValue(valueResource, value, binding);\r
-\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public void claimLiteral(Resource resource, Resource predicate,\r
- Resource type, Object value) throws BindingException,\r
- ManyObjectsForFunctionalRelationException, ServiceException {\r
-\r
- try {\r
- Binding b = Bindings.getBinding(value.getClass());\r
- claimLiteral(resource, predicate, type, value, b);\r
- } catch (BindingConstructionException e) {\r
- throw new IllegalArgumentException(e);\r
- } catch (BindingException e) {\r
- throw new IllegalArgumentException(e);\r
- }\r
- \r
- }\r
- \r
- @Override\r
- public void claimLiteral(Resource resource, Resource predicate,\r
- Resource type, Object value, Binding binding)\r
- throws BindingException, ManyObjectsForFunctionalRelationException,\r
- ServiceException {\r
-\r
- try {\r
- Resource inverse = getSingleObject(predicate, b.InverseOf);\r
- claimLiteral(resource, predicate, inverse, type, value, binding);\r
- } catch (NoSingleResultException e) {\r
- throw new ServiceException(e);\r
- }\r
- \r
- }\r
- \r
- @Override\r
- public void deny(Resource subject) throws ServiceException {\r
- assert(subject != null);\r
- if(!(subject instanceof InternalResource)) {\r
- try {\r
- for (Statement statement : getStatements(subject, b.IsWeaklyRelatedTo)) {\r
- deny(statement);\r
- }\r
- } catch (ManyObjectsForFunctionalRelationException e) {\r
- throw new ServiceException(e);\r
- }\r
- }\r
- }\r
-\r
- @Override\r
- public void deny(Resource subject, Resource predicate)\r
- throws ServiceException {\r
- assert(subject != null);\r
- if(!(subject instanceof InternalResource)) {\r
- for (Resource object : getObjects(subject, predicate)) {\r
- deny(subject, predicate, object);\r
- }\r
- }\r
- }\r
-\r
- @Override\r
- public void deny(Resource subject, Resource predicate, Resource object) throws ServiceException {\r
- denyStatement(subject, predicate, object);\r
- }\r
-\r
- @Override\r
- public void denyStatement(Resource subject, Resource predicate, Resource object) throws ServiceException {\r
- deny(subject, predicate, getPossibleInverse(predicate), object);\r
- }\r
-\r
- @Override\r
- public void deny(Statement statement) throws ServiceException {\r
- Resource predicate = statement.getPredicate();\r
- deny(statement.getSubject(), predicate, getPossibleInverse(predicate), statement.getObject());\r
- }\r
-\r
- @Override\r
- public void denyValue(Resource resource, Resource predicate)\r
- throws ManyObjectsForFunctionalRelationException, ServiceException {\r
- assert(resource != null);\r
- assert(predicate != null);\r
-\r
- if(!(resource instanceof InternalResource)) {\r
- Statement valueStatement = getPossibleStatement(resource, predicate);\r
-\r
- if (valueStatement != null && !valueStatement.isAsserted(resource)) {\r
- Resource value = valueStatement.getObject();\r
- denyValue(value);\r
- }\r
- }\r
- }\r
-\r
- @Override\r
- public Resource newResource() throws ServiceException {\r
- if(writeState.defaultClusterSet != null) return newResource(writeState.defaultClusterSet);\r
- else return newResource(writeState.defaultCluster);\r
- }\r
-\r
- @Override\r
- public Resource newResource(long clusterId) throws ServiceException {\r
- int id = writeState.idToResource.size();\r
- InternalResource ret = new InternalResource(id, clusterId);\r
- writeState.idToResource.add(ret);\r
- return ret;\r
- }\r
-\r
- @Override\r
- public Resource newResource(Resource clusterSet) throws ServiceException {\r
- \r
- if ((clusterSet instanceof InternalResource)) {\r
- if(!writeState.clusterSets.contains(clusterSet))\r
- throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);\r
- } else {\r
- WriteSupport ws = session.getService(WriteSupport.class);\r
- if (!ws.hasClusterSet(null, clusterSet))\r
- if(!writeState.clusterSetsForExistingResources.contains(clusterSet))\r
- throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);\r
- }\r
- \r
- int id = writeState.idToResource.size();\r
- InternalResource ret = new InternalResource(id, clusterSet);\r
- writeState.idToResource.add(ret);\r
- return ret;\r
- }\r
-\r
- @Override\r
- public void newClusterSet(Resource clusterSet) throws ServiceException {\r
- if (DEBUG)\r
- System.out.println("new cluster set=" + clusterSet);\r
- boolean existingResource = !(clusterSet instanceof InternalResource);\r
- if (existingResource) {\r
- WriteSupport ws = session.getService(WriteSupport.class);\r
- if (ws.hasClusterSet(null, clusterSet))\r
- throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);\r
- writeState.clusterSetsForExistingResources.add(clusterSet); \r
- } else { \r
- if(!writeState.clusterSets.add(clusterSet))\r
- throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);\r
- }\r
- }\r
- @Override\r
- public Resource setClusterSet4NewResource(Resource clusterSet)\r
- throws ServiceException {\r
- Resource existing = writeState.defaultClusterSet; \r
- writeState.defaultClusterSet = clusterSet;\r
- return existing;\r
- }\r
- \r
- @Override\r
- public void claim(Resource subject, Resource predicate, Resource inverse,\r
- Resource object) throws ServiceException {\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(object != null);\r
- try {\r
- if(inverse != null && !(subject.equals(object) && inverse.equals(predicate))) {\r
- writeByte(CLAIM);\r
- writeInt(getId(subject));\r
- writeInt(getId(predicate));\r
- writeInt(getId(inverse));\r
- writeInt(getId(object));\r
- } else {\r
- writeByte(CLAIM_NOINVERSE);\r
- writeInt(getId(subject));\r
- writeInt(getId(predicate));\r
- writeInt(getId(object));\r
- }\r
- } catch(Exception e) {\r
- throw new ServiceException(e);\r
- }\r
- }\r
-\r
- @Override\r
- public void deny(Resource subject, Resource predicate, Resource inverse,\r
- Resource object) throws ServiceException {\r
- assert(subject != null);\r
- assert(predicate != null);\r
- assert(object != null);\r
- try {\r
- writeByte(DENY);\r
- writeInt(getId(subject));\r
- writeInt(getId(predicate));\r
- if(inverse != null) writeInt(getId(inverse));\r
- else writeInt(0);\r
- writeInt(getId(object));\r
- } catch(Exception e) {\r
- throw new ServiceException(e);\r
- }\r
- }\r
-\r
- @Override\r
- public void deny(Resource subject, Resource predicate, Resource inverse,\r
- Resource object, VirtualGraph graph) throws ServiceException {\r
- throw new UnsupportedOperationException();\r
- }\r
- \r
- @Override\r
- public void claimValue(Resource resource, Object value)\r
- throws ServiceException {\r
- try {\r
- Binding binding = Bindings.getBinding(value.getClass());\r
- claimValue(resource, value, binding);\r
- } catch (BindingConstructionException e) {\r
- throw new ServiceException(e);\r
- }\r
- }\r
-\r
- @Override\r
- public void claimValue(Resource resource, Object value, Binding binding)\r
- throws ServiceException {\r
- try {\r
- \r
- writeByte(CLAIM_VALUE_B);\r
- writeInt(getId(resource));\r
- Serializer serializer = binding.serializer();\r
- int size = serializer.getSize(value);\r
- writeInt(size);\r
- serializer.serialize(new OutputStream() {\r
- \r
- @Override\r
- public void write(int b) throws IOException {\r
- writeByte(b);\r
- }\r
- \r
- }, value);\r
- \r
- } catch(IOException e) {\r
- Logger.defaultLogError(e);\r
- throw new ServiceException(e);\r
- }\r
- }\r
-\r
- @Override\r
- public void denyValue(Resource resource) throws ServiceException {\r
- writeByte(DENY_VALUE);\r
- writeInt(getId(resource));\r
- }\r
-\r
- @Override\r
- public void denyValue(Resource resource, VirtualGraph graph) throws ServiceException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public void flushCluster() throws ServiceException {\r
- writeState.defaultCluster = newCluster();\r
- }\r
-\r
- @Override\r
- public void flushCluster(Resource r) throws ServiceException {\r
- throw new ServiceException("Operation flushCluster(" + r + " not implemented.");\r
- }\r
- \r
- private FileChannel channel;\r
- byte[] bytes = new byte[BUFFER];\r
- byte[] buffer = new byte[BUFFER];\r
- ByteBuffer bb = ByteBuffer.wrap(bytes);\r
- int byteIndex = 0;\r
- \r
- private void writeReset(int size) {\r
-\r
- byteIndex = 0;\r
- bb.position(0);\r
- bb.limit(size);\r
- try {\r
-\r
- if(writeState.tempFile == null) {\r
-\r
- File workspace = Platform.getLocation().toFile();\r
- File temp = new File(workspace, "tempFiles");\r
- temp.mkdirs();\r
-\r
- File base = new File(temp, "delayed");\r
- base.mkdirs();\r
- writeState.tempFile = new File(base, UUID.randomUUID().toString());\r
- writeState.out = new FileOutputStream(writeState.tempFile);\r
- channel = writeState.out.getChannel();\r
- \r
- }\r
-\r
- for (int got=0;got < size;) {\r
- int n = channel.write(bb); \r
- if (n <= 0) {\r
- new Exception().printStackTrace();\r
- return;\r
- }\r
- got += n;\r
- }\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- }\r
- }\r
-\r
- private void reset() {\r
- byteIndex = 0;\r
- try {\r
- bb.clear();\r
- for(int got=0; got < BUFFER;) {\r
- int n = channel.read(bb); \r
- if (n <= 0)\r
- return;\r
- got += n;\r
- }\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- }\r
- }\r
-\r
- private void writeInt(int i) {\r
- if(byteIndex < (BUFFER-4)) {\r
- bytes[byteIndex++] = (byte)(i&0xff);\r
- bytes[byteIndex++] = (byte)((i>>>8)&0xff);\r
- bytes[byteIndex++] = (byte)((i>>>16)&0xff);\r
- bytes[byteIndex++] = (byte)((i>>>24)&0xff);\r
- if (byteIndex == BUFFER)\r
- writeReset(BUFFER);\r
- } else {\r
- int has = BUFFER-byteIndex;\r
- if(has == 0) writeReset(BUFFER);\r
- bytes[byteIndex++] = (byte)(i&0xff);\r
- if(has == 1) writeReset(BUFFER);\r
- bytes[byteIndex++] = (byte)((i>>>8)&0xff);\r
- if(has == 2) writeReset(BUFFER);\r
- bytes[byteIndex++] = (byte)((i>>>16)&0xff);\r
- if(has == 3) writeReset(BUFFER);\r
- bytes[byteIndex++] = (byte)((i>>>24)&0xff);\r
- if(has == 4) writeReset(BUFFER);\r
- }\r
- }\r
- \r
- private int readInt() {\r
- if(byteIndex < (BUFFER-4)) {\r
- int result = (int) \r
- ((bytes[byteIndex++] & 0xff) | \r
- ((bytes[byteIndex++] & 0xff)<<8) | \r
- ((bytes[byteIndex++] & 0xff)<<16) | \r
- ((bytes[byteIndex++] & 0xff)<<24)); \r
- return result;\r
- } else {\r
- int has = BUFFER-byteIndex;\r
- int result = 0;\r
- if(has == 0) reset();\r
- result = (int)(bytes[byteIndex++] & 0xff);\r
- if(has == 1) reset();\r
- result |= (int)((bytes[byteIndex++] & 0xff) <<8);\r
- if(has == 2) reset();\r
- result |= (int)((bytes[byteIndex++] & 0xff) <<16);\r
- if(has == 3) reset();\r
- result |= (int)((bytes[byteIndex++] & 0xff) <<24);\r
- if(has == 4) reset();\r
- return result;\r
- }\r
- }\r
- \r
- private byte readByte() {\r
- byte result = bytes[byteIndex++]; \r
- if(byteIndex == BUFFER) reset();\r
- return result;\r
- }\r
-\r
- private void writeByte(int b) {\r
- bytes[byteIndex++] = (byte)b;\r
- if(byteIndex == BUFFER) writeReset(BUFFER);\r
- }\r
- \r
- private void writeBytes(byte[] data) {\r
- int has = BUFFER-byteIndex;\r
- int amount = data.length;\r
- if(has > amount) {\r
- System.arraycopy(data, 0, bytes, byteIndex, amount);\r
- byteIndex += amount;\r
- } else {\r
- System.arraycopy(data, 0, bytes, byteIndex, has);\r
- writeReset(BUFFER);\r
- ByteBuffer bb2 = ByteBuffer.wrap(data);\r
- bb2.position(has);\r
- try {\r
- channel.write(bb2);\r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- }\r
- }\r
- }\r
- \r
- public byte[] readBytes(int amount) {\r
- return readBytes(buffer, amount);\r
- }\r
- \r
- public byte[] readBytes(byte[] result, int amount) {\r
- if(result == null) result = new byte[amount];\r
- int has = BUFFER-byteIndex;\r
- if(has > amount) {\r
- System.arraycopy(bytes, byteIndex, result, 0, amount);\r
- byteIndex += amount;\r
- } else {\r
- System.arraycopy(bytes, byteIndex, result, 0, has);\r
- ByteBuffer bb2 = ByteBuffer.wrap(result);\r
- bb2.position(has);\r
- for(int got=has;got<amount;)\r
- try {\r
- got += channel.read(bb2);\r
- if(got == -1) {\r
- new Exception().printStackTrace();\r
- return result;\r
- }\r
- \r
- } catch (IOException e) {\r
- e.printStackTrace();\r
- }\r
- reset();\r
- }\r
- return result;\r
- }\r
- \r
- public void commit(final WriteOnlyGraph w, final WriteTraits traits) throws ServiceException {\r
- writeState.bindingToId = null;\r
- writeState.externalToId = null;\r
- try {\r
- \r
- writeByte(TERM);\r
- if(byteIndex > 0) {\r
- writeReset(byteIndex);\r
- }\r
- channel.force(false);\r
- writeState.out.close();\r
-\r
- FileInputStream fs = new FileInputStream(writeState.tempFile);\r
- channel = fs.getChannel();\r
- \r
- } catch (IOException e) {\r
- throw new ServiceException(e);\r
- }\r
- \r
- w.getMetadata().putAll(metadata);\r
- \r
- TransferableGraphSupport tgs = w.getService(TransferableGraphSupport.class);\r
-\r
- // First create all resources defined by clusterId\r
- MapList<Long,InternalResource> clusterAssignment = new MapList<Long,InternalResource>();\r
- for(Resource r : writeState.idToResource) {\r
- if(r instanceof InternalResource) {\r
- InternalResource ir = (InternalResource)r;\r
- if(ir.clusterId < 0) {\r
- if (DEBUG)\r
- System.out.println("ASSIGN CLUSTER " + ir + " => " + ir.clusterId);\r
- clusterAssignment.add(ir.clusterId, ir);\r
- } else if(ir.clusterId > 0) {\r
- if (DEBUG)\r
- System.out.println("-CREATED RESOURCE WITH EXISTING CLUSTER ID: " + ir);\r
- ir.resource = w.newResource(ir.clusterId);\r
- writeState.idToResource.set(ir.id, ir.resource);\r
- if (writeState.clusterSets.contains(ir)) {\r
- w.newClusterSet(ir.resource);\r
- if (DEBUG)\r
- System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource);\r
- }\r
- \r
- }\r
- }\r
- }\r
- \r
- for(Long clusterKey : clusterAssignment.getKeys()) {\r
- if (DEBUG)\r
- System.out.println("CREATE LOGICAL CLUSTER: " + clusterKey);\r
- w.flushCluster();\r
- for(InternalResource ir : clusterAssignment.getValuesUnsafe(clusterKey)) {\r
- if (DEBUG)\r
- System.out.println("-CREATED RESOURCE: " + ir);\r
- ir.resource = w.newResource();\r
- writeState.idToResource.set(ir.id, ir.resource);\r
- if (writeState.clusterSets.contains(ir)) {\r
- w.newClusterSet(ir.resource);\r
- if (DEBUG)\r
- System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource);\r
- }\r
- }\r
- }\r
- \r
- // Create cluster sets for all existing resources (not InternalResource)\r
- // before proceeding to create resources.\r
- for(Resource existingResource : writeState.clusterSetsForExistingResources) {\r
- w.newClusterSet(existingResource);\r
- if (DEBUG)\r
- System.out.println("CREATED NEW CLUSTER SET: " + existingResource);\r
- }\r
- \r
- // Then create all resources defined by cluster set\r
- for(Resource r : writeState.idToResource) {\r
- if(r instanceof InternalResource) {\r
- InternalResource ir = (InternalResource)r;\r
- Resource clusterSet = ir.clusterSet;\r
- \r
- if (clusterSet != null) {\r
- if (DEBUG)\r
- System.out.println("NEW RESOURCE " + ir + " for cluster set " + clusterSet);\r
- if(clusterSet instanceof InternalResource) {\r
- ir.resource = w.newResource(((InternalResource)clusterSet).resource);\r
- } else {\r
- ir.resource = w.newResource(clusterSet);\r
- }\r
- if (DEBUG)\r
- System.out.println(" => " + ir.resource);\r
- writeState.idToResource.set(ir.id, ir.resource);\r
- if(writeState.clusterSets.contains(ir)) {\r
- if (DEBUG)\r
- System.out.println(" ==> NEW CLUSTER SET");\r
- w.newClusterSet(ir.resource);\r
- }\r
- }\r
- \r
- }\r
- }\r
- \r
- reset();\r
- bb.limit(BUFFER);\r
- \r
- try {\r
- while(true) {\r
- byte method = readByte();\r
- switch(method) { \r
- case TERM: {\r
- return;\r
- }\r
- case CLAIM: {\r
- writeState.statementCount += 2;\r
- Resource subject = getResource(readInt());\r
- Resource predicate = getResource(readInt());\r
- Resource inverse = getResource(readInt());\r
- Resource object = getResource(readInt());\r
- w.claim(subject, predicate, inverse, object);\r
- } break;\r
- case CLAIM_NOINVERSE: {\r
- ++writeState.statementCount;\r
- Resource subject = getResource(readInt());\r
- Resource predicate = getResource(readInt());\r
- Resource object = getResource(readInt());\r
- w.claim(subject, predicate, null, object);\r
- } break;\r
- case DENY: {\r
- Resource subject = getResource(readInt());\r
- Resource predicate = getResource(readInt());\r
- int inv = readInt();\r
- Resource inverse = null;\r
- if(inv > 0) inverse = getResource(inv);\r
- Resource object = getResource(readInt());\r
- if(!subject.isPersistent() || !object.isPersistent()) {\r
- VirtualGraph statementProvider1 = processor.getProvider(subject, predicate, object);\r
- if(inv > 0) {\r
- VirtualGraph statementProvider2 = processor.getProvider(object, inverse, subject);\r
- if(statementProvider2 != null)\r
- w.deny(object, inverse, null, subject, statementProvider2);\r
- }\r
- if(statementProvider1 != null)\r
- w.deny(subject, predicate, null, object, statementProvider1);\r
- } else {\r
- w.deny(subject, predicate, inverse, object, null);\r
- }\r
- } break;\r
- case DENY_VALUE: {\r
- Resource subject = getResource(readInt());\r
- if(!subject.isPersistent()) {\r
- VirtualGraph provider = processor.getValueProvider(subject);\r
- if(provider != null)\r
- w.denyValue(subject, provider);\r
- } else {\r
- w.denyValue(subject);\r
- }\r
- } break;\r
- case CLAIM_VALUE_B: {\r
- ++writeState.valueCount;\r
- Resource resource = getResource(readInt());\r
- int len = readInt();\r
- tgs.setValue(w, resource, null, this, len);\r
-// byte[] bytes = readBytes(len);\r
-// tgs.setValue(resource, null, bytes);\r
- } break;\r
- case COMMIT_AND_CONTINUE: {\r
- XSupport xs = w.getService(XSupport.class);\r
- xs.commitAndContinue(w, traits);\r
- } break;\r
- }\r
- }\r
- } catch(Exception e) {\r
- if(e instanceof ServiceException)\r
- throw (ServiceException)e;\r
- else\r
- throw new ServiceException(e);\r
- } finally {\r
- try {\r
- channel.close();\r
- channel = null;\r
- } catch (IOException e) {\r
- throw new ServiceException(e);\r
- }\r
- }\r
-// System.out.println("Resources: " + state.resourceCount);\r
-// System.out.println("Statements: " + state.statementCount);\r
-// System.out.println("Values: " + state.valueCount);\r
- }\r
- \r
- private Resource getType(Object value) {\r
- Class<?> clazz = value.getClass();\r
- Resource dataType = \r
- clazz == Float.class ? b.Float\r
- : clazz == Double.class ? b.Double\r
- : clazz == Integer.class ? b.Integer\r
- : clazz == String.class ? b.String\r
- : clazz == Boolean.class ? b.Boolean\r
- : clazz == Byte.class ? b.Byte\r
- : clazz == Long.class ? b.Long\r
- : clazz == float[].class ? b.FloatArray\r
- : clazz == double[].class ? b.DoubleArray\r
- : clazz == int[].class ? b.IntegerArray\r
- : clazz == String[].class ? b.StringArray\r
- : clazz == boolean[].class ? b.BooleanArray\r
- : clazz == byte[].class ? b.ByteArray\r
- : clazz == long[].class ? b.LongArray \r
- : null\r
- ;\r
- return dataType;\r
- }\r
- \r
- public long newCluster() {\r
- return -1 - (++writeState.clusterCount);\r
- }\r
-\r
- public long getDefaultCluster() {\r
- return writeState.defaultCluster;\r
- }\r
- \r
- public void setDefaultCluster(long cluster) {\r
- writeState.defaultCluster = cluster;\r
- }\r
- \r
- @Override\r
- public void syncRequest(final DelayedWrite request) throws DatabaseException {\r
- \r
- try {\r
-\r
- final DelayedWriteGraph dwg = new DelayedWriteGraph(this);\r
- request.perform(dwg);\r
-\r
- syncRequest(new WriteOnlyRequest() {\r
-\r
- @Override\r
- public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
- dwg.commit(graph, request);\r
- }\r
-\r
- });\r
-\r
- } catch (DatabaseException e) {\r
- \r
- throw e;\r
- \r
- } catch (Throwable e) {\r
- \r
- throw new DatabaseException(e);\r
- \r
- } finally {\r
- \r
- }\r
- \r
- }\r
- \r
- @Override\r
- public void syncRequest(WriteOnly request) throws DatabaseException {\r
- \r
- Resource defaultClusterSet = setClusterSet4NewResource(null);\r
- \r
- try {\r
- WriteSupport ws = session.getService(WriteSupport.class);\r
- ws.performWriteRequest(this, request);\r
- } catch (DatabaseException e) {\r
- throw e;\r
- } catch (Throwable t) {\r
- throw new DatabaseException(t);\r
- } finally {\r
- setClusterSet4NewResource(defaultClusterSet);\r
- }\r
- \r
- }\r
-\r
- @SuppressWarnings("unchecked")\r
- @Override\r
- public <T> T getService(Class<T> api) {\r
- \r
- if(ClusteringSupport.class == api) {\r
-\r
- final ClusteringSupport support = (ClusteringSupport)super.getService(api); \r
-\r
- return (T)new ClusteringSupport() {\r
- \r
- @Override\r
- public Resource getResourceByIndexAndCluster(int resourceIndex, long clusterId)\r
- throws DatabaseException, ResourceNotFoundException {\r
- return support.getResourceByIndexAndCluster(resourceIndex, clusterId);\r
- }\r
- \r
- @Override\r
- public Resource getResourceByKey(int resourceKey) throws ResourceNotFoundException {\r
- return support.getResourceByKey(resourceKey);\r
- }\r
- \r
- @Override\r
- public int getNumberOfResources(long clusterId)\r
- throws DatabaseException {\r
- return support.getNumberOfResources(clusterId);\r
- }\r
- \r
- @Override\r
- public long getCluster(Resource r) {\r
- return support.getCluster(r);\r
- }\r
- \r
- @Override\r
- public long createCluster() {\r
- return newCluster();\r
- }\r
- \r
- @Override\r
- public boolean isClusterSet(Resource r) throws DatabaseException {\r
- return support.isClusterSet(r);\r
- }\r
-\r
- @Override\r
- public Resource getClusterSetOfCluster(Resource r) throws DatabaseException {\r
- return support.getClusterSetOfCluster(r);\r
- }\r
- \r
- @Override\r
- public Resource getClusterSetOfCluster(long cluster) throws DatabaseException {\r
- return support.getClusterSetOfCluster(cluster);\r
- }\r
-\r
- };\r
- \r
- } else if (TransferableGraphSupport.class == api) {\r
- \r
- final TransferableGraphSupport parentSupport = session.getService(TransferableGraphSupport.class);\r
- \r
- return (T)new TransferableGraphSupport() {\r
-\r
- @Override\r
- public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, byte[] raw) {\r
- writeByte(CLAIM_VALUE_B);\r
- writeInt(getId(resource));\r
- writeInt(raw.length);\r
- writeBytes(raw);\r
- writeInt(getBindingId(haxx));\r
- }\r
- \r
- @Override\r
- public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, ByteReader reader, int amount)\r
- throws DatabaseException {\r
- writeByte(CLAIM_VALUE_B);\r
- writeInt(getId(resource));\r
- writeInt(amount);\r
- writeBytes(reader.readBytes(null, amount));\r
- writeInt(getBindingId(haxx));\r
- }\r
- \r
- @Override\r
- public byte[] getValue(ReadGraph graph, Resource resource) {\r
- return parentSupport.getValue(graph, resource);\r
- }\r
- \r
- @Override\r
- public InputStream getValueStream(ReadGraph graph, Resource resource) {\r
- return parentSupport.getValueStream(graph, resource);\r
- }\r
- \r
- };\r
-\r
- }\r
- \r
- return super.getService(api);\r
- \r
- }\r
-\r
- @Override\r
- public <T> void addMetadata(Metadata data) throws ServiceException {\r
- MetadataUtils.addMetadata(session, metadata, data);\r
- }\r
- \r
- public void addCommitAndContinue() {\r
- writeByte(COMMIT_AND_CONTINUE);\r
- }\r
-\r
- @Override\r
- public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {\r
- return MetadataUtils.getMetadata(session, metadata, clazz);\r
- }\r
-\r
- @Override\r
- public TreeMap<String, byte[]> getMetadata() {\r
- return metadata;\r
- }\r
- @Override\r
- public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {\r
- \r
- return request.perform(this);\r
- \r
- }\r
-\r
- @Override\r
- public VirtualGraph getProvider() {\r
- return null;\r
- }\r
-\r
- @Override\r
- public void clearUndoList(WriteTraits writeTraits) {\r
- WriteSupport ws = session.getService(WriteSupport.class);\r
- if (null != ws)\r
- ws.clearUndoList(writeTraits);\r
- }\r
-\r
- @Override\r
- public void markUndoPoint() {\r
- // TODO Auto-generated method stub\r
- \r
- }\r
-\r
- @Override\r
- public <T> T getPossibleRelatedValue(final Resource subject, final Resource relation, final Binding binding)\r
- throws ManyObjectsForFunctionalRelationException, BindingException, ServiceException {\r
- if(!(subject instanceof InternalResource)) {\r
- return super.getPossibleRelatedValue(subject, relation, binding);\r
- } else {\r
- return null;\r
- }\r
- }\r
- \r
- @Override\r
- final public Resource getPossibleObject(final Resource subject, final Resource relation)\r
- throws ManyObjectsForFunctionalRelationException, ServiceException {\r
- if(!(subject instanceof InternalResource)) {\r
- return super.getPossibleObject(subject, relation);\r
- } else {\r
- return null;\r
- }\r
- }\r
-\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.db.impl.graph;
+
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.eclipse.core.runtime.Platform;
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.accessor.Accessor;
+import org.simantics.databoard.binding.Binding;
+import org.simantics.databoard.binding.error.BindingConstructionException;
+import org.simantics.databoard.serialization.Serializer;
+import org.simantics.databoard.type.Datatype;
+import org.simantics.databoard.util.binary.RandomAccessBinary;
+import org.simantics.db.Metadata;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.Resource;
+import org.simantics.db.Session;
+import org.simantics.db.Statement;
+import org.simantics.db.VirtualGraph;
+import org.simantics.db.WriteGraph;
+import org.simantics.db.WriteOnlyGraph;
+import org.simantics.db.common.MetadataUtils;
+import org.simantics.db.common.request.WriteOnlyRequest;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.BindingException;
+import org.simantics.db.exception.ClusterSetExistException;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
+import org.simantics.db.exception.NoSingleResultException;
+import org.simantics.db.exception.ResourceNotFoundException;
+import org.simantics.db.exception.ServiceException;
+import org.simantics.db.impl.DebugPolicy;
+import org.simantics.db.impl.ResourceImpl;
+import org.simantics.db.request.DelayedWrite;
+import org.simantics.db.request.WriteOnly;
+import org.simantics.db.request.WriteResult;
+import org.simantics.db.request.WriteTraits;
+import org.simantics.db.service.ByteReader;
+import org.simantics.db.service.ClusteringSupport;
+import org.simantics.db.service.TransferableGraphSupport;
+import org.simantics.db.service.XSupport;
+import org.simantics.layer0.Layer0;
+import org.simantics.utils.datastructures.MapList;
+
+import gnu.trove.map.hash.TIntIntHashMap;
+import gnu.trove.map.hash.TObjectIntHashMap;
+
+/**
+ * Write graph implementation that does not modify the database
+ * immediately but with an explicit commit method. All read operations
+ * return results based on the old graph.
+ */
+public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, ByteReader, Closeable {
+
+ private static final boolean DEBUG = false;
+ private static final int BUFFER_SIZE = 512*1024;
+
+ private static final PassthroughSerializerBinding PASSTHROUGH = new PassthroughSerializerBinding();
+
+ private final int TERM = 0;
+ private final int CLAIM = 1;
+ private final int CLAIM_NOINVERSE = 2;
+ private final int CLAIM_VALUE_B = 4;
+ private final int DENY = 5;
+ private final int DENY_VALUE = 6;
+ private final int COMMIT_AND_CONTINUE = 7;
+
+ private static class State {
+ public File tempFile;
+ public FileOutputStream out;
+ public FileInputStream in;
+ public ArrayList<Resource> idToResource = new ArrayList<Resource>();
+ public TIntIntHashMap externalToId = new TIntIntHashMap();
+ public ArrayList<Binding> idToBinding = new ArrayList<Binding>();
+ public TObjectIntHashMap<Binding> bindingToId = new TObjectIntHashMap<Binding>();
+ public Set<Resource> clusterSets = new HashSet<Resource>();
+ public Set<Resource> clusterSetsForExistingResources = new HashSet<Resource>();
+ public int clusterCount = 0;
+ public long defaultCluster;
+ public Resource defaultClusterSet;
+ public int statementCount = 0;
+ public int valueCount = 0;
+ public int fileCount = 0;
+ }
+
+ private State writeState;
+ private TreeMap<String,byte[]> metadata = new TreeMap<String,byte[]>();
+
+ private FileChannel channel;
+ private byte[] bytes = new byte[BUFFER_SIZE];
+ private ByteBuffer bb = ByteBuffer.wrap(bytes);
+ private int byteIndex = 0;
+
+ private Layer0 b;
+ private Session session;
+
+ public static Resource convertDelayedResource(Resource r) {
+ if (r instanceof InternalResource)
+ return ((InternalResource) r).resource;
+ return r;
+ }
+
+ private static class InternalResource implements Resource {
+
+ int id;
+ long clusterId = 0;
+ Resource resource = null;
+ Resource clusterSet = null;
+
+ public InternalResource(int id, long clusterId) {
+ this.id = id;
+ this.clusterId = clusterId;
+ }
+
+ public InternalResource(int id, Resource clusterSet) {
+ this.id = id;
+ this.clusterSet = clusterSet;
+ }
+
+ @Override
+ public long getResourceId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Resource get() {
+ return this;
+ }
+
+ @Override
+ public boolean isPersistent() {
+ return false;
+ }
+
+ @Override
+ public int compareTo(Resource o) {
+ if(o instanceof InternalResource)
+ return Integer.compare(id, ((InternalResource)o).id);
+ return -1;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + id;
+ return result;
+ }
+
+ @Override
+ public int getThreadHash() {
+ return hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (!(obj instanceof InternalResource))
+ return false;
+ InternalResource other = (InternalResource) obj;
+ if (id != other.id)
+ return false;
+ return true;
+ }
+
+ @Override
+ public boolean equalsResource(Resource other) {
+ return equals(other);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(32);
+ if(DebugPolicy.VERBOSE) {
+ sb.append("[delayed id=");
+ sb.append(id);
+ sb.append("]");
+ } else {
+ sb.append("[did=");
+ sb.append(id);
+ sb.append("]");
+ }
+ return sb.toString();
+ }
+ }
+
+ private int getId(Resource resource) {
+ if(resource instanceof InternalResource)
+ return ((InternalResource)resource).id;
+ else {
+ ResourceImpl r = (ResourceImpl)resource;
+ int id = writeState.externalToId.get(r.id);
+ if(id != 0) {
+ return id;
+ } else {
+ id = writeState.idToResource.size();
+ writeState.idToResource.add(resource);
+ writeState.externalToId.put(r.id, id);
+ return id;
+ }
+ }
+ }
+
+ private Resource getResource(int id) {
+ return writeState.idToResource.get(id);
+ }
+
+ private int getBindingId(Binding binding) {
+ if(writeState.bindingToId.contains(binding))
+ return writeState.bindingToId.get(binding);
+ else {
+ int id = writeState.idToBinding.size();
+ writeState.idToBinding.add(binding);
+ writeState.bindingToId.put(binding, id);
+ return id;
+ }
+ }
+
+ public DelayedWriteGraph(ReadGraph g) throws IOException {
+ super((ReadGraphImpl)g);
+ writeState = new State();
+ session = g.getSession();
+ b = Layer0.getInstance(g);
+ writeState.defaultCluster = newCluster();
+ }
+
+ public DelayedWriteGraph(ReadGraph g, State state) {
+ super((ReadGraphImpl)g);
+ session = g.getSession();
+ b = Layer0.getInstance(g);
+ this.writeState = state;
+ }
+
+ public DelayedWriteGraph newSync() {
+ return new DelayedWriteGraph(this, writeState);
+ }
+
+ @Override
+ public void claim(Resource subject, Resource predicate, Resource object)
+ throws ServiceException {
+ assert(subject != null);
+ assert(predicate != null);
+ assert(object != null);
+
+ Resource inverse = getPossibleInverse(predicate);
+ claim(subject, predicate, inverse, object);
+ }
+
+ @Override
+ public void addLiteral(Resource resource, Resource predicate, Resource inverse, Resource type, Object value,
+ Binding binding) throws BindingException,
+ ManyObjectsForFunctionalRelationException, ServiceException {
+ Resource valueResource = newResource();
+ claimValue(valueResource, value, binding);
+ claim(valueResource, b.InstanceOf, null, type);
+ claim(resource, predicate, inverse, valueResource);
+ }
+
+ @Override
+ public void addLiteral(Resource resource, Resource predicate,
+ Resource inverse, Object value, Binding binding)
+ throws BindingException, ManyObjectsForFunctionalRelationException,
+ ServiceException {
+
+ Resource type = getType(value);
+ if(type == null) {
+ Resource literal = newResource();
+ type = b.Literal;
+ Resource dataType = newResource();
+ claim(dataType, b.InstanceOf, null, b.DataType);
+ claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL);
+ claim(literal, b.HasDataType, null, dataType);
+ claim(literal, b.InstanceOf, null, type);
+ claimValue(literal, value, binding);
+ claim(resource, predicate, inverse, literal);
+ } else {
+ addLiteral(resource, predicate, inverse, type, value, binding);
+ }
+
+ }
+
+ @Override
+ public <T extends Accessor> T newLiteral(Resource resource, Resource predicate, Datatype datatype, Object initialValue)
+ throws DatabaseException {
+
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public RandomAccessBinary createRandomAccessBinary (Resource resource, Resource predicate, Datatype datatype, Object initialValue) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RandomAccessBinary createRandomAccessBinary(Resource resource, Datatype datatype, Object initialValue) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void claimLiteral(Resource resource, Resource predicate, Object value) throws ManyObjectsForFunctionalRelationException, ServiceException {
+
+ try {
+ Binding b = Bindings.getBinding(value.getClass());
+ claimLiteral(resource, predicate, value, b);
+ } catch (BindingConstructionException e) {
+ throw new IllegalArgumentException(e);
+ } catch (BindingException e) {
+ throw new IllegalArgumentException(e);
+ }
+
+ }
+
+ @Override
+ public void claimLiteral(Resource resource, Resource predicate, Object value, Binding binding) throws BindingException,
+ ManyObjectsForFunctionalRelationException, ServiceException {
+
+ Statement valueStatement = null;
+ if(!(resource instanceof InternalResource)) valueStatement = getPossibleStatement(resource, predicate);
+
+ if(valueStatement != null && resource.equals(valueStatement.getSubject())) {
+
+ claimValue(valueStatement.getObject(), value, binding);
+
+ } else {
+
+ Resource type = getType(value);
+ Resource literal = newResource();
+ if (type == null) {
+ type = b.Literal;
+ Resource dataType = newResource();
+ claim(dataType, b.InstanceOf, null, b.DataType);
+ claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL);
+ claim(literal, b.HasDataType, dataType);
+ }
+ claim(literal, b.InstanceOf, null, type);
+ claimValue(literal, value, binding);
+ claim(resource, predicate, literal);
+
+ }
+
+ }
+
+ @Override
+ public void claimLiteral(Resource resource, Resource predicate,
+ Resource inverse, Resource type, Object value)
+ throws BindingException, ManyObjectsForFunctionalRelationException,
+ ServiceException {
+
+ try {
+ Binding b = Bindings.getBinding(value.getClass());
+ claimLiteral(resource, predicate, inverse, type, value, b);
+ } catch (BindingConstructionException e) {
+ throw new IllegalArgumentException(e);
+ } catch (BindingException e) {
+ throw new IllegalArgumentException(e);
+ }
+
+ }
+
+ @Override
+ public void claimLiteral(Resource resource, Resource predicate,
+ Resource inverse, Resource type, Object value, Binding binding)
+ throws BindingException, ManyObjectsForFunctionalRelationException,
+ ServiceException {
+
+ Statement valueStatement = (resource instanceof InternalResource) ? null : getPossibleStatement(resource, predicate);
+
+ if(valueStatement != null && resource.equals(valueStatement.getSubject())) {
+
+ claimValue(valueStatement.getObject(), value, binding);
+
+ } else {
+
+ Resource valueResource = newResource();
+ claim(valueResource, b.InstanceOf, null, type);
+ claim(resource, predicate, inverse, valueResource);
+ claimValue(valueResource, value, binding);
+
+ }
+
+ }
+
+ @Override
+ public void claimLiteral(Resource resource, Resource predicate,
+ Resource type, Object value) throws BindingException,
+ ManyObjectsForFunctionalRelationException, ServiceException {
+
+ try {
+ Binding b = Bindings.getBinding(value.getClass());
+ claimLiteral(resource, predicate, type, value, b);
+ } catch (BindingConstructionException e) {
+ throw new IllegalArgumentException(e);
+ } catch (BindingException e) {
+ throw new IllegalArgumentException(e);
+ }
+
+ }
+
+ @Override
+ public void claimLiteral(Resource resource, Resource predicate,
+ Resource type, Object value, Binding binding)
+ throws BindingException, ManyObjectsForFunctionalRelationException,
+ ServiceException {
+
+ try {
+ Resource inverse = getSingleObject(predicate, b.InverseOf);
+ claimLiteral(resource, predicate, inverse, type, value, binding);
+ } catch (NoSingleResultException e) {
+ throw new ServiceException(e);
+ }
+
+ }
+
+ @Override
+ public void deny(Resource subject) throws ServiceException {
+ assert(subject != null);
+ if(!(subject instanceof InternalResource)) {
+ try {
+ for (Statement statement : getStatements(subject, b.IsWeaklyRelatedTo)) {
+ deny(statement);
+ }
+ } catch (ManyObjectsForFunctionalRelationException e) {
+ throw new ServiceException(e);
+ }
+ }
+ }
+
+ @Override
+ public void deny(Resource subject, Resource predicate)
+ throws ServiceException {
+ assert(subject != null);
+ if(!(subject instanceof InternalResource)) {
+ for (Resource object : getObjects(subject, predicate)) {
+ deny(subject, predicate, object);
+ }
+ }
+ }
+
+ @Override
+ public void deny(Resource subject, Resource predicate, Resource object) throws ServiceException {
+ denyStatement(subject, predicate, object);
+ }
+
+ @Override
+ public void denyStatement(Resource subject, Resource predicate, Resource object) throws ServiceException {
+ deny(subject, predicate, getPossibleInverse(predicate), object);
+ }
+
+ @Override
+ public void deny(Statement statement) throws ServiceException {
+ Resource predicate = statement.getPredicate();
+ deny(statement.getSubject(), predicate, getPossibleInverse(predicate), statement.getObject());
+ }
+
+ @Override
+ public void denyValue(Resource resource, Resource predicate)
+ throws ManyObjectsForFunctionalRelationException, ServiceException {
+ assert(resource != null);
+ assert(predicate != null);
+
+ if(!(resource instanceof InternalResource)) {
+ Statement valueStatement = getPossibleStatement(resource, predicate);
+
+ if (valueStatement != null && !valueStatement.isAsserted(resource)) {
+ Resource value = valueStatement.getObject();
+ deny(valueStatement);
+ denyValue(value);
+ }
+ }
+ }
+
+ @Override
+ public Resource newResource() throws ServiceException {
+ if(writeState.defaultClusterSet != null) return newResource(writeState.defaultClusterSet);
+ else return newResource(writeState.defaultCluster);
+ }
+
+ @Override
+ public Resource newResource(long clusterId) throws ServiceException {
+ int id = writeState.idToResource.size();
+ InternalResource ret = new InternalResource(id, clusterId);
+ writeState.idToResource.add(ret);
+ return ret;
+ }
+
+ @Override
+ public Resource newResource(Resource clusterSet) throws ServiceException {
+
+ if ((clusterSet instanceof InternalResource)) {
+ if(!writeState.clusterSets.contains(clusterSet))
+ throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
+ } else {
+ WriteSupport ws = session.getService(WriteSupport.class);
+ if (!ws.hasClusterSet(null, clusterSet))
+ if(!writeState.clusterSetsForExistingResources.contains(clusterSet))
+ throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
+ }
+
+ int id = writeState.idToResource.size();
+ InternalResource ret = new InternalResource(id, clusterSet);
+ writeState.idToResource.add(ret);
+ return ret;
+ }
+
+ @Override
+ public void newClusterSet(Resource clusterSet) throws ServiceException {
+ if (DEBUG)
+ System.out.println("new cluster set=" + clusterSet);
+ boolean existingResource = !(clusterSet instanceof InternalResource);
+ if (existingResource) {
+ WriteSupport ws = session.getService(WriteSupport.class);
+ if (ws.hasClusterSet(null, clusterSet))
+ throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
+ writeState.clusterSetsForExistingResources.add(clusterSet);
+ } else {
+ if(!writeState.clusterSets.add(clusterSet))
+ throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
+ }
+ }
+ @Override
+ public Resource setClusterSet4NewResource(Resource clusterSet)
+ throws ServiceException {
+ Resource existing = writeState.defaultClusterSet;
+ writeState.defaultClusterSet = clusterSet;
+ return existing;
+ }
+
+ @Override
+ public void claim(Resource subject, Resource predicate, Resource inverse,
+ Resource object) throws ServiceException {
+ assert(subject != null);
+ assert(predicate != null);
+ assert(object != null);
+ try {
+ if(inverse != null && !(subject.equals(object) && inverse.equals(predicate))) {
+ writeByte(CLAIM);
+ writeInt(getId(subject));
+ writeInt(getId(predicate));
+ writeInt(getId(inverse));
+ writeInt(getId(object));
+ } else {
+ writeByte(CLAIM_NOINVERSE);
+ writeInt(getId(subject));
+ writeInt(getId(predicate));
+ writeInt(getId(object));
+ }
+ } catch(Exception e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public void deny(Resource subject, Resource predicate, Resource inverse,
+ Resource object) throws ServiceException {
+ assert(subject != null);
+ assert(predicate != null);
+ assert(object != null);
+ try {
+ writeByte(DENY);
+ writeInt(getId(subject));
+ writeInt(getId(predicate));
+ if(inverse != null) writeInt(getId(inverse));
+ else writeInt(0);
+ writeInt(getId(object));
+ } catch(Exception e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public void deny(Resource subject, Resource predicate, Resource inverse,
+ Resource object, VirtualGraph graph) throws ServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void claimValue(Resource resource, Object value)
+ throws ServiceException {
+ try {
+ Binding binding = Bindings.getBinding(value.getClass());
+ claimValue(resource, value, binding);
+ } catch (BindingConstructionException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ private OutputStream valueWriter = new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ writeByte(b);
+ }
+ };
+
+ @Override
+ public void claimValue(Resource resource, Object value, Binding binding)
+ throws ServiceException {
+ try {
+
+ writeByte(CLAIM_VALUE_B);
+ writeInt(getId(resource));
+ Serializer serializer = binding.serializer();
+ int size = serializer.getSize(value);
+ writeInt(size);
+ serializer.serialize(valueWriter, value);
+
+ } catch(IOException e) {
+ Logger.defaultLogError(e);
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public void denyValue(Resource resource) throws ServiceException {
+ writeByte(DENY_VALUE);
+ writeInt(getId(resource));
+ }
+
+ @Override
+ public void denyValue(Resource resource, VirtualGraph graph) throws ServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void flushCluster() throws ServiceException {
+ writeState.defaultCluster = newCluster();
+ }
+
+ @Override
+ public void flushCluster(Resource r) throws ServiceException {
+ throw new ServiceException("Operation flushCluster(" + r + " not implemented.");
+ }
+
+ private void writeReset(int size) {
+ byteIndex = 0;
+ bb.position(0);
+ bb.limit(size);
+ try {
+ if (writeState.tempFile == null) {
+ File workspace = Platform.getLocation().toFile();
+ File temp = new File(workspace, "tempFiles");
+ File base = new File(temp, "delayed");
+ Files.createDirectories(base.toPath());
+ writeState.tempFile = new File(base, UUID.randomUUID().toString());
+ writeState.out = new FileOutputStream(writeState.tempFile);
+ channel = writeState.out.getChannel();
+ }
+
+ for (int got=0;got < size;) {
+ int n = channel.write(bb);
+ if (n <= 0) {
+ Logger.defaultLogError(new Exception("FileChannel.write returned " + n));
+ return;
+ }
+ got += n;
+ }
+ } catch (IOException e) {
+ Logger.defaultLogError("Failed to write buffer of " + size + " bytes to temporary file " + writeState.tempFile, e);
+ }
+ }
+
+ private void reset() {
+ byteIndex = 0;
+ bb.clear();
+ if (channel != null) {
+ try {
+ for(int got=0; got < BUFFER_SIZE;) {
+ int n = channel.read(bb);
+ if (n <= 0)
+ return;
+ got += n;
+ }
+ } catch (IOException e) {
+ Logger.defaultLogError("FileChannel.read failed", e);
+ }
+ }
+ }
+
+ private void writeInt(int i) {
+ if(byteIndex < (BUFFER_SIZE-4)) {
+ bytes[byteIndex++] = (byte)(i&0xff);
+ bytes[byteIndex++] = (byte)((i>>>8)&0xff);
+ bytes[byteIndex++] = (byte)((i>>>16)&0xff);
+ bytes[byteIndex++] = (byte)((i>>>24)&0xff);
+ if (byteIndex == BUFFER_SIZE)
+ writeReset(BUFFER_SIZE);
+ } else {
+ int has = BUFFER_SIZE-byteIndex;
+ if(has == 0) writeReset(BUFFER_SIZE);
+ bytes[byteIndex++] = (byte)(i&0xff);
+ if(has == 1) writeReset(BUFFER_SIZE);
+ bytes[byteIndex++] = (byte)((i>>>8)&0xff);
+ if(has == 2) writeReset(BUFFER_SIZE);
+ bytes[byteIndex++] = (byte)((i>>>16)&0xff);
+ if(has == 3) writeReset(BUFFER_SIZE);
+ bytes[byteIndex++] = (byte)((i>>>24)&0xff);
+ if(has == 4) writeReset(BUFFER_SIZE);
+ }
+ }
+
+ private int readInt() {
+ if(byteIndex < (BUFFER_SIZE-4)) {
+ int result = (int)
+ ((bytes[byteIndex++] & 0xff) |
+ ((bytes[byteIndex++] & 0xff)<<8) |
+ ((bytes[byteIndex++] & 0xff)<<16) |
+ ((bytes[byteIndex++] & 0xff)<<24));
+ return result;
+ } else {
+ int has = BUFFER_SIZE-byteIndex;
+ int result = 0;
+ if(has == 0) reset();
+ result = (int)(bytes[byteIndex++] & 0xff);
+ if(has == 1) reset();
+ result |= (int)((bytes[byteIndex++] & 0xff) <<8);
+ if(has == 2) reset();
+ result |= (int)((bytes[byteIndex++] & 0xff) <<16);
+ if(has == 3) reset();
+ result |= (int)((bytes[byteIndex++] & 0xff) <<24);
+ if(has == 4) reset();
+ return result;
+ }
+ }
+
+ private byte readByte() {
+ byte result = bytes[byteIndex++];
+ if(byteIndex == BUFFER_SIZE) reset();
+ return result;
+ }
+
+ private void writeByte(int b) {
+ bytes[byteIndex++] = (byte)b;
+ if(byteIndex == BUFFER_SIZE) writeReset(BUFFER_SIZE);
+ }
+
+ private void writeBytes(byte[] data) {
+ int has = BUFFER_SIZE-byteIndex;
+ int amount = data.length;
+ if(has > amount) {
+ System.arraycopy(data, 0, bytes, byteIndex, amount);
+ byteIndex += amount;
+ } else {
+ System.arraycopy(data, 0, bytes, byteIndex, has);
+ writeReset(BUFFER_SIZE);
+ ByteBuffer bb2 = ByteBuffer.wrap(data);
+ bb2.position(has);
+ try {
+ channel.write(bb2);
+ } catch (IOException e) {
+ Logger.defaultLogError("FileChannel.write failed", e);
+ }
+ }
+ }
+
+ public byte[] readBytes(byte[] result, int amount) {
+ if(result == null) result = new byte[amount];
+ int has = BUFFER_SIZE-byteIndex;
+ if(has > amount) {
+ System.arraycopy(bytes, byteIndex, result, 0, amount);
+ byteIndex += amount;
+ } else {
+ System.arraycopy(bytes, byteIndex, result, 0, has);
+ ByteBuffer bb2 = ByteBuffer.wrap(result);
+ bb2.position(has);
+ for(int got=has;got<amount;)
+ try {
+ got += channel.read(bb2);
+ if(got == -1) {
+ // End-of-stream, why log this?
+ return result;
+ }
+ } catch (IOException e) {
+ Logger.defaultLogError("FileChannel.read failed", e);
+ }
+ reset();
+ }
+ return result;
+ }
+
+ public void commit(final WriteOnlyGraph w, final WriteTraits traits) throws ServiceException {
+ writeState.bindingToId = null;
+ writeState.externalToId = null;
+ writeByte(TERM);
+
+ if (writeState.out != null) {
+ // Flush current buffer to file only if backing file has already
+ // been taken into use.
+ if (byteIndex > 0)
+ writeReset(byteIndex);
+
+ try (OutputStream out = writeState.out) {
+ channel.force(false);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ } finally {
+ writeState.out = null;
+ }
+
+ try {
+ writeState.in = new FileInputStream(writeState.tempFile);
+ channel = writeState.in.getChannel();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ w.getMetadata().putAll(metadata);
+
+ TransferableGraphSupport tgs = w.getService(TransferableGraphSupport.class);
+
+ // First create all resources defined by clusterId
+ MapList<Long,InternalResource> clusterAssignment = new MapList<Long,InternalResource>();
+ for(Resource r : writeState.idToResource) {
+ if(r instanceof InternalResource) {
+ InternalResource ir = (InternalResource)r;
+ if(ir.clusterId < 0) {
+ if (DEBUG)
+ System.out.println("ASSIGN CLUSTER " + ir + " => " + ir.clusterId);
+ clusterAssignment.add(ir.clusterId, ir);
+ } else if(ir.clusterId > 0) {
+ if (DEBUG)
+ System.out.println("-CREATED RESOURCE WITH EXISTING CLUSTER ID: " + ir);
+ ir.resource = w.newResource(ir.clusterId);
+ writeState.idToResource.set(ir.id, ir.resource);
+ if (writeState.clusterSets.contains(ir)) {
+ w.newClusterSet(ir.resource);
+ if (DEBUG)
+ System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource);
+ }
+
+ }
+ }
+ }
+
+ for(Long clusterKey : clusterAssignment.getKeys()) {
+ if (DEBUG)
+ System.out.println("CREATE LOGICAL CLUSTER: " + clusterKey);
+ w.flushCluster();
+ for(InternalResource ir : clusterAssignment.getValuesUnsafe(clusterKey)) {
+ if (DEBUG)
+ System.out.println("-CREATED RESOURCE: " + ir);
+ ir.resource = w.newResource();
+ writeState.idToResource.set(ir.id, ir.resource);
+ if (writeState.clusterSets.contains(ir)) {
+ w.newClusterSet(ir.resource);
+ if (DEBUG)
+ System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource);
+ }
+ }
+ }
+
+ // Create cluster sets for all existing resources (not InternalResource)
+ // before proceeding to create resources.
+ for(Resource existingResource : writeState.clusterSetsForExistingResources) {
+ w.newClusterSet(existingResource);
+ if (DEBUG)
+ System.out.println("CREATED NEW CLUSTER SET: " + existingResource);
+ }
+
+ // Then create all resources defined by cluster set
+ for(Resource r : writeState.idToResource) {
+ if(r instanceof InternalResource) {
+ InternalResource ir = (InternalResource)r;
+ Resource clusterSet = ir.clusterSet;
+
+ if (clusterSet != null) {
+ if (DEBUG)
+ System.out.println("NEW RESOURCE " + ir + " for cluster set " + clusterSet);
+ if(clusterSet instanceof InternalResource) {
+ ir.resource = w.newResource(((InternalResource)clusterSet).resource);
+ } else {
+ ir.resource = w.newResource(clusterSet);
+ }
+ if (DEBUG)
+ System.out.println(" => " + ir.resource);
+ writeState.idToResource.set(ir.id, ir.resource);
+ if(writeState.clusterSets.contains(ir)) {
+ if (DEBUG)
+ System.out.println(" ==> NEW CLUSTER SET");
+ w.newClusterSet(ir.resource);
+ }
+ }
+
+ }
+ }
+
+ reset();
+ bb.limit(BUFFER_SIZE);
+
+ try {
+ while(true) {
+ byte method = readByte();
+ switch(method) {
+ case TERM: {
+ if (DEBUG) {
+ System.out.println("Resources: " + writeState.idToResource.size());
+ System.out.println("Statements: " + writeState.statementCount);
+ System.out.println("Values: " + writeState.valueCount);
+ System.out.println("Files: " + writeState.fileCount);
+ System.out.println("Clusters: " + writeState.clusterCount);
+ }
+ return;
+ }
+ case CLAIM: {
+ writeState.statementCount += 2;
+ Resource subject = getResource(readInt());
+ Resource predicate = getResource(readInt());
+ Resource inverse = getResource(readInt());
+ Resource object = getResource(readInt());
+ w.claim(subject, predicate, inverse, object);
+ } break;
+ case CLAIM_NOINVERSE: {
+ ++writeState.statementCount;
+ Resource subject = getResource(readInt());
+ Resource predicate = getResource(readInt());
+ Resource object = getResource(readInt());
+ w.claim(subject, predicate, null, object);
+ } break;
+ case DENY: {
+ Resource subject = getResource(readInt());
+ Resource predicate = getResource(readInt());
+ int inv = readInt();
+ Resource inverse = null;
+ if(inv > 0) inverse = getResource(inv);
+ Resource object = getResource(readInt());
+ if(!subject.isPersistent() || !object.isPersistent()) {
+ VirtualGraph statementProvider1 = processor.getProvider(subject, predicate, object);
+ if(inv > 0) {
+ VirtualGraph statementProvider2 = processor.getProvider(object, inverse, subject);
+ if(statementProvider2 != null)
+ w.deny(object, inverse, null, subject, statementProvider2);
+ }
+ if(statementProvider1 != null)
+ w.deny(subject, predicate, null, object, statementProvider1);
+ } else {
+ w.deny(subject, predicate, inverse, object, null);
+ }
+ } break;
+ case DENY_VALUE: {
+ Resource subject = getResource(readInt());
+ if(!subject.isPersistent()) {
+ VirtualGraph provider = processor.getValueProvider(subject);
+ if(provider != null)
+ w.denyValue(subject, provider);
+ } else {
+ w.denyValue(subject);
+ }
+ } break;
+ case CLAIM_VALUE_B: {
+ ++writeState.valueCount;
+ Resource resource = getResource(readInt());
+ int len = readInt();
+ tgs.setValue(w, resource, null, this, len);
+ } break;
+ case COMMIT_AND_CONTINUE: {
+ XSupport xs = w.getService(XSupport.class);
+ xs.commitAndContinue(w, traits);
+ } break;
+ }
+ }
+ } catch(Exception e) {
+ if(e instanceof ServiceException)
+ throw (ServiceException)e;
+ else
+ throw new ServiceException(e);
+ } finally {
+ channel = null;
+ if (writeState.in != null) {
+ try (InputStream in = writeState.in) {
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ } finally {
+ writeState.in = null;
+ writeState.tempFile.delete();
+ }
+ }
+ }
+ }
+
+ private Resource getType(Object value) {
+ Class<?> clazz = value.getClass();
+ Resource dataType =
+ clazz == Float.class ? b.Float
+ : clazz == Double.class ? b.Double
+ : clazz == Integer.class ? b.Integer
+ : clazz == String.class ? b.String
+ : clazz == Boolean.class ? b.Boolean
+ : clazz == Byte.class ? b.Byte
+ : clazz == Long.class ? b.Long
+ : clazz == float[].class ? b.FloatArray
+ : clazz == double[].class ? b.DoubleArray
+ : clazz == int[].class ? b.IntegerArray
+ : clazz == String[].class ? b.StringArray
+ : clazz == boolean[].class ? b.BooleanArray
+ : clazz == byte[].class ? b.ByteArray
+ : clazz == long[].class ? b.LongArray
+ : null
+ ;
+ return dataType;
+ }
+
+ public long newCluster() {
+ return -1 - (++writeState.clusterCount);
+ }
+
+ public long getDefaultCluster() {
+ return writeState.defaultCluster;
+ }
+
+ public void setDefaultCluster(long cluster) {
+ writeState.defaultCluster = cluster;
+ }
+
+ @Override
+ public void syncRequest(final DelayedWrite request) throws DatabaseException {
+
+ try {
+
+ final DelayedWriteGraph dwg = new DelayedWriteGraph(this);
+ request.perform(dwg);
+
+ syncRequest(new WriteOnlyRequest() {
+
+ @Override
+ public void perform(WriteOnlyGraph graph) throws DatabaseException {
+ dwg.commit(graph, request);
+ }
+
+ });
+
+ } catch (DatabaseException e) {
+
+ throw e;
+
+ } catch (Throwable e) {
+
+ throw new DatabaseException(e);
+
+ } finally {
+ }
+
+ }
+
+ @Override
+ public void syncRequest(WriteOnly request) throws DatabaseException {
+
+ Resource defaultClusterSet = setClusterSet4NewResource(null);
+
+ try {
+ WriteSupport ws = session.getService(WriteSupport.class);
+ ws.performWriteRequest(this, request);
+ } catch (DatabaseException e) {
+ throw e;
+ } catch (Throwable t) {
+ throw new DatabaseException(t);
+ } finally {
+ setClusterSet4NewResource(defaultClusterSet);
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T getService(Class<T> api) {
+
+ if(ClusteringSupport.class == api) {
+
+ final ClusteringSupport support = (ClusteringSupport)super.getService(api);
+
+ return (T)new ClusteringSupport() {
+
+ @Override
+ public Resource getResourceByIndexAndCluster(int resourceIndex, long clusterId)
+ throws DatabaseException, ResourceNotFoundException {
+ return support.getResourceByIndexAndCluster(resourceIndex, clusterId);
+ }
+
+ @Override
+ public Resource getResourceByKey(int resourceKey) throws ResourceNotFoundException {
+ return support.getResourceByKey(resourceKey);
+ }
+
+ @Override
+ public int getNumberOfResources(long clusterId)
+ throws DatabaseException {
+ return support.getNumberOfResources(clusterId);
+ }
+
+ @Override
+ public long getCluster(Resource r) {
+ return support.getCluster(r);
+ }
+
+ @Override
+ public long createCluster() {
+ return newCluster();
+ }
+
+ @Override
+ public boolean isClusterSet(Resource r) throws DatabaseException {
+ return support.isClusterSet(r);
+ }
+
+ @Override
+ public Resource getClusterSetOfCluster(Resource r) throws DatabaseException {
+ return support.getClusterSetOfCluster(r);
+ }
+
+ @Override
+ public Resource getClusterSetOfCluster(long cluster) throws DatabaseException {
+ return support.getClusterSetOfCluster(cluster);
+ }
+
+ };
+
+ } else if (TransferableGraphSupport.class == api) {
+
+ final TransferableGraphSupport parentSupport = session.getService(TransferableGraphSupport.class);
+
+ return (T)new TransferableGraphSupport() {
+
+ @Override
+ public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, byte[] raw) {
+ writeByte(CLAIM_VALUE_B);
+ writeInt(getId(resource));
+ writeInt(raw.length);
+ writeBytes(raw);
+ writeInt(getBindingId(PASSTHROUGH));
+ }
+
+ @Override
+ public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, ByteReader reader, int amount)
+ throws DatabaseException {
+ writeByte(CLAIM_VALUE_B);
+ writeInt(getId(resource));
+ writeInt(amount);
+ writeBytes(reader.readBytes(null, amount));
+ writeInt(getBindingId(PASSTHROUGH));
+ }
+
+ @Override
+ public byte[] getValue(ReadGraph graph, Resource resource) {
+ return parentSupport.getValue(graph, resource);
+ }
+
+ @Override
+ public InputStream getValueStream(ReadGraph graph, Resource resource) {
+ return parentSupport.getValueStream(graph, resource);
+ }
+
+ };
+
+ }
+
+ return super.getService(api);
+
+ }
+
+ @Override
+ public <T> void addMetadata(Metadata data) throws ServiceException {
+ MetadataUtils.addMetadata(session, metadata, data);
+ }
+
+ public void addCommitAndContinue() {
+ writeByte(COMMIT_AND_CONTINUE);
+ }
+
+ @Override
+ public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
+ return MetadataUtils.getMetadata(session, metadata, clazz);
+ }
+
+ @Override
+ public TreeMap<String, byte[]> getMetadata() {
+ return metadata;
+ }
+ @Override
+ public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
+
+ return request.perform(this);
+
+ }
+
+ @Override
+ public VirtualGraph getProvider() {
+ return null;
+ }
+
+ @Override
+ public void clearUndoList(WriteTraits writeTraits) {
+ WriteSupport ws = session.getService(WriteSupport.class);
+ if (null != ws)
+ ws.clearUndoList(writeTraits);
+ }
+
+ @Override
+ public void markUndoPoint() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public <T> T getPossibleRelatedValue(final Resource subject, final Resource relation, final Binding binding)
+ throws ManyObjectsForFunctionalRelationException, BindingException, ServiceException {
+ if(!(subject instanceof InternalResource)) {
+ return super.getPossibleRelatedValue(subject, relation, binding);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ final public Resource getPossibleObject(final Resource subject, final Resource relation)
+ throws ManyObjectsForFunctionalRelationException, ServiceException {
+ if(!(subject instanceof InternalResource)) {
+ return super.getPossibleObject(subject, relation);
+ } else {
+ return null;
+ }
+ }
+
+ public void close() {
+ if (writeState.out != null) {
+ try (OutputStream out = writeState.out) {
+ } catch (IOException e) {
+ Logger.defaultLogError("Failed to close delayed write graph temporary commit output stream", e);
+ } finally {
+ writeState.out = null;
+ }
+ }
+ if (writeState.in != null) {
+ try (InputStream in = writeState.in) {
+ } catch (IOException e) {
+ Logger.defaultLogError("Failed to close delayed write graph temporary commit input stream", e);
+ } finally {
+ writeState.in = null;
+ }
+ }
+ if (writeState.tempFile != null) {
+ writeState.tempFile.delete();
+ writeState.tempFile = null;
+ }
+ }
+
+}