X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.layer0.utils%2Fsrc%2Forg%2Fsimantics%2Flayer0%2Futils%2Fwriter%2FDelayedGraphWriter.java;fp=bundles%2Forg.simantics.layer0.utils%2Fsrc%2Forg%2Fsimantics%2Flayer0%2Futils%2Fwriter%2FDelayedGraphWriter.java;h=804535337c80310f655902814c3c4c94fca45b6b;hp=0000000000000000000000000000000000000000;hb=969bd23cab98a79ca9101af33334000879fb60c5;hpb=866dba5cd5a3929bbeae85991796acb212338a08 diff --git a/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java b/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java new file mode 100644 index 000000000..804535337 --- /dev/null +++ b/bundles/org.simantics.layer0.utils/src/org/simantics/layer0/utils/writer/DelayedGraphWriter.java @@ -0,0 +1,444 @@ +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.layer0.utils.writer; + + +import gnu.trove.map.hash.TIntIntHashMap; +import gnu.trove.map.hash.TIntLongHashMap; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.channels.FileChannel; + +import org.eclipse.core.runtime.IProgressMonitor; +import org.simantics.db.ReadGraph; +import org.simantics.db.Resource; +import org.simantics.db.WriteOnlyGraph; +import org.simantics.db.exception.DatabaseException; + +public class DelayedGraphWriter extends AbstractDelayedGraphWriter { + + protected File file; + protected ObjectOutputStream s; + TIntIntHashMap clusterHints = new TIntIntHashMap(); + TIntLongHashMap clusterIds = new TIntLongHashMap(); + + public DelayedGraphWriter(ReadGraph graph) { + super(graph); + try { + file = File.createTempFile("graph", ".tmp"); + System.out.println("Temp file: " + file.getAbsolutePath()); + s = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file), 1024*1024)); +// s = new ObjectOutputStream(backup); + + file.deleteOnExit(); + } catch (IOException e) { + throw new RuntimeException("Opening output stream to temporary file failed", e); + } + } + + private void writeRef(int ref) throws IOException { + if(ref > 0) + timestamps.set(ref-1, time++); + s.writeInt(ref); + } + + @Override + public GraphWriter let(Resource p, Resource o) throws DatabaseException { + assert(p != null); + assert(o != null); + try { + s.writeByte(0); + writeRef(current); + writeRef(getPredicateId(p)); + writeRef(getId(o)); + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + return this; + } + + @Override + public GraphWriter let(Resource p, Object value, Resource dataType) throws DatabaseException { + assert(p != null); + assert(value != null); + assert(dataType != null); + try { + ++internalCount; + timestamps.add(0); + + s.writeByte(1); + writeRef(internalCount); + s.writeUnshared(value); + writeRef(getId(dataType)); + + s.writeByte(0); + writeRef(current); + writeRef(getPredicateId(p)); + writeRef(internalCount); + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + return this; + } + + @Override + public GraphWriter let(int clusterHint, Resource p, Object value, + Resource dataType) throws DatabaseException { + let(p, value, dataType); + clusterHints.put(internalCount, clusterHint); + return this; + } + + @Override + public GraphWriter createLiteral(Object value, Resource dataType) { + assert(value != null); + assert(dataType != null); + try { + ++internalCount; + timestamps.add(0); + + s.writeByte(1); + writeRef(internalCount); + s.writeUnshared(value); + writeRef(getId(dataType)); + + current = internalCount; + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + return this; + } + + @Override + public GraphWriter createLiteral(int clusterHint, Object value, + Resource dataType) { + createLiteral(value, dataType); + + clusterHints.put(current, clusterHint); + return this; + } + + @Override + public GraphWriter createInverse(int cluster, Resource r) { + assert(r != null); + create(cluster); + try { + s.writeByte(5); + writeRef(current); + writeRef(getId(r)); + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + return this; + } + + @Override + public GraphWriter createInverse(Resource r) { + assert(r != null); + create(); + try { + s.writeByte(5); + writeRef(current); + writeRef(getId(r)); + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + return this; + } + + protected Resource getResource(WriteOnlyGraph wg, Resource[] internals, long[] resourceIds, int id) { + if(id > 0) { + Resource ret = internals[id-1]; + if(ret == null) { + throw new Error("Error"); +// ret = wg.newResource(types.get(id)); +// internals[id-1] = ret; +// resourceIds[id-1] = ret.getResourceId(); + } + if(timestamps.getQuick(id-1)==time) + internals[id-1] = null; + ++time; + return ret; + } + else if(id < 0) { + return externals.get(-1-id); + } + else + return null; + } + + @Override + public void commit(IProgressMonitor monitor, WriteOnlyGraph wg) throws DatabaseException { + + try { + if (s != null) { + s.close(); + s = null; + } + externalsInv = null; + + monitor.beginTask("", 100); + + int lastPercentageDone = 0; + long fileLength = file.length(); + +// System.out.println("size of commit file: " + fileLength); + + Resource[] internals = new Resource[internalCount]; + resourceIds = new long[internalCount]; + + + FileInputStream fis = new FileInputStream(file); + FileChannel fc = fis.getChannel(); + ObjectInputStream is = new ObjectInputStream(new BufferedInputStream(fis, 1024*1024)); + int resourceCounter = 0; + int statementCounter = 0; + int valueCounter = 0; + time = 0; + loop: while(true) { + switch(is.read()) { + case -1: + break loop; + case 0: { + int s = is.readInt(); + int p = is.readInt(); + int o = is.readInt(); + Resource rs = getResource(wg, internals, resourceIds, s); + Resource rp = getResource(wg, internals, resourceIds, p); + Resource ro = getResource(wg, internals, resourceIds, o); + Resource rpInv = inverses.get(p); + wg.claim(rs, rp, rpInv, ro); + statementCounter += 2; + } break; + case 1: { + int id = is.readInt(); + Object value = is.readUnshared(); + int type = is.readInt(); + + Resource r = newResource(wg, internals, id); + wg.claim(r, l0.InstanceOf, null, getResource(wg, internals, resourceIds, type)); + wg.claimValue(r, value); + statementCounter ++; + resourceCounter ++; + valueCounter ++; + + } break; + case 2: { + wg.flushCluster(); + } break; + case 3: { + + int s = is.readInt(); + int t = is.readInt(); + + Resource type = getResource(wg, internals, resourceIds, t); + wg.claim(newResource(wg, internals, s), l0.InstanceOf, null, type); + statementCounter ++; + resourceCounter ++; + + } break; + case 4: { + + int s = is.readInt(); + newResource(wg, internals, s); + resourceCounter ++; + + } break; + case 5: { // InverseOf + + int r1 = is.readInt(); + int r2 = is.readInt(); + + Resource rr1 = getResource(wg, internals, resourceIds, r1); + Resource rr2 = getResource(wg, internals, resourceIds, r2); + wg.claim(rr1, l0.InverseOf, l0.InverseOf, rr2); + statementCounter += 2; + + inverses.put(r1, rr2); + inverses.put(r2, rr1); + + } break; + } + +// if((counter % 200000) == 0) { +// System.out.println("Written " + counter + " statements."); +// } + + double percentageDone = 100.0 * (double) fc.position() / (double) fileLength; + int newPercentageDone = (int) Math.round(percentageDone); + if(newPercentageDone > lastPercentageDone) { + monitor.setTaskName("Writing database (" + newPercentageDone + "%)"); + monitor.worked(newPercentageDone - lastPercentageDone); + lastPercentageDone = newPercentageDone; + } + } + + System.out.println("clusterIds.size() = " + clusterIds.size()); + + System.out.println("Wrote " + resourceCounter + " resources, " + statementCounter + " statements and " + valueCounter + " values."); + + } catch (IOException e) { + throw new RuntimeException("Commiting delayed graph writings failed.", e); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Commiting delayed graph writings failed.", e); + } finally { + file.delete(); + monitor.done(); + } + } + + private Resource newResource(WriteOnlyGraph wg, Resource[] internals, int s) throws DatabaseException { + int clusterHint = clusterHints.get(s); + + Resource r; + if(clusterHint == 0) + r = wg.newResource(); + else { + long clusterId = clusterIds.get(clusterHint); + if(clusterId == 0) { + wg.flushCluster(); + r = wg.newResource(); + clusterIds.put(clusterHint, clustering.getCluster(r)); + } + else + r = wg.newResource(clusterId); + } + internals[s-1] = r; + resourceIds[s-1] = r.getResourceId(); + return r; + } + +// public ValueDescriptor createDescriptor(Object obj) { +// +// if(obj instanceof String[]) +// return new ValueDescriptor(ValueTrait.StaticValue, (String[])obj); +// else if(obj instanceof int[]) +// return new ValueDescriptor(ValueTrait.StaticValue, (int[])obj); +// else if(obj instanceof double[]) +// return new ValueDescriptor(ValueTrait.StaticValue, (double[])obj); +// else if(obj instanceof float[]) +// return new ValueDescriptor(ValueTrait.StaticValue, (float[])obj); +// else if(obj instanceof boolean[]) +// return new ValueDescriptor(ValueTrait.StaticValue, (boolean[])obj); +// else if(obj instanceof long[]) +// return new ValueDescriptor(ValueTrait.StaticValue, (long[])obj); +// else if(obj instanceof byte[]) +// return new ValueDescriptor(ValueTrait.StaticValue, (byte[])obj); +// +// else if(obj instanceof String) +// return new ValueDescriptor(ValueTrait.StaticValue, new String[] {(String)obj}); +// else if(obj instanceof Double) +// return new ValueDescriptor(ValueTrait.StaticValue, new double[] {(Double)obj}); +// else if(obj instanceof Float) +// return new ValueDescriptor(ValueTrait.StaticValue, new float[] {(Float)obj}); +// else if(obj instanceof Integer) +// return new ValueDescriptor(ValueTrait.StaticValue, new int[] {(Integer)obj}); +// else if(obj instanceof Boolean) +// return new ValueDescriptor(ValueTrait.StaticValue, new boolean[] {(Boolean)obj}); +// else if(obj instanceof Byte) +// return new ValueDescriptor(ValueTrait.StaticValue, new byte[] {(Byte)obj}); +// else if(obj instanceof Long) +// return new ValueDescriptor(ValueTrait.StaticValue, new long[] {(Long)obj}); +// +// throw new Error("Wrong type!"); +// +// } + + @Override + public GraphWriter create() { + + current = ++internalCount; + timestamps.add(0); + + try { + s.writeByte(4); + s.writeInt(current); + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + + return this; + + } + + @Override + public GraphWriter create(int clusterHint) { + + create(); + clusterHints.put(current, clusterHint); + return this; + + } + + @Override + public GraphWriter create(Resource type) { + assert(type != null); + + current = ++internalCount; + timestamps.add(0); + + try { + s.writeByte(3); + s.writeInt(current); + s.writeInt(getId(type)); + } catch (IOException e) { + throw new RuntimeException("Writing statement failed.", e); + } + + return this; + + } + + @Override + public GraphWriter create(int clusterHint, Resource type) { + + create(type); + clusterHints.put(current, clusterHint); + return this; + + } + + @Override + public Resource get() { + if(current > 0) + return new InternalResource(current); + else if(current < 0) + return externals.get(-1-current); + else + return null; + } + + @Override + public GraphWriter handle(Resource s) { + assert(s != null); + + current = getId(s); + return this; + } + + public GraphWriter flush() { + try { + s.writeByte(2); + } catch(IOException e) { + throw new RuntimeException("Writing flush failed.", e); + } + return this; + } + +}