/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.impl.graph; import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.List; import java.util.Set; import java.util.TreeMap; import java.util.UUID; import org.eclipse.core.runtime.Platform; import org.simantics.databoard.Bindings; import org.simantics.databoard.accessor.Accessor; import org.simantics.databoard.accessor.reference.ChildReference; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.error.BindingConstructionException; import org.simantics.databoard.binding.impl.BindingPrintContext; import org.simantics.databoard.serialization.SerializationException; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.type.Datatype; import org.simantics.databoard.util.IdentityPair; import org.simantics.databoard.util.binary.RandomAccessBinary; import org.simantics.db.Metadata; import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.Statement; import org.simantics.db.VirtualGraph; import org.simantics.db.WriteGraph; import org.simantics.db.WriteOnlyGraph; import org.simantics.db.common.MetadataUtils; import org.simantics.db.common.request.WriteOnlyRequest; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.BindingException; import org.simantics.db.exception.ClusterSetExistException; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.ManyObjectsForFunctionalRelationException; import org.simantics.db.exception.NoSingleResultException; import org.simantics.db.exception.ResourceNotFoundException; import org.simantics.db.exception.ServiceException; import org.simantics.db.impl.DebugPolicy; import org.simantics.db.impl.ResourceImpl; import org.simantics.db.request.DelayedWrite; import org.simantics.db.request.WriteOnly; import org.simantics.db.request.WriteResult; import org.simantics.db.request.WriteTraits; import org.simantics.db.service.ByteReader; import org.simantics.db.service.ClusteringSupport; import org.simantics.db.service.TransferableGraphSupport; import org.simantics.db.service.XSupport; import org.simantics.layer0.Layer0; import org.simantics.utils.datastructures.MapList; import gnu.trove.list.array.TIntArrayList; import gnu.trove.map.hash.TIntIntHashMap; import gnu.trove.map.hash.TObjectIntHashMap; /** * Write graph implementation that does not modify the database * immediately but with an explicit commit method. All read operations * return results based on the old graph. */ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, ByteReader { private static final boolean DEBUG = false; public static int BUFFER = 65536; static class Haxx extends Binding { static final Serializer serializer = new Serializer() { public byte[] serialize(Object obj) throws SerializationException { return (byte[])obj; } @Override public void serialize(DataOutput out, TObjectIntHashMap identities, Object obj) throws IOException { throw new Error("Not supported."); } @Override public void serialize(DataOutput out, Object obj) throws IOException { throw new Error("Not supported."); } @Override public Object deserialize(DataInput in, List identities) throws IOException { throw new Error("Not supported."); } @Override public Object deserialize(DataInput in) throws IOException { throw new Error("Not supported."); } @Override public void deserializeTo(DataInput in, List identities, Object obj) throws IOException { throw new Error("Not supported."); } @Override public void deserializeTo(DataInput in, Object obj) throws IOException { throw new Error("Not supported."); } @Override public void skip(DataInput in, List identities) throws IOException { throw new Error("Not supported."); } @Override public void skip(DataInput in) throws IOException { throw new Error("Not supported."); } @Override public Integer getConstantSize() { throw new Error("Not supported."); } @Override public int getSize(Object obj, TObjectIntHashMap identities) throws IOException { throw new Error("Not supported."); } @Override public int getSize(Object obj) throws IOException { throw new Error("Not supported."); } @Override public int getMinSize() { throw new Error("Not supported."); } }; @Override public Serializer serializer() { return serializer; } @Override public void accept(Visitor1 v, Object obj) { throw new Error("Not supported."); } @Override public T accept(Visitor v) { throw new Error("Not supported."); } @Override public boolean isInstance(Object obj) { throw new Error("Not supported."); } @Override public void assertInstaceIsValid(Object obj, Set validInstances) throws org.simantics.databoard.binding.error.BindingException { throw new Error("Not supported."); } @Override public int deepHashValue(Object value, IdentityHashMap hashedObjects) throws org.simantics.databoard.binding.error.BindingException { throw new Error("Not supported."); } @Override public int deepCompare(Object o1, Object o2, Set> compareHistory) throws org.simantics.databoard.binding.error.BindingException { throw new Error("Not supported."); } @Override public void readFrom(Binding srcBinding, Object src, Object dst) throws org.simantics.databoard.binding.error.BindingException { throw new Error("Not supported."); } @Override public Object readFromTry(Binding srcBinding, Object src, Object dst) throws org.simantics.databoard.binding.error.BindingException { throw new Error("Not supported."); } @Override protected void toString(Object value, BindingPrintContext ctx) throws org.simantics.databoard.binding.error.BindingException { throw new Error("Not supported."); } @Override public int getComponentCount() { throw new Error("Not supported."); } @Override public Binding getComponentBinding(int index) { throw new Error("Not supported."); } @Override public Binding getComponentBinding(ChildReference path) { throw new Error("Not supported."); } } private static final Haxx haxx = new Haxx(); private final int TERM = 0; private final int CLAIM = 1; private final int CLAIM_NOINVERSE = 2; private final int CLAIM_VALUE_B = 4; private final int DENY = 5; private final int DENY_VALUE = 6; private final int COMMIT_AND_CONTINUE = 7; static class ClusterSet { public Resource resource; ClusterSet() { ids = new TIntArrayList(); old = false; } ClusterSet(boolean old, Resource r) { ids = new TIntArrayList(); this.old = old; this.resource = r; } void add(int id) { ids.add(id); } private TIntArrayList ids; private final boolean old; // true if boolean isNew() { return !old; } } public class State { public File tempFile; public FileOutputStream out; public ArrayList idToResource = new ArrayList(); public TIntIntHashMap externalToId = new TIntIntHashMap(); public ArrayList idToBinding = new ArrayList(); public TObjectIntHashMap bindingToId = new TObjectIntHashMap(); public Set clusterSets = new HashSet(); public Set clusterSetsForExistingResources = new HashSet(); public int clusterCount = 0; public long defaultCluster; public Resource defaultClusterSet; public int statementCount = 0; public int valueCount = 0; public int fileCount = 0; } public State writeState; public TreeMap metadata = new TreeMap(); Layer0 b; Session session; public static Resource convertDelayedResource(Resource r) { if (r instanceof InternalResource) { InternalResource ri = (InternalResource)r; return ri.resource; } return r; } private static class InternalResource implements Resource { int id; long clusterId = 0; Resource resource = null; Resource clusterSet = null; public InternalResource(int id, long clusterId) { this.id = id; this.clusterId = clusterId; } public InternalResource(int id, Resource clusterSet) { this.id = id; this.clusterSet = clusterSet; } @Override public long getResourceId() { throw new UnsupportedOperationException(); } @Override public Resource get() { return this; } @Override public boolean isPersistent() { return false; } @Override public int compareTo(Resource o) { if(o instanceof InternalResource) { return Integer.compare(id, ((InternalResource)o).id); } else { return -1; } } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + id; return result; } @Override public int getThreadHash() { return hashCode(); } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (!(obj instanceof InternalResource)) return false; InternalResource other = (InternalResource) obj; if (id != other.id) return false; return true; } @Override public boolean equalsResource(Resource other) { return equals(other); } @Override public String toString() { StringBuilder sb = new StringBuilder(32); if(DebugPolicy.VERBOSE) { sb.append("[delayed id="); sb.append(id); sb.append("]"); } else { sb.append("[did="); sb.append(id); sb.append("]"); } return sb.toString(); } } private int getId(Resource resource) { if(resource instanceof InternalResource) return ((InternalResource)resource).id; else { ResourceImpl r = (ResourceImpl)resource; int id = writeState.externalToId.get(r.id); if(id != 0) { return id; } else { id = writeState.idToResource.size(); writeState.idToResource.add(resource); writeState.externalToId.put(r.id, id); return id; } } } private Resource getResource(int id) { return writeState.idToResource.get(id); } private int getBindingId(Binding binding) { if(writeState.bindingToId.contains(binding)) return writeState.bindingToId.get(binding); else { int id = writeState.idToBinding.size(); writeState.idToBinding.add(binding); writeState.bindingToId.put(binding, id); return id; } } public DelayedWriteGraph(ReadGraph g) throws IOException { super((ReadGraphImpl)g); writeState = new State(); session = g.getSession(); b = Layer0.getInstance(g); writeState.defaultCluster = newCluster(); } public DelayedWriteGraph(ReadGraph g, State state) { super((ReadGraphImpl)g); session = g.getSession(); b = Layer0.getInstance(g); this.writeState = state; } public DelayedWriteGraph newSync() { return new DelayedWriteGraph(this, writeState); } @Override public void claim(Resource subject, Resource predicate, Resource object) throws ServiceException { assert(subject != null); assert(predicate != null); assert(object != null); Resource inverse = getPossibleInverse(predicate); claim(subject, predicate, inverse, object); } @Override public void addLiteral(Resource resource, Resource predicate, Resource inverse, Resource type, Object value, Binding binding) throws BindingException, ManyObjectsForFunctionalRelationException, ServiceException { Resource valueResource = newResource(); claimValue(valueResource, value, binding); claim(valueResource, b.InstanceOf, null, type); claim(resource, predicate, inverse, valueResource); } @Override public void addLiteral(Resource resource, Resource predicate, Resource inverse, Object value, Binding binding) throws BindingException, ManyObjectsForFunctionalRelationException, ServiceException { Resource type = getType(value); if(type == null) { Resource literal = newResource(); type = b.Literal; Resource dataType = newResource(); claim(dataType, b.InstanceOf, null, b.DataType); claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL); claim(literal, b.HasDataType, null, dataType); claim(literal, b.InstanceOf, null, type); claimValue(literal, value, binding); claim(resource, predicate, inverse, literal); } else { addLiteral(resource, predicate, inverse, type, value, binding); } } @Override public T newLiteral(Resource resource, Resource predicate, Datatype datatype, Object initialValue) throws DatabaseException { throw new UnsupportedOperationException(); } @Override public RandomAccessBinary createRandomAccessBinary (Resource resource, Resource predicate, Datatype datatype, Object initialValue) throws DatabaseException { throw new UnsupportedOperationException(); } @Override public RandomAccessBinary createRandomAccessBinary(Resource resource, Datatype datatype, Object initialValue) throws DatabaseException { throw new UnsupportedOperationException(); } @Override public void claimLiteral(Resource resource, Resource predicate, Object value) throws ManyObjectsForFunctionalRelationException, ServiceException { try { Binding b = Bindings.getBinding(value.getClass()); claimLiteral(resource, predicate, value, b); } catch (BindingConstructionException e) { throw new IllegalArgumentException(e); } catch (BindingException e) { throw new IllegalArgumentException(e); } } @Override public void claimLiteral(Resource resource, Resource predicate, Object value, Binding binding) throws BindingException, ManyObjectsForFunctionalRelationException, ServiceException { Statement valueStatement = null; if(!(resource instanceof InternalResource)) valueStatement = getPossibleStatement(resource, predicate); if(valueStatement != null && resource.equals(valueStatement.getSubject())) { claimValue(valueStatement.getObject(), value, binding); } else { Resource type = getType(value); Resource literal = newResource(); if (type == null) { type = b.Literal; Resource dataType = newResource(); claim(dataType, b.InstanceOf, null, b.DataType); claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL); claim(literal, b.HasDataType, dataType); } claim(literal, b.InstanceOf, null, type); claimValue(literal, value, binding); claim(resource, predicate, literal); } } @Override public void claimLiteral(Resource resource, Resource predicate, Resource inverse, Resource type, Object value) throws BindingException, ManyObjectsForFunctionalRelationException, ServiceException { try { Binding b = Bindings.getBinding(value.getClass()); claimLiteral(resource, predicate, inverse, type, value, b); } catch (BindingConstructionException e) { throw new IllegalArgumentException(e); } catch (BindingException e) { throw new IllegalArgumentException(e); } } @Override public void claimLiteral(Resource resource, Resource predicate, Resource inverse, Resource type, Object value, Binding binding) throws BindingException, ManyObjectsForFunctionalRelationException, ServiceException { Statement valueStatement = (resource instanceof InternalResource) ? null : getPossibleStatement(resource, predicate); if(valueStatement != null && resource.equals(valueStatement.getSubject())) { claimValue(valueStatement.getObject(), value, binding); } else { Resource valueResource = newResource(); claim(valueResource, b.InstanceOf, null, type); claim(resource, predicate, inverse, valueResource); claimValue(valueResource, value, binding); } } @Override public void claimLiteral(Resource resource, Resource predicate, Resource type, Object value) throws BindingException, ManyObjectsForFunctionalRelationException, ServiceException { try { Binding b = Bindings.getBinding(value.getClass()); claimLiteral(resource, predicate, type, value, b); } catch (BindingConstructionException e) { throw new IllegalArgumentException(e); } catch (BindingException e) { throw new IllegalArgumentException(e); } } @Override public void claimLiteral(Resource resource, Resource predicate, Resource type, Object value, Binding binding) throws BindingException, ManyObjectsForFunctionalRelationException, ServiceException { try { Resource inverse = getSingleObject(predicate, b.InverseOf); claimLiteral(resource, predicate, inverse, type, value, binding); } catch (NoSingleResultException e) { throw new ServiceException(e); } } @Override public void deny(Resource subject) throws ServiceException { assert(subject != null); if(!(subject instanceof InternalResource)) { try { for (Statement statement : getStatements(subject, b.IsWeaklyRelatedTo)) { deny(statement); } } catch (ManyObjectsForFunctionalRelationException e) { throw new ServiceException(e); } } } @Override public void deny(Resource subject, Resource predicate) throws ServiceException { assert(subject != null); if(!(subject instanceof InternalResource)) { for (Resource object : getObjects(subject, predicate)) { deny(subject, predicate, object); } } } @Override public void deny(Resource subject, Resource predicate, Resource object) throws ServiceException { denyStatement(subject, predicate, object); } @Override public void denyStatement(Resource subject, Resource predicate, Resource object) throws ServiceException { deny(subject, predicate, getPossibleInverse(predicate), object); } @Override public void deny(Statement statement) throws ServiceException { Resource predicate = statement.getPredicate(); deny(statement.getSubject(), predicate, getPossibleInverse(predicate), statement.getObject()); } @Override public void denyValue(Resource resource, Resource predicate) throws ManyObjectsForFunctionalRelationException, ServiceException { assert(resource != null); assert(predicate != null); if(!(resource instanceof InternalResource)) { Statement valueStatement = getPossibleStatement(resource, predicate); if (valueStatement != null && !valueStatement.isAsserted(resource)) { Resource value = valueStatement.getObject(); denyValue(value); } } } @Override public Resource newResource() throws ServiceException { if(writeState.defaultClusterSet != null) return newResource(writeState.defaultClusterSet); else return newResource(writeState.defaultCluster); } @Override public Resource newResource(long clusterId) throws ServiceException { int id = writeState.idToResource.size(); InternalResource ret = new InternalResource(id, clusterId); writeState.idToResource.add(ret); return ret; } @Override public Resource newResource(Resource clusterSet) throws ServiceException { if ((clusterSet instanceof InternalResource)) { if(!writeState.clusterSets.contains(clusterSet)) throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet); } else { WriteSupport ws = session.getService(WriteSupport.class); if (!ws.hasClusterSet(null, clusterSet)) if(!writeState.clusterSetsForExistingResources.contains(clusterSet)) throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet); } int id = writeState.idToResource.size(); InternalResource ret = new InternalResource(id, clusterSet); writeState.idToResource.add(ret); return ret; } @Override public void newClusterSet(Resource clusterSet) throws ServiceException { if (DEBUG) System.out.println("new cluster set=" + clusterSet); boolean existingResource = !(clusterSet instanceof InternalResource); if (existingResource) { WriteSupport ws = session.getService(WriteSupport.class); if (ws.hasClusterSet(null, clusterSet)) throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet); writeState.clusterSetsForExistingResources.add(clusterSet); } else { if(!writeState.clusterSets.add(clusterSet)) throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet); } } @Override public Resource setClusterSet4NewResource(Resource clusterSet) throws ServiceException { Resource existing = writeState.defaultClusterSet; writeState.defaultClusterSet = clusterSet; return existing; } @Override public void claim(Resource subject, Resource predicate, Resource inverse, Resource object) throws ServiceException { assert(subject != null); assert(predicate != null); assert(object != null); try { if(inverse != null && !(subject.equals(object) && inverse.equals(predicate))) { writeByte(CLAIM); writeInt(getId(subject)); writeInt(getId(predicate)); writeInt(getId(inverse)); writeInt(getId(object)); } else { writeByte(CLAIM_NOINVERSE); writeInt(getId(subject)); writeInt(getId(predicate)); writeInt(getId(object)); } } catch(Exception e) { throw new ServiceException(e); } } @Override public void deny(Resource subject, Resource predicate, Resource inverse, Resource object) throws ServiceException { assert(subject != null); assert(predicate != null); assert(object != null); try { writeByte(DENY); writeInt(getId(subject)); writeInt(getId(predicate)); if(inverse != null) writeInt(getId(inverse)); else writeInt(0); writeInt(getId(object)); } catch(Exception e) { throw new ServiceException(e); } } @Override public void deny(Resource subject, Resource predicate, Resource inverse, Resource object, VirtualGraph graph) throws ServiceException { throw new UnsupportedOperationException(); } @Override public void claimValue(Resource resource, Object value) throws ServiceException { try { Binding binding = Bindings.getBinding(value.getClass()); claimValue(resource, value, binding); } catch (BindingConstructionException e) { throw new ServiceException(e); } } @Override public void claimValue(Resource resource, Object value, Binding binding) throws ServiceException { try { writeByte(CLAIM_VALUE_B); writeInt(getId(resource)); Serializer serializer = binding.serializer(); int size = serializer.getSize(value); writeInt(size); serializer.serialize(new OutputStream() { @Override public void write(int b) throws IOException { writeByte(b); } }, value); } catch(IOException e) { Logger.defaultLogError(e); throw new ServiceException(e); } } @Override public void denyValue(Resource resource) throws ServiceException { writeByte(DENY_VALUE); writeInt(getId(resource)); } @Override public void denyValue(Resource resource, VirtualGraph graph) throws ServiceException { throw new UnsupportedOperationException(); } @Override public void flushCluster() throws ServiceException { writeState.defaultCluster = newCluster(); } @Override public void flushCluster(Resource r) throws ServiceException { throw new ServiceException("Operation flushCluster(" + r + " not implemented."); } private FileChannel channel; byte[] bytes = new byte[BUFFER]; byte[] buffer = new byte[BUFFER]; ByteBuffer bb = ByteBuffer.wrap(bytes); int byteIndex = 0; private void writeReset(int size) { byteIndex = 0; bb.position(0); bb.limit(size); try { if(writeState.tempFile == null) { File workspace = Platform.getLocation().toFile(); File temp = new File(workspace, "tempFiles"); temp.mkdirs(); File base = new File(temp, "delayed"); base.mkdirs(); writeState.tempFile = new File(base, UUID.randomUUID().toString()); writeState.out = new FileOutputStream(writeState.tempFile); channel = writeState.out.getChannel(); } for (int got=0;got < size;) { int n = channel.write(bb); if (n <= 0) { new Exception().printStackTrace(); return; } got += n; } } catch (IOException e) { e.printStackTrace(); } } private void reset() { byteIndex = 0; try { bb.clear(); for(int got=0; got < BUFFER;) { int n = channel.read(bb); if (n <= 0) return; got += n; } } catch (IOException e) { e.printStackTrace(); } } private void writeInt(int i) { if(byteIndex < (BUFFER-4)) { bytes[byteIndex++] = (byte)(i&0xff); bytes[byteIndex++] = (byte)((i>>>8)&0xff); bytes[byteIndex++] = (byte)((i>>>16)&0xff); bytes[byteIndex++] = (byte)((i>>>24)&0xff); if (byteIndex == BUFFER) writeReset(BUFFER); } else { int has = BUFFER-byteIndex; if(has == 0) writeReset(BUFFER); bytes[byteIndex++] = (byte)(i&0xff); if(has == 1) writeReset(BUFFER); bytes[byteIndex++] = (byte)((i>>>8)&0xff); if(has == 2) writeReset(BUFFER); bytes[byteIndex++] = (byte)((i>>>16)&0xff); if(has == 3) writeReset(BUFFER); bytes[byteIndex++] = (byte)((i>>>24)&0xff); if(has == 4) writeReset(BUFFER); } } private int readInt() { if(byteIndex < (BUFFER-4)) { int result = (int) ((bytes[byteIndex++] & 0xff) | ((bytes[byteIndex++] & 0xff)<<8) | ((bytes[byteIndex++] & 0xff)<<16) | ((bytes[byteIndex++] & 0xff)<<24)); return result; } else { int has = BUFFER-byteIndex; int result = 0; if(has == 0) reset(); result = (int)(bytes[byteIndex++] & 0xff); if(has == 1) reset(); result |= (int)((bytes[byteIndex++] & 0xff) <<8); if(has == 2) reset(); result |= (int)((bytes[byteIndex++] & 0xff) <<16); if(has == 3) reset(); result |= (int)((bytes[byteIndex++] & 0xff) <<24); if(has == 4) reset(); return result; } } private byte readByte() { byte result = bytes[byteIndex++]; if(byteIndex == BUFFER) reset(); return result; } private void writeByte(int b) { bytes[byteIndex++] = (byte)b; if(byteIndex == BUFFER) writeReset(BUFFER); } private void writeBytes(byte[] data) { int has = BUFFER-byteIndex; int amount = data.length; if(has > amount) { System.arraycopy(data, 0, bytes, byteIndex, amount); byteIndex += amount; } else { System.arraycopy(data, 0, bytes, byteIndex, has); writeReset(BUFFER); ByteBuffer bb2 = ByteBuffer.wrap(data); bb2.position(has); try { channel.write(bb2); } catch (IOException e) { e.printStackTrace(); } } } public byte[] readBytes(int amount) { return readBytes(buffer, amount); } public byte[] readBytes(byte[] result, int amount) { if(result == null) result = new byte[amount]; int has = BUFFER-byteIndex; if(has > amount) { System.arraycopy(bytes, byteIndex, result, 0, amount); byteIndex += amount; } else { System.arraycopy(bytes, byteIndex, result, 0, has); ByteBuffer bb2 = ByteBuffer.wrap(result); bb2.position(has); for(int got=has;got 0) { writeReset(byteIndex); } channel.force(false); writeState.out.close(); FileInputStream fs = new FileInputStream(writeState.tempFile); channel = fs.getChannel(); } catch (IOException e) { throw new ServiceException(e); } w.getMetadata().putAll(metadata); TransferableGraphSupport tgs = w.getService(TransferableGraphSupport.class); // First create all resources defined by clusterId MapList clusterAssignment = new MapList(); for(Resource r : writeState.idToResource) { if(r instanceof InternalResource) { InternalResource ir = (InternalResource)r; if(ir.clusterId < 0) { if (DEBUG) System.out.println("ASSIGN CLUSTER " + ir + " => " + ir.clusterId); clusterAssignment.add(ir.clusterId, ir); } else if(ir.clusterId > 0) { if (DEBUG) System.out.println("-CREATED RESOURCE WITH EXISTING CLUSTER ID: " + ir); ir.resource = w.newResource(ir.clusterId); writeState.idToResource.set(ir.id, ir.resource); if (writeState.clusterSets.contains(ir)) { w.newClusterSet(ir.resource); if (DEBUG) System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource); } } } } for(Long clusterKey : clusterAssignment.getKeys()) { if (DEBUG) System.out.println("CREATE LOGICAL CLUSTER: " + clusterKey); w.flushCluster(); for(InternalResource ir : clusterAssignment.getValuesUnsafe(clusterKey)) { if (DEBUG) System.out.println("-CREATED RESOURCE: " + ir); ir.resource = w.newResource(); writeState.idToResource.set(ir.id, ir.resource); if (writeState.clusterSets.contains(ir)) { w.newClusterSet(ir.resource); if (DEBUG) System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource); } } } // Create cluster sets for all existing resources (not InternalResource) // before proceeding to create resources. for(Resource existingResource : writeState.clusterSetsForExistingResources) { w.newClusterSet(existingResource); if (DEBUG) System.out.println("CREATED NEW CLUSTER SET: " + existingResource); } // Then create all resources defined by cluster set for(Resource r : writeState.idToResource) { if(r instanceof InternalResource) { InternalResource ir = (InternalResource)r; Resource clusterSet = ir.clusterSet; if (clusterSet != null) { if (DEBUG) System.out.println("NEW RESOURCE " + ir + " for cluster set " + clusterSet); if(clusterSet instanceof InternalResource) { ir.resource = w.newResource(((InternalResource)clusterSet).resource); } else { ir.resource = w.newResource(clusterSet); } if (DEBUG) System.out.println(" => " + ir.resource); writeState.idToResource.set(ir.id, ir.resource); if(writeState.clusterSets.contains(ir)) { if (DEBUG) System.out.println(" ==> NEW CLUSTER SET"); w.newClusterSet(ir.resource); } } } } reset(); bb.limit(BUFFER); try { while(true) { byte method = readByte(); switch(method) { case TERM: { return; } case CLAIM: { writeState.statementCount += 2; Resource subject = getResource(readInt()); Resource predicate = getResource(readInt()); Resource inverse = getResource(readInt()); Resource object = getResource(readInt()); w.claim(subject, predicate, inverse, object); } break; case CLAIM_NOINVERSE: { ++writeState.statementCount; Resource subject = getResource(readInt()); Resource predicate = getResource(readInt()); Resource object = getResource(readInt()); w.claim(subject, predicate, null, object); } break; case DENY: { Resource subject = getResource(readInt()); Resource predicate = getResource(readInt()); int inv = readInt(); Resource inverse = null; if(inv > 0) inverse = getResource(inv); Resource object = getResource(readInt()); if(!subject.isPersistent() || !object.isPersistent()) { VirtualGraph statementProvider1 = processor.getProvider(subject, predicate, object); if(inv > 0) { VirtualGraph statementProvider2 = processor.getProvider(object, inverse, subject); if(statementProvider2 != null) w.deny(object, inverse, null, subject, statementProvider2); } if(statementProvider1 != null) w.deny(subject, predicate, null, object, statementProvider1); } else { w.deny(subject, predicate, inverse, object, null); } } break; case DENY_VALUE: { Resource subject = getResource(readInt()); if(!subject.isPersistent()) { VirtualGraph provider = processor.getValueProvider(subject); if(provider != null) w.denyValue(subject, provider); } else { w.denyValue(subject); } } break; case CLAIM_VALUE_B: { ++writeState.valueCount; Resource resource = getResource(readInt()); int len = readInt(); tgs.setValue(w, resource, null, this, len); // byte[] bytes = readBytes(len); // tgs.setValue(resource, null, bytes); } break; case COMMIT_AND_CONTINUE: { XSupport xs = w.getService(XSupport.class); xs.commitAndContinue(w, traits); } break; } } } catch(Exception e) { if(e instanceof ServiceException) throw (ServiceException)e; else throw new ServiceException(e); } finally { try { channel.close(); channel = null; } catch (IOException e) { throw new ServiceException(e); } } // System.out.println("Resources: " + state.resourceCount); // System.out.println("Statements: " + state.statementCount); // System.out.println("Values: " + state.valueCount); } private Resource getType(Object value) { Class clazz = value.getClass(); Resource dataType = clazz == Float.class ? b.Float : clazz == Double.class ? b.Double : clazz == Integer.class ? b.Integer : clazz == String.class ? b.String : clazz == Boolean.class ? b.Boolean : clazz == Byte.class ? b.Byte : clazz == Long.class ? b.Long : clazz == float[].class ? b.FloatArray : clazz == double[].class ? b.DoubleArray : clazz == int[].class ? b.IntegerArray : clazz == String[].class ? b.StringArray : clazz == boolean[].class ? b.BooleanArray : clazz == byte[].class ? b.ByteArray : clazz == long[].class ? b.LongArray : null ; return dataType; } public long newCluster() { return -1 - (++writeState.clusterCount); } public long getDefaultCluster() { return writeState.defaultCluster; } public void setDefaultCluster(long cluster) { writeState.defaultCluster = cluster; } @Override public void syncRequest(final DelayedWrite request) throws DatabaseException { try { final DelayedWriteGraph dwg = new DelayedWriteGraph(this); request.perform(dwg); syncRequest(new WriteOnlyRequest() { @Override public void perform(WriteOnlyGraph graph) throws DatabaseException { dwg.commit(graph, request); } }); } catch (DatabaseException e) { throw e; } catch (Throwable e) { throw new DatabaseException(e); } finally { } } @Override public void syncRequest(WriteOnly request) throws DatabaseException { Resource defaultClusterSet = setClusterSet4NewResource(null); try { WriteSupport ws = session.getService(WriteSupport.class); ws.performWriteRequest(this, request); } catch (DatabaseException e) { throw e; } catch (Throwable t) { throw new DatabaseException(t); } finally { setClusterSet4NewResource(defaultClusterSet); } } @SuppressWarnings("unchecked") @Override public T getService(Class api) { if(ClusteringSupport.class == api) { final ClusteringSupport support = (ClusteringSupport)super.getService(api); return (T)new ClusteringSupport() { @Override public Resource getResourceByIndexAndCluster(int resourceIndex, long clusterId) throws DatabaseException, ResourceNotFoundException { return support.getResourceByIndexAndCluster(resourceIndex, clusterId); } @Override public Resource getResourceByKey(int resourceKey) throws ResourceNotFoundException { return support.getResourceByKey(resourceKey); } @Override public int getNumberOfResources(long clusterId) throws DatabaseException { return support.getNumberOfResources(clusterId); } @Override public long getCluster(Resource r) { return support.getCluster(r); } @Override public long createCluster() { return newCluster(); } @Override public boolean isClusterSet(Resource r) throws DatabaseException { return support.isClusterSet(r); } @Override public Resource getClusterSetOfCluster(Resource r) throws DatabaseException { return support.getClusterSetOfCluster(r); } @Override public Resource getClusterSetOfCluster(long cluster) throws DatabaseException { return support.getClusterSetOfCluster(cluster); } }; } else if (TransferableGraphSupport.class == api) { final TransferableGraphSupport parentSupport = session.getService(TransferableGraphSupport.class); return (T)new TransferableGraphSupport() { @Override public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, byte[] raw) { writeByte(CLAIM_VALUE_B); writeInt(getId(resource)); writeInt(raw.length); writeBytes(raw); writeInt(getBindingId(haxx)); } @Override public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, ByteReader reader, int amount) throws DatabaseException { writeByte(CLAIM_VALUE_B); writeInt(getId(resource)); writeInt(amount); writeBytes(reader.readBytes(null, amount)); writeInt(getBindingId(haxx)); } @Override public byte[] getValue(ReadGraph graph, Resource resource) { return parentSupport.getValue(graph, resource); } @Override public InputStream getValueStream(ReadGraph graph, Resource resource) { return parentSupport.getValueStream(graph, resource); } }; } return super.getService(api); } @Override public void addMetadata(Metadata data) throws ServiceException { MetadataUtils.addMetadata(session, metadata, data); } public void addCommitAndContinue() { writeByte(COMMIT_AND_CONTINUE); } @Override public T getMetadata(Class clazz) throws ServiceException { return MetadataUtils.getMetadata(session, metadata, clazz); } @Override public TreeMap getMetadata() { return metadata; } @Override public T syncRequest(WriteResult request) throws DatabaseException { return request.perform(this); } @Override public VirtualGraph getProvider() { return null; } @Override public void clearUndoList(WriteTraits writeTraits) { WriteSupport ws = session.getService(WriteSupport.class); if (null != ws) ws.clearUndoList(writeTraits); } @Override public void markUndoPoint() { // TODO Auto-generated method stub } @Override public T getPossibleRelatedValue(final Resource subject, final Resource relation, final Binding binding) throws ManyObjectsForFunctionalRelationException, BindingException, ServiceException { if(!(subject instanceof InternalResource)) { return super.getPossibleRelatedValue(subject, relation, binding); } else { return null; } } @Override final public Resource getPossibleObject(final Resource subject, final Resource relation) throws ManyObjectsForFunctionalRelationException, ServiceException { if(!(subject instanceof InternalResource)) { return super.getPossibleObject(subject, relation); } else { return null; } } }