X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fgraph%2FDelayedWriteGraph.java;h=44e23d017c99864bdc16131cd620723cc1c40cdb;hp=f56a2eede50cd7a143f7fd49a6b1fd6931904522;hb=61033f112b0a2e643bf8530b99bcf90c64464f30;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/DelayedWriteGraph.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/DelayedWriteGraph.java index f56a2eede..44e23d017 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/DelayedWriteGraph.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/DelayedWriteGraph.java @@ -1,1426 +1,1272 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.db.impl.graph; - - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.IdentityHashMap; -import java.util.List; -import java.util.Set; -import java.util.TreeMap; -import java.util.UUID; - -import org.eclipse.core.runtime.Platform; -import org.simantics.databoard.Bindings; -import org.simantics.databoard.accessor.Accessor; -import org.simantics.databoard.accessor.reference.ChildReference; -import org.simantics.databoard.binding.Binding; -import org.simantics.databoard.binding.error.BindingConstructionException; -import org.simantics.databoard.binding.impl.BindingPrintContext; -import org.simantics.databoard.serialization.SerializationException; -import org.simantics.databoard.serialization.Serializer; -import org.simantics.databoard.type.Datatype; -import org.simantics.databoard.util.IdentityPair; -import org.simantics.databoard.util.binary.RandomAccessBinary; -import org.simantics.db.Metadata; -import org.simantics.db.ReadGraph; -import org.simantics.db.Resource; -import org.simantics.db.Session; -import org.simantics.db.Statement; -import org.simantics.db.VirtualGraph; -import org.simantics.db.WriteGraph; -import org.simantics.db.WriteOnlyGraph; -import org.simantics.db.common.MetadataUtils; -import org.simantics.db.common.request.WriteOnlyRequest; -import org.simantics.db.common.utils.Logger; -import org.simantics.db.exception.BindingException; -import org.simantics.db.exception.ClusterSetExistException; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.exception.ManyObjectsForFunctionalRelationException; -import org.simantics.db.exception.NoSingleResultException; -import org.simantics.db.exception.ResourceNotFoundException; -import org.simantics.db.exception.ServiceException; -import org.simantics.db.impl.DebugPolicy; -import org.simantics.db.impl.ResourceImpl; -import org.simantics.db.request.DelayedWrite; -import org.simantics.db.request.WriteOnly; -import org.simantics.db.request.WriteResult; -import org.simantics.db.request.WriteTraits; -import org.simantics.db.service.ByteReader; -import org.simantics.db.service.ClusteringSupport; -import org.simantics.db.service.TransferableGraphSupport; -import org.simantics.db.service.XSupport; -import org.simantics.layer0.Layer0; -import org.simantics.utils.datastructures.MapList; - -import gnu.trove.list.array.TIntArrayList; -import gnu.trove.map.hash.TIntIntHashMap; -import gnu.trove.map.hash.TObjectIntHashMap; - -/** - * Write graph implementation that does not modify the database - * immediately but with an explicit commit method. All read operations - * return results based on the old graph. - */ -public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, ByteReader { - - private static final boolean DEBUG = false; - public static int BUFFER = 65536; - - static class Haxx extends Binding { - - static final Serializer serializer = new Serializer() { - - public byte[] serialize(Object obj) throws SerializationException { - return (byte[])obj; - } - - @Override - public void serialize(DataOutput out, - TObjectIntHashMap identities, Object obj) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public void serialize(DataOutput out, Object obj) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public Object deserialize(DataInput in, List identities) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public Object deserialize(DataInput in) throws IOException { - throw new Error("Not supported."); - } - - @Override - public void deserializeTo(DataInput in, List identities, - Object obj) throws IOException { - throw new Error("Not supported."); - } - - @Override - public void deserializeTo(DataInput in, Object obj) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public void skip(DataInput in, List identities) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public void skip(DataInput in) throws IOException { - throw new Error("Not supported."); - } - - @Override - public Integer getConstantSize() { - throw new Error("Not supported."); - } - - @Override - public int getSize(Object obj, TObjectIntHashMap identities) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public int getSize(Object obj) throws IOException { - throw new Error("Not supported."); - } - - @Override - public int getMinSize() { - throw new Error("Not supported."); - } - - }; - - @Override - public Serializer serializer() { - return serializer; - } - - @Override - public void accept(Visitor1 v, Object obj) { - throw new Error("Not supported."); - } - - @Override - public T accept(Visitor v) { - throw new Error("Not supported."); - } - - @Override - public boolean isInstance(Object obj) { - throw new Error("Not supported."); - } - - @Override - public void assertInstaceIsValid(Object obj, Set validInstances) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public int deepHashValue(Object value, - IdentityHashMap hashedObjects) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public int deepCompare(Object o1, Object o2, - Set> compareHistory) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public void readFrom(Binding srcBinding, Object src, Object dst) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public Object readFromTry(Binding srcBinding, Object src, Object dst) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - protected void toString(Object value, BindingPrintContext ctx) throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public int getComponentCount() { - throw new Error("Not supported."); - } - - @Override - public Binding getComponentBinding(int index) { - throw new Error("Not supported."); - } - - @Override - public Binding getComponentBinding(ChildReference path) { - throw new Error("Not supported."); - } - - } - - private static final Haxx haxx = new Haxx(); - - private final int TERM = 0; - private final int CLAIM = 1; - private final int CLAIM_NOINVERSE = 2; - private final int CLAIM_VALUE_B = 4; - private final int DENY = 5; - private final int DENY_VALUE = 6; - private final int COMMIT_AND_CONTINUE = 7; - - static class ClusterSet { - public Resource resource; - ClusterSet() { - ids = new TIntArrayList(); - old = false; - } - ClusterSet(boolean old, Resource r) { - ids = new TIntArrayList(); - this.old = old; - this.resource = r; - } - void add(int id) { - ids.add(id); - } - private TIntArrayList ids; - private final boolean old; // true if - boolean isNew() { - return !old; - } - } - public class State { - public File tempFile; - public FileOutputStream out; - public ArrayList idToResource = new ArrayList(); - public TIntIntHashMap externalToId = new TIntIntHashMap(); - public ArrayList idToBinding = new ArrayList(); - public TObjectIntHashMap bindingToId = new TObjectIntHashMap(); - public Set clusterSets = new HashSet(); - public Set clusterSetsForExistingResources = new HashSet(); - public int clusterCount = 0; - public long defaultCluster; - public Resource defaultClusterSet; - public int statementCount = 0; - public int valueCount = 0; - public int fileCount = 0; - } - - public State writeState; - public TreeMap metadata = new TreeMap(); - - Layer0 b; - Session session; - public static Resource convertDelayedResource(Resource r) { - if (r instanceof InternalResource) { - InternalResource ri = (InternalResource)r; - return ri.resource; - } - return r; - } - private static class InternalResource implements Resource { - - int id; - long clusterId = 0; - Resource resource = null; - Resource clusterSet = null; - - public InternalResource(int id, long clusterId) { - this.id = id; - this.clusterId = clusterId; - } - - public InternalResource(int id, Resource clusterSet) { - this.id = id; - this.clusterSet = clusterSet; - } - - @Override - public long getResourceId() { - throw new UnsupportedOperationException(); - } - - @Override - public Resource get() { - return this; - } - - @Override - public boolean isPersistent() { - return false; - } - - @Override - public int compareTo(Resource o) { - if(o instanceof InternalResource) { - return Integer.compare(id, ((InternalResource)o).id); - } else { - return -1; - } - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + id; - return result; - } - - @Override - public int getThreadHash() { - return hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (!(obj instanceof InternalResource)) - return false; - InternalResource other = (InternalResource) obj; - if (id != other.id) - return false; - return true; - } - - - @Override - public boolean equalsResource(Resource other) { - return equals(other); - } - @Override - public String toString() { - StringBuilder sb = new StringBuilder(32); - if(DebugPolicy.VERBOSE) { - sb.append("[delayed id="); - sb.append(id); - sb.append("]"); - } else { - sb.append("[did="); - sb.append(id); - sb.append("]"); - } - return sb.toString(); - } - } - - private int getId(Resource resource) { - if(resource instanceof InternalResource) - return ((InternalResource)resource).id; - else { - ResourceImpl r = (ResourceImpl)resource; - int id = writeState.externalToId.get(r.id); - if(id != 0) { - return id; - } else { - id = writeState.idToResource.size(); - writeState.idToResource.add(resource); - writeState.externalToId.put(r.id, id); - return id; - } - } - } - - private Resource getResource(int id) { - return writeState.idToResource.get(id); - } - - private int getBindingId(Binding binding) { - if(writeState.bindingToId.contains(binding)) - return writeState.bindingToId.get(binding); - else { - int id = writeState.idToBinding.size(); - writeState.idToBinding.add(binding); - writeState.bindingToId.put(binding, id); - return id; - } - } - - public DelayedWriteGraph(ReadGraph g) throws IOException { - super((ReadGraphImpl)g); - writeState = new State(); - session = g.getSession(); - b = Layer0.getInstance(g); - writeState.defaultCluster = newCluster(); - } - - public DelayedWriteGraph(ReadGraph g, State state) { - super((ReadGraphImpl)g); - session = g.getSession(); - b = Layer0.getInstance(g); - this.writeState = state; - } - - public DelayedWriteGraph newSync() { - return new DelayedWriteGraph(this, writeState); - } - - @Override - public void claim(Resource subject, Resource predicate, Resource object) - throws ServiceException { - assert(subject != null); - assert(predicate != null); - assert(object != null); - - Resource inverse = getPossibleInverse(predicate); - claim(subject, predicate, inverse, object); - } - - @Override - public void addLiteral(Resource resource, Resource predicate, Resource inverse, Resource type, Object value, - Binding binding) throws BindingException, - ManyObjectsForFunctionalRelationException, ServiceException { - Resource valueResource = newResource(); - claimValue(valueResource, value, binding); - claim(valueResource, b.InstanceOf, null, type); - claim(resource, predicate, inverse, valueResource); - } - - @Override - public void addLiteral(Resource resource, Resource predicate, - Resource inverse, Object value, Binding binding) - throws BindingException, ManyObjectsForFunctionalRelationException, - ServiceException { - - Resource type = getType(value); - if(type == null) { - Resource literal = newResource(); - type = b.Literal; - Resource dataType = newResource(); - claim(dataType, b.InstanceOf, null, b.DataType); - claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL); - claim(literal, b.HasDataType, null, dataType); - claim(literal, b.InstanceOf, null, type); - claimValue(literal, value, binding); - claim(resource, predicate, inverse, literal); - } else { - addLiteral(resource, predicate, inverse, type, value, binding); - } - - } - - @Override - public T newLiteral(Resource resource, Resource predicate, Datatype datatype, Object initialValue) - throws DatabaseException { - - throw new UnsupportedOperationException(); - - } - - @Override - public RandomAccessBinary createRandomAccessBinary (Resource resource, Resource predicate, Datatype datatype, Object initialValue) throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public RandomAccessBinary createRandomAccessBinary(Resource resource, Datatype datatype, Object initialValue) throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public void claimLiteral(Resource resource, Resource predicate, Object value) throws ManyObjectsForFunctionalRelationException, ServiceException { - - try { - Binding b = Bindings.getBinding(value.getClass()); - claimLiteral(resource, predicate, value, b); - } catch (BindingConstructionException e) { - throw new IllegalArgumentException(e); - } catch (BindingException e) { - throw new IllegalArgumentException(e); - } - - } - - @Override - public void claimLiteral(Resource resource, Resource predicate, Object value, Binding binding) throws BindingException, - ManyObjectsForFunctionalRelationException, ServiceException { - - Statement valueStatement = null; - if(!(resource instanceof InternalResource)) valueStatement = getPossibleStatement(resource, predicate); - - if(valueStatement != null && resource.equals(valueStatement.getSubject())) { - - claimValue(valueStatement.getObject(), value, binding); - - } else { - - Resource type = getType(value); - Resource literal = newResource(); - if (type == null) { - type = b.Literal; - Resource dataType = newResource(); - claim(dataType, b.InstanceOf, null, b.DataType); - claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL); - claim(literal, b.HasDataType, dataType); - } - claim(literal, b.InstanceOf, null, type); - claimValue(literal, value, binding); - claim(resource, predicate, literal); - - } - - } - - @Override - public void claimLiteral(Resource resource, Resource predicate, - Resource inverse, Resource type, Object value) - throws BindingException, ManyObjectsForFunctionalRelationException, - ServiceException { - - try { - Binding b = Bindings.getBinding(value.getClass()); - claimLiteral(resource, predicate, inverse, type, value, b); - } catch (BindingConstructionException e) { - throw new IllegalArgumentException(e); - } catch (BindingException e) { - throw new IllegalArgumentException(e); - } - - } - - @Override - public void claimLiteral(Resource resource, Resource predicate, - Resource inverse, Resource type, Object value, Binding binding) - throws BindingException, ManyObjectsForFunctionalRelationException, - ServiceException { - - Statement valueStatement = (resource instanceof InternalResource) ? null : getPossibleStatement(resource, predicate); - - if(valueStatement != null && resource.equals(valueStatement.getSubject())) { - - claimValue(valueStatement.getObject(), value, binding); - - } else { - - Resource valueResource = newResource(); - claim(valueResource, b.InstanceOf, null, type); - claim(resource, predicate, inverse, valueResource); - claimValue(valueResource, value, binding); - - } - - } - - @Override - public void claimLiteral(Resource resource, Resource predicate, - Resource type, Object value) throws BindingException, - ManyObjectsForFunctionalRelationException, ServiceException { - - try { - Binding b = Bindings.getBinding(value.getClass()); - claimLiteral(resource, predicate, type, value, b); - } catch (BindingConstructionException e) { - throw new IllegalArgumentException(e); - } catch (BindingException e) { - throw new IllegalArgumentException(e); - } - - } - - @Override - public void claimLiteral(Resource resource, Resource predicate, - Resource type, Object value, Binding binding) - throws BindingException, ManyObjectsForFunctionalRelationException, - ServiceException { - - try { - Resource inverse = getSingleObject(predicate, b.InverseOf); - claimLiteral(resource, predicate, inverse, type, value, binding); - } catch (NoSingleResultException e) { - throw new ServiceException(e); - } - - } - - @Override - public void deny(Resource subject) throws ServiceException { - assert(subject != null); - if(!(subject instanceof InternalResource)) { - try { - for (Statement statement : getStatements(subject, b.IsWeaklyRelatedTo)) { - deny(statement); - } - } catch (ManyObjectsForFunctionalRelationException e) { - throw new ServiceException(e); - } - } - } - - @Override - public void deny(Resource subject, Resource predicate) - throws ServiceException { - assert(subject != null); - if(!(subject instanceof InternalResource)) { - for (Resource object : getObjects(subject, predicate)) { - deny(subject, predicate, object); - } - } - } - - @Override - public void deny(Resource subject, Resource predicate, Resource object) throws ServiceException { - denyStatement(subject, predicate, object); - } - - @Override - public void denyStatement(Resource subject, Resource predicate, Resource object) throws ServiceException { - deny(subject, predicate, getPossibleInverse(predicate), object); - } - - @Override - public void deny(Statement statement) throws ServiceException { - Resource predicate = statement.getPredicate(); - deny(statement.getSubject(), predicate, getPossibleInverse(predicate), statement.getObject()); - } - - @Override - public void denyValue(Resource resource, Resource predicate) - throws ManyObjectsForFunctionalRelationException, ServiceException { - assert(resource != null); - assert(predicate != null); - - if(!(resource instanceof InternalResource)) { - Statement valueStatement = getPossibleStatement(resource, predicate); - - if (valueStatement != null && !valueStatement.isAsserted(resource)) { - Resource value = valueStatement.getObject(); - denyValue(value); - } - } - } - - @Override - public Resource newResource() throws ServiceException { - if(writeState.defaultClusterSet != null) return newResource(writeState.defaultClusterSet); - else return newResource(writeState.defaultCluster); - } - - @Override - public Resource newResource(long clusterId) throws ServiceException { - int id = writeState.idToResource.size(); - InternalResource ret = new InternalResource(id, clusterId); - writeState.idToResource.add(ret); - return ret; - } - - @Override - public Resource newResource(Resource clusterSet) throws ServiceException { - - if ((clusterSet instanceof InternalResource)) { - if(!writeState.clusterSets.contains(clusterSet)) - throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet); - } else { - WriteSupport ws = session.getService(WriteSupport.class); - if (!ws.hasClusterSet(null, clusterSet)) - if(!writeState.clusterSetsForExistingResources.contains(clusterSet)) - throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet); - } - - int id = writeState.idToResource.size(); - InternalResource ret = new InternalResource(id, clusterSet); - writeState.idToResource.add(ret); - return ret; - } - - @Override - public void newClusterSet(Resource clusterSet) throws ServiceException { - if (DEBUG) - System.out.println("new cluster set=" + clusterSet); - boolean existingResource = !(clusterSet instanceof InternalResource); - if (existingResource) { - WriteSupport ws = session.getService(WriteSupport.class); - if (ws.hasClusterSet(null, clusterSet)) - throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet); - writeState.clusterSetsForExistingResources.add(clusterSet); - } else { - if(!writeState.clusterSets.add(clusterSet)) - throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet); - } - } - @Override - public Resource setClusterSet4NewResource(Resource clusterSet) - throws ServiceException { - Resource existing = writeState.defaultClusterSet; - writeState.defaultClusterSet = clusterSet; - return existing; - } - - @Override - public void claim(Resource subject, Resource predicate, Resource inverse, - Resource object) throws ServiceException { - assert(subject != null); - assert(predicate != null); - assert(object != null); - try { - if(inverse != null && !(subject.equals(object) && inverse.equals(predicate))) { - writeByte(CLAIM); - writeInt(getId(subject)); - writeInt(getId(predicate)); - writeInt(getId(inverse)); - writeInt(getId(object)); - } else { - writeByte(CLAIM_NOINVERSE); - writeInt(getId(subject)); - writeInt(getId(predicate)); - writeInt(getId(object)); - } - } catch(Exception e) { - throw new ServiceException(e); - } - } - - @Override - public void deny(Resource subject, Resource predicate, Resource inverse, - Resource object) throws ServiceException { - assert(subject != null); - assert(predicate != null); - assert(object != null); - try { - writeByte(DENY); - writeInt(getId(subject)); - writeInt(getId(predicate)); - if(inverse != null) writeInt(getId(inverse)); - else writeInt(0); - writeInt(getId(object)); - } catch(Exception e) { - throw new ServiceException(e); - } - } - - @Override - public void deny(Resource subject, Resource predicate, Resource inverse, - Resource object, VirtualGraph graph) throws ServiceException { - throw new UnsupportedOperationException(); - } - - @Override - public void claimValue(Resource resource, Object value) - throws ServiceException { - try { - Binding binding = Bindings.getBinding(value.getClass()); - claimValue(resource, value, binding); - } catch (BindingConstructionException e) { - throw new ServiceException(e); - } - } - - @Override - public void claimValue(Resource resource, Object value, Binding binding) - throws ServiceException { - try { - - writeByte(CLAIM_VALUE_B); - writeInt(getId(resource)); - Serializer serializer = binding.serializer(); - int size = serializer.getSize(value); - writeInt(size); - serializer.serialize(new OutputStream() { - - @Override - public void write(int b) throws IOException { - writeByte(b); - } - - }, value); - - } catch(IOException e) { - Logger.defaultLogError(e); - throw new ServiceException(e); - } - } - - @Override - public void denyValue(Resource resource) throws ServiceException { - writeByte(DENY_VALUE); - writeInt(getId(resource)); - } - - @Override - public void denyValue(Resource resource, VirtualGraph graph) throws ServiceException { - throw new UnsupportedOperationException(); - } - - @Override - public void flushCluster() throws ServiceException { - writeState.defaultCluster = newCluster(); - } - - @Override - public void flushCluster(Resource r) throws ServiceException { - throw new ServiceException("Operation flushCluster(" + r + " not implemented."); - } - - private FileChannel channel; - byte[] bytes = new byte[BUFFER]; - byte[] buffer = new byte[BUFFER]; - ByteBuffer bb = ByteBuffer.wrap(bytes); - int byteIndex = 0; - - private void writeReset(int size) { - - byteIndex = 0; - bb.position(0); - bb.limit(size); - try { - - if(writeState.tempFile == null) { - - File workspace = Platform.getLocation().toFile(); - File temp = new File(workspace, "tempFiles"); - temp.mkdirs(); - - File base = new File(temp, "delayed"); - base.mkdirs(); - writeState.tempFile = new File(base, UUID.randomUUID().toString()); - writeState.out = new FileOutputStream(writeState.tempFile); - channel = writeState.out.getChannel(); - - } - - for (int got=0;got < size;) { - int n = channel.write(bb); - if (n <= 0) { - new Exception().printStackTrace(); - return; - } - got += n; - } - } catch (IOException e) { - e.printStackTrace(); - } - } - - private void reset() { - byteIndex = 0; - try { - bb.clear(); - for(int got=0; got < BUFFER;) { - int n = channel.read(bb); - if (n <= 0) - return; - got += n; - } - } catch (IOException e) { - e.printStackTrace(); - } - } - - private void writeInt(int i) { - if(byteIndex < (BUFFER-4)) { - bytes[byteIndex++] = (byte)(i&0xff); - bytes[byteIndex++] = (byte)((i>>>8)&0xff); - bytes[byteIndex++] = (byte)((i>>>16)&0xff); - bytes[byteIndex++] = (byte)((i>>>24)&0xff); - if (byteIndex == BUFFER) - writeReset(BUFFER); - } else { - int has = BUFFER-byteIndex; - if(has == 0) writeReset(BUFFER); - bytes[byteIndex++] = (byte)(i&0xff); - if(has == 1) writeReset(BUFFER); - bytes[byteIndex++] = (byte)((i>>>8)&0xff); - if(has == 2) writeReset(BUFFER); - bytes[byteIndex++] = (byte)((i>>>16)&0xff); - if(has == 3) writeReset(BUFFER); - bytes[byteIndex++] = (byte)((i>>>24)&0xff); - if(has == 4) writeReset(BUFFER); - } - } - - private int readInt() { - if(byteIndex < (BUFFER-4)) { - int result = (int) - ((bytes[byteIndex++] & 0xff) | - ((bytes[byteIndex++] & 0xff)<<8) | - ((bytes[byteIndex++] & 0xff)<<16) | - ((bytes[byteIndex++] & 0xff)<<24)); - return result; - } else { - int has = BUFFER-byteIndex; - int result = 0; - if(has == 0) reset(); - result = (int)(bytes[byteIndex++] & 0xff); - if(has == 1) reset(); - result |= (int)((bytes[byteIndex++] & 0xff) <<8); - if(has == 2) reset(); - result |= (int)((bytes[byteIndex++] & 0xff) <<16); - if(has == 3) reset(); - result |= (int)((bytes[byteIndex++] & 0xff) <<24); - if(has == 4) reset(); - return result; - } - } - - private byte readByte() { - byte result = bytes[byteIndex++]; - if(byteIndex == BUFFER) reset(); - return result; - } - - private void writeByte(int b) { - bytes[byteIndex++] = (byte)b; - if(byteIndex == BUFFER) writeReset(BUFFER); - } - - private void writeBytes(byte[] data) { - int has = BUFFER-byteIndex; - int amount = data.length; - if(has > amount) { - System.arraycopy(data, 0, bytes, byteIndex, amount); - byteIndex += amount; - } else { - System.arraycopy(data, 0, bytes, byteIndex, has); - writeReset(BUFFER); - ByteBuffer bb2 = ByteBuffer.wrap(data); - bb2.position(has); - try { - channel.write(bb2); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - public byte[] readBytes(int amount) { - return readBytes(buffer, amount); - } - - public byte[] readBytes(byte[] result, int amount) { - if(result == null) result = new byte[amount]; - int has = BUFFER-byteIndex; - if(has > amount) { - System.arraycopy(bytes, byteIndex, result, 0, amount); - byteIndex += amount; - } else { - System.arraycopy(bytes, byteIndex, result, 0, has); - ByteBuffer bb2 = ByteBuffer.wrap(result); - bb2.position(has); - for(int got=has;got 0) { - writeReset(byteIndex); - } - channel.force(false); - writeState.out.close(); - - FileInputStream fs = new FileInputStream(writeState.tempFile); - channel = fs.getChannel(); - - } catch (IOException e) { - throw new ServiceException(e); - } - - w.getMetadata().putAll(metadata); - - TransferableGraphSupport tgs = w.getService(TransferableGraphSupport.class); - - // First create all resources defined by clusterId - MapList clusterAssignment = new MapList(); - for(Resource r : writeState.idToResource) { - if(r instanceof InternalResource) { - InternalResource ir = (InternalResource)r; - if(ir.clusterId < 0) { - if (DEBUG) - System.out.println("ASSIGN CLUSTER " + ir + " => " + ir.clusterId); - clusterAssignment.add(ir.clusterId, ir); - } else if(ir.clusterId > 0) { - if (DEBUG) - System.out.println("-CREATED RESOURCE WITH EXISTING CLUSTER ID: " + ir); - ir.resource = w.newResource(ir.clusterId); - writeState.idToResource.set(ir.id, ir.resource); - if (writeState.clusterSets.contains(ir)) { - w.newClusterSet(ir.resource); - if (DEBUG) - System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource); - } - - } - } - } - - for(Long clusterKey : clusterAssignment.getKeys()) { - if (DEBUG) - System.out.println("CREATE LOGICAL CLUSTER: " + clusterKey); - w.flushCluster(); - for(InternalResource ir : clusterAssignment.getValuesUnsafe(clusterKey)) { - if (DEBUG) - System.out.println("-CREATED RESOURCE: " + ir); - ir.resource = w.newResource(); - writeState.idToResource.set(ir.id, ir.resource); - if (writeState.clusterSets.contains(ir)) { - w.newClusterSet(ir.resource); - if (DEBUG) - System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource); - } - } - } - - // Create cluster sets for all existing resources (not InternalResource) - // before proceeding to create resources. - for(Resource existingResource : writeState.clusterSetsForExistingResources) { - w.newClusterSet(existingResource); - if (DEBUG) - System.out.println("CREATED NEW CLUSTER SET: " + existingResource); - } - - // Then create all resources defined by cluster set - for(Resource r : writeState.idToResource) { - if(r instanceof InternalResource) { - InternalResource ir = (InternalResource)r; - Resource clusterSet = ir.clusterSet; - - if (clusterSet != null) { - if (DEBUG) - System.out.println("NEW RESOURCE " + ir + " for cluster set " + clusterSet); - if(clusterSet instanceof InternalResource) { - ir.resource = w.newResource(((InternalResource)clusterSet).resource); - } else { - ir.resource = w.newResource(clusterSet); - } - if (DEBUG) - System.out.println(" => " + ir.resource); - writeState.idToResource.set(ir.id, ir.resource); - if(writeState.clusterSets.contains(ir)) { - if (DEBUG) - System.out.println(" ==> NEW CLUSTER SET"); - w.newClusterSet(ir.resource); - } - } - - } - } - - reset(); - bb.limit(BUFFER); - - try { - while(true) { - byte method = readByte(); - switch(method) { - case TERM: { - return; - } - case CLAIM: { - writeState.statementCount += 2; - Resource subject = getResource(readInt()); - Resource predicate = getResource(readInt()); - Resource inverse = getResource(readInt()); - Resource object = getResource(readInt()); - w.claim(subject, predicate, inverse, object); - } break; - case CLAIM_NOINVERSE: { - ++writeState.statementCount; - Resource subject = getResource(readInt()); - Resource predicate = getResource(readInt()); - Resource object = getResource(readInt()); - w.claim(subject, predicate, null, object); - } break; - case DENY: { - Resource subject = getResource(readInt()); - Resource predicate = getResource(readInt()); - int inv = readInt(); - Resource inverse = null; - if(inv > 0) inverse = getResource(inv); - Resource object = getResource(readInt()); - if(!subject.isPersistent() || !object.isPersistent()) { - VirtualGraph statementProvider1 = processor.getProvider(subject, predicate, object); - if(inv > 0) { - VirtualGraph statementProvider2 = processor.getProvider(object, inverse, subject); - if(statementProvider2 != null) - w.deny(object, inverse, null, subject, statementProvider2); - } - if(statementProvider1 != null) - w.deny(subject, predicate, null, object, statementProvider1); - } else { - w.deny(subject, predicate, inverse, object, null); - } - } break; - case DENY_VALUE: { - Resource subject = getResource(readInt()); - if(!subject.isPersistent()) { - VirtualGraph provider = processor.getValueProvider(subject); - if(provider != null) - w.denyValue(subject, provider); - } else { - w.denyValue(subject); - } - } break; - case CLAIM_VALUE_B: { - ++writeState.valueCount; - Resource resource = getResource(readInt()); - int len = readInt(); - tgs.setValue(w, resource, null, this, len); -// byte[] bytes = readBytes(len); -// tgs.setValue(resource, null, bytes); - } break; - case COMMIT_AND_CONTINUE: { - XSupport xs = w.getService(XSupport.class); - xs.commitAndContinue(w, traits); - } break; - } - } - } catch(Exception e) { - if(e instanceof ServiceException) - throw (ServiceException)e; - else - throw new ServiceException(e); - } finally { - try { - channel.close(); - channel = null; - } catch (IOException e) { - throw new ServiceException(e); - } - } -// System.out.println("Resources: " + state.resourceCount); -// System.out.println("Statements: " + state.statementCount); -// System.out.println("Values: " + state.valueCount); - } - - private Resource getType(Object value) { - Class clazz = value.getClass(); - Resource dataType = - clazz == Float.class ? b.Float - : clazz == Double.class ? b.Double - : clazz == Integer.class ? b.Integer - : clazz == String.class ? b.String - : clazz == Boolean.class ? b.Boolean - : clazz == Byte.class ? b.Byte - : clazz == Long.class ? b.Long - : clazz == float[].class ? b.FloatArray - : clazz == double[].class ? b.DoubleArray - : clazz == int[].class ? b.IntegerArray - : clazz == String[].class ? b.StringArray - : clazz == boolean[].class ? b.BooleanArray - : clazz == byte[].class ? b.ByteArray - : clazz == long[].class ? b.LongArray - : null - ; - return dataType; - } - - public long newCluster() { - return -1 - (++writeState.clusterCount); - } - - public long getDefaultCluster() { - return writeState.defaultCluster; - } - - public void setDefaultCluster(long cluster) { - writeState.defaultCluster = cluster; - } - - @Override - public void syncRequest(final DelayedWrite request) throws DatabaseException { - - try { - - final DelayedWriteGraph dwg = new DelayedWriteGraph(this); - request.perform(dwg); - - syncRequest(new WriteOnlyRequest() { - - @Override - public void perform(WriteOnlyGraph graph) throws DatabaseException { - dwg.commit(graph, request); - } - - }); - - } catch (DatabaseException e) { - - throw e; - - } catch (Throwable e) { - - throw new DatabaseException(e); - - } finally { - - } - - } - - @Override - public void syncRequest(WriteOnly request) throws DatabaseException { - - Resource defaultClusterSet = setClusterSet4NewResource(null); - - try { - WriteSupport ws = session.getService(WriteSupport.class); - ws.performWriteRequest(this, request); - } catch (DatabaseException e) { - throw e; - } catch (Throwable t) { - throw new DatabaseException(t); - } finally { - setClusterSet4NewResource(defaultClusterSet); - } - - } - - @SuppressWarnings("unchecked") - @Override - public T getService(Class api) { - - if(ClusteringSupport.class == api) { - - final ClusteringSupport support = (ClusteringSupport)super.getService(api); - - return (T)new ClusteringSupport() { - - @Override - public Resource getResourceByIndexAndCluster(int resourceIndex, long clusterId) - throws DatabaseException, ResourceNotFoundException { - return support.getResourceByIndexAndCluster(resourceIndex, clusterId); - } - - @Override - public Resource getResourceByKey(int resourceKey) throws ResourceNotFoundException { - return support.getResourceByKey(resourceKey); - } - - @Override - public int getNumberOfResources(long clusterId) - throws DatabaseException { - return support.getNumberOfResources(clusterId); - } - - @Override - public long getCluster(Resource r) { - return support.getCluster(r); - } - - @Override - public long createCluster() { - return newCluster(); - } - - @Override - public boolean isClusterSet(Resource r) throws DatabaseException { - return support.isClusterSet(r); - } - - @Override - public Resource getClusterSetOfCluster(Resource r) throws DatabaseException { - return support.getClusterSetOfCluster(r); - } - - @Override - public Resource getClusterSetOfCluster(long cluster) throws DatabaseException { - return support.getClusterSetOfCluster(cluster); - } - - }; - - } else if (TransferableGraphSupport.class == api) { - - final TransferableGraphSupport parentSupport = session.getService(TransferableGraphSupport.class); - - return (T)new TransferableGraphSupport() { - - @Override - public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, byte[] raw) { - writeByte(CLAIM_VALUE_B); - writeInt(getId(resource)); - writeInt(raw.length); - writeBytes(raw); - writeInt(getBindingId(haxx)); - } - - @Override - public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, ByteReader reader, int amount) - throws DatabaseException { - writeByte(CLAIM_VALUE_B); - writeInt(getId(resource)); - writeInt(amount); - writeBytes(reader.readBytes(null, amount)); - writeInt(getBindingId(haxx)); - } - - @Override - public byte[] getValue(ReadGraph graph, Resource resource) { - return parentSupport.getValue(graph, resource); - } - - @Override - public InputStream getValueStream(ReadGraph graph, Resource resource) { - return parentSupport.getValueStream(graph, resource); - } - - }; - - } - - return super.getService(api); - - } - - @Override - public void addMetadata(Metadata data) throws ServiceException { - MetadataUtils.addMetadata(session, metadata, data); - } - - public void addCommitAndContinue() { - writeByte(COMMIT_AND_CONTINUE); - } - - @Override - public T getMetadata(Class clazz) throws ServiceException { - return MetadataUtils.getMetadata(session, metadata, clazz); - } - - @Override - public TreeMap getMetadata() { - return metadata; - } - @Override - public T syncRequest(WriteResult request) throws DatabaseException { - - return request.perform(this); - - } - - @Override - public VirtualGraph getProvider() { - return null; - } - - @Override - public void clearUndoList(WriteTraits writeTraits) { - WriteSupport ws = session.getService(WriteSupport.class); - if (null != ws) - ws.clearUndoList(writeTraits); - } - - @Override - public void markUndoPoint() { - // TODO Auto-generated method stub - - } - - @Override - public T getPossibleRelatedValue(final Resource subject, final Resource relation, final Binding binding) - throws ManyObjectsForFunctionalRelationException, BindingException, ServiceException { - if(!(subject instanceof InternalResource)) { - return super.getPossibleRelatedValue(subject, relation, binding); - } else { - return null; - } - } - - @Override - final public Resource getPossibleObject(final Resource subject, final Resource relation) - throws ManyObjectsForFunctionalRelationException, ServiceException { - if(!(subject instanceof InternalResource)) { - return super.getPossibleObject(subject, relation); - } else { - return null; - } - } - -} +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.db.impl.graph; + + +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; + +import org.eclipse.core.runtime.Platform; +import org.simantics.databoard.Bindings; +import org.simantics.databoard.accessor.Accessor; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.binding.error.BindingConstructionException; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.databoard.type.Datatype; +import org.simantics.databoard.util.binary.RandomAccessBinary; +import org.simantics.db.Metadata; +import org.simantics.db.ReadGraph; +import org.simantics.db.Resource; +import org.simantics.db.Session; +import org.simantics.db.Statement; +import org.simantics.db.VirtualGraph; +import org.simantics.db.WriteGraph; +import org.simantics.db.WriteOnlyGraph; +import org.simantics.db.common.MetadataUtils; +import org.simantics.db.common.request.WriteOnlyRequest; +import org.simantics.db.common.utils.Logger; +import org.simantics.db.exception.BindingException; +import org.simantics.db.exception.ClusterSetExistException; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.exception.ManyObjectsForFunctionalRelationException; +import org.simantics.db.exception.NoSingleResultException; +import org.simantics.db.exception.ResourceNotFoundException; +import org.simantics.db.exception.ServiceException; +import org.simantics.db.impl.DebugPolicy; +import org.simantics.db.impl.ResourceImpl; +import org.simantics.db.request.DelayedWrite; +import org.simantics.db.request.WriteOnly; +import org.simantics.db.request.WriteResult; +import org.simantics.db.request.WriteTraits; +import org.simantics.db.service.ByteReader; +import org.simantics.db.service.ClusteringSupport; +import org.simantics.db.service.TransferableGraphSupport; +import org.simantics.db.service.XSupport; +import org.simantics.layer0.Layer0; +import org.simantics.utils.datastructures.MapList; + +import gnu.trove.map.hash.TIntIntHashMap; +import gnu.trove.map.hash.TObjectIntHashMap; + +/** + * Write graph implementation that does not modify the database + * immediately but with an explicit commit method. All read operations + * return results based on the old graph. + */ +public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, ByteReader, Closeable { + + private static final boolean DEBUG = false; + private static final int BUFFER_SIZE = 512*1024; + + private static final PassthroughSerializerBinding PASSTHROUGH = new PassthroughSerializerBinding(); + + private final int TERM = 0; + private final int CLAIM = 1; + private final int CLAIM_NOINVERSE = 2; + private final int CLAIM_VALUE_B = 4; + private final int DENY = 5; + private final int DENY_VALUE = 6; + private final int COMMIT_AND_CONTINUE = 7; + + private static class State { + public File tempFile; + public FileOutputStream out; + public FileInputStream in; + public ArrayList idToResource = new ArrayList(); + public TIntIntHashMap externalToId = new TIntIntHashMap(); + public ArrayList idToBinding = new ArrayList(); + public TObjectIntHashMap bindingToId = new TObjectIntHashMap(); + public Set clusterSets = new HashSet(); + public Set clusterSetsForExistingResources = new HashSet(); + public int clusterCount = 0; + public long defaultCluster; + public Resource defaultClusterSet; + public int statementCount = 0; + public int valueCount = 0; + public int fileCount = 0; + } + + private State writeState; + private TreeMap metadata = new TreeMap(); + + private FileChannel channel; + private byte[] bytes = new byte[BUFFER_SIZE]; + private ByteBuffer bb = ByteBuffer.wrap(bytes); + private int byteIndex = 0; + + private Layer0 b; + private Session session; + + public static Resource convertDelayedResource(Resource r) { + if (r instanceof InternalResource) + return ((InternalResource) r).resource; + return r; + } + + private static class InternalResource implements Resource { + + int id; + long clusterId = 0; + Resource resource = null; + Resource clusterSet = null; + + public InternalResource(int id, long clusterId) { + this.id = id; + this.clusterId = clusterId; + } + + public InternalResource(int id, Resource clusterSet) { + this.id = id; + this.clusterSet = clusterSet; + } + + @Override + public long getResourceId() { + throw new UnsupportedOperationException(); + } + + @Override + public Resource get() { + return this; + } + + @Override + public boolean isPersistent() { + return false; + } + + @Override + public int compareTo(Resource o) { + if(o instanceof InternalResource) + return Integer.compare(id, ((InternalResource)o).id); + return -1; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + id; + return result; + } + + @Override + public int getThreadHash() { + return hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof InternalResource)) + return false; + InternalResource other = (InternalResource) obj; + if (id != other.id) + return false; + return true; + } + + @Override + public boolean equalsResource(Resource other) { + return equals(other); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(32); + if(DebugPolicy.VERBOSE) { + sb.append("[delayed id="); + sb.append(id); + sb.append("]"); + } else { + sb.append("[did="); + sb.append(id); + sb.append("]"); + } + return sb.toString(); + } + } + + private int getId(Resource resource) { + if(resource instanceof InternalResource) + return ((InternalResource)resource).id; + else { + ResourceImpl r = (ResourceImpl)resource; + int id = writeState.externalToId.get(r.id); + if(id != 0) { + return id; + } else { + id = writeState.idToResource.size(); + writeState.idToResource.add(resource); + writeState.externalToId.put(r.id, id); + return id; + } + } + } + + private Resource getResource(int id) { + return writeState.idToResource.get(id); + } + + private int getBindingId(Binding binding) { + if(writeState.bindingToId.contains(binding)) + return writeState.bindingToId.get(binding); + else { + int id = writeState.idToBinding.size(); + writeState.idToBinding.add(binding); + writeState.bindingToId.put(binding, id); + return id; + } + } + + public DelayedWriteGraph(ReadGraph g) throws IOException { + super((ReadGraphImpl)g); + writeState = new State(); + session = g.getSession(); + b = Layer0.getInstance(g); + writeState.defaultCluster = newCluster(); + } + + public DelayedWriteGraph(ReadGraph g, State state) { + super((ReadGraphImpl)g); + session = g.getSession(); + b = Layer0.getInstance(g); + this.writeState = state; + } + + public DelayedWriteGraph newSync() { + return new DelayedWriteGraph(this, writeState); + } + + @Override + public void claim(Resource subject, Resource predicate, Resource object) + throws ServiceException { + assert(subject != null); + assert(predicate != null); + assert(object != null); + + Resource inverse = getPossibleInverse(predicate); + claim(subject, predicate, inverse, object); + } + + @Override + public void addLiteral(Resource resource, Resource predicate, Resource inverse, Resource type, Object value, + Binding binding) throws BindingException, + ManyObjectsForFunctionalRelationException, ServiceException { + Resource valueResource = newResource(); + claimValue(valueResource, value, binding); + claim(valueResource, b.InstanceOf, null, type); + claim(resource, predicate, inverse, valueResource); + } + + @Override + public void addLiteral(Resource resource, Resource predicate, + Resource inverse, Object value, Binding binding) + throws BindingException, ManyObjectsForFunctionalRelationException, + ServiceException { + + Resource type = getType(value); + if(type == null) { + Resource literal = newResource(); + type = b.Literal; + Resource dataType = newResource(); + claim(dataType, b.InstanceOf, null, b.DataType); + claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL); + claim(literal, b.HasDataType, null, dataType); + claim(literal, b.InstanceOf, null, type); + claimValue(literal, value, binding); + claim(resource, predicate, inverse, literal); + } else { + addLiteral(resource, predicate, inverse, type, value, binding); + } + + } + + @Override + public T newLiteral(Resource resource, Resource predicate, Datatype datatype, Object initialValue) + throws DatabaseException { + + throw new UnsupportedOperationException(); + + } + + @Override + public RandomAccessBinary createRandomAccessBinary (Resource resource, Resource predicate, Datatype datatype, Object initialValue) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public RandomAccessBinary createRandomAccessBinary(Resource resource, Datatype datatype, Object initialValue) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public void claimLiteral(Resource resource, Resource predicate, Object value) throws ManyObjectsForFunctionalRelationException, ServiceException { + + try { + Binding b = Bindings.getBinding(value.getClass()); + claimLiteral(resource, predicate, value, b); + } catch (BindingConstructionException e) { + throw new IllegalArgumentException(e); + } catch (BindingException e) { + throw new IllegalArgumentException(e); + } + + } + + @Override + public void claimLiteral(Resource resource, Resource predicate, Object value, Binding binding) throws BindingException, + ManyObjectsForFunctionalRelationException, ServiceException { + + Statement valueStatement = null; + if(!(resource instanceof InternalResource)) valueStatement = getPossibleStatement(resource, predicate); + + if(valueStatement != null && resource.equals(valueStatement.getSubject())) { + + claimValue(valueStatement.getObject(), value, binding); + + } else { + + Resource type = getType(value); + Resource literal = newResource(); + if (type == null) { + type = b.Literal; + Resource dataType = newResource(); + claim(dataType, b.InstanceOf, null, b.DataType); + claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL); + claim(literal, b.HasDataType, dataType); + } + claim(literal, b.InstanceOf, null, type); + claimValue(literal, value, binding); + claim(resource, predicate, literal); + + } + + } + + @Override + public void claimLiteral(Resource resource, Resource predicate, + Resource inverse, Resource type, Object value) + throws BindingException, ManyObjectsForFunctionalRelationException, + ServiceException { + + try { + Binding b = Bindings.getBinding(value.getClass()); + claimLiteral(resource, predicate, inverse, type, value, b); + } catch (BindingConstructionException e) { + throw new IllegalArgumentException(e); + } catch (BindingException e) { + throw new IllegalArgumentException(e); + } + + } + + @Override + public void claimLiteral(Resource resource, Resource predicate, + Resource inverse, Resource type, Object value, Binding binding) + throws BindingException, ManyObjectsForFunctionalRelationException, + ServiceException { + + Statement valueStatement = (resource instanceof InternalResource) ? null : getPossibleStatement(resource, predicate); + + if(valueStatement != null && resource.equals(valueStatement.getSubject())) { + + claimValue(valueStatement.getObject(), value, binding); + + } else { + + Resource valueResource = newResource(); + claim(valueResource, b.InstanceOf, null, type); + claim(resource, predicate, inverse, valueResource); + claimValue(valueResource, value, binding); + + } + + } + + @Override + public void claimLiteral(Resource resource, Resource predicate, + Resource type, Object value) throws BindingException, + ManyObjectsForFunctionalRelationException, ServiceException { + + try { + Binding b = Bindings.getBinding(value.getClass()); + claimLiteral(resource, predicate, type, value, b); + } catch (BindingConstructionException e) { + throw new IllegalArgumentException(e); + } catch (BindingException e) { + throw new IllegalArgumentException(e); + } + + } + + @Override + public void claimLiteral(Resource resource, Resource predicate, + Resource type, Object value, Binding binding) + throws BindingException, ManyObjectsForFunctionalRelationException, + ServiceException { + + try { + Resource inverse = getSingleObject(predicate, b.InverseOf); + claimLiteral(resource, predicate, inverse, type, value, binding); + } catch (NoSingleResultException e) { + throw new ServiceException(e); + } + + } + + @Override + public void deny(Resource subject) throws ServiceException { + assert(subject != null); + if(!(subject instanceof InternalResource)) { + try { + for (Statement statement : getStatements(subject, b.IsWeaklyRelatedTo)) { + deny(statement); + } + } catch (ManyObjectsForFunctionalRelationException e) { + throw new ServiceException(e); + } + } + } + + @Override + public void deny(Resource subject, Resource predicate) + throws ServiceException { + assert(subject != null); + if(!(subject instanceof InternalResource)) { + for (Resource object : getObjects(subject, predicate)) { + deny(subject, predicate, object); + } + } + } + + @Override + public void deny(Resource subject, Resource predicate, Resource object) throws ServiceException { + denyStatement(subject, predicate, object); + } + + @Override + public void denyStatement(Resource subject, Resource predicate, Resource object) throws ServiceException { + deny(subject, predicate, getPossibleInverse(predicate), object); + } + + @Override + public void deny(Statement statement) throws ServiceException { + Resource predicate = statement.getPredicate(); + deny(statement.getSubject(), predicate, getPossibleInverse(predicate), statement.getObject()); + } + + @Override + public void denyValue(Resource resource, Resource predicate) + throws ManyObjectsForFunctionalRelationException, ServiceException { + assert(resource != null); + assert(predicate != null); + + if(!(resource instanceof InternalResource)) { + Statement valueStatement = getPossibleStatement(resource, predicate); + + if (valueStatement != null && !valueStatement.isAsserted(resource)) { + Resource value = valueStatement.getObject(); + denyValue(value); + } + } + } + + @Override + public Resource newResource() throws ServiceException { + if(writeState.defaultClusterSet != null) return newResource(writeState.defaultClusterSet); + else return newResource(writeState.defaultCluster); + } + + @Override + public Resource newResource(long clusterId) throws ServiceException { + int id = writeState.idToResource.size(); + InternalResource ret = new InternalResource(id, clusterId); + writeState.idToResource.add(ret); + return ret; + } + + @Override + public Resource newResource(Resource clusterSet) throws ServiceException { + + if ((clusterSet instanceof InternalResource)) { + if(!writeState.clusterSets.contains(clusterSet)) + throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet); + } else { + WriteSupport ws = session.getService(WriteSupport.class); + if (!ws.hasClusterSet(null, clusterSet)) + if(!writeState.clusterSetsForExistingResources.contains(clusterSet)) + throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet); + } + + int id = writeState.idToResource.size(); + InternalResource ret = new InternalResource(id, clusterSet); + writeState.idToResource.add(ret); + return ret; + } + + @Override + public void newClusterSet(Resource clusterSet) throws ServiceException { + if (DEBUG) + System.out.println("new cluster set=" + clusterSet); + boolean existingResource = !(clusterSet instanceof InternalResource); + if (existingResource) { + WriteSupport ws = session.getService(WriteSupport.class); + if (ws.hasClusterSet(null, clusterSet)) + throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet); + writeState.clusterSetsForExistingResources.add(clusterSet); + } else { + if(!writeState.clusterSets.add(clusterSet)) + throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet); + } + } + @Override + public Resource setClusterSet4NewResource(Resource clusterSet) + throws ServiceException { + Resource existing = writeState.defaultClusterSet; + writeState.defaultClusterSet = clusterSet; + return existing; + } + + @Override + public void claim(Resource subject, Resource predicate, Resource inverse, + Resource object) throws ServiceException { + assert(subject != null); + assert(predicate != null); + assert(object != null); + try { + if(inverse != null && !(subject.equals(object) && inverse.equals(predicate))) { + writeByte(CLAIM); + writeInt(getId(subject)); + writeInt(getId(predicate)); + writeInt(getId(inverse)); + writeInt(getId(object)); + } else { + writeByte(CLAIM_NOINVERSE); + writeInt(getId(subject)); + writeInt(getId(predicate)); + writeInt(getId(object)); + } + } catch(Exception e) { + throw new ServiceException(e); + } + } + + @Override + public void deny(Resource subject, Resource predicate, Resource inverse, + Resource object) throws ServiceException { + assert(subject != null); + assert(predicate != null); + assert(object != null); + try { + writeByte(DENY); + writeInt(getId(subject)); + writeInt(getId(predicate)); + if(inverse != null) writeInt(getId(inverse)); + else writeInt(0); + writeInt(getId(object)); + } catch(Exception e) { + throw new ServiceException(e); + } + } + + @Override + public void deny(Resource subject, Resource predicate, Resource inverse, + Resource object, VirtualGraph graph) throws ServiceException { + throw new UnsupportedOperationException(); + } + + @Override + public void claimValue(Resource resource, Object value) + throws ServiceException { + try { + Binding binding = Bindings.getBinding(value.getClass()); + claimValue(resource, value, binding); + } catch (BindingConstructionException e) { + throw new ServiceException(e); + } + } + + private OutputStream valueWriter = new OutputStream() { + @Override + public void write(int b) throws IOException { + writeByte(b); + } + }; + + @Override + public void claimValue(Resource resource, Object value, Binding binding) + throws ServiceException { + try { + + writeByte(CLAIM_VALUE_B); + writeInt(getId(resource)); + Serializer serializer = binding.serializer(); + int size = serializer.getSize(value); + writeInt(size); + serializer.serialize(valueWriter, value); + + } catch(IOException e) { + Logger.defaultLogError(e); + throw new ServiceException(e); + } + } + + @Override + public void denyValue(Resource resource) throws ServiceException { + writeByte(DENY_VALUE); + writeInt(getId(resource)); + } + + @Override + public void denyValue(Resource resource, VirtualGraph graph) throws ServiceException { + throw new UnsupportedOperationException(); + } + + @Override + public void flushCluster() throws ServiceException { + writeState.defaultCluster = newCluster(); + } + + @Override + public void flushCluster(Resource r) throws ServiceException { + throw new ServiceException("Operation flushCluster(" + r + " not implemented."); + } + + private void writeReset(int size) { + byteIndex = 0; + bb.position(0); + bb.limit(size); + try { + if (writeState.tempFile == null) { + File workspace = Platform.getLocation().toFile(); + File temp = new File(workspace, "tempFiles"); + File base = new File(temp, "delayed"); + Files.createDirectories(base.toPath()); + writeState.tempFile = new File(base, UUID.randomUUID().toString()); + writeState.out = new FileOutputStream(writeState.tempFile); + channel = writeState.out.getChannel(); + } + + for (int got=0;got < size;) { + int n = channel.write(bb); + if (n <= 0) { + Logger.defaultLogError(new Exception("FileChannel.write returned " + n)); + return; + } + got += n; + } + } catch (IOException e) { + Logger.defaultLogError("Failed to write buffer of " + size + " bytes to temporary file " + writeState.tempFile, e); + } + } + + private void reset() { + byteIndex = 0; + bb.clear(); + if (channel != null) { + try { + for(int got=0; got < BUFFER_SIZE;) { + int n = channel.read(bb); + if (n <= 0) + return; + got += n; + } + } catch (IOException e) { + Logger.defaultLogError("FileChannel.read failed", e); + } + } + } + + private void writeInt(int i) { + if(byteIndex < (BUFFER_SIZE-4)) { + bytes[byteIndex++] = (byte)(i&0xff); + bytes[byteIndex++] = (byte)((i>>>8)&0xff); + bytes[byteIndex++] = (byte)((i>>>16)&0xff); + bytes[byteIndex++] = (byte)((i>>>24)&0xff); + if (byteIndex == BUFFER_SIZE) + writeReset(BUFFER_SIZE); + } else { + int has = BUFFER_SIZE-byteIndex; + if(has == 0) writeReset(BUFFER_SIZE); + bytes[byteIndex++] = (byte)(i&0xff); + if(has == 1) writeReset(BUFFER_SIZE); + bytes[byteIndex++] = (byte)((i>>>8)&0xff); + if(has == 2) writeReset(BUFFER_SIZE); + bytes[byteIndex++] = (byte)((i>>>16)&0xff); + if(has == 3) writeReset(BUFFER_SIZE); + bytes[byteIndex++] = (byte)((i>>>24)&0xff); + if(has == 4) writeReset(BUFFER_SIZE); + } + } + + private int readInt() { + if(byteIndex < (BUFFER_SIZE-4)) { + int result = (int) + ((bytes[byteIndex++] & 0xff) | + ((bytes[byteIndex++] & 0xff)<<8) | + ((bytes[byteIndex++] & 0xff)<<16) | + ((bytes[byteIndex++] & 0xff)<<24)); + return result; + } else { + int has = BUFFER_SIZE-byteIndex; + int result = 0; + if(has == 0) reset(); + result = (int)(bytes[byteIndex++] & 0xff); + if(has == 1) reset(); + result |= (int)((bytes[byteIndex++] & 0xff) <<8); + if(has == 2) reset(); + result |= (int)((bytes[byteIndex++] & 0xff) <<16); + if(has == 3) reset(); + result |= (int)((bytes[byteIndex++] & 0xff) <<24); + if(has == 4) reset(); + return result; + } + } + + private byte readByte() { + byte result = bytes[byteIndex++]; + if(byteIndex == BUFFER_SIZE) reset(); + return result; + } + + private void writeByte(int b) { + bytes[byteIndex++] = (byte)b; + if(byteIndex == BUFFER_SIZE) writeReset(BUFFER_SIZE); + } + + private void writeBytes(byte[] data) { + int has = BUFFER_SIZE-byteIndex; + int amount = data.length; + if(has > amount) { + System.arraycopy(data, 0, bytes, byteIndex, amount); + byteIndex += amount; + } else { + System.arraycopy(data, 0, bytes, byteIndex, has); + writeReset(BUFFER_SIZE); + ByteBuffer bb2 = ByteBuffer.wrap(data); + bb2.position(has); + try { + channel.write(bb2); + } catch (IOException e) { + Logger.defaultLogError("FileChannel.write failed", e); + } + } + } + + public byte[] readBytes(byte[] result, int amount) { + if(result == null) result = new byte[amount]; + int has = BUFFER_SIZE-byteIndex; + if(has > amount) { + System.arraycopy(bytes, byteIndex, result, 0, amount); + byteIndex += amount; + } else { + System.arraycopy(bytes, byteIndex, result, 0, has); + ByteBuffer bb2 = ByteBuffer.wrap(result); + bb2.position(has); + for(int got=has;got 0) + writeReset(byteIndex); + + try (OutputStream out = writeState.out) { + channel.force(false); + } catch (IOException e) { + throw new ServiceException(e); + } finally { + writeState.out = null; + } + + try { + writeState.in = new FileInputStream(writeState.tempFile); + channel = writeState.in.getChannel(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + w.getMetadata().putAll(metadata); + + TransferableGraphSupport tgs = w.getService(TransferableGraphSupport.class); + + // First create all resources defined by clusterId + MapList clusterAssignment = new MapList(); + for(Resource r : writeState.idToResource) { + if(r instanceof InternalResource) { + InternalResource ir = (InternalResource)r; + if(ir.clusterId < 0) { + if (DEBUG) + System.out.println("ASSIGN CLUSTER " + ir + " => " + ir.clusterId); + clusterAssignment.add(ir.clusterId, ir); + } else if(ir.clusterId > 0) { + if (DEBUG) + System.out.println("-CREATED RESOURCE WITH EXISTING CLUSTER ID: " + ir); + ir.resource = w.newResource(ir.clusterId); + writeState.idToResource.set(ir.id, ir.resource); + if (writeState.clusterSets.contains(ir)) { + w.newClusterSet(ir.resource); + if (DEBUG) + System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource); + } + + } + } + } + + for(Long clusterKey : clusterAssignment.getKeys()) { + if (DEBUG) + System.out.println("CREATE LOGICAL CLUSTER: " + clusterKey); + w.flushCluster(); + for(InternalResource ir : clusterAssignment.getValuesUnsafe(clusterKey)) { + if (DEBUG) + System.out.println("-CREATED RESOURCE: " + ir); + ir.resource = w.newResource(); + writeState.idToResource.set(ir.id, ir.resource); + if (writeState.clusterSets.contains(ir)) { + w.newClusterSet(ir.resource); + if (DEBUG) + System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource); + } + } + } + + // Create cluster sets for all existing resources (not InternalResource) + // before proceeding to create resources. + for(Resource existingResource : writeState.clusterSetsForExistingResources) { + w.newClusterSet(existingResource); + if (DEBUG) + System.out.println("CREATED NEW CLUSTER SET: " + existingResource); + } + + // Then create all resources defined by cluster set + for(Resource r : writeState.idToResource) { + if(r instanceof InternalResource) { + InternalResource ir = (InternalResource)r; + Resource clusterSet = ir.clusterSet; + + if (clusterSet != null) { + if (DEBUG) + System.out.println("NEW RESOURCE " + ir + " for cluster set " + clusterSet); + if(clusterSet instanceof InternalResource) { + ir.resource = w.newResource(((InternalResource)clusterSet).resource); + } else { + ir.resource = w.newResource(clusterSet); + } + if (DEBUG) + System.out.println(" => " + ir.resource); + writeState.idToResource.set(ir.id, ir.resource); + if(writeState.clusterSets.contains(ir)) { + if (DEBUG) + System.out.println(" ==> NEW CLUSTER SET"); + w.newClusterSet(ir.resource); + } + } + + } + } + + reset(); + bb.limit(BUFFER_SIZE); + + try { + while(true) { + byte method = readByte(); + switch(method) { + case TERM: { + if (DEBUG) { + System.out.println("Resources: " + writeState.idToResource.size()); + System.out.println("Statements: " + writeState.statementCount); + System.out.println("Values: " + writeState.valueCount); + System.out.println("Files: " + writeState.fileCount); + System.out.println("Clusters: " + writeState.clusterCount); + } + return; + } + case CLAIM: { + writeState.statementCount += 2; + Resource subject = getResource(readInt()); + Resource predicate = getResource(readInt()); + Resource inverse = getResource(readInt()); + Resource object = getResource(readInt()); + w.claim(subject, predicate, inverse, object); + } break; + case CLAIM_NOINVERSE: { + ++writeState.statementCount; + Resource subject = getResource(readInt()); + Resource predicate = getResource(readInt()); + Resource object = getResource(readInt()); + w.claim(subject, predicate, null, object); + } break; + case DENY: { + Resource subject = getResource(readInt()); + Resource predicate = getResource(readInt()); + int inv = readInt(); + Resource inverse = null; + if(inv > 0) inverse = getResource(inv); + Resource object = getResource(readInt()); + if(!subject.isPersistent() || !object.isPersistent()) { + VirtualGraph statementProvider1 = processor.getProvider(subject, predicate, object); + if(inv > 0) { + VirtualGraph statementProvider2 = processor.getProvider(object, inverse, subject); + if(statementProvider2 != null) + w.deny(object, inverse, null, subject, statementProvider2); + } + if(statementProvider1 != null) + w.deny(subject, predicate, null, object, statementProvider1); + } else { + w.deny(subject, predicate, inverse, object, null); + } + } break; + case DENY_VALUE: { + Resource subject = getResource(readInt()); + if(!subject.isPersistent()) { + VirtualGraph provider = processor.getValueProvider(subject); + if(provider != null) + w.denyValue(subject, provider); + } else { + w.denyValue(subject); + } + } break; + case CLAIM_VALUE_B: { + ++writeState.valueCount; + Resource resource = getResource(readInt()); + int len = readInt(); + tgs.setValue(w, resource, null, this, len); + } break; + case COMMIT_AND_CONTINUE: { + XSupport xs = w.getService(XSupport.class); + xs.commitAndContinue(w, traits); + } break; + } + } + } catch(Exception e) { + if(e instanceof ServiceException) + throw (ServiceException)e; + else + throw new ServiceException(e); + } finally { + channel = null; + if (writeState.in != null) { + try (InputStream in = writeState.in) { + } catch (IOException e) { + throw new ServiceException(e); + } finally { + writeState.in = null; + writeState.tempFile.delete(); + } + } + } + } + + private Resource getType(Object value) { + Class clazz = value.getClass(); + Resource dataType = + clazz == Float.class ? b.Float + : clazz == Double.class ? b.Double + : clazz == Integer.class ? b.Integer + : clazz == String.class ? b.String + : clazz == Boolean.class ? b.Boolean + : clazz == Byte.class ? b.Byte + : clazz == Long.class ? b.Long + : clazz == float[].class ? b.FloatArray + : clazz == double[].class ? b.DoubleArray + : clazz == int[].class ? b.IntegerArray + : clazz == String[].class ? b.StringArray + : clazz == boolean[].class ? b.BooleanArray + : clazz == byte[].class ? b.ByteArray + : clazz == long[].class ? b.LongArray + : null + ; + return dataType; + } + + public long newCluster() { + return -1 - (++writeState.clusterCount); + } + + public long getDefaultCluster() { + return writeState.defaultCluster; + } + + public void setDefaultCluster(long cluster) { + writeState.defaultCluster = cluster; + } + + @Override + public void syncRequest(final DelayedWrite request) throws DatabaseException { + + try { + + final DelayedWriteGraph dwg = new DelayedWriteGraph(this); + request.perform(dwg); + + syncRequest(new WriteOnlyRequest() { + + @Override + public void perform(WriteOnlyGraph graph) throws DatabaseException { + dwg.commit(graph, request); + } + + }); + + } catch (DatabaseException e) { + + throw e; + + } catch (Throwable e) { + + throw new DatabaseException(e); + + } finally { + } + + } + + @Override + public void syncRequest(WriteOnly request) throws DatabaseException { + + Resource defaultClusterSet = setClusterSet4NewResource(null); + + try { + WriteSupport ws = session.getService(WriteSupport.class); + ws.performWriteRequest(this, request); + } catch (DatabaseException e) { + throw e; + } catch (Throwable t) { + throw new DatabaseException(t); + } finally { + setClusterSet4NewResource(defaultClusterSet); + } + + } + + @SuppressWarnings("unchecked") + @Override + public T getService(Class api) { + + if(ClusteringSupport.class == api) { + + final ClusteringSupport support = (ClusteringSupport)super.getService(api); + + return (T)new ClusteringSupport() { + + @Override + public Resource getResourceByIndexAndCluster(int resourceIndex, long clusterId) + throws DatabaseException, ResourceNotFoundException { + return support.getResourceByIndexAndCluster(resourceIndex, clusterId); + } + + @Override + public Resource getResourceByKey(int resourceKey) throws ResourceNotFoundException { + return support.getResourceByKey(resourceKey); + } + + @Override + public int getNumberOfResources(long clusterId) + throws DatabaseException { + return support.getNumberOfResources(clusterId); + } + + @Override + public long getCluster(Resource r) { + return support.getCluster(r); + } + + @Override + public long createCluster() { + return newCluster(); + } + + @Override + public boolean isClusterSet(Resource r) throws DatabaseException { + return support.isClusterSet(r); + } + + @Override + public Resource getClusterSetOfCluster(Resource r) throws DatabaseException { + return support.getClusterSetOfCluster(r); + } + + @Override + public Resource getClusterSetOfCluster(long cluster) throws DatabaseException { + return support.getClusterSetOfCluster(cluster); + } + + }; + + } else if (TransferableGraphSupport.class == api) { + + final TransferableGraphSupport parentSupport = session.getService(TransferableGraphSupport.class); + + return (T)new TransferableGraphSupport() { + + @Override + public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, byte[] raw) { + writeByte(CLAIM_VALUE_B); + writeInt(getId(resource)); + writeInt(raw.length); + writeBytes(raw); + writeInt(getBindingId(PASSTHROUGH)); + } + + @Override + public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, ByteReader reader, int amount) + throws DatabaseException { + writeByte(CLAIM_VALUE_B); + writeInt(getId(resource)); + writeInt(amount); + writeBytes(reader.readBytes(null, amount)); + writeInt(getBindingId(PASSTHROUGH)); + } + + @Override + public byte[] getValue(ReadGraph graph, Resource resource) { + return parentSupport.getValue(graph, resource); + } + + @Override + public InputStream getValueStream(ReadGraph graph, Resource resource) { + return parentSupport.getValueStream(graph, resource); + } + + }; + + } + + return super.getService(api); + + } + + @Override + public void addMetadata(Metadata data) throws ServiceException { + MetadataUtils.addMetadata(session, metadata, data); + } + + public void addCommitAndContinue() { + writeByte(COMMIT_AND_CONTINUE); + } + + @Override + public T getMetadata(Class clazz) throws ServiceException { + return MetadataUtils.getMetadata(session, metadata, clazz); + } + + @Override + public TreeMap getMetadata() { + return metadata; + } + @Override + public T syncRequest(WriteResult request) throws DatabaseException { + + return request.perform(this); + + } + + @Override + public VirtualGraph getProvider() { + return null; + } + + @Override + public void clearUndoList(WriteTraits writeTraits) { + WriteSupport ws = session.getService(WriteSupport.class); + if (null != ws) + ws.clearUndoList(writeTraits); + } + + @Override + public void markUndoPoint() { + // TODO Auto-generated method stub + + } + + @Override + public T getPossibleRelatedValue(final Resource subject, final Resource relation, final Binding binding) + throws ManyObjectsForFunctionalRelationException, BindingException, ServiceException { + if(!(subject instanceof InternalResource)) { + return super.getPossibleRelatedValue(subject, relation, binding); + } else { + return null; + } + } + + @Override + final public Resource getPossibleObject(final Resource subject, final Resource relation) + throws ManyObjectsForFunctionalRelationException, ServiceException { + if(!(subject instanceof InternalResource)) { + return super.getPossibleObject(subject, relation); + } else { + return null; + } + } + + public void close() { + if (writeState.out != null) { + try (OutputStream out = writeState.out) { + } catch (IOException e) { + Logger.defaultLogError("Failed to close delayed write graph temporary commit output stream", e); + } finally { + writeState.out = null; + } + } + if (writeState.in != null) { + try (InputStream in = writeState.in) { + } catch (IOException e) { + Logger.defaultLogError("Failed to close delayed write graph temporary commit input stream", e); + } finally { + writeState.in = null; + } + } + if (writeState.tempFile != null) { + writeState.tempFile.delete(); + writeState.tempFile = null; + } + } + +}