From 7127b0a35075e69e484a1ec9791e594025354645 Mon Sep 17 00:00:00 2001 From: Tuukka Lehtonen Date: Tue, 4 Jul 2017 00:09:51 +0300 Subject: [PATCH] Delete temporary files after use in delayed writes and model TG export Also, delayed writes no longer create temporary files for commits that create under 512KB of committed data. refs #7091 refs #7092 Change-Id: Iff75738b9cc41a52eb88df32b51ddc7cd8af812d (cherry picked from commit 61033f112b0a2e643bf8530b99bcf90c64464f30) --- .../db/impl/graph/DelayedWriteGraph.java | 514 ++++++------------ .../graph/PassthroughSerializerBinding.java | 174 ++++++ .../db/layer0/adapter/impl/TGRemover.java | 5 +- .../db/layer0/util/TGRepresentation.java | 9 +- .../procore/internal/SessionImplSocket.java | 1 + .../utils/writer/DelayedGraphWriter.java | 284 +++++----- .../org/simantics/modeling/ModelingUtils.java | 4 +- 7 files changed, 485 insertions(+), 506 deletions(-) create mode 100644 bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/PassthroughSerializerBinding.java diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/DelayedWriteGraph.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/DelayedWriteGraph.java index dec1f5396..44e23d017 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/DelayedWriteGraph.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/DelayedWriteGraph.java @@ -12,8 +12,7 @@ package org.simantics.db.impl.graph; -import java.io.DataInput; -import java.io.DataOutput; +import java.io.Closeable; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -22,10 +21,9 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.Files; import java.util.ArrayList; import java.util.HashSet; -import java.util.IdentityHashMap; -import java.util.List; import java.util.Set; import java.util.TreeMap; import java.util.UUID; @@ -33,14 +31,10 @@ import java.util.UUID; import org.eclipse.core.runtime.Platform; import org.simantics.databoard.Bindings; import org.simantics.databoard.accessor.Accessor; -import org.simantics.databoard.accessor.reference.ChildReference; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.error.BindingConstructionException; -import org.simantics.databoard.binding.impl.BindingPrintContext; -import org.simantics.databoard.serialization.SerializationException; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.type.Datatype; -import org.simantics.databoard.util.IdentityPair; import org.simantics.databoard.util.binary.RandomAccessBinary; import org.simantics.db.Metadata; import org.simantics.db.ReadGraph; @@ -73,7 +67,6 @@ import org.simantics.db.service.XSupport; import org.simantics.layer0.Layer0; import org.simantics.utils.datastructures.MapList; -import gnu.trove.list.array.TIntArrayList; import gnu.trove.map.hash.TIntIntHashMap; import gnu.trove.map.hash.TObjectIntHashMap; @@ -82,164 +75,12 @@ import gnu.trove.map.hash.TObjectIntHashMap; * immediately but with an explicit commit method. All read operations * return results based on the old graph. */ -public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, ByteReader { - - private static final boolean DEBUG = false; - public static int BUFFER = 65536; - - static class Haxx extends Binding { - - static final Serializer serializer = new Serializer() { - - public byte[] serialize(Object obj) throws SerializationException { - return (byte[])obj; - } - - @Override - public void serialize(DataOutput out, - TObjectIntHashMap identities, Object obj) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public void serialize(DataOutput out, Object obj) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public Object deserialize(DataInput in, List identities) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public Object deserialize(DataInput in) throws IOException { - throw new Error("Not supported."); - } - - @Override - public void deserializeTo(DataInput in, List identities, - Object obj) throws IOException { - throw new Error("Not supported."); - } - - @Override - public void deserializeTo(DataInput in, Object obj) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public void skip(DataInput in, List identities) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public void skip(DataInput in) throws IOException { - throw new Error("Not supported."); - } - - @Override - public Integer getConstantSize() { - throw new Error("Not supported."); - } - - @Override - public int getSize(Object obj, TObjectIntHashMap identities) - throws IOException { - throw new Error("Not supported."); - } - - @Override - public int getSize(Object obj) throws IOException { - throw new Error("Not supported."); - } - - @Override - public int getMinSize() { - throw new Error("Not supported."); - } - - }; - - @Override - public Serializer serializer() { - return serializer; - } - - @Override - public void accept(Visitor1 v, Object obj) { - throw new Error("Not supported."); - } - - @Override - public T accept(Visitor v) { - throw new Error("Not supported."); - } - - @Override - public boolean isInstance(Object obj) { - throw new Error("Not supported."); - } - - @Override - public void assertInstaceIsValid(Object obj, Set validInstances) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public int deepHashValue(Object value, - IdentityHashMap hashedObjects) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public int deepCompare(Object o1, Object o2, - Set> compareHistory) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public void readFrom(Binding srcBinding, Object src, Object dst) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public Object readFromTry(Binding srcBinding, Object src, Object dst) - throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - protected void toString(Object value, BindingPrintContext ctx) throws org.simantics.databoard.binding.error.BindingException { - throw new Error("Not supported."); - } - - @Override - public int getComponentCount() { - throw new Error("Not supported."); - } +public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, ByteReader, Closeable { - @Override - public Binding getComponentBinding(int index) { - throw new Error("Not supported."); - } + private static final boolean DEBUG = false; + private static final int BUFFER_SIZE = 512*1024; - @Override - public Binding getComponentBinding(ChildReference path) { - throw new Error("Not supported."); - } - - } - - private static final Haxx haxx = new Haxx(); + private static final PassthroughSerializerBinding PASSTHROUGH = new PassthroughSerializerBinding(); private final int TERM = 0; private final int CLAIM = 1; @@ -248,30 +89,11 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte private final int DENY = 5; private final int DENY_VALUE = 6; private final int COMMIT_AND_CONTINUE = 7; - - static class ClusterSet { - public Resource resource; - ClusterSet() { - ids = new TIntArrayList(); - old = false; - } - ClusterSet(boolean old, Resource r) { - ids = new TIntArrayList(); - this.old = old; - this.resource = r; - } - void add(int id) { - ids.add(id); - } - private TIntArrayList ids; - private final boolean old; // true if - boolean isNew() { - return !old; - } - } - public class State { + + private static class State { public File tempFile; public FileOutputStream out; + public FileInputStream in; public ArrayList idToResource = new ArrayList(); public TIntIntHashMap externalToId = new TIntIntHashMap(); public ArrayList idToBinding = new ArrayList(); @@ -285,26 +107,31 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte public int valueCount = 0; public int fileCount = 0; } - - public State writeState; - public TreeMap metadata = new TreeMap(); - - Layer0 b; - Session session; + + private State writeState; + private TreeMap metadata = new TreeMap(); + + private FileChannel channel; + private byte[] bytes = new byte[BUFFER_SIZE]; + private ByteBuffer bb = ByteBuffer.wrap(bytes); + private int byteIndex = 0; + + private Layer0 b; + private Session session; + public static Resource convertDelayedResource(Resource r) { - if (r instanceof InternalResource) { - InternalResource ri = (InternalResource)r; - return ri.resource; - } + if (r instanceof InternalResource) + return ((InternalResource) r).resource; return r; } + private static class InternalResource implements Resource { int id; long clusterId = 0; Resource resource = null; Resource clusterSet = null; - + public InternalResource(int id, long clusterId) { this.id = id; this.clusterId = clusterId; @@ -314,7 +141,7 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte this.id = id; this.clusterSet = clusterSet; } - + @Override public long getResourceId() { throw new UnsupportedOperationException(); @@ -324,7 +151,7 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte public Resource get() { return this; } - + @Override public boolean isPersistent() { return false; @@ -332,11 +159,9 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte @Override public int compareTo(Resource o) { - if(o instanceof InternalResource) { - return Integer.compare(id, ((InternalResource)o).id); - } else { - return -1; - } + if(o instanceof InternalResource) + return Integer.compare(id, ((InternalResource)o).id); + return -1; } @Override @@ -346,7 +171,7 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte result = prime * result + id; return result; } - + @Override public int getThreadHash() { return hashCode(); @@ -365,12 +190,12 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte return false; return true; } - @Override public boolean equalsResource(Resource other) { return equals(other); } + @Override public String toString() { StringBuilder sb = new StringBuilder(32); @@ -432,7 +257,7 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte session = g.getSession(); b = Layer0.getInstance(g); this.writeState = state; - } + } public DelayedWriteGraph newSync() { return new DelayedWriteGraph(this, writeState); @@ -715,7 +540,7 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte if (ws.hasClusterSet(null, clusterSet)) throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet); writeState.clusterSetsForExistingResources.add(clusterSet); - } else { + } else { if(!writeState.clusterSets.add(clusterSet)) throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet); } @@ -787,25 +612,25 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte } } + private OutputStream valueWriter = new OutputStream() { + @Override + public void write(int b) throws IOException { + writeByte(b); + } + }; + @Override public void claimValue(Resource resource, Object value, Binding binding) throws ServiceException { try { - + writeByte(CLAIM_VALUE_B); writeInt(getId(resource)); Serializer serializer = binding.serializer(); int size = serializer.getSize(value); writeInt(size); - serializer.serialize(new OutputStream() { - - @Override - public void write(int b) throws IOException { - writeByte(b); - } - - }, value); - + serializer.serialize(valueWriter, value); + } catch(IOException e) { Logger.defaultLogError(e); throw new ServiceException(e); @@ -832,86 +657,76 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte public void flushCluster(Resource r) throws ServiceException { throw new ServiceException("Operation flushCluster(" + r + " not implemented."); } - - private FileChannel channel; - byte[] bytes = new byte[BUFFER]; - byte[] buffer = new byte[BUFFER]; - ByteBuffer bb = ByteBuffer.wrap(bytes); - int byteIndex = 0; - - private void writeReset(int size) { + private void writeReset(int size) { byteIndex = 0; bb.position(0); bb.limit(size); try { - - if(writeState.tempFile == null) { - + if (writeState.tempFile == null) { File workspace = Platform.getLocation().toFile(); File temp = new File(workspace, "tempFiles"); - temp.mkdirs(); - File base = new File(temp, "delayed"); - base.mkdirs(); + Files.createDirectories(base.toPath()); writeState.tempFile = new File(base, UUID.randomUUID().toString()); writeState.out = new FileOutputStream(writeState.tempFile); channel = writeState.out.getChannel(); - } for (int got=0;got < size;) { int n = channel.write(bb); if (n <= 0) { - new Exception().printStackTrace(); + Logger.defaultLogError(new Exception("FileChannel.write returned " + n)); return; } got += n; } } catch (IOException e) { - e.printStackTrace(); + Logger.defaultLogError("Failed to write buffer of " + size + " bytes to temporary file " + writeState.tempFile, e); } } private void reset() { byteIndex = 0; - try { - bb.clear(); - for(int got=0; got < BUFFER;) { - int n = channel.read(bb); - if (n <= 0) - return; - got += n; + bb.clear(); + if (channel != null) { + try { + for(int got=0; got < BUFFER_SIZE;) { + int n = channel.read(bb); + if (n <= 0) + return; + got += n; + } + } catch (IOException e) { + Logger.defaultLogError("FileChannel.read failed", e); } - } catch (IOException e) { - e.printStackTrace(); } } private void writeInt(int i) { - if(byteIndex < (BUFFER-4)) { + if(byteIndex < (BUFFER_SIZE-4)) { bytes[byteIndex++] = (byte)(i&0xff); bytes[byteIndex++] = (byte)((i>>>8)&0xff); bytes[byteIndex++] = (byte)((i>>>16)&0xff); bytes[byteIndex++] = (byte)((i>>>24)&0xff); - if (byteIndex == BUFFER) - writeReset(BUFFER); + if (byteIndex == BUFFER_SIZE) + writeReset(BUFFER_SIZE); } else { - int has = BUFFER-byteIndex; - if(has == 0) writeReset(BUFFER); + int has = BUFFER_SIZE-byteIndex; + if(has == 0) writeReset(BUFFER_SIZE); bytes[byteIndex++] = (byte)(i&0xff); - if(has == 1) writeReset(BUFFER); + if(has == 1) writeReset(BUFFER_SIZE); bytes[byteIndex++] = (byte)((i>>>8)&0xff); - if(has == 2) writeReset(BUFFER); + if(has == 2) writeReset(BUFFER_SIZE); bytes[byteIndex++] = (byte)((i>>>16)&0xff); - if(has == 3) writeReset(BUFFER); + if(has == 3) writeReset(BUFFER_SIZE); bytes[byteIndex++] = (byte)((i>>>24)&0xff); - if(has == 4) writeReset(BUFFER); + if(has == 4) writeReset(BUFFER_SIZE); } } - + private int readInt() { - if(byteIndex < (BUFFER-4)) { + if(byteIndex < (BUFFER_SIZE-4)) { int result = (int) ((bytes[byteIndex++] & 0xff) | ((bytes[byteIndex++] & 0xff)<<8) | @@ -919,7 +734,7 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte ((bytes[byteIndex++] & 0xff)<<24)); return result; } else { - int has = BUFFER-byteIndex; + int has = BUFFER_SIZE-byteIndex; int result = 0; if(has == 0) reset(); result = (int)(bytes[byteIndex++] & 0xff); @@ -933,44 +748,40 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte return result; } } - + private byte readByte() { byte result = bytes[byteIndex++]; - if(byteIndex == BUFFER) reset(); + if(byteIndex == BUFFER_SIZE) reset(); return result; } private void writeByte(int b) { bytes[byteIndex++] = (byte)b; - if(byteIndex == BUFFER) writeReset(BUFFER); + if(byteIndex == BUFFER_SIZE) writeReset(BUFFER_SIZE); } - + private void writeBytes(byte[] data) { - int has = BUFFER-byteIndex; + int has = BUFFER_SIZE-byteIndex; int amount = data.length; if(has > amount) { System.arraycopy(data, 0, bytes, byteIndex, amount); byteIndex += amount; } else { System.arraycopy(data, 0, bytes, byteIndex, has); - writeReset(BUFFER); + writeReset(BUFFER_SIZE); ByteBuffer bb2 = ByteBuffer.wrap(data); bb2.position(has); try { channel.write(bb2); } catch (IOException e) { - e.printStackTrace(); + Logger.defaultLogError("FileChannel.write failed", e); } } } - - public byte[] readBytes(int amount) { - return readBytes(buffer, amount); - } - + public byte[] readBytes(byte[] result, int amount) { if(result == null) result = new byte[amount]; - int has = BUFFER-byteIndex; + int has = BUFFER_SIZE-byteIndex; if(has > amount) { System.arraycopy(bytes, byteIndex, result, 0, amount); byteIndex += amount; @@ -982,39 +793,46 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte try { got += channel.read(bb2); if(got == -1) { - new Exception().printStackTrace(); + // End-of-stream, why log this? return result; } - } catch (IOException e) { - e.printStackTrace(); + Logger.defaultLogError("FileChannel.read failed", e); } reset(); } return result; } - + public void commit(final WriteOnlyGraph w, final WriteTraits traits) throws ServiceException { writeState.bindingToId = null; writeState.externalToId = null; - try { - - writeByte(TERM); - if(byteIndex > 0) { + writeByte(TERM); + + if (writeState.out != null) { + // Flush current buffer to file only if backing file has already + // been taken into use. + if (byteIndex > 0) writeReset(byteIndex); + + try (OutputStream out = writeState.out) { + channel.force(false); + } catch (IOException e) { + throw new ServiceException(e); + } finally { + writeState.out = null; } - channel.force(false); - writeState.out.close(); - FileInputStream fs = new FileInputStream(writeState.tempFile); - channel = fs.getChannel(); - - } catch (IOException e) { - throw new ServiceException(e); + try { + writeState.in = new FileInputStream(writeState.tempFile); + channel = writeState.in.getChannel(); + } catch (IOException e) { + throw new ServiceException(e); + } } - + w.getMetadata().putAll(metadata); - + TransferableGraphSupport tgs = w.getService(TransferableGraphSupport.class); // First create all resources defined by clusterId @@ -1094,13 +912,20 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte } reset(); - bb.limit(BUFFER); + bb.limit(BUFFER_SIZE); try { while(true) { byte method = readByte(); - switch(method) { + switch(method) { case TERM: { + if (DEBUG) { + System.out.println("Resources: " + writeState.idToResource.size()); + System.out.println("Statements: " + writeState.statementCount); + System.out.println("Values: " + writeState.valueCount); + System.out.println("Files: " + writeState.fileCount); + System.out.println("Clusters: " + writeState.clusterCount); + } return; } case CLAIM: { @@ -1153,8 +978,6 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte Resource resource = getResource(readInt()); int len = readInt(); tgs.setValue(w, resource, null, this, len); -// byte[] bytes = readBytes(len); -// tgs.setValue(resource, null, bytes); } break; case COMMIT_AND_CONTINUE: { XSupport xs = w.getService(XSupport.class); @@ -1168,18 +991,19 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte else throw new ServiceException(e); } finally { - try { - channel.close(); - channel = null; - } catch (IOException e) { - throw new ServiceException(e); + channel = null; + if (writeState.in != null) { + try (InputStream in = writeState.in) { + } catch (IOException e) { + throw new ServiceException(e); + } finally { + writeState.in = null; + writeState.tempFile.delete(); + } } } -// System.out.println("Resources: " + state.resourceCount); -// System.out.println("Statements: " + state.statementCount); -// System.out.println("Values: " + state.valueCount); } - + private Resource getType(Object value) { Class clazz = value.getClass(); Resource dataType = @@ -1196,12 +1020,12 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte : clazz == String[].class ? b.StringArray : clazz == boolean[].class ? b.BooleanArray : clazz == byte[].class ? b.ByteArray - : clazz == long[].class ? b.LongArray + : clazz == long[].class ? b.LongArray : null ; return dataType; } - + public long newCluster() { return -1 - (++writeState.clusterCount); } @@ -1209,14 +1033,14 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte public long getDefaultCluster() { return writeState.defaultCluster; } - + public void setDefaultCluster(long cluster) { writeState.defaultCluster = cluster; } - + @Override public void syncRequest(final DelayedWrite request) throws DatabaseException { - + try { final DelayedWriteGraph dwg = new DelayedWriteGraph(this); @@ -1232,24 +1056,23 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte }); } catch (DatabaseException e) { - + throw e; - + } catch (Throwable e) { - + throw new DatabaseException(e); - + } finally { - } - + } - + @Override public void syncRequest(WriteOnly request) throws DatabaseException { - + Resource defaultClusterSet = setClusterSet4NewResource(null); - + try { WriteSupport ws = session.getService(WriteSupport.class); ws.performWriteRequest(this, request); @@ -1260,46 +1083,46 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte } finally { setClusterSet4NewResource(defaultClusterSet); } - + } @SuppressWarnings("unchecked") @Override public T getService(Class api) { - + if(ClusteringSupport.class == api) { final ClusteringSupport support = (ClusteringSupport)super.getService(api); return (T)new ClusteringSupport() { - + @Override public Resource getResourceByIndexAndCluster(int resourceIndex, long clusterId) throws DatabaseException, ResourceNotFoundException { return support.getResourceByIndexAndCluster(resourceIndex, clusterId); } - + @Override public Resource getResourceByKey(int resourceKey) throws ResourceNotFoundException { return support.getResourceByKey(resourceKey); } - + @Override public int getNumberOfResources(long clusterId) throws DatabaseException { return support.getNumberOfResources(clusterId); } - + @Override public long getCluster(Resource r) { return support.getCluster(r); } - + @Override public long createCluster() { return newCluster(); } - + @Override public boolean isClusterSet(Resource r) throws DatabaseException { return support.isClusterSet(r); @@ -1309,18 +1132,18 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte public Resource getClusterSetOfCluster(Resource r) throws DatabaseException { return support.getClusterSetOfCluster(r); } - + @Override public Resource getClusterSetOfCluster(long cluster) throws DatabaseException { return support.getClusterSetOfCluster(cluster); } }; - + } else if (TransferableGraphSupport.class == api) { - + final TransferableGraphSupport parentSupport = session.getService(TransferableGraphSupport.class); - + return (T)new TransferableGraphSupport() { @Override @@ -1329,9 +1152,9 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte writeInt(getId(resource)); writeInt(raw.length); writeBytes(raw); - writeInt(getBindingId(haxx)); + writeInt(getBindingId(PASSTHROUGH)); } - + @Override public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, ByteReader reader, int amount) throws DatabaseException { @@ -1339,25 +1162,25 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte writeInt(getId(resource)); writeInt(amount); writeBytes(reader.readBytes(null, amount)); - writeInt(getBindingId(haxx)); + writeInt(getBindingId(PASSTHROUGH)); } - + @Override public byte[] getValue(ReadGraph graph, Resource resource) { return parentSupport.getValue(graph, resource); } - + @Override public InputStream getValueStream(ReadGraph graph, Resource resource) { return parentSupport.getValueStream(graph, resource); } - + }; } - + return super.getService(api); - + } @Override @@ -1423,4 +1246,27 @@ public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, Byte } } + public void close() { + if (writeState.out != null) { + try (OutputStream out = writeState.out) { + } catch (IOException e) { + Logger.defaultLogError("Failed to close delayed write graph temporary commit output stream", e); + } finally { + writeState.out = null; + } + } + if (writeState.in != null) { + try (InputStream in = writeState.in) { + } catch (IOException e) { + Logger.defaultLogError("Failed to close delayed write graph temporary commit input stream", e); + } finally { + writeState.in = null; + } + } + if (writeState.tempFile != null) { + writeState.tempFile.delete(); + writeState.tempFile = null; + } + } + } diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/PassthroughSerializerBinding.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/PassthroughSerializerBinding.java new file mode 100644 index 000000000..8c95a5151 --- /dev/null +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/PassthroughSerializerBinding.java @@ -0,0 +1,174 @@ +package org.simantics.db.impl.graph; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Set; + +import org.simantics.databoard.accessor.reference.ChildReference; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.binding.impl.BindingPrintContext; +import org.simantics.databoard.serialization.SerializationException; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.databoard.util.IdentityPair; + +import gnu.trove.map.hash.TObjectIntHashMap; + +/** + * Originally within DelayedWriteGraph, put in separate file in 1.30.0. + * + * @author Antti Villberg + */ +class PassthroughSerializerBinding extends Binding { + + static final Serializer serializer = new Serializer() { + + public byte[] serialize(Object obj) throws SerializationException { + return (byte[])obj; + } + + @Override + public void serialize(DataOutput out, + TObjectIntHashMap identities, Object obj) + throws IOException { + throw new Error("Not supported."); + } + + @Override + public void serialize(DataOutput out, Object obj) + throws IOException { + throw new Error("Not supported."); + } + + @Override + public Object deserialize(DataInput in, List identities) + throws IOException { + throw new Error("Not supported."); + } + + @Override + public Object deserialize(DataInput in) throws IOException { + throw new Error("Not supported."); + } + + @Override + public void deserializeTo(DataInput in, List identities, + Object obj) throws IOException { + throw new Error("Not supported."); + } + + @Override + public void deserializeTo(DataInput in, Object obj) + throws IOException { + throw new Error("Not supported."); + } + + @Override + public void skip(DataInput in, List identities) + throws IOException { + throw new Error("Not supported."); + } + + @Override + public void skip(DataInput in) throws IOException { + throw new Error("Not supported."); + } + + @Override + public Integer getConstantSize() { + throw new Error("Not supported."); + } + + @Override + public int getSize(Object obj, TObjectIntHashMap identities) + throws IOException { + throw new Error("Not supported."); + } + + @Override + public int getSize(Object obj) throws IOException { + throw new Error("Not supported."); + } + + @Override + public int getMinSize() { + throw new Error("Not supported."); + } + + }; + + @Override + public Serializer serializer() { + return serializer; + } + + @Override + public void accept(Visitor1 v, Object obj) { + throw new Error("Not supported."); + } + + @Override + public T accept(Visitor v) { + throw new Error("Not supported."); + } + + @Override + public boolean isInstance(Object obj) { + throw new Error("Not supported."); + } + + @Override + public void assertInstaceIsValid(Object obj, Set validInstances) + throws org.simantics.databoard.binding.error.BindingException { + throw new Error("Not supported."); + } + + @Override + public int deepHashValue(Object value, + IdentityHashMap hashedObjects) + throws org.simantics.databoard.binding.error.BindingException { + throw new Error("Not supported."); + } + + @Override + public int deepCompare(Object o1, Object o2, + Set> compareHistory) + throws org.simantics.databoard.binding.error.BindingException { + throw new Error("Not supported."); + } + + @Override + public void readFrom(Binding srcBinding, Object src, Object dst) + throws org.simantics.databoard.binding.error.BindingException { + throw new Error("Not supported."); + } + + @Override + public Object readFromTry(Binding srcBinding, Object src, Object dst) + throws org.simantics.databoard.binding.error.BindingException { + throw new Error("Not supported."); + } + + @Override + protected void toString(Object value, BindingPrintContext ctx) throws org.simantics.databoard.binding.error.BindingException { + throw new Error("Not supported."); + } + + @Override + public int getComponentCount() { + throw new Error("Not supported."); + } + + @Override + public Binding getComponentBinding(int index) { + throw new Error("Not supported."); + } + + @Override + public Binding getComponentBinding(ChildReference path) { + throw new Error("Not supported."); + } + +} \ No newline at end of file diff --git a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/adapter/impl/TGRemover.java b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/adapter/impl/TGRemover.java index aae4b581b..885948834 100644 --- a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/adapter/impl/TGRemover.java +++ b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/adapter/impl/TGRemover.java @@ -43,10 +43,9 @@ public class TGRemover extends AbstractRemover { TransferableGraphConfiguration2 conf = new TransferableGraphConfiguration2(graph, resource); conf.values = false; - ModelTransferableGraphSource source = graph.syncRequest(new ModelTransferableGraphSourceRequest(conf)); final SerialisationSupport ss = graph.getService(SerialisationSupport.class); - try { + try (ModelTransferableGraphSource source = graph.syncRequest(new ModelTransferableGraphSourceRequest(conf))) { source.forResourceStatements(graph, new TransferableGraphSourceProcedure() { @Override @@ -75,8 +74,6 @@ public class TGRemover extends AbstractRemover { }); - source.closeStreams(); - } catch (Exception e) { throw new DatabaseException(e); } diff --git a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/TGRepresentation.java b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/TGRepresentation.java index 0f9ffa0a7..ed7ae5193 100644 --- a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/TGRepresentation.java +++ b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/TGRepresentation.java @@ -1,5 +1,6 @@ package org.simantics.db.layer0.util; +import java.io.IOException; import java.util.Arrays; import java.util.Map; @@ -49,9 +50,11 @@ public class TGRepresentation implements Representation { configuration.exclusionFunction = TGRepresentationUtils.computeExclusionFunction(graph, resources, hints); } - ModelTransferableGraphSource source = graph.syncRequest(new ModelTransferableGraphSourceRequest(configuration)); - return TransferableGraphs.create(graph, source); - + try (ModelTransferableGraphSource source = graph.syncRequest(new ModelTransferableGraphSourceRequest(configuration))) { + return TransferableGraphs.create(graph, source); + } catch (IOException e) { + throw new DatabaseException(e); + } } @SuppressWarnings("unchecked") diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index 6c3497f15..2f555d079 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -675,6 +675,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } catch (Throwable e) { delayedWriteState.except(e); total.finish(); + dwg.close(); return; } finally { // newGraph.state.barrier.dec(); diff --git a/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java b/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java index 576bc0b4b..66ffa77de 100644 --- a/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java +++ b/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java @@ -12,9 +12,6 @@ package org.simantics.layer0.utils.writer; -import gnu.trove.map.hash.TIntIntHashMap; -import gnu.trove.map.hash.TIntLongHashMap; - import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; @@ -31,6 +28,9 @@ import org.simantics.db.Resource; import org.simantics.db.WriteOnlyGraph; import org.simantics.db.exception.DatabaseException; +import gnu.trove.map.hash.TIntIntHashMap; +import gnu.trove.map.hash.TIntLongHashMap; + public class DelayedGraphWriter extends AbstractDelayedGraphWriter { protected File file; @@ -45,7 +45,6 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { System.out.println("Temp file: " + file.getAbsolutePath()); s = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file), 1024*1024)); // s = new ObjectOutputStream(backup); - file.deleteOnExit(); } catch (IOException e) { throw new RuntimeException("Opening output stream to temporary file failed", e); @@ -79,13 +78,13 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { assert(value != null); assert(dataType != null); try { - ++internalCount; + ++internalCount; timestamps.add(0); - s.writeByte(1); - writeRef(internalCount); + s.writeByte(1); + writeRef(internalCount); s.writeUnshared(value); - writeRef(getId(dataType)); + writeRef(getId(dataType)); s.writeByte(0); writeRef(current); @@ -110,14 +109,14 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { assert(value != null); assert(dataType != null); try { - ++internalCount; + ++internalCount; timestamps.add(0); - s.writeByte(1); - writeRef(internalCount); + s.writeByte(1); + writeRef(internalCount); s.writeUnshared(value); - writeRef(getId(dataType)); - + writeRef(getId(dataType)); + current = internalCount; } catch (IOException e) { throw new RuntimeException("Writing statement failed.", e); @@ -133,7 +132,7 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { clusterHints.put(current, clusterHint); return this; } - + @Override public GraphWriter createInverse(int cluster, Resource r) { assert(r != null); @@ -147,7 +146,7 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { } return this; } - + @Override public GraphWriter createInverse(Resource r) { assert(r != null); @@ -160,7 +159,7 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { throw new RuntimeException("Writing statement failed.", e); } return this; - } + } protected Resource getResource(WriteOnlyGraph wg, Resource[] internals, long[] resourceIds, int id) { if(id > 0) { @@ -185,124 +184,122 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { @Override public void commit(IProgressMonitor monitor, WriteOnlyGraph wg) throws DatabaseException { - try { - if (s != null) { - s.close(); - s = null; - } + if (s != null) { + s.close(); + s = null; + } externalsInv = null; - - monitor.beginTask("", 100); - - int lastPercentageDone = 0; - long fileLength = file.length(); - -// System.out.println("size of commit file: " + fileLength); - - Resource[] internals = new Resource[internalCount]; - resourceIds = new long[internalCount]; - - + monitor.beginTask("Writing database", 100); FileInputStream fis = new FileInputStream(file); FileChannel fc = fis.getChannel(); - ObjectInputStream is = new ObjectInputStream(new BufferedInputStream(fis, 1024*1024)); - int resourceCounter = 0; - int statementCounter = 0; - int valueCounter = 0; - time = 0; - loop: while(true) { - switch(is.read()) { - case -1: - break loop; - case 0: { - int s = is.readInt(); - int p = is.readInt(); - int o = is.readInt(); - Resource rs = getResource(wg, internals, resourceIds, s); - Resource rp = getResource(wg, internals, resourceIds, p); - Resource ro = getResource(wg, internals, resourceIds, o); - Resource rpInv = inverses.get(p); - wg.claim(rs, rp, rpInv, ro); - statementCounter += 2; - } break; - case 1: { - int id = is.readInt(); - Object value = is.readUnshared(); - int type = is.readInt(); - - Resource r = newResource(wg, internals, id); - wg.claim(r, l0.InstanceOf, null, getResource(wg, internals, resourceIds, type)); - wg.claimValue(r, value); - statementCounter ++; - resourceCounter ++; - valueCounter ++; - - } break; - case 2: { - wg.flushCluster(); - } break; - case 3: { - - int s = is.readInt(); - int t = is.readInt(); - - Resource type = getResource(wg, internals, resourceIds, t); - wg.claim(newResource(wg, internals, s), l0.InstanceOf, null, type); - statementCounter ++; - resourceCounter ++; - - } break; - case 4: { - - int s = is.readInt(); - newResource(wg, internals, s); - resourceCounter ++; - - } break; - case 5: { // InverseOf - - int r1 = is.readInt(); - int r2 = is.readInt(); - - Resource rr1 = getResource(wg, internals, resourceIds, r1); - Resource rr2 = getResource(wg, internals, resourceIds, r2); - wg.claim(rr1, l0.InverseOf, l0.InverseOf, rr2); - statementCounter += 2; - - inverses.put(r1, rr2); - inverses.put(r2, rr1); - - } break; - } - -// if((counter % 200000) == 0) { -// System.out.println("Written " + counter + " statements."); -// } - - double percentageDone = 100.0 * (double) fc.position() / (double) fileLength; - int newPercentageDone = (int) Math.round(percentageDone); - if(newPercentageDone > lastPercentageDone) { - monitor.setTaskName("Writing database (" + newPercentageDone + "%)"); - monitor.worked(newPercentageDone - lastPercentageDone); - lastPercentageDone = newPercentageDone; - } + try (ObjectInputStream is = new ObjectInputStream(new BufferedInputStream(fis, 1024*1024))) { + commit(monitor, wg, is, fc); } - - System.out.println("clusterIds.size() = " + clusterIds.size()); - - System.out.println("Wrote " + resourceCounter + " resources, " + statementCounter + " statements and " + valueCounter + " values."); - } catch (IOException e) { - throw new RuntimeException("Commiting delayed graph writings failed.", e); + throw new DatabaseException("Commiting delayed graph writings failed.", e); } catch (ClassNotFoundException e) { - throw new RuntimeException("Commiting delayed graph writings failed.", e); - } finally { + throw new DatabaseException("Commiting delayed graph writings failed.", e); + } finally { file.delete(); monitor.done(); } } - + + private void commit(IProgressMonitor monitor, WriteOnlyGraph wg, ObjectInputStream is, FileChannel fc) throws DatabaseException, IOException, ClassNotFoundException { + int lastPercentageDone = 0; + long fileLength = file.length(); + //System.out.println("size of commit file: " + fileLength); + + Resource[] internals = new Resource[internalCount]; + resourceIds = new long[internalCount]; + + int resourceCounter = 0; + int statementCounter = 0; + int valueCounter = 0; + time = 0; + while(true) { + switch(is.read()) { + case -1: + System.out.println("clusterIds.size() = " + clusterIds.size()); + System.out.println("Wrote " + resourceCounter + " resources, " + statementCounter + " statements and " + valueCounter + " values."); + return; + + case 0: { + int s = is.readInt(); + int p = is.readInt(); + int o = is.readInt(); + Resource rs = getResource(wg, internals, resourceIds, s); + Resource rp = getResource(wg, internals, resourceIds, p); + Resource ro = getResource(wg, internals, resourceIds, o); + Resource rpInv = inverses.get(p); + wg.claim(rs, rp, rpInv, ro); + statementCounter += 2; + } break; + case 1: { + int id = is.readInt(); + Object value = is.readUnshared(); + int type = is.readInt(); + + Resource r = newResource(wg, internals, id); + wg.claim(r, l0.InstanceOf, null, getResource(wg, internals, resourceIds, type)); + wg.claimValue(r, value); + statementCounter ++; + resourceCounter ++; + valueCounter ++; + + } break; + case 2: { + wg.flushCluster(); + } break; + case 3: { + + int s = is.readInt(); + int t = is.readInt(); + + Resource type = getResource(wg, internals, resourceIds, t); + wg.claim(newResource(wg, internals, s), l0.InstanceOf, null, type); + statementCounter ++; + resourceCounter ++; + + } break; + case 4: { + + int s = is.readInt(); + newResource(wg, internals, s); + resourceCounter ++; + + } break; + case 5: { // InverseOf + + int r1 = is.readInt(); + int r2 = is.readInt(); + + Resource rr1 = getResource(wg, internals, resourceIds, r1); + Resource rr2 = getResource(wg, internals, resourceIds, r2); + wg.claim(rr1, l0.InverseOf, l0.InverseOf, rr2); + statementCounter += 2; + + inverses.put(r1, rr2); + inverses.put(r2, rr1); + + } break; + } + +// if((counter % 200000) == 0) { +// System.out.println("Written " + counter + " statements."); +// } + + double percentageDone = 100.0 * (double) fc.position() / (double) fileLength; + int newPercentageDone = (int) Math.round(percentageDone); + if(newPercentageDone > lastPercentageDone) { + monitor.setTaskName("Writing database (" + newPercentageDone + "%)"); + monitor.worked(newPercentageDone - lastPercentageDone); + lastPercentageDone = newPercentageDone; + } + } + } + private Resource newResource(WriteOnlyGraph wg, Resource[] internals, int s) throws DatabaseException { int clusterHint = clusterHints.get(s); @@ -323,42 +320,6 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { resourceIds[s-1] = r.getResourceId(); return r; } - -// public ValueDescriptor createDescriptor(Object obj) { -// -// if(obj instanceof String[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (String[])obj); -// else if(obj instanceof int[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (int[])obj); -// else if(obj instanceof double[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (double[])obj); -// else if(obj instanceof float[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (float[])obj); -// else if(obj instanceof boolean[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (boolean[])obj); -// else if(obj instanceof long[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (long[])obj); -// else if(obj instanceof byte[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (byte[])obj); -// -// else if(obj instanceof String) -// return new ValueDescriptor(ValueTrait.StaticValue, new String[] {(String)obj}); -// else if(obj instanceof Double) -// return new ValueDescriptor(ValueTrait.StaticValue, new double[] {(Double)obj}); -// else if(obj instanceof Float) -// return new ValueDescriptor(ValueTrait.StaticValue, new float[] {(Float)obj}); -// else if(obj instanceof Integer) -// return new ValueDescriptor(ValueTrait.StaticValue, new int[] {(Integer)obj}); -// else if(obj instanceof Boolean) -// return new ValueDescriptor(ValueTrait.StaticValue, new boolean[] {(Boolean)obj}); -// else if(obj instanceof Byte) -// return new ValueDescriptor(ValueTrait.StaticValue, new byte[] {(Byte)obj}); -// else if(obj instanceof Long) -// return new ValueDescriptor(ValueTrait.StaticValue, new long[] {(Long)obj}); -// -// throw new Error("Wrong type!"); -// -// } @Override public GraphWriter create() { @@ -407,8 +368,7 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { @Override public GraphWriter create(int clusterHint, Resource type) { - - create(type); + create(type); clusterHints.put(current, clusterHint); return this; @@ -432,7 +392,7 @@ public class DelayedGraphWriter extends AbstractDelayedGraphWriter { return this; } - public GraphWriter flush() { + public GraphWriter flush() { try { s.writeByte(2); } catch(IOException e) { diff --git a/bundles/org.simantics.modeling/src/org/simantics/modeling/ModelingUtils.java b/bundles/org.simantics.modeling/src/org/simantics/modeling/ModelingUtils.java index 3b4806c7c..e43640775 100644 --- a/bundles/org.simantics.modeling/src/org/simantics/modeling/ModelingUtils.java +++ b/bundles/org.simantics.modeling/src/org/simantics/modeling/ModelingUtils.java @@ -400,9 +400,7 @@ public class ModelingUtils { Resource project = SimanticsUI.getProject().get(); - try { - - StreamingTransferableGraphFileReader importer = new StreamingTransferableGraphFileReader(new File(fileName)); + try (StreamingTransferableGraphFileReader importer = new StreamingTransferableGraphFileReader(new File(fileName))) { TransferableGraphSource tg = importer.readTG(); final DefaultPasteImportAdvisor advisor = new DefaultPasteImportAdvisor(project) { -- 2.47.1