From: Tuukka Lehtonen Date: Mon, 3 Jul 2017 21:09:51 +0000 (+0300) Subject: Delete temporary files after use in delayed writes and model TG export X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=commitdiff_plain;h=7127b0a35075e69e484a1ec9791e594025354645;p=simantics%2Fplatform.git Delete temporary files after use in delayed writes and model TG export Also, delayed writes no longer create temporary files for commits that create under 512KB of committed data. refs #7091 refs #7092 Change-Id: Iff75738b9cc41a52eb88df32b51ddc7cd8af812d (cherry picked from commit 61033f112b0a2e643bf8530b99bcf90c64464f30) --- diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/DelayedWriteGraph.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/DelayedWriteGraph.java index dec1f5396..44e23d017 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/DelayedWriteGraph.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/DelayedWriteGraph.java @@ -12,8 +12,7 @@ package org.simantics.db.impl.graph; -import java.io.DataInput; -import java.io.DataOutput; +import java.io.Closeable; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -22,10 +21,9 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.Files; import java.util.ArrayList; import java.util.HashSet; -import java.util.IdentityHashMap; -import java.util.List; import java.util.Set; import java.util.TreeMap; import java.util.UUID; @@ -33,14 +31,10 @@ import java.util.UUID; import org.eclipse.core.runtime.Platform; import org.simantics.databoard.Bindings; import org.simantics.databoard.accessor.Accessor; -import org.simantics.databoard.accessor.reference.ChildReference; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.error.BindingConstructionException; -import org.simantics.databoard.binding.impl.BindingPrintContext; -import org.simantics.databoard.serialization.SerializationException; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.type.Datatype; -import org.simantics.databoard.util.IdentityPair; import org.simantics.databoard.util.binary.RandomAccessBinary; import org.simantics.db.Metadata; import org.simantics.db.ReadGraph; @@ -73,7 +67,6 @@ import org.simantics.db.service.XSupport; import org.simantics.layer0.Layer0; import org.simantics.utils.datastructures.MapList; -import gnu.trove.list.array.TIntArrayList; import gnu.trove.map.hash.TIntIntHashMap; import gnu.trove.map.hash.TObjectIntHashMap; @@ -82,164 +75,12 @@ import gnu.trove.map.hash.TObjectIntHashMap; * immediately but with an explicit commit method. All read operations * return results based on the old graph. */ -public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, ByteReader { - - private static final boolean DEBUG = false; - public static int BUFFER = 65536; - - static class Haxx extends Binding { - - static final Serializer serializer = new Serializer() { - - public byte[] serialize(Object obj) throws SerializationException { - return (byte[])obj; - } - - @Override - public void serialize(DataOutput out, - TObjectIntHashMap identities, Object obj) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public void serialize(DataOutput out, Object obj) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public Object deserialize(DataInput in, List identities) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public Object deserialize(DataInput in) throws IOException { - throw new Error("Not supported."); - } - - @Override - public void deserializeTo(DataInput in, List identities, - Object obj) throws IOException { - throw new Error("Not supported."); - } - - @Override - public void deserializeTo(DataInput in, Object obj) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public void skip(DataInput in, List identities) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public void skip(DataInput in) throws IOException { - throw new Error("Not supported."); - } - - @Override - public Integer getConstantSize() { - throw new Error("Not supported."); - } - - @Override - public int getSize(Object obj, TObjectIntHashMap identities) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public int getSize(Object obj) throws IOException { - throw new Error("Not supported."); - } - - @Override - public int getMinSize() { - throw new Error("Not supported."); - } - - }; - - @Override - public Serializer serializer() { - return serializer; - } - - @Override - public void accept(Visitor1 v, Object obj) { - throw new Error("Not supported."); - } - - @Override - public T accept(Visitor v) { - throw new Error("Not supported."); - } - - @Override - public boolean isInstance(Object obj) { - throw new Error("Not supported."); - } - - @Override - public void assertInstaceIsValid(Object obj, Set validInstances) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public int deepHashValue(Object value, - IdentityHashMap hashedObjects) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public int deepCompare(Object o1, Object o2, - Set> compareHistory) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public void readFrom(Binding srcBinding, Object src, Object dst) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public Object readFromTry(Binding srcBinding, Object src, Object dst) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - protected void toString(Object value, BindingPrintContext ctx) throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public int getComponentCount() { - throw new Error("Not supported."); - } +public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, ByteReader, Closeable { - @Override - public Binding getComponentBinding(int index) { - throw new Error("Not supported."); - } + private static final boolean DEBUG = false; + private static final int BUFFER_SIZE = 512*1024; - @Override - public Binding getComponentBinding(ChildReference path) { - throw new Error("Not supported."); - } - - } - - private static final Haxx haxx = new Haxx(); + private static final PassthroughSerializerBinding PASSTHROUGH = new PassthroughSerializerBinding(); private final int TERM = 0; private final int CLAIM = 1; @@ -248,30 +89,11 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte private final int DENY = 5; private final int DENY_VALUE = 6; private final int COMMIT_AND_CONTINUE = 7; - - static class ClusterSet { - public Resource resource; - ClusterSet() { - ids = new TIntArrayList(); - old = false; - } - ClusterSet(boolean old, Resource r) { - ids = new TIntArrayList(); - this.old = old; - this.resource = r; - } - void add(int id) { - ids.add(id); - } - private TIntArrayList ids; - private final boolean old; // true if - boolean isNew() { - return !old; - } - } - public class State { + + private static class State { public File tempFile; public FileOutputStream out; + public FileInputStream in; public ArrayList idToResource = new ArrayList(); public TIntIntHashMap externalToId = new TIntIntHashMap(); public ArrayList idToBinding = new ArrayList(); @@ -285,26 +107,31 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte public int valueCount = 0; public int fileCount = 0; } - - public State writeState; - public TreeMap metadata = new TreeMap(); - - Layer0 b; - Session session; + + private State writeState; + private TreeMap metadata = new TreeMap(); + + private FileChannel channel; + private byte[] bytes = new byte[BUFFER_SIZE]; + private ByteBuffer bb = ByteBuffer.wrap(bytes); + private int byteIndex = 0; + + private Layer0 b; + private Session session; + public static Resource convertDelayedResource(Resource r) { - if (r instanceof InternalResource) { - InternalResource ri = (InternalResource)r; - return ri.resource; - } + if (r instanceof InternalResource) + return ((InternalResource) r).resource; return r; } + private static class InternalResource implements Resource { int id; long clusterId = 0; Resource resource = null; Resource clusterSet = null; - + public InternalResource(int id, long clusterId) { this.id = id; this.clusterId = clusterId; @@ -314,7 +141,7 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte this.id = id; this.clusterSet = clusterSet; } - + @Override public long getResourceId() { throw new UnsupportedOperationException(); @@ -324,7 +151,7 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte public Resource get() { return this; } - + @Override public boolean isPersistent() { return false; @@ -332,11 +159,9 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte @Override public int compareTo(Resource o) { - if(o instanceof InternalResource) { - return Integer.compare(id, ((InternalResource)o).id); - } else { - return -1; - } + if(o instanceof InternalResource) + return Integer.compare(id, ((InternalResource)o).id); + return -1; } @Override @@ -346,7 +171,7 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte result = prime * result + id; return result; } - + @Override public int getThreadHash() { return hashCode(); @@ -365,12 +190,12 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte return false; return true; } - @Override public boolean equalsResource(Resource other) { return equals(other); } + @Override public String toString() { StringBuilder sb = new StringBuilder(32); @@ -432,7 +257,7 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte session = g.getSession(); b = Layer0.getInstance(g); this.writeState = state; - } + } public DelayedWriteGraph newSync() { return new DelayedWriteGraph(this, writeState); @@ -715,7 +540,7 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte if (ws.hasClusterSet(null, clusterSet)) throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet); writeState.clusterSetsForExistingResources.add(clusterSet); - } else { + } else { if(!writeState.clusterSets.add(clusterSet)) throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet); } @@ -787,25 +612,25 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte } } + private OutputStream valueWriter = new OutputStream() { + @Override + public void write(int b) throws IOException { + writeByte(b); + } + }; + @Override public void claimValue(Resource resource, Object value, Binding binding) throws ServiceException { try { - + writeByte(CLAIM_VALUE_B); writeInt(getId(resource)); Serializer serializer = binding.serializer(); int size = serializer.getSize(value); writeInt(size); - serializer.serialize(new OutputStream() { - - @Override - public void write(int b) throws IOException { - writeByte(b); - } - - }, value); - + serializer.serialize(valueWriter, value); + } catch(IOException e) { Logger.defaultLogError(e); throw new ServiceException(e); @@ -832,86 +657,76 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte public void flushCluster(Resource r) throws ServiceException { throw new ServiceException("Operation flushCluster(" + r + " not implemented."); } - - private FileChannel channel; - byte[] bytes = new byte[BUFFER]; - byte[] buffer = new byte[BUFFER]; - ByteBuffer bb = ByteBuffer.wrap(bytes); - int byteIndex = 0; - - private void writeReset(int size) { + private void writeReset(int size) { byteIndex = 0; bb.position(0); bb.limit(size); try { - - if(writeState.tempFile == null) { - + if (writeState.tempFile == null) { File workspace = Platform.getLocation().toFile(); File temp = new File(workspace, "tempFiles"); - temp.mkdirs(); - File base = new File(temp, "delayed"); - base.mkdirs(); + Files.createDirectories(base.toPath()); writeState.tempFile = new File(base, UUID.randomUUID().toString()); writeState.out = new FileOutputStream(writeState.tempFile); channel = writeState.out.getChannel(); - } for (int got=0;got < size;) { int n = channel.write(bb); if (n <= 0) { - new Exception().printStackTrace(); + Logger.defaultLogError(new Exception("FileChannel.write returned " + n)); return; } got += n; } } catch (IOException e) { - e.printStackTrace(); + Logger.defaultLogError("Failed to write buffer of " + size + " bytes to temporary file " + writeState.tempFile, e); } } private void reset() { byteIndex = 0; - try { - bb.clear(); - for(int got=0; got < BUFFER;) { - int n = channel.read(bb); - if (n <= 0) - return; - got += n; + bb.clear(); + if (channel != null) { + try { + for(int got=0; got < BUFFER_SIZE;) { + int n = channel.read(bb); + if (n <= 0) + return; + got += n; + } + } catch (IOException e) { + Logger.defaultLogError("FileChannel.read failed", e); } - } catch (IOException e) { - e.printStackTrace(); } } private void writeInt(int i) { - if(byteIndex < (BUFFER-4)) { + if(byteIndex < (BUFFER_SIZE-4)) { bytes[byteIndex++] = (byte)(i&0xff); bytes[byteIndex++] = (byte)((i>>>8)&0xff); bytes[byteIndex++] = (byte)((i>>>16)&0xff); bytes[byteIndex++] = (byte)((i>>>24)&0xff); - if (byteIndex == BUFFER) - writeReset(BUFFER); + if (byteIndex == BUFFER_SIZE) + writeReset(BUFFER_SIZE); } else { - int has = BUFFER-byteIndex; - if(has == 0) writeReset(BUFFER); + int has = BUFFER_SIZE-byteIndex; + if(has == 0) writeReset(BUFFER_SIZE); bytes[byteIndex++] = (byte)(i&0xff); - if(has == 1) writeReset(BUFFER); + if(has == 1) writeReset(BUFFER_SIZE); bytes[byteIndex++] = (byte)((i>>>8)&0xff); - if(has == 2) writeReset(BUFFER); + if(has == 2) writeReset(BUFFER_SIZE); bytes[byteIndex++] = (byte)((i>>>16)&0xff); - if(has == 3) writeReset(BUFFER); + if(has == 3) writeReset(BUFFER_SIZE); bytes[byteIndex++] = (byte)((i>>>24)&0xff); - if(has == 4) writeReset(BUFFER); + if(has == 4) writeReset(BUFFER_SIZE); } } - + private int readInt() { - if(byteIndex < (BUFFER-4)) { + if(byteIndex < (BUFFER_SIZE-4)) { int result = (int) ((bytes[byteIndex++] & 0xff) | ((bytes[byteIndex++] & 0xff)<<8) | @@ -919,7 +734,7 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte ((bytes[byteIndex++] & 0xff)<<24)); return result; } else { - int has = BUFFER-byteIndex; + int has = BUFFER_SIZE-byteIndex; int result = 0; if(has == 0) reset(); result = (int)(bytes[byteIndex++] & 0xff); @@ -933,44 +748,40 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte return result; } } - + private byte readByte() { byte result = bytes[byteIndex++]; - if(byteIndex == BUFFER) reset(); + if(byteIndex == BUFFER_SIZE) reset(); return result; } private void writeByte(int b) { bytes[byteIndex++] = (byte)b; - if(byteIndex == BUFFER) writeReset(BUFFER); + if(byteIndex == BUFFER_SIZE) writeReset(BUFFER_SIZE); } - + private void writeBytes(byte[] data) { - int has = BUFFER-byteIndex; + int has = BUFFER_SIZE-byteIndex; int amount = data.length; if(has > amount) { System.arraycopy(data, 0, bytes, byteIndex, amount); byteIndex += amount; } else { System.arraycopy(data, 0, bytes, byteIndex, has); - writeReset(BUFFER); + writeReset(BUFFER_SIZE); ByteBuffer bb2 = ByteBuffer.wrap(data); bb2.position(has); try { channel.write(bb2); } catch (IOException e) { - e.printStackTrace(); + Logger.defaultLogError("FileChannel.write failed", e); } } } - - public byte[] readBytes(int amount) { - return readBytes(buffer, amount); - } - + public byte[] readBytes(byte[] result, int amount) { if(result == null) result = new byte[amount]; - int has = BUFFER-byteIndex; + int has = BUFFER_SIZE-byteIndex; if(has > amount) { System.arraycopy(bytes, byteIndex, result, 0, amount); byteIndex += amount; @@ -982,39 +793,46 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte try { got += channel.read(bb2); if(got == -1) { - new Exception().printStackTrace(); + // End-of-stream, why log this? return result; } - } catch (IOException e) { - e.printStackTrace(); + Logger.defaultLogError("FileChannel.read failed", e); } reset(); } return result; } - + public void commit(final WriteOnlyGraph w, final WriteTraits traits) throws ServiceException { writeState.bindingToId = null; writeState.externalToId = null; - try { - - writeByte(TERM); - if(byteIndex > 0) { + writeByte(TERM); + + if (writeState.out != null) { + // Flush current buffer to file only if backing file has already + // been taken into use. + if (byteIndex > 0) writeReset(byteIndex); + + try (OutputStream out = writeState.out) { + channel.force(false); + } catch (IOException e) { + throw new ServiceException(e); + } finally { + writeState.out = null; } - channel.force(false); - writeState.out.close(); - FileInputStream fs = new FileInputStream(writeState.tempFile); - channel = fs.getChannel(); - - } catch (IOException e) { - throw new ServiceException(e); + try { + writeState.in = new FileInputStream(writeState.tempFile); + channel = writeState.in.getChannel(); + } catch (IOException e) { + throw new ServiceException(e); + } } - + w.getMetadata().putAll(metadata); - + TransferableGraphSupport tgs = w.getService(TransferableGraphSupport.class); // First create all resources defined by clusterId @@ -1094,13 +912,20 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte } reset(); - bb.limit(BUFFER); + bb.limit(BUFFER_SIZE); try { while(true) { byte method = readByte(); - switch(method) { + switch(method) { case TERM: { + if (DEBUG) { + System.out.println("Resources: " + writeState.idToResource.size()); + System.out.println("Statements: " + writeState.statementCount); + System.out.println("Values: " + writeState.valueCount); + System.out.println("Files: " + writeState.fileCount); + System.out.println("Clusters: " + writeState.clusterCount); + } return; } case CLAIM: { @@ -1153,8 +978,6 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte Resource resource = getResource(readInt()); int len = readInt(); tgs.setValue(w, resource, null, this, len); -// byte[] bytes = readBytes(len); -// tgs.setValue(resource, null, bytes); } break; case COMMIT_AND_CONTINUE: { XSupport xs = w.getService(XSupport.class); @@ -1168,18 +991,19 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte else throw new ServiceException(e); } finally { - try { - channel.close(); - channel = null; - } catch (IOException e) { - throw new ServiceException(e); + channel = null; + if (writeState.in != null) { + try (InputStream in = writeState.in) { + } catch (IOException e) { + throw new ServiceException(e); + } finally { + writeState.in = null; + writeState.tempFile.delete(); + } } } -// System.out.println("Resources: " + state.resourceCount); -// System.out.println("Statements: " + state.statementCount); -// System.out.println("Values: " + state.valueCount); } - + private Resource getType(Object value) { Class clazz = value.getClass(); Resource dataType = @@ -1196,12 +1020,12 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte : clazz == String[].class ? b.StringArray : clazz == boolean[].class ? b.BooleanArray : clazz == byte[].class ? b.ByteArray - : clazz == long[].class ? b.LongArray + : clazz == long[].class ? b.LongArray : null ; return dataType; } - + public long newCluster() { return -1 - (++writeState.clusterCount); } @@ -1209,14 +1033,14 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte public long getDefaultCluster() { return writeState.defaultCluster; } - + public void setDefaultCluster(long cluster) { writeState.defaultCluster = cluster; } - + @Override public void syncRequest(final DelayedWrite request) throws DatabaseException { - + try { final DelayedWriteGraph dwg = new DelayedWriteGraph(this); @@ -1232,24 +1056,23 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte }); } catch (DatabaseException e) { - + throw e; - + } catch (Throwable e) { - + throw new DatabaseException(e); - + } finally { - } - + } - + @Override public void syncRequest(WriteOnly request) throws DatabaseException { - + Resource defaultClusterSet = setClusterSet4NewResource(null); - + try { WriteSupport ws = session.getService(WriteSupport.class); ws.performWriteRequest(this, request); @@ -1260,46 +1083,46 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte } finally { setClusterSet4NewResource(defaultClusterSet); } - + } @SuppressWarnings("unchecked") @Override public T getService(Class api) { - + if(ClusteringSupport.class == api) { final ClusteringSupport support = (ClusteringSupport)super.getService(api); return (T)new ClusteringSupport() { - + @Override public Resource getResourceByIndexAndCluster(int resourceIndex, long clusterId) throws DatabaseException, ResourceNotFoundException { return support.getResourceByIndexAndCluster(resourceIndex, clusterId); } - + @Override public Resource getResourceByKey(int resourceKey) throws ResourceNotFoundException { return support.getResourceByKey(resourceKey); } - + @Override public int getNumberOfResources(long clusterId) throws DatabaseException { return support.getNumberOfResources(clusterId); } - + @Override public long getCluster(Resource r) { return support.getCluster(r); } - + @Override public long createCluster() { return newCluster(); } - + @Override public boolean isClusterSet(Resource r) throws DatabaseException { return support.isClusterSet(r); @@ -1309,18 +1132,18 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte public Resource getClusterSetOfCluster(Resource r) throws DatabaseException { return support.getClusterSetOfCluster(r); } - + @Override public Resource getClusterSetOfCluster(long cluster) throws DatabaseException { return support.getClusterSetOfCluster(cluster); } }; - + } else if (TransferableGraphSupport.class == api) { - + final TransferableGraphSupport parentSupport = session.getService(TransferableGraphSupport.class); - + return (T)new TransferableGraphSupport() { @Override @@ -1329,9 +1152,9 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte writeInt(getId(resource)); writeInt(raw.length); writeBytes(raw); - writeInt(getBindingId(haxx)); + writeInt(getBindingId(PASSTHROUGH)); } - + @Override public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, ByteReader reader, int amount) throws DatabaseException { @@ -1339,25 +1162,25 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte writeInt(getId(resource)); writeInt(amount); writeBytes(reader.readBytes(null, amount)); - writeInt(getBindingId(haxx)); + writeInt(getBindingId(PASSTHROUGH)); } - + @Override public byte[] getValue(ReadGraph graph, Resource resource) { return parentSupport.getValue(graph, resource); } - + @Override public InputStream getValueStream(ReadGraph graph, Resource resource) { return parentSupport.getValueStream(graph, resource); } - + }; } - + return super.getService(api); - + } @Override @@ -1423,4 +1246,27 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte } } + public void close() { + if (writeState.out != null) { + try (OutputStream out = writeState.out) { + } catch (IOException e) { + Logger.defaultLogError("Failed to close delayed write graph temporary commit output stream", e); + } finally { + writeState.out = null; + } + } + if (writeState.in != null) { + try (InputStream in = writeState.in) { + } catch (IOException e) { + Logger.defaultLogError("Failed to close delayed write graph temporary commit input stream", e); + } finally { + writeState.in = null; + } + } + if (writeState.tempFile != null) { + writeState.tempFile.delete(); + writeState.tempFile = null; + } + } + } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/PassthroughSerializerBinding.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/PassthroughSerializerBinding.java new file mode 100644 index 000000000..8c95a5151 --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/PassthroughSerializerBinding.java @@ -0,0 +1,174 @@ +package org.simantics.db.impl.graph; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Set; + +import org.simantics.databoard.accessor.reference.ChildReference; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.binding.impl.BindingPrintContext; +import org.simantics.databoard.serialization.SerializationException; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.databoard.util.IdentityPair; + +import gnu.trove.map.hash.TObjectIntHashMap; + +/** + * Originally within DelayedWriteGraph, put in separate file in 1.30.0. + * + * @author Antti Villberg + */ +class PassthroughSerializerBinding extends Binding { + + static final Serializer serializer = new Serializer() { + + public byte[] serialize(Object obj) throws SerializationException { + return (byte[])obj; + } + + @Override + public void serialize(DataOutput out, + TObjectIntHashMap identities, Object obj) + throws IOException { + throw new Error("Not supported."); + } + + @Override + public void serialize(DataOutput out, Object obj) + throws IOException { + throw new Error("Not supported."); + } + + @Override + public Object deserialize(DataInput in, List identities) + throws IOException { + throw new Error("Not supported."); + } + + @Override + public Object deserialize(DataInput in) throws IOException { + throw new Error("Not supported."); + } + + @Override + public void deserializeTo(DataInput in, List identities, + Object obj) throws IOException { + throw new Error("Not supported."); + } + + @Override + public void deserializeTo(DataInput in, Object obj) + throws IOException { + throw new Error("Not supported."); + } + + @Override + public void skip(DataInput in, List identities) + throws IOException { + throw new Error("Not supported."); + } + + @Override + public void skip(DataInput in) throws IOException { + throw new Error("Not supported."); + } + + @Override + public Integer getConstantSize() { + throw new Error("Not supported."); + } + + @Override + public int getSize(Object obj, TObjectIntHashMap identities) + throws IOException { + throw new Error("Not supported."); + } + + @Override + public int getSize(Object obj) throws IOException { + throw new Error("Not supported."); + } + + @Override + public int getMinSize() { + throw new Error("Not supported."); + } + + }; + + @Override + public Serializer serializer() { + return serializer; + } + + @Override + public void accept(Visitor1 v, Object obj) { + throw new Error("Not supported."); + } + + @Override + public T accept(Visitor v) { + throw new Error("Not supported."); + } + + @Override + public boolean isInstance(Object obj) { + throw new Error("Not supported."); + } + + @Override + public void assertInstaceIsValid(Object obj, Set validInstances) + throws org.simantics.databoard.binding.error.BindingException { + throw new Error("Not supported."); + } + + @Override + public int deepHashValue(Object value, + IdentityHashMap hashedObjects) + throws org.simantics.databoard.binding.error.BindingException { + throw new Error("Not supported."); + } + + @Override + public int deepCompare(Object o1, Object o2, + Set> compareHistory) + throws org.simantics.databoard.binding.error.BindingException { + throw new Error("Not supported."); + } + + @Override + public void readFrom(Binding srcBinding, Object src, Object dst) + throws org.simantics.databoard.binding.error.BindingException { + throw new Error("Not supported."); + } + + @Override + public Object readFromTry(Binding srcBinding, Object src, Object dst) + throws org.simantics.databoard.binding.error.BindingException { + throw new Error("Not supported."); + } + + @Override + protected void toString(Object value, BindingPrintContext ctx) throws org.simantics.databoard.binding.error.BindingException { + throw new Error("Not supported."); + } + + @Override + public int getComponentCount() { + throw new Error("Not supported."); + } + + @Override + public Binding getComponentBinding(int index) { + throw new Error("Not supported."); + } + + @Override + public Binding getComponentBinding(ChildReference path) { + throw new Error("Not supported."); + } + +} \ No newline at end of file diff --git a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/adapter/impl/TGRemover.java b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/adapter/impl/TGRemover.java index aae4b581b..885948834 100644 --- a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/adapter/impl/TGRemover.java +++ b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/adapter/impl/TGRemover.java @@ -43,10 +43,9 @@ public class TGRemover extends AbstractRemover { TransferableGraphConfiguration2 conf = new TransferableGraphConfiguration2(graph, resource); conf.values = false; - ModelTransferableGraphSource source = graph.syncRequest(new ModelTransferableGraphSourceRequest(conf)); final SerialisationSupport ss = graph.getService(SerialisationSupport.class); - try { + try (ModelTransferableGraphSource source = graph.syncRequest(new ModelTransferableGraphSourceRequest(conf))) { source.forResourceStatements(graph, new TransferableGraphSourceProcedure() { @Override @@ -75,8 +74,6 @@ public class TGRemover extends AbstractRemover { }); - source.closeStreams(); - } catch (Exception e) { throw new DatabaseException(e); } diff --git a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/TGRepresentation.java b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/TGRepresentation.java index 0f9ffa0a7..ed7ae5193 100644 --- a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/TGRepresentation.java +++ b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/TGRepresentation.java @@ -1,5 +1,6 @@ package org.simantics.db.layer0.util; +import java.io.IOException; import java.util.Arrays; import java.util.Map; @@ -49,9 +50,11 @@ public class TGRepresentation implements Representation { configuration.exclusionFunction = TGRepresentationUtils.computeExclusionFunction(graph, resources, hints); } - ModelTransferableGraphSource source = graph.syncRequest(new ModelTransferableGraphSourceRequest(configuration)); - return TransferableGraphs.create(graph, source); - + try (ModelTransferableGraphSource source = graph.syncRequest(new ModelTransferableGraphSourceRequest(configuration))) { + return TransferableGraphs.create(graph, source); + } catch (IOException e) { + throw new DatabaseException(e); + } } @SuppressWarnings("unchecked") diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index 6c3497f15..2f555d079 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -675,6 +675,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } catch (Throwable e) { delayedWriteState.except(e); total.finish(); + dwg.close(); return; } finally { // newGraph.state.barrier.dec(); diff --git a/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java b/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java index 576bc0b4b..66ffa77de 100644 --- a/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java +++ b/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java @@ -12,9 +12,6 @@ package org.simantics.layer0.utils.writer; -import gnu.trove.map.hash.TIntIntHashMap; -import gnu.trove.map.hash.TIntLongHashMap; - import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; @@ -31,6 +28,9 @@ import org.simantics.db.Resource; import org.simantics.db.WriteOnlyGraph; import org.simantics.db.exception.DatabaseException; +import gnu.trove.map.hash.TIntIntHashMap; +import gnu.trove.map.hash.TIntLongHashMap; + public class DelayedGraphWriter extends AbstractDelayedGraphWriter { protected File file; @@ -45,7 +45,6 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { System.out.println("Temp file: " + file.getAbsolutePath()); s = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file), 1024*1024)); // s = new ObjectOutputStream(backup); - file.deleteOnExit(); } catch (IOException e) { throw new RuntimeException("Opening output stream to temporary file failed", e); @@ -79,13 +78,13 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { assert(value != null); assert(dataType != null); try { - ++internalCount; + ++internalCount; timestamps.add(0); - s.writeByte(1); - writeRef(internalCount); + s.writeByte(1); + writeRef(internalCount); s.writeUnshared(value); - writeRef(getId(dataType)); + writeRef(getId(dataType)); s.writeByte(0); writeRef(current); @@ -110,14 +109,14 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { assert(value != null); assert(dataType != null); try { - ++internalCount; + ++internalCount; timestamps.add(0); - s.writeByte(1); - writeRef(internalCount); + s.writeByte(1); + writeRef(internalCount); s.writeUnshared(value); - writeRef(getId(dataType)); - + writeRef(getId(dataType)); + current = internalCount; } catch (IOException e) { throw new RuntimeException("Writing statement failed.", e); @@ -133,7 +132,7 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { clusterHints.put(current, clusterHint); return this; } - + @Override public GraphWriter createInverse(int cluster, Resource r) { assert(r != null); @@ -147,7 +146,7 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { } return this; } - + @Override public GraphWriter createInverse(Resource r) { assert(r != null); @@ -160,7 +159,7 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { throw new RuntimeException("Writing statement failed.", e); } return this; - } + } protected Resource getResource(WriteOnlyGraph wg, Resource[] internals, long[] resourceIds, int id) { if(id > 0) { @@ -185,124 +184,122 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { @Override public void commit(IProgressMonitor monitor, WriteOnlyGraph wg) throws DatabaseException { - try { - if (s != null) { - s.close(); - s = null; - } + if (s != null) { + s.close(); + s = null; + } externalsInv = null; - - monitor.beginTask("", 100); - - int lastPercentageDone = 0; - long fileLength = file.length(); - -// System.out.println("size of commit file: " + fileLength); - - Resource[] internals = new Resource[internalCount]; - resourceIds = new long[internalCount]; - - + monitor.beginTask("Writing database", 100); FileInputStream fis = new FileInputStream(file); FileChannel fc = fis.getChannel(); - ObjectInputStream is = new ObjectInputStream(new BufferedInputStream(fis, 1024*1024)); - int resourceCounter = 0; - int statementCounter = 0; - int valueCounter = 0; - time = 0; - loop: while(true) { - switch(is.read()) { - case -1: - break loop; - case 0: { - int s = is.readInt(); - int p = is.readInt(); - int o = is.readInt(); - Resource rs = getResource(wg, internals, resourceIds, s); - Resource rp = getResource(wg, internals, resourceIds, p); - Resource ro = getResource(wg, internals, resourceIds, o); - Resource rpInv = inverses.get(p); - wg.claim(rs, rp, rpInv, ro); - statementCounter += 2; - } break; - case 1: { - int id = is.readInt(); - Object value = is.readUnshared(); - int type = is.readInt(); - - Resource r = newResource(wg, internals, id); - wg.claim(r, l0.InstanceOf, null, getResource(wg, internals, resourceIds, type)); - wg.claimValue(r, value); - statementCounter ++; - resourceCounter ++; - valueCounter ++; - - } break; - case 2: { - wg.flushCluster(); - } break; - case 3: { - - int s = is.readInt(); - int t = is.readInt(); - - Resource type = getResource(wg, internals, resourceIds, t); - wg.claim(newResource(wg, internals, s), l0.InstanceOf, null, type); - statementCounter ++; - resourceCounter ++; - - } break; - case 4: { - - int s = is.readInt(); - newResource(wg, internals, s); - resourceCounter ++; - - } break; - case 5: { // InverseOf - - int r1 = is.readInt(); - int r2 = is.readInt(); - - Resource rr1 = getResource(wg, internals, resourceIds, r1); - Resource rr2 = getResource(wg, internals, resourceIds, r2); - wg.claim(rr1, l0.InverseOf, l0.InverseOf, rr2); - statementCounter += 2; - - inverses.put(r1, rr2); - inverses.put(r2, rr1); - - } break; - } - -// if((counter % 200000) == 0) { -// System.out.println("Written " + counter + " statements."); -// } - - double percentageDone = 100.0 * (double) fc.position() / (double) fileLength; - int newPercentageDone = (int) Math.round(percentageDone); - if(newPercentageDone > lastPercentageDone) { - monitor.setTaskName("Writing database (" + newPercentageDone + "%)"); - monitor.worked(newPercentageDone - lastPercentageDone); - lastPercentageDone = newPercentageDone; - } + try (ObjectInputStream is = new ObjectInputStream(new BufferedInputStream(fis, 1024*1024))) { + commit(monitor, wg, is, fc); } - - System.out.println("clusterIds.size() = " + clusterIds.size()); - - System.out.println("Wrote " + resourceCounter + " resources, " + statementCounter + " statements and " + valueCounter + " values."); - } catch (IOException e) { - throw new RuntimeException("Commiting delayed graph writings failed.", e); + throw new DatabaseException("Commiting delayed graph writings failed.", e); } catch (ClassNotFoundException e) { - throw new RuntimeException("Commiting delayed graph writings failed.", e); - } finally { + throw new DatabaseException("Commiting delayed graph writings failed.", e); + } finally { file.delete(); monitor.done(); } } - + + private void commit(IProgressMonitor monitor, WriteOnlyGraph wg, ObjectInputStream is, FileChannel fc) throws DatabaseException, IOException, ClassNotFoundException { + int lastPercentageDone = 0; + long fileLength = file.length(); + //System.out.println("size of commit file: " + fileLength); + + Resource[] internals = new Resource[internalCount]; + resourceIds = new long[internalCount]; + + int resourceCounter = 0; + int statementCounter = 0; + int valueCounter = 0; + time = 0; + while(true) { + switch(is.read()) { + case -1: + System.out.println("clusterIds.size() = " + clusterIds.size()); + System.out.println("Wrote " + resourceCounter + " resources, " + statementCounter + " statements and " + valueCounter + " values."); + return; + + case 0: { + int s = is.readInt(); + int p = is.readInt(); + int o = is.readInt(); + Resource rs = getResource(wg, internals, resourceIds, s); + Resource rp = getResource(wg, internals, resourceIds, p); + Resource ro = getResource(wg, internals, resourceIds, o); + Resource rpInv = inverses.get(p); + wg.claim(rs, rp, rpInv, ro); + statementCounter += 2; + } break; + case 1: { + int id = is.readInt(); + Object value = is.readUnshared(); + int type = is.readInt(); + + Resource r = newResource(wg, internals, id); + wg.claim(r, l0.InstanceOf, null, getResource(wg, internals, resourceIds, type)); + wg.claimValue(r, value); + statementCounter ++; + resourceCounter ++; + valueCounter ++; + + } break; + case 2: { + wg.flushCluster(); + } break; + case 3: { + + int s = is.readInt(); + int t = is.readInt(); + + Resource type = getResource(wg, internals, resourceIds, t); + wg.claim(newResource(wg, internals, s), l0.InstanceOf, null, type); + statementCounter ++; + resourceCounter ++; + + } break; + case 4: { + + int s = is.readInt(); + newResource(wg, internals, s); + resourceCounter ++; + + } break; + case 5: { // InverseOf + + int r1 = is.readInt(); + int r2 = is.readInt(); + + Resource rr1 = getResource(wg, internals, resourceIds, r1); + Resource rr2 = getResource(wg, internals, resourceIds, r2); + wg.claim(rr1, l0.InverseOf, l0.InverseOf, rr2); + statementCounter += 2; + + inverses.put(r1, rr2); + inverses.put(r2, rr1); + + } break; + } + +// if((counter % 200000) == 0) { +// System.out.println("Written " + counter + " statements."); +// } + + double percentageDone = 100.0 * (double) fc.position() / (double) fileLength; + int newPercentageDone = (int) Math.round(percentageDone); + if(newPercentageDone > lastPercentageDone) { + monitor.setTaskName("Writing database (" + newPercentageDone + "%)"); + monitor.worked(newPercentageDone - lastPercentageDone); + lastPercentageDone = newPercentageDone; + } + } + } + private Resource newResource(WriteOnlyGraph wg, Resource[] internals, int s) throws DatabaseException { int clusterHint = clusterHints.get(s); @@ -323,42 +320,6 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { resourceIds[s-1] = r.getResourceId(); return r; } - -// public ValueDescriptor createDescriptor(Object obj) { -// -// if(obj instanceof String[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (String[])obj); -// else if(obj instanceof int[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (int[])obj); -// else if(obj instanceof double[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (double[])obj); -// else if(obj instanceof float[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (float[])obj); -// else if(obj instanceof boolean[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (boolean[])obj); -// else if(obj instanceof long[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (long[])obj); -// else if(obj instanceof byte[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (byte[])obj); -// -// else if(obj instanceof String) -// return new ValueDescriptor(ValueTrait.StaticValue, new String[] {(String)obj}); -// else if(obj instanceof Double) -// return new ValueDescriptor(ValueTrait.StaticValue, new double[] {(Double)obj}); -// else if(obj instanceof Float) -// return new ValueDescriptor(ValueTrait.StaticValue, new float[] {(Float)obj}); -// else if(obj instanceof Integer) -// return new ValueDescriptor(ValueTrait.StaticValue, new int[] {(Integer)obj}); -// else if(obj instanceof Boolean) -// return new ValueDescriptor(ValueTrait.StaticValue, new boolean[] {(Boolean)obj}); -// else if(obj instanceof Byte) -// return new ValueDescriptor(ValueTrait.StaticValue, new byte[] {(Byte)obj}); -// else if(obj instanceof Long) -// return new ValueDescriptor(ValueTrait.StaticValue, new long[] {(Long)obj}); -// -// throw new Error("Wrong type!"); -// -// } @Override public GraphWriter create() { @@ -407,8 +368,7 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { @Override public GraphWriter create(int clusterHint, Resource type) { - - create(type); + create(type); clusterHints.put(current, clusterHint); return this; @@ -432,7 +392,7 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { return this; } - public GraphWriter flush() { + public GraphWriter flush() { try { s.writeByte(2); } catch(IOException e) { diff --git a/bundles/org.simantics.modeling/src/org/simantics/modeling/ModelingUtils.java b/bundles/org.simantics.modeling/src/org/simantics/modeling/ModelingUtils.java index 3b4806c7c..e43640775 100644 --- a/bundles/org.simantics.modeling/src/org/simantics/modeling/ModelingUtils.java +++ b/bundles/org.simantics.modeling/src/org/simantics/modeling/ModelingUtils.java @@ -400,9 +400,7 @@ public class ModelingUtils { Resource project = SimanticsUI.getProject().get(); - try { - - StreamingTransferableGraphFileReader importer = new StreamingTransferableGraphFileReader(new File(fileName)); + try (StreamingTransferableGraphFileReader importer = new StreamingTransferableGraphFileReader(new File(fileName))) { TransferableGraphSource tg = importer.readTG(); final DefaultPasteImportAdvisor advisor = new DefaultPasteImportAdvisor(project) {