X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.layer0.utils%2Fsrc%2Forg%2Fsimantics%2Flayer0%2Futils%2Fwriter%2FDelayedGraphWriter.java;h=66ffa77de7def4253c8601babb7d978b4d2184bd;hp=804535337c80310f655902814c3c4c94fca45b6b;hb=61033f112b0a2e643bf8530b99bcf90c64464f30;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java b/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java index 804535337..66ffa77de 100644 --- a/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java +++ b/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java @@ -1,444 +1,404 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.layer0.utils.writer; - - -import gnu.trove.map.hash.TIntIntHashMap; -import gnu.trove.map.hash.TIntLongHashMap; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.nio.channels.FileChannel; - -import org.eclipse.core.runtime.IProgressMonitor; -import org.simantics.db.ReadGraph; -import org.simantics.db.Resource; -import org.simantics.db.WriteOnlyGraph; -import org.simantics.db.exception.DatabaseException; - -public class DelayedGraphWriter extends AbstractDelayedGraphWriter { - - protected File file; - protected ObjectOutputStream s; - TIntIntHashMap clusterHints = new TIntIntHashMap(); - TIntLongHashMap clusterIds = new TIntLongHashMap(); - - public DelayedGraphWriter(ReadGraph graph) { - super(graph); - try { - file = File.createTempFile("graph", ".tmp"); - System.out.println("Temp file: " + file.getAbsolutePath()); - s = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file), 1024*1024)); -// s = new ObjectOutputStream(backup); - - file.deleteOnExit(); - } catch (IOException e) { - throw new RuntimeException("Opening output stream to temporary file failed", e); - } - } - - private void writeRef(int ref) throws IOException { - if(ref > 0) - timestamps.set(ref-1, time++); - s.writeInt(ref); - } - - @Override - public GraphWriter let(Resource p, Resource o) throws DatabaseException { - assert(p != null); - assert(o != null); - try { - s.writeByte(0); - writeRef(current); - writeRef(getPredicateId(p)); - writeRef(getId(o)); - } catch (IOException e) { - throw new RuntimeException("Writing statement failed.", e); - } - return this; - } - - @Override - public GraphWriter let(Resource p, Object value, Resource dataType) throws DatabaseException { - assert(p != null); - assert(value != null); - assert(dataType != null); - try { - ++internalCount; - timestamps.add(0); - - s.writeByte(1); - writeRef(internalCount); - s.writeUnshared(value); - writeRef(getId(dataType)); - - s.writeByte(0); - writeRef(current); - writeRef(getPredicateId(p)); - writeRef(internalCount); - } catch (IOException e) { - throw new RuntimeException("Writing statement failed.", e); - } - return this; - } - - @Override - public GraphWriter let(int clusterHint, Resource p, Object value, - Resource dataType) throws DatabaseException { - let(p, value, dataType); - clusterHints.put(internalCount, clusterHint); - return this; - } - - @Override - public GraphWriter createLiteral(Object value, Resource dataType) { - assert(value != null); - assert(dataType != null); - try { - ++internalCount; - timestamps.add(0); - - s.writeByte(1); - writeRef(internalCount); - s.writeUnshared(value); - writeRef(getId(dataType)); - - current = internalCount; - } catch (IOException e) { - throw new RuntimeException("Writing statement failed.", e); - } - return this; - } - - @Override - public GraphWriter createLiteral(int clusterHint, Object value, - Resource dataType) { - createLiteral(value, dataType); - - clusterHints.put(current, clusterHint); - return this; - } - - @Override - public GraphWriter createInverse(int cluster, Resource r) { - assert(r != null); - create(cluster); - try { - s.writeByte(5); - writeRef(current); - writeRef(getId(r)); - } catch (IOException e) { - throw new RuntimeException("Writing statement failed.", e); - } - return this; - } - - @Override - public GraphWriter createInverse(Resource r) { - assert(r != null); - create(); - try { - s.writeByte(5); - writeRef(current); - writeRef(getId(r)); - } catch (IOException e) { - throw new RuntimeException("Writing statement failed.", e); - } - return this; - } - - protected Resource getResource(WriteOnlyGraph wg, Resource[] internals, long[] resourceIds, int id) { - if(id > 0) { - Resource ret = internals[id-1]; - if(ret == null) { - throw new Error("Error"); -// ret = wg.newResource(types.get(id)); -// internals[id-1] = ret; -// resourceIds[id-1] = ret.getResourceId(); - } - if(timestamps.getQuick(id-1)==time) - internals[id-1] = null; - ++time; - return ret; - } - else if(id < 0) { - return externals.get(-1-id); - } - else - return null; - } - - @Override - public void commit(IProgressMonitor monitor, WriteOnlyGraph wg) throws DatabaseException { - - try { - if (s != null) { - s.close(); - s = null; - } - externalsInv = null; - - monitor.beginTask("", 100); - - int lastPercentageDone = 0; - long fileLength = file.length(); - -// System.out.println("size of commit file: " + fileLength); - - Resource[] internals = new Resource[internalCount]; - resourceIds = new long[internalCount]; - - - FileInputStream fis = new FileInputStream(file); - FileChannel fc = fis.getChannel(); - ObjectInputStream is = new ObjectInputStream(new BufferedInputStream(fis, 1024*1024)); - int resourceCounter = 0; - int statementCounter = 0; - int valueCounter = 0; - time = 0; - loop: while(true) { - switch(is.read()) { - case -1: - break loop; - case 0: { - int s = is.readInt(); - int p = is.readInt(); - int o = is.readInt(); - Resource rs = getResource(wg, internals, resourceIds, s); - Resource rp = getResource(wg, internals, resourceIds, p); - Resource ro = getResource(wg, internals, resourceIds, o); - Resource rpInv = inverses.get(p); - wg.claim(rs, rp, rpInv, ro); - statementCounter += 2; - } break; - case 1: { - int id = is.readInt(); - Object value = is.readUnshared(); - int type = is.readInt(); - - Resource r = newResource(wg, internals, id); - wg.claim(r, l0.InstanceOf, null, getResource(wg, internals, resourceIds, type)); - wg.claimValue(r, value); - statementCounter ++; - resourceCounter ++; - valueCounter ++; - - } break; - case 2: { - wg.flushCluster(); - } break; - case 3: { - - int s = is.readInt(); - int t = is.readInt(); - - Resource type = getResource(wg, internals, resourceIds, t); - wg.claim(newResource(wg, internals, s), l0.InstanceOf, null, type); - statementCounter ++; - resourceCounter ++; - - } break; - case 4: { - - int s = is.readInt(); - newResource(wg, internals, s); - resourceCounter ++; - - } break; - case 5: { // InverseOf - - int r1 = is.readInt(); - int r2 = is.readInt(); - - Resource rr1 = getResource(wg, internals, resourceIds, r1); - Resource rr2 = getResource(wg, internals, resourceIds, r2); - wg.claim(rr1, l0.InverseOf, l0.InverseOf, rr2); - statementCounter += 2; - - inverses.put(r1, rr2); - inverses.put(r2, rr1); - - } break; - } - -// if((counter % 200000) == 0) { -// System.out.println("Written " + counter + " statements."); -// } - - double percentageDone = 100.0 * (double) fc.position() / (double) fileLength; - int newPercentageDone = (int) Math.round(percentageDone); - if(newPercentageDone > lastPercentageDone) { - monitor.setTaskName("Writing database (" + newPercentageDone + "%)"); - monitor.worked(newPercentageDone - lastPercentageDone); - lastPercentageDone = newPercentageDone; - } - } - - System.out.println("clusterIds.size() = " + clusterIds.size()); - - System.out.println("Wrote " + resourceCounter + " resources, " + statementCounter + " statements and " + valueCounter + " values."); - - } catch (IOException e) { - throw new RuntimeException("Commiting delayed graph writings failed.", e); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Commiting delayed graph writings failed.", e); - } finally { - file.delete(); - monitor.done(); - } - } - - private Resource newResource(WriteOnlyGraph wg, Resource[] internals, int s) throws DatabaseException { - int clusterHint = clusterHints.get(s); - - Resource r; - if(clusterHint == 0) - r = wg.newResource(); - else { - long clusterId = clusterIds.get(clusterHint); - if(clusterId == 0) { - wg.flushCluster(); - r = wg.newResource(); - clusterIds.put(clusterHint, clustering.getCluster(r)); - } - else - r = wg.newResource(clusterId); - } - internals[s-1] = r; - resourceIds[s-1] = r.getResourceId(); - return r; - } - -// public ValueDescriptor createDescriptor(Object obj) { -// -// if(obj instanceof String[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (String[])obj); -// else if(obj instanceof int[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (int[])obj); -// else if(obj instanceof double[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (double[])obj); -// else if(obj instanceof float[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (float[])obj); -// else if(obj instanceof boolean[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (boolean[])obj); -// else if(obj instanceof long[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (long[])obj); -// else if(obj instanceof byte[]) -// return new ValueDescriptor(ValueTrait.StaticValue, (byte[])obj); -// -// else if(obj instanceof String) -// return new ValueDescriptor(ValueTrait.StaticValue, new String[] {(String)obj}); -// else if(obj instanceof Double) -// return new ValueDescriptor(ValueTrait.StaticValue, new double[] {(Double)obj}); -// else if(obj instanceof Float) -// return new ValueDescriptor(ValueTrait.StaticValue, new float[] {(Float)obj}); -// else if(obj instanceof Integer) -// return new ValueDescriptor(ValueTrait.StaticValue, new int[] {(Integer)obj}); -// else if(obj instanceof Boolean) -// return new ValueDescriptor(ValueTrait.StaticValue, new boolean[] {(Boolean)obj}); -// else if(obj instanceof Byte) -// return new ValueDescriptor(ValueTrait.StaticValue, new byte[] {(Byte)obj}); -// else if(obj instanceof Long) -// return new ValueDescriptor(ValueTrait.StaticValue, new long[] {(Long)obj}); -// -// throw new Error("Wrong type!"); -// -// } - - @Override - public GraphWriter create() { - - current = ++internalCount; - timestamps.add(0); - - try { - s.writeByte(4); - s.writeInt(current); - } catch (IOException e) { - throw new RuntimeException("Writing statement failed.", e); - } - - return this; - - } - - @Override - public GraphWriter create(int clusterHint) { - - create(); - clusterHints.put(current, clusterHint); - return this; - - } - - @Override - public GraphWriter create(Resource type) { - assert(type != null); - - current = ++internalCount; - timestamps.add(0); - - try { - s.writeByte(3); - s.writeInt(current); - s.writeInt(getId(type)); - } catch (IOException e) { - throw new RuntimeException("Writing statement failed.", e); - } - - return this; - - } - - @Override - public GraphWriter create(int clusterHint, Resource type) { - - create(type); - clusterHints.put(current, clusterHint); - return this; - - } - - @Override - public Resource get() { - if(current > 0) - return new InternalResource(current); - else if(current < 0) - return externals.get(-1-current); - else - return null; - } - - @Override - public GraphWriter handle(Resource s) { - assert(s != null); - - current = getId(s); - return this; - } - - public GraphWriter flush() { - try { - s.writeByte(2); - } catch(IOException e) { - throw new RuntimeException("Writing flush failed.", e); - } - return this; - } - -} +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.layer0.utils.writer; + + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.channels.FileChannel; + +import org.eclipse.core.runtime.IProgressMonitor; +import org.simantics.db.ReadGraph; +import org.simantics.db.Resource; +import org.simantics.db.WriteOnlyGraph; +import org.simantics.db.exception.DatabaseException; + +import gnu.trove.map.hash.TIntIntHashMap; +import gnu.trove.map.hash.TIntLongHashMap; + +public class DelayedGraphWriter extends AbstractDelayedGraphWriter { + + protected File file; + protected ObjectOutputStream s; + TIntIntHashMap clusterHints = new TIntIntHashMap(); + TIntLongHashMap clusterIds = new TIntLongHashMap(); + + public DelayedGraphWriter(ReadGraph graph) { + super(graph); + try { + file = File.createTempFile("graph", ".tmp"); + System.out.println("Temp file: " + file.getAbsolutePath()); + s = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file), 1024*1024)); +// s = new ObjectOutputStream(backup); + file.deleteOnExit(); + } catch (IOException e) { + throw new RuntimeException("Opening output stream to temporary file failed", e); + } + } + + private void writeRef(int ref) throws IOException { + if(ref > 0) + timestamps.set(ref-1, time++); + s.writeInt(ref); + } + + @Override + public GraphWriter let(Resource p, Resource o) throws DatabaseException { + assert(p != null); + assert(o != null); + try { + s.writeByte(0); + writeRef(current); + writeRef(getPredicateId(p)); + writeRef(getId(o)); + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + return this; + } + + @Override + public GraphWriter let(Resource p, Object value, Resource dataType) throws DatabaseException { + assert(p != null); + assert(value != null); + assert(dataType != null); + try { + ++internalCount; + timestamps.add(0); + + s.writeByte(1); + writeRef(internalCount); + s.writeUnshared(value); + writeRef(getId(dataType)); + + s.writeByte(0); + writeRef(current); + writeRef(getPredicateId(p)); + writeRef(internalCount); + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + return this; + } + + @Override + public GraphWriter let(int clusterHint, Resource p, Object value, + Resource dataType) throws DatabaseException { + let(p, value, dataType); + clusterHints.put(internalCount, clusterHint); + return this; + } + + @Override + public GraphWriter createLiteral(Object value, Resource dataType) { + assert(value != null); + assert(dataType != null); + try { + ++internalCount; + timestamps.add(0); + + s.writeByte(1); + writeRef(internalCount); + s.writeUnshared(value); + writeRef(getId(dataType)); + + current = internalCount; + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + return this; + } + + @Override + public GraphWriter createLiteral(int clusterHint, Object value, + Resource dataType) { + createLiteral(value, dataType); + + clusterHints.put(current, clusterHint); + return this; + } + + @Override + public GraphWriter createInverse(int cluster, Resource r) { + assert(r != null); + create(cluster); + try { + s.writeByte(5); + writeRef(current); + writeRef(getId(r)); + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + return this; + } + + @Override + public GraphWriter createInverse(Resource r) { + assert(r != null); + create(); + try { + s.writeByte(5); + writeRef(current); + writeRef(getId(r)); + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + return this; + } + + protected Resource getResource(WriteOnlyGraph wg, Resource[] internals, long[] resourceIds, int id) { + if(id > 0) { + Resource ret = internals[id-1]; + if(ret == null) { + throw new Error("Error"); +// ret = wg.newResource(types.get(id)); +// internals[id-1] = ret; +// resourceIds[id-1] = ret.getResourceId(); + } + if(timestamps.getQuick(id-1)==time) + internals[id-1] = null; + ++time; + return ret; + } + else if(id < 0) { + return externals.get(-1-id); + } + else + return null; + } + + @Override + public void commit(IProgressMonitor monitor, WriteOnlyGraph wg) throws DatabaseException { + try { + if (s != null) { + s.close(); + s = null; + } + externalsInv = null; + monitor.beginTask("Writing database", 100); + FileInputStream fis = new FileInputStream(file); + FileChannel fc = fis.getChannel(); + try (ObjectInputStream is = new ObjectInputStream(new BufferedInputStream(fis, 1024*1024))) { + commit(monitor, wg, is, fc); + } + } catch (IOException e) { + throw new DatabaseException("Commiting delayed graph writings failed.", e); + } catch (ClassNotFoundException e) { + throw new DatabaseException("Commiting delayed graph writings failed.", e); + } finally { + file.delete(); + monitor.done(); + } + } + + private void commit(IProgressMonitor monitor, WriteOnlyGraph wg, ObjectInputStream is, FileChannel fc) throws DatabaseException, IOException, ClassNotFoundException { + int lastPercentageDone = 0; + long fileLength = file.length(); + //System.out.println("size of commit file: " + fileLength); + + Resource[] internals = new Resource[internalCount]; + resourceIds = new long[internalCount]; + + int resourceCounter = 0; + int statementCounter = 0; + int valueCounter = 0; + time = 0; + while(true) { + switch(is.read()) { + case -1: + System.out.println("clusterIds.size() = " + clusterIds.size()); + System.out.println("Wrote " + resourceCounter + " resources, " + statementCounter + " statements and " + valueCounter + " values."); + return; + + case 0: { + int s = is.readInt(); + int p = is.readInt(); + int o = is.readInt(); + Resource rs = getResource(wg, internals, resourceIds, s); + Resource rp = getResource(wg, internals, resourceIds, p); + Resource ro = getResource(wg, internals, resourceIds, o); + Resource rpInv = inverses.get(p); + wg.claim(rs, rp, rpInv, ro); + statementCounter += 2; + } break; + case 1: { + int id = is.readInt(); + Object value = is.readUnshared(); + int type = is.readInt(); + + Resource r = newResource(wg, internals, id); + wg.claim(r, l0.InstanceOf, null, getResource(wg, internals, resourceIds, type)); + wg.claimValue(r, value); + statementCounter ++; + resourceCounter ++; + valueCounter ++; + + } break; + case 2: { + wg.flushCluster(); + } break; + case 3: { + + int s = is.readInt(); + int t = is.readInt(); + + Resource type = getResource(wg, internals, resourceIds, t); + wg.claim(newResource(wg, internals, s), l0.InstanceOf, null, type); + statementCounter ++; + resourceCounter ++; + + } break; + case 4: { + + int s = is.readInt(); + newResource(wg, internals, s); + resourceCounter ++; + + } break; + case 5: { // InverseOf + + int r1 = is.readInt(); + int r2 = is.readInt(); + + Resource rr1 = getResource(wg, internals, resourceIds, r1); + Resource rr2 = getResource(wg, internals, resourceIds, r2); + wg.claim(rr1, l0.InverseOf, l0.InverseOf, rr2); + statementCounter += 2; + + inverses.put(r1, rr2); + inverses.put(r2, rr1); + + } break; + } + +// if((counter % 200000) == 0) { +// System.out.println("Written " + counter + " statements."); +// } + + double percentageDone = 100.0 * (double) fc.position() / (double) fileLength; + int newPercentageDone = (int) Math.round(percentageDone); + if(newPercentageDone > lastPercentageDone) { + monitor.setTaskName("Writing database (" + newPercentageDone + "%)"); + monitor.worked(newPercentageDone - lastPercentageDone); + lastPercentageDone = newPercentageDone; + } + } + } + + private Resource newResource(WriteOnlyGraph wg, Resource[] internals, int s) throws DatabaseException { + int clusterHint = clusterHints.get(s); + + Resource r; + if(clusterHint == 0) + r = wg.newResource(); + else { + long clusterId = clusterIds.get(clusterHint); + if(clusterId == 0) { + wg.flushCluster(); + r = wg.newResource(); + clusterIds.put(clusterHint, clustering.getCluster(r)); + } + else + r = wg.newResource(clusterId); + } + internals[s-1] = r; + resourceIds[s-1] = r.getResourceId(); + return r; + } + + @Override + public GraphWriter create() { + + current = ++internalCount; + timestamps.add(0); + + try { + s.writeByte(4); + s.writeInt(current); + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + + return this; + + } + + @Override + public GraphWriter create(int clusterHint) { + + create(); + clusterHints.put(current, clusterHint); + return this; + + } + + @Override + public GraphWriter create(Resource type) { + assert(type != null); + + current = ++internalCount; + timestamps.add(0); + + try { + s.writeByte(3); + s.writeInt(current); + s.writeInt(getId(type)); + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + + return this; + + } + + @Override + public GraphWriter create(int clusterHint, Resource type) { + create(type); + clusterHints.put(current, clusterHint); + return this; + + } + + @Override + public Resource get() { + if(current > 0) + return new InternalResource(current); + else if(current < 0) + return externals.get(-1-current); + else + return null; + } + + @Override + public GraphWriter handle(Resource s) { + assert(s != null); + + current = getId(s); + return this; + } + + public GraphWriter flush() { + try { + s.writeByte(2); + } catch(IOException e) { + throw new RuntimeException("Writing flush failed.", e); + } + return this; + } + +}