package org.simantics.db.impl.graph;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import org.eclipse.core.runtime.Platform;
import org.simantics.databoard.Bindings;
import org.simantics.databoard.accessor.Accessor;
-import org.simantics.databoard.accessor.reference.ChildReference;
import org.simantics.databoard.binding.Binding;
import org.simantics.databoard.binding.error.BindingConstructionException;
-import org.simantics.databoard.binding.impl.BindingPrintContext;
-import org.simantics.databoard.serialization.SerializationException;
import org.simantics.databoard.serialization.Serializer;
import org.simantics.databoard.type.Datatype;
-import org.simantics.databoard.util.IdentityPair;
import org.simantics.databoard.util.binary.RandomAccessBinary;
import org.simantics.db.Metadata;
import org.simantics.db.ReadGraph;
import org.simantics.layer0.Layer0;
import org.simantics.utils.datastructures.MapList;
-import gnu.trove.list.array.TIntArrayList;
import gnu.trove.map.hash.TIntIntHashMap;
import gnu.trove.map.hash.TObjectIntHashMap;
* immediately but with an explicit commit method. All read operations
* return results based on the old graph.
*/
-public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, ByteReader {
-
- private static final boolean DEBUG = false;
- public static int BUFFER = 65536;
-
- static class Haxx extends Binding {
-
- static final Serializer serializer = new Serializer() {
-
- public byte[] serialize(Object obj) throws SerializationException {
- return (byte[])obj;
- }
-
- @Override
- public void serialize(DataOutput out,
- TObjectIntHashMap<Object> identities, Object obj)
- throws IOException {
- throw new Error("Not supported.");
- }
-
- @Override
- public void serialize(DataOutput out, Object obj)
- throws IOException {
- throw new Error("Not supported.");
- }
-
- @Override
- public Object deserialize(DataInput in, List<Object> identities)
- throws IOException {
- throw new Error("Not supported.");
- }
-
- @Override
- public Object deserialize(DataInput in) throws IOException {
- throw new Error("Not supported.");
- }
-
- @Override
- public void deserializeTo(DataInput in, List<Object> identities,
- Object obj) throws IOException {
- throw new Error("Not supported.");
- }
-
- @Override
- public void deserializeTo(DataInput in, Object obj)
- throws IOException {
- throw new Error("Not supported.");
- }
-
- @Override
- public void skip(DataInput in, List<Object> identities)
- throws IOException {
- throw new Error("Not supported.");
- }
-
- @Override
- public void skip(DataInput in) throws IOException {
- throw new Error("Not supported.");
- }
-
- @Override
- public Integer getConstantSize() {
- throw new Error("Not supported.");
- }
-
- @Override
- public int getSize(Object obj, TObjectIntHashMap<Object> identities)
- throws IOException {
- throw new Error("Not supported.");
- }
-
- @Override
- public int getSize(Object obj) throws IOException {
- throw new Error("Not supported.");
- }
-
- @Override
- public int getMinSize() {
- throw new Error("Not supported.");
- }
-
- };
-
- @Override
- public Serializer serializer() {
- return serializer;
- }
-
- @Override
- public void accept(Visitor1 v, Object obj) {
- throw new Error("Not supported.");
- }
-
- @Override
- public <T> T accept(Visitor<T> v) {
- throw new Error("Not supported.");
- }
-
- @Override
- public boolean isInstance(Object obj) {
- throw new Error("Not supported.");
- }
-
- @Override
- public void assertInstaceIsValid(Object obj, Set<Object> validInstances)
- throws org.simantics.databoard.binding.error.BindingException {
- throw new Error("Not supported.");
- }
-
- @Override
- public int deepHashValue(Object value,
- IdentityHashMap<Object, Object> hashedObjects)
- throws org.simantics.databoard.binding.error.BindingException {
- throw new Error("Not supported.");
- }
-
- @Override
- public int deepCompare(Object o1, Object o2,
- Set<IdentityPair<Object, Object>> compareHistory)
- throws org.simantics.databoard.binding.error.BindingException {
- throw new Error("Not supported.");
- }
-
- @Override
- public void readFrom(Binding srcBinding, Object src, Object dst)
- throws org.simantics.databoard.binding.error.BindingException {
- throw new Error("Not supported.");
- }
-
- @Override
- public Object readFromTry(Binding srcBinding, Object src, Object dst)
- throws org.simantics.databoard.binding.error.BindingException {
- throw new Error("Not supported.");
- }
-
- @Override
- protected void toString(Object value, BindingPrintContext ctx) throws org.simantics.databoard.binding.error.BindingException {
- throw new Error("Not supported.");
- }
-
- @Override
- public int getComponentCount() {
- throw new Error("Not supported.");
- }
+public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, ByteReader, Closeable {
- @Override
- public Binding getComponentBinding(int index) {
- throw new Error("Not supported.");
- }
+ private static final boolean DEBUG = false;
+ private static final int BUFFER_SIZE = 512*1024;
- @Override
- public Binding getComponentBinding(ChildReference path) {
- throw new Error("Not supported.");
- }
-
- }
-
- private static final Haxx haxx = new Haxx();
+ private static final PassthroughSerializerBinding PASSTHROUGH = new PassthroughSerializerBinding();
private final int TERM = 0;
private final int CLAIM = 1;
private final int DENY = 5;
private final int DENY_VALUE = 6;
private final int COMMIT_AND_CONTINUE = 7;
-
- static class ClusterSet {
- public Resource resource;
- ClusterSet() {
- ids = new TIntArrayList();
- old = false;
- }
- ClusterSet(boolean old, Resource r) {
- ids = new TIntArrayList();
- this.old = old;
- this.resource = r;
- }
- void add(int id) {
- ids.add(id);
- }
- private TIntArrayList ids;
- private final boolean old; // true if
- boolean isNew() {
- return !old;
- }
- }
- public class State {
+
+ private static class State {
public File tempFile;
public FileOutputStream out;
+ public FileInputStream in;
public ArrayList<Resource> idToResource = new ArrayList<Resource>();
public TIntIntHashMap externalToId = new TIntIntHashMap();
public ArrayList<Binding> idToBinding = new ArrayList<Binding>();
public int valueCount = 0;
public int fileCount = 0;
}
-
- public State writeState;
- public TreeMap<String,byte[]> metadata = new TreeMap<String,byte[]>();
-
- Layer0 b;
- Session session;
+
+ private State writeState;
+ private TreeMap<String,byte[]> metadata = new TreeMap<String,byte[]>();
+
+ private FileChannel channel;
+ private byte[] bytes = new byte[BUFFER_SIZE];
+ private ByteBuffer bb = ByteBuffer.wrap(bytes);
+ private int byteIndex = 0;
+
+ private Layer0 b;
+ private Session session;
+
public static Resource convertDelayedResource(Resource r) {
- if (r instanceof InternalResource) {
- InternalResource ri = (InternalResource)r;
- return ri.resource;
- }
+ if (r instanceof InternalResource)
+ return ((InternalResource) r).resource;
return r;
}
+
private static class InternalResource implements Resource {
int id;
long clusterId = 0;
Resource resource = null;
Resource clusterSet = null;
-
+
public InternalResource(int id, long clusterId) {
this.id = id;
this.clusterId = clusterId;
this.id = id;
this.clusterSet = clusterSet;
}
-
+
@Override
public long getResourceId() {
throw new UnsupportedOperationException();
public Resource get() {
return this;
}
-
+
@Override
public boolean isPersistent() {
return false;
@Override
public int compareTo(Resource o) {
- if(o instanceof InternalResource) {
- return Integer.compare(id, ((InternalResource)o).id);
- } else {
- return -1;
- }
+ if(o instanceof InternalResource)
+ return Integer.compare(id, ((InternalResource)o).id);
+ return -1;
}
@Override
result = prime * result + id;
return result;
}
-
+
@Override
public int getThreadHash() {
return hashCode();
return false;
return true;
}
-
@Override
public boolean equalsResource(Resource other) {
return equals(other);
}
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder(32);
session = g.getSession();
b = Layer0.getInstance(g);
this.writeState = state;
- }
+ }
public DelayedWriteGraph newSync() {
return new DelayedWriteGraph(this, writeState);
if (ws.hasClusterSet(null, clusterSet))
throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
writeState.clusterSetsForExistingResources.add(clusterSet);
- } else {
+ } else {
if(!writeState.clusterSets.add(clusterSet))
throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
}
}
}
+ private OutputStream valueWriter = new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ writeByte(b);
+ }
+ };
+
@Override
public void claimValue(Resource resource, Object value, Binding binding)
throws ServiceException {
try {
-
+
writeByte(CLAIM_VALUE_B);
writeInt(getId(resource));
Serializer serializer = binding.serializer();
int size = serializer.getSize(value);
writeInt(size);
- serializer.serialize(new OutputStream() {
-
- @Override
- public void write(int b) throws IOException {
- writeByte(b);
- }
-
- }, value);
-
+ serializer.serialize(valueWriter, value);
+
} catch(IOException e) {
Logger.defaultLogError(e);
throw new ServiceException(e);
public void flushCluster(Resource r) throws ServiceException {
throw new ServiceException("Operation flushCluster(" + r + " not implemented.");
}
-
- private FileChannel channel;
- byte[] bytes = new byte[BUFFER];
- byte[] buffer = new byte[BUFFER];
- ByteBuffer bb = ByteBuffer.wrap(bytes);
- int byteIndex = 0;
-
- private void writeReset(int size) {
+ private void writeReset(int size) {
byteIndex = 0;
bb.position(0);
bb.limit(size);
try {
-
- if(writeState.tempFile == null) {
-
+ if (writeState.tempFile == null) {
File workspace = Platform.getLocation().toFile();
File temp = new File(workspace, "tempFiles");
- temp.mkdirs();
-
File base = new File(temp, "delayed");
- base.mkdirs();
+ Files.createDirectories(base.toPath());
writeState.tempFile = new File(base, UUID.randomUUID().toString());
writeState.out = new FileOutputStream(writeState.tempFile);
channel = writeState.out.getChannel();
-
}
for (int got=0;got < size;) {
int n = channel.write(bb);
if (n <= 0) {
- new Exception().printStackTrace();
+ Logger.defaultLogError(new Exception("FileChannel.write returned " + n));
return;
}
got += n;
}
} catch (IOException e) {
- e.printStackTrace();
+ Logger.defaultLogError("Failed to write buffer of " + size + " bytes to temporary file " + writeState.tempFile, e);
}
}
private void reset() {
byteIndex = 0;
- try {
- bb.clear();
- for(int got=0; got < BUFFER;) {
- int n = channel.read(bb);
- if (n <= 0)
- return;
- got += n;
+ bb.clear();
+ if (channel != null) {
+ try {
+ for(int got=0; got < BUFFER_SIZE;) {
+ int n = channel.read(bb);
+ if (n <= 0)
+ return;
+ got += n;
+ }
+ } catch (IOException e) {
+ Logger.defaultLogError("FileChannel.read failed", e);
}
- } catch (IOException e) {
- e.printStackTrace();
}
}
private void writeInt(int i) {
- if(byteIndex < (BUFFER-4)) {
+ if(byteIndex < (BUFFER_SIZE-4)) {
bytes[byteIndex++] = (byte)(i&0xff);
bytes[byteIndex++] = (byte)((i>>>8)&0xff);
bytes[byteIndex++] = (byte)((i>>>16)&0xff);
bytes[byteIndex++] = (byte)((i>>>24)&0xff);
- if (byteIndex == BUFFER)
- writeReset(BUFFER);
+ if (byteIndex == BUFFER_SIZE)
+ writeReset(BUFFER_SIZE);
} else {
- int has = BUFFER-byteIndex;
- if(has == 0) writeReset(BUFFER);
+ int has = BUFFER_SIZE-byteIndex;
+ if(has == 0) writeReset(BUFFER_SIZE);
bytes[byteIndex++] = (byte)(i&0xff);
- if(has == 1) writeReset(BUFFER);
+ if(has == 1) writeReset(BUFFER_SIZE);
bytes[byteIndex++] = (byte)((i>>>8)&0xff);
- if(has == 2) writeReset(BUFFER);
+ if(has == 2) writeReset(BUFFER_SIZE);
bytes[byteIndex++] = (byte)((i>>>16)&0xff);
- if(has == 3) writeReset(BUFFER);
+ if(has == 3) writeReset(BUFFER_SIZE);
bytes[byteIndex++] = (byte)((i>>>24)&0xff);
- if(has == 4) writeReset(BUFFER);
+ if(has == 4) writeReset(BUFFER_SIZE);
}
}
-
+
private int readInt() {
- if(byteIndex < (BUFFER-4)) {
+ if(byteIndex < (BUFFER_SIZE-4)) {
int result = (int)
((bytes[byteIndex++] & 0xff) |
((bytes[byteIndex++] & 0xff)<<8) |
((bytes[byteIndex++] & 0xff)<<24));
return result;
} else {
- int has = BUFFER-byteIndex;
+ int has = BUFFER_SIZE-byteIndex;
int result = 0;
if(has == 0) reset();
result = (int)(bytes[byteIndex++] & 0xff);
return result;
}
}
-
+
private byte readByte() {
byte result = bytes[byteIndex++];
- if(byteIndex == BUFFER) reset();
+ if(byteIndex == BUFFER_SIZE) reset();
return result;
}
private void writeByte(int b) {
bytes[byteIndex++] = (byte)b;
- if(byteIndex == BUFFER) writeReset(BUFFER);
+ if(byteIndex == BUFFER_SIZE) writeReset(BUFFER_SIZE);
}
-
+
private void writeBytes(byte[] data) {
- int has = BUFFER-byteIndex;
+ int has = BUFFER_SIZE-byteIndex;
int amount = data.length;
if(has > amount) {
System.arraycopy(data, 0, bytes, byteIndex, amount);
byteIndex += amount;
} else {
System.arraycopy(data, 0, bytes, byteIndex, has);
- writeReset(BUFFER);
+ writeReset(BUFFER_SIZE);
ByteBuffer bb2 = ByteBuffer.wrap(data);
bb2.position(has);
try {
channel.write(bb2);
} catch (IOException e) {
- e.printStackTrace();
+ Logger.defaultLogError("FileChannel.write failed", e);
}
}
}
-
- public byte[] readBytes(int amount) {
- return readBytes(buffer, amount);
- }
-
+
public byte[] readBytes(byte[] result, int amount) {
if(result == null) result = new byte[amount];
- int has = BUFFER-byteIndex;
+ int has = BUFFER_SIZE-byteIndex;
if(has > amount) {
System.arraycopy(bytes, byteIndex, result, 0, amount);
byteIndex += amount;
try {
got += channel.read(bb2);
if(got == -1) {
- new Exception().printStackTrace();
+ // End-of-stream, why log this?
return result;
}
-
} catch (IOException e) {
- e.printStackTrace();
+ Logger.defaultLogError("FileChannel.read failed", e);
}
reset();
}
return result;
}
-
+
public void commit(final WriteOnlyGraph w, final WriteTraits traits) throws ServiceException {
writeState.bindingToId = null;
writeState.externalToId = null;
- try {
-
- writeByte(TERM);
- if(byteIndex > 0) {
+ writeByte(TERM);
+
+ if (writeState.out != null) {
+ // Flush current buffer to file only if backing file has already
+ // been taken into use.
+ if (byteIndex > 0)
writeReset(byteIndex);
+
+ try (OutputStream out = writeState.out) {
+ channel.force(false);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ } finally {
+ writeState.out = null;
}
- channel.force(false);
- writeState.out.close();
- FileInputStream fs = new FileInputStream(writeState.tempFile);
- channel = fs.getChannel();
-
- } catch (IOException e) {
- throw new ServiceException(e);
+ try {
+ writeState.in = new FileInputStream(writeState.tempFile);
+ channel = writeState.in.getChannel();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
}
-
+
w.getMetadata().putAll(metadata);
-
+
TransferableGraphSupport tgs = w.getService(TransferableGraphSupport.class);
// First create all resources defined by clusterId
}
reset();
- bb.limit(BUFFER);
+ bb.limit(BUFFER_SIZE);
try {
while(true) {
byte method = readByte();
- switch(method) {
+ switch(method) {
case TERM: {
+ if (DEBUG) {
+ System.out.println("Resources: " + writeState.idToResource.size());
+ System.out.println("Statements: " + writeState.statementCount);
+ System.out.println("Values: " + writeState.valueCount);
+ System.out.println("Files: " + writeState.fileCount);
+ System.out.println("Clusters: " + writeState.clusterCount);
+ }
return;
}
case CLAIM: {
Resource resource = getResource(readInt());
int len = readInt();
tgs.setValue(w, resource, null, this, len);
-// byte[] bytes = readBytes(len);
-// tgs.setValue(resource, null, bytes);
} break;
case COMMIT_AND_CONTINUE: {
XSupport xs = w.getService(XSupport.class);
else
throw new ServiceException(e);
} finally {
- try {
- channel.close();
- channel = null;
- } catch (IOException e) {
- throw new ServiceException(e);
+ channel = null;
+ if (writeState.in != null) {
+ try (InputStream in = writeState.in) {
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ } finally {
+ writeState.in = null;
+ writeState.tempFile.delete();
+ }
}
}
-// System.out.println("Resources: " + state.resourceCount);
-// System.out.println("Statements: " + state.statementCount);
-// System.out.println("Values: " + state.valueCount);
}
-
+
private Resource getType(Object value) {
Class<?> clazz = value.getClass();
Resource dataType =
: clazz == String[].class ? b.StringArray
: clazz == boolean[].class ? b.BooleanArray
: clazz == byte[].class ? b.ByteArray
- : clazz == long[].class ? b.LongArray
+ : clazz == long[].class ? b.LongArray
: null
;
return dataType;
}
-
+
public long newCluster() {
return -1 - (++writeState.clusterCount);
}
public long getDefaultCluster() {
return writeState.defaultCluster;
}
-
+
public void setDefaultCluster(long cluster) {
writeState.defaultCluster = cluster;
}
-
+
@Override
public void syncRequest(final DelayedWrite request) throws DatabaseException {
-
+
try {
final DelayedWriteGraph dwg = new DelayedWriteGraph(this);
});
} catch (DatabaseException e) {
-
+
throw e;
-
+
} catch (Throwable e) {
-
+
throw new DatabaseException(e);
-
+
} finally {
-
}
-
+
}
-
+
@Override
public void syncRequest(WriteOnly request) throws DatabaseException {
-
+
Resource defaultClusterSet = setClusterSet4NewResource(null);
-
+
try {
WriteSupport ws = session.getService(WriteSupport.class);
ws.performWriteRequest(this, request);
} finally {
setClusterSet4NewResource(defaultClusterSet);
}
-
+
}
@SuppressWarnings("unchecked")
@Override
public <T> T getService(Class<T> api) {
-
+
if(ClusteringSupport.class == api) {
final ClusteringSupport support = (ClusteringSupport)super.getService(api);
return (T)new ClusteringSupport() {
-
+
@Override
public Resource getResourceByIndexAndCluster(int resourceIndex, long clusterId)
throws DatabaseException, ResourceNotFoundException {
return support.getResourceByIndexAndCluster(resourceIndex, clusterId);
}
-
+
@Override
public Resource getResourceByKey(int resourceKey) throws ResourceNotFoundException {
return support.getResourceByKey(resourceKey);
}
-
+
@Override
public int getNumberOfResources(long clusterId)
throws DatabaseException {
return support.getNumberOfResources(clusterId);
}
-
+
@Override
public long getCluster(Resource r) {
return support.getCluster(r);
}
-
+
@Override
public long createCluster() {
return newCluster();
}
-
+
@Override
public boolean isClusterSet(Resource r) throws DatabaseException {
return support.isClusterSet(r);
public Resource getClusterSetOfCluster(Resource r) throws DatabaseException {
return support.getClusterSetOfCluster(r);
}
-
+
@Override
public Resource getClusterSetOfCluster(long cluster) throws DatabaseException {
return support.getClusterSetOfCluster(cluster);
}
};
-
+
} else if (TransferableGraphSupport.class == api) {
-
+
final TransferableGraphSupport parentSupport = session.getService(TransferableGraphSupport.class);
-
+
return (T)new TransferableGraphSupport() {
@Override
writeInt(getId(resource));
writeInt(raw.length);
writeBytes(raw);
- writeInt(getBindingId(haxx));
+ writeInt(getBindingId(PASSTHROUGH));
}
-
+
@Override
public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, ByteReader reader, int amount)
throws DatabaseException {
writeInt(getId(resource));
writeInt(amount);
writeBytes(reader.readBytes(null, amount));
- writeInt(getBindingId(haxx));
+ writeInt(getBindingId(PASSTHROUGH));
}
-
+
@Override
public byte[] getValue(ReadGraph graph, Resource resource) {
return parentSupport.getValue(graph, resource);
}
-
+
@Override
public InputStream getValueStream(ReadGraph graph, Resource resource) {
return parentSupport.getValueStream(graph, resource);
}
-
+
};
}
-
+
return super.getService(api);
-
+
}
@Override
}
}
+ public void close() {
+ if (writeState.out != null) {
+ try (OutputStream out = writeState.out) {
+ } catch (IOException e) {
+ Logger.defaultLogError("Failed to close delayed write graph temporary commit output stream", e);
+ } finally {
+ writeState.out = null;
+ }
+ }
+ if (writeState.in != null) {
+ try (InputStream in = writeState.in) {
+ } catch (IOException e) {
+ Logger.defaultLogError("Failed to close delayed write graph temporary commit input stream", e);
+ } finally {
+ writeState.in = null;
+ }
+ }
+ if (writeState.tempFile != null) {
+ writeState.tempFile.delete();
+ writeState.tempFile = null;
+ }
+ }
+
}
--- /dev/null
+package org.simantics.db.impl.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+
+import org.simantics.databoard.accessor.reference.ChildReference;
+import org.simantics.databoard.binding.Binding;
+import org.simantics.databoard.binding.impl.BindingPrintContext;
+import org.simantics.databoard.serialization.SerializationException;
+import org.simantics.databoard.serialization.Serializer;
+import org.simantics.databoard.util.IdentityPair;
+
+import gnu.trove.map.hash.TObjectIntHashMap;
+
+/**
+ * Originally within DelayedWriteGraph, put in separate file in 1.30.0.
+ *
+ * @author Antti Villberg
+ */
+class PassthroughSerializerBinding extends Binding {
+
+ static final Serializer serializer = new Serializer() {
+
+ public byte[] serialize(Object obj) throws SerializationException {
+ return (byte[])obj;
+ }
+
+ @Override
+ public void serialize(DataOutput out,
+ TObjectIntHashMap<Object> identities, Object obj)
+ throws IOException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public void serialize(DataOutput out, Object obj)
+ throws IOException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public Object deserialize(DataInput in, List<Object> identities)
+ throws IOException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public Object deserialize(DataInput in) throws IOException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public void deserializeTo(DataInput in, List<Object> identities,
+ Object obj) throws IOException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public void deserializeTo(DataInput in, Object obj)
+ throws IOException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public void skip(DataInput in, List<Object> identities)
+ throws IOException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public void skip(DataInput in) throws IOException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public Integer getConstantSize() {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public int getSize(Object obj, TObjectIntHashMap<Object> identities)
+ throws IOException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public int getSize(Object obj) throws IOException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public int getMinSize() {
+ throw new Error("Not supported.");
+ }
+
+ };
+
+ @Override
+ public Serializer serializer() {
+ return serializer;
+ }
+
+ @Override
+ public void accept(Visitor1 v, Object obj) {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public <T> T accept(Visitor<T> v) {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public boolean isInstance(Object obj) {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public void assertInstaceIsValid(Object obj, Set<Object> validInstances)
+ throws org.simantics.databoard.binding.error.BindingException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public int deepHashValue(Object value,
+ IdentityHashMap<Object, Object> hashedObjects)
+ throws org.simantics.databoard.binding.error.BindingException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public int deepCompare(Object o1, Object o2,
+ Set<IdentityPair<Object, Object>> compareHistory)
+ throws org.simantics.databoard.binding.error.BindingException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public void readFrom(Binding srcBinding, Object src, Object dst)
+ throws org.simantics.databoard.binding.error.BindingException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public Object readFromTry(Binding srcBinding, Object src, Object dst)
+ throws org.simantics.databoard.binding.error.BindingException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ protected void toString(Object value, BindingPrintContext ctx) throws org.simantics.databoard.binding.error.BindingException {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public int getComponentCount() {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public Binding getComponentBinding(int index) {
+ throw new Error("Not supported.");
+ }
+
+ @Override
+ public Binding getComponentBinding(ChildReference path) {
+ throw new Error("Not supported.");
+ }
+
+}
\ No newline at end of file
package org.simantics.layer0.utils.writer;
-import gnu.trove.map.hash.TIntIntHashMap;
-import gnu.trove.map.hash.TIntLongHashMap;
-
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import org.simantics.db.WriteOnlyGraph;
import org.simantics.db.exception.DatabaseException;
+import gnu.trove.map.hash.TIntIntHashMap;
+import gnu.trove.map.hash.TIntLongHashMap;
+
public class DelayedGraphWriter extends AbstractDelayedGraphWriter {
protected File file;
System.out.println("Temp file: " + file.getAbsolutePath());
s = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file), 1024*1024));
// s = new ObjectOutputStream(backup);
-
file.deleteOnExit();
} catch (IOException e) {
throw new RuntimeException("Opening output stream to temporary file failed", e);
assert(value != null);
assert(dataType != null);
try {
- ++internalCount;
+ ++internalCount;
timestamps.add(0);
- s.writeByte(1);
- writeRef(internalCount);
+ s.writeByte(1);
+ writeRef(internalCount);
s.writeUnshared(value);
- writeRef(getId(dataType));
+ writeRef(getId(dataType));
s.writeByte(0);
writeRef(current);
assert(value != null);
assert(dataType != null);
try {
- ++internalCount;
+ ++internalCount;
timestamps.add(0);
- s.writeByte(1);
- writeRef(internalCount);
+ s.writeByte(1);
+ writeRef(internalCount);
s.writeUnshared(value);
- writeRef(getId(dataType));
-
+ writeRef(getId(dataType));
+
current = internalCount;
} catch (IOException e) {
throw new RuntimeException("Writing statement failed.", e);
clusterHints.put(current, clusterHint);
return this;
}
-
+
@Override
public GraphWriter createInverse(int cluster, Resource r) {
assert(r != null);
}
return this;
}
-
+
@Override
public GraphWriter createInverse(Resource r) {
assert(r != null);
throw new RuntimeException("Writing statement failed.", e);
}
return this;
- }
+ }
protected Resource getResource(WriteOnlyGraph wg, Resource[] internals, long[] resourceIds, int id) {
if(id > 0) {
@Override
public void commit(IProgressMonitor monitor, WriteOnlyGraph wg) throws DatabaseException {
-
try {
- if (s != null) {
- s.close();
- s = null;
- }
+ if (s != null) {
+ s.close();
+ s = null;
+ }
externalsInv = null;
-
- monitor.beginTask("", 100);
-
- int lastPercentageDone = 0;
- long fileLength = file.length();
-
-// System.out.println("size of commit file: " + fileLength);
-
- Resource[] internals = new Resource[internalCount];
- resourceIds = new long[internalCount];
-
-
+ monitor.beginTask("Writing database", 100);
FileInputStream fis = new FileInputStream(file);
FileChannel fc = fis.getChannel();
- ObjectInputStream is = new ObjectInputStream(new BufferedInputStream(fis, 1024*1024));
- int resourceCounter = 0;
- int statementCounter = 0;
- int valueCounter = 0;
- time = 0;
- loop: while(true) {
- switch(is.read()) {
- case -1:
- break loop;
- case 0: {
- int s = is.readInt();
- int p = is.readInt();
- int o = is.readInt();
- Resource rs = getResource(wg, internals, resourceIds, s);
- Resource rp = getResource(wg, internals, resourceIds, p);
- Resource ro = getResource(wg, internals, resourceIds, o);
- Resource rpInv = inverses.get(p);
- wg.claim(rs, rp, rpInv, ro);
- statementCounter += 2;
- } break;
- case 1: {
- int id = is.readInt();
- Object value = is.readUnshared();
- int type = is.readInt();
-
- Resource r = newResource(wg, internals, id);
- wg.claim(r, l0.InstanceOf, null, getResource(wg, internals, resourceIds, type));
- wg.claimValue(r, value);
- statementCounter ++;
- resourceCounter ++;
- valueCounter ++;
-
- } break;
- case 2: {
- wg.flushCluster();
- } break;
- case 3: {
-
- int s = is.readInt();
- int t = is.readInt();
-
- Resource type = getResource(wg, internals, resourceIds, t);
- wg.claim(newResource(wg, internals, s), l0.InstanceOf, null, type);
- statementCounter ++;
- resourceCounter ++;
-
- } break;
- case 4: {
-
- int s = is.readInt();
- newResource(wg, internals, s);
- resourceCounter ++;
-
- } break;
- case 5: { // InverseOf
-
- int r1 = is.readInt();
- int r2 = is.readInt();
-
- Resource rr1 = getResource(wg, internals, resourceIds, r1);
- Resource rr2 = getResource(wg, internals, resourceIds, r2);
- wg.claim(rr1, l0.InverseOf, l0.InverseOf, rr2);
- statementCounter += 2;
-
- inverses.put(r1, rr2);
- inverses.put(r2, rr1);
-
- } break;
- }
-
-// if((counter % 200000) == 0) {
-// System.out.println("Written " + counter + " statements.");
-// }
-
- double percentageDone = 100.0 * (double) fc.position() / (double) fileLength;
- int newPercentageDone = (int) Math.round(percentageDone);
- if(newPercentageDone > lastPercentageDone) {
- monitor.setTaskName("Writing database (" + newPercentageDone + "%)");
- monitor.worked(newPercentageDone - lastPercentageDone);
- lastPercentageDone = newPercentageDone;
- }
+ try (ObjectInputStream is = new ObjectInputStream(new BufferedInputStream(fis, 1024*1024))) {
+ commit(monitor, wg, is, fc);
}
-
- System.out.println("clusterIds.size() = " + clusterIds.size());
-
- System.out.println("Wrote " + resourceCounter + " resources, " + statementCounter + " statements and " + valueCounter + " values.");
-
} catch (IOException e) {
- throw new RuntimeException("Commiting delayed graph writings failed.", e);
+ throw new DatabaseException("Commiting delayed graph writings failed.", e);
} catch (ClassNotFoundException e) {
- throw new RuntimeException("Commiting delayed graph writings failed.", e);
- } finally {
+ throw new DatabaseException("Commiting delayed graph writings failed.", e);
+ } finally {
file.delete();
monitor.done();
}
}
-
+
+ private void commit(IProgressMonitor monitor, WriteOnlyGraph wg, ObjectInputStream is, FileChannel fc) throws DatabaseException, IOException, ClassNotFoundException {
+ int lastPercentageDone = 0;
+ long fileLength = file.length();
+ //System.out.println("size of commit file: " + fileLength);
+
+ Resource[] internals = new Resource[internalCount];
+ resourceIds = new long[internalCount];
+
+ int resourceCounter = 0;
+ int statementCounter = 0;
+ int valueCounter = 0;
+ time = 0;
+ while(true) {
+ switch(is.read()) {
+ case -1:
+ System.out.println("clusterIds.size() = " + clusterIds.size());
+ System.out.println("Wrote " + resourceCounter + " resources, " + statementCounter + " statements and " + valueCounter + " values.");
+ return;
+
+ case 0: {
+ int s = is.readInt();
+ int p = is.readInt();
+ int o = is.readInt();
+ Resource rs = getResource(wg, internals, resourceIds, s);
+ Resource rp = getResource(wg, internals, resourceIds, p);
+ Resource ro = getResource(wg, internals, resourceIds, o);
+ Resource rpInv = inverses.get(p);
+ wg.claim(rs, rp, rpInv, ro);
+ statementCounter += 2;
+ } break;
+ case 1: {
+ int id = is.readInt();
+ Object value = is.readUnshared();
+ int type = is.readInt();
+
+ Resource r = newResource(wg, internals, id);
+ wg.claim(r, l0.InstanceOf, null, getResource(wg, internals, resourceIds, type));
+ wg.claimValue(r, value);
+ statementCounter ++;
+ resourceCounter ++;
+ valueCounter ++;
+
+ } break;
+ case 2: {
+ wg.flushCluster();
+ } break;
+ case 3: {
+
+ int s = is.readInt();
+ int t = is.readInt();
+
+ Resource type = getResource(wg, internals, resourceIds, t);
+ wg.claim(newResource(wg, internals, s), l0.InstanceOf, null, type);
+ statementCounter ++;
+ resourceCounter ++;
+
+ } break;
+ case 4: {
+
+ int s = is.readInt();
+ newResource(wg, internals, s);
+ resourceCounter ++;
+
+ } break;
+ case 5: { // InverseOf
+
+ int r1 = is.readInt();
+ int r2 = is.readInt();
+
+ Resource rr1 = getResource(wg, internals, resourceIds, r1);
+ Resource rr2 = getResource(wg, internals, resourceIds, r2);
+ wg.claim(rr1, l0.InverseOf, l0.InverseOf, rr2);
+ statementCounter += 2;
+
+ inverses.put(r1, rr2);
+ inverses.put(r2, rr1);
+
+ } break;
+ }
+
+// if((counter % 200000) == 0) {
+// System.out.println("Written " + counter + " statements.");
+// }
+
+ double percentageDone = 100.0 * (double) fc.position() / (double) fileLength;
+ int newPercentageDone = (int) Math.round(percentageDone);
+ if(newPercentageDone > lastPercentageDone) {
+ monitor.setTaskName("Writing database (" + newPercentageDone + "%)");
+ monitor.worked(newPercentageDone - lastPercentageDone);
+ lastPercentageDone = newPercentageDone;
+ }
+ }
+ }
+
private Resource newResource(WriteOnlyGraph wg, Resource[] internals, int s) throws DatabaseException {
int clusterHint = clusterHints.get(s);
resourceIds[s-1] = r.getResourceId();
return r;
}
-
-// public ValueDescriptor createDescriptor(Object obj) {
-//
-// if(obj instanceof String[])
-// return new ValueDescriptor(ValueTrait.StaticValue, (String[])obj);
-// else if(obj instanceof int[])
-// return new ValueDescriptor(ValueTrait.StaticValue, (int[])obj);
-// else if(obj instanceof double[])
-// return new ValueDescriptor(ValueTrait.StaticValue, (double[])obj);
-// else if(obj instanceof float[])
-// return new ValueDescriptor(ValueTrait.StaticValue, (float[])obj);
-// else if(obj instanceof boolean[])
-// return new ValueDescriptor(ValueTrait.StaticValue, (boolean[])obj);
-// else if(obj instanceof long[])
-// return new ValueDescriptor(ValueTrait.StaticValue, (long[])obj);
-// else if(obj instanceof byte[])
-// return new ValueDescriptor(ValueTrait.StaticValue, (byte[])obj);
-//
-// else if(obj instanceof String)
-// return new ValueDescriptor(ValueTrait.StaticValue, new String[] {(String)obj});
-// else if(obj instanceof Double)
-// return new ValueDescriptor(ValueTrait.StaticValue, new double[] {(Double)obj});
-// else if(obj instanceof Float)
-// return new ValueDescriptor(ValueTrait.StaticValue, new float[] {(Float)obj});
-// else if(obj instanceof Integer)
-// return new ValueDescriptor(ValueTrait.StaticValue, new int[] {(Integer)obj});
-// else if(obj instanceof Boolean)
-// return new ValueDescriptor(ValueTrait.StaticValue, new boolean[] {(Boolean)obj});
-// else if(obj instanceof Byte)
-// return new ValueDescriptor(ValueTrait.StaticValue, new byte[] {(Byte)obj});
-// else if(obj instanceof Long)
-// return new ValueDescriptor(ValueTrait.StaticValue, new long[] {(Long)obj});
-//
-// throw new Error("Wrong type!");
-//
-// }
@Override
public GraphWriter create() {
@Override
public GraphWriter create(int clusterHint, Resource type) {
-
- create(type);
+ create(type);
clusterHints.put(current, clusterHint);
return this;
return this;
}
- public GraphWriter flush() {
+ public GraphWriter flush() {
try {
s.writeByte(2);
} catch(IOException e) {