/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.layer0.utils.writer; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.channels.FileChannel; import org.eclipse.core.runtime.IProgressMonitor; import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.WriteOnlyGraph; import org.simantics.db.exception.DatabaseException; import gnu.trove.map.hash.TIntIntHashMap; import gnu.trove.map.hash.TIntLongHashMap; public class DelayedGraphWriter extends AbstractDelayedGraphWriter { protected File file; protected ObjectOutputStream s; TIntIntHashMap clusterHints = new TIntIntHashMap(); TIntLongHashMap clusterIds = new TIntLongHashMap(); public DelayedGraphWriter(ReadGraph graph) { super(graph); try { file = File.createTempFile("graph", ".tmp"); System.out.println("Temp file: " + file.getAbsolutePath()); s = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file), 1024*1024)); // s = new ObjectOutputStream(backup); file.deleteOnExit(); } catch (IOException e) { throw new RuntimeException("Opening output stream to temporary file failed", e); } } private void writeRef(int ref) throws IOException { if(ref > 0) timestamps.set(ref-1, time++); s.writeInt(ref); } @Override public GraphWriter let(Resource p, Resource o) throws DatabaseException { assert(p != null); assert(o != null); try { s.writeByte(0); writeRef(current); writeRef(getPredicateId(p)); writeRef(getId(o)); } catch (IOException e) { throw new RuntimeException("Writing statement failed.", e); } return this; } @Override public GraphWriter let(Resource p, Object value, Resource dataType) throws DatabaseException { assert(p != null); assert(value != null); assert(dataType != null); try { ++internalCount; timestamps.add(0); s.writeByte(1); writeRef(internalCount); s.writeUnshared(value); writeRef(getId(dataType)); s.writeByte(0); writeRef(current); writeRef(getPredicateId(p)); writeRef(internalCount); } catch (IOException e) { throw new RuntimeException("Writing statement failed.", e); } return this; } @Override public GraphWriter let(int clusterHint, Resource p, Object value, Resource dataType) throws DatabaseException { let(p, value, dataType); clusterHints.put(internalCount, clusterHint); return this; } @Override public GraphWriter createLiteral(Object value, Resource dataType) { assert(value != null); assert(dataType != null); try { ++internalCount; timestamps.add(0); s.writeByte(1); writeRef(internalCount); s.writeUnshared(value); writeRef(getId(dataType)); current = internalCount; } catch (IOException e) { throw new RuntimeException("Writing statement failed.", e); } return this; } @Override public GraphWriter createLiteral(int clusterHint, Object value, Resource dataType) { createLiteral(value, dataType); clusterHints.put(current, clusterHint); return this; } @Override public GraphWriter createInverse(int cluster, Resource r) { assert(r != null); create(cluster); try { s.writeByte(5); writeRef(current); writeRef(getId(r)); } catch (IOException e) { throw new RuntimeException("Writing statement failed.", e); } return this; } @Override public GraphWriter createInverse(Resource r) { assert(r != null); create(); try { s.writeByte(5); writeRef(current); writeRef(getId(r)); } catch (IOException e) { throw new RuntimeException("Writing statement failed.", e); } return this; } protected Resource getResource(WriteOnlyGraph wg, Resource[] internals, long[] resourceIds, int id) { if(id > 0) { Resource ret = internals[id-1]; if(ret == null) { throw new Error("Error"); // ret = wg.newResource(types.get(id)); // internals[id-1] = ret; // resourceIds[id-1] = ret.getResourceId(); } if(timestamps.getQuick(id-1)==time) internals[id-1] = null; ++time; return ret; } else if(id < 0) { return externals.get(-1-id); } else return null; } @Override public void commit(IProgressMonitor monitor, WriteOnlyGraph wg) throws DatabaseException { try { if (s != null) { s.close(); s = null; } externalsInv = null; monitor.beginTask("Writing database", 100); FileInputStream fis = new FileInputStream(file); FileChannel fc = fis.getChannel(); try (ObjectInputStream is = new ObjectInputStream(new BufferedInputStream(fis, 1024*1024))) { commit(monitor, wg, is, fc); } } catch (IOException e) { throw new DatabaseException("Commiting delayed graph writings failed.", e); } catch (ClassNotFoundException e) { throw new DatabaseException("Commiting delayed graph writings failed.", e); } finally { file.delete(); monitor.done(); } } private void commit(IProgressMonitor monitor, WriteOnlyGraph wg, ObjectInputStream is, FileChannel fc) throws DatabaseException, IOException, ClassNotFoundException { int lastPercentageDone = 0; long fileLength = file.length(); //System.out.println("size of commit file: " + fileLength); Resource[] internals = new Resource[internalCount]; resourceIds = new long[internalCount]; int resourceCounter = 0; int statementCounter = 0; int valueCounter = 0; time = 0; while(true) { switch(is.read()) { case -1: System.out.println("clusterIds.size() = " + clusterIds.size()); System.out.println("Wrote " + resourceCounter + " resources, " + statementCounter + " statements and " + valueCounter + " values."); return; case 0: { int s = is.readInt(); int p = is.readInt(); int o = is.readInt(); Resource rs = getResource(wg, internals, resourceIds, s); Resource rp = getResource(wg, internals, resourceIds, p); Resource ro = getResource(wg, internals, resourceIds, o); Resource rpInv = inverses.get(p); wg.claim(rs, rp, rpInv, ro); statementCounter += 2; } break; case 1: { int id = is.readInt(); Object value = is.readUnshared(); int type = is.readInt(); Resource r = newResource(wg, internals, id); wg.claim(r, l0.InstanceOf, null, getResource(wg, internals, resourceIds, type)); wg.claimValue(r, value); statementCounter ++; resourceCounter ++; valueCounter ++; } break; case 2: { wg.flushCluster(); } break; case 3: { int s = is.readInt(); int t = is.readInt(); Resource type = getResource(wg, internals, resourceIds, t); wg.claim(newResource(wg, internals, s), l0.InstanceOf, null, type); statementCounter ++; resourceCounter ++; } break; case 4: { int s = is.readInt(); newResource(wg, internals, s); resourceCounter ++; } break; case 5: { // InverseOf int r1 = is.readInt(); int r2 = is.readInt(); Resource rr1 = getResource(wg, internals, resourceIds, r1); Resource rr2 = getResource(wg, internals, resourceIds, r2); wg.claim(rr1, l0.InverseOf, l0.InverseOf, rr2); statementCounter += 2; inverses.put(r1, rr2); inverses.put(r2, rr1); } break; } // if((counter % 200000) == 0) { // System.out.println("Written " + counter + " statements."); // } double percentageDone = 100.0 * (double) fc.position() / (double) fileLength; int newPercentageDone = (int) Math.round(percentageDone); if(newPercentageDone > lastPercentageDone) { monitor.setTaskName("Writing database (" + newPercentageDone + "%)"); monitor.worked(newPercentageDone - lastPercentageDone); lastPercentageDone = newPercentageDone; } } } private Resource newResource(WriteOnlyGraph wg, Resource[] internals, int s) throws DatabaseException { int clusterHint = clusterHints.get(s); Resource r; if(clusterHint == 0) r = wg.newResource(); else { long clusterId = clusterIds.get(clusterHint); if(clusterId == 0) { wg.flushCluster(); r = wg.newResource(); clusterIds.put(clusterHint, clustering.getCluster(r)); } else r = wg.newResource(clusterId); } internals[s-1] = r; resourceIds[s-1] = r.getResourceId(); return r; } @Override public GraphWriter create() { current = ++internalCount; timestamps.add(0); try { s.writeByte(4); s.writeInt(current); } catch (IOException e) { throw new RuntimeException("Writing statement failed.", e); } return this; } @Override public GraphWriter create(int clusterHint) { create(); clusterHints.put(current, clusterHint); return this; } @Override public GraphWriter create(Resource type) { assert(type != null); current = ++internalCount; timestamps.add(0); try { s.writeByte(3); s.writeInt(current); s.writeInt(getId(type)); } catch (IOException e) { throw new RuntimeException("Writing statement failed.", e); } return this; } @Override public GraphWriter create(int clusterHint, Resource type) { create(type); clusterHints.put(current, clusterHint); return this; } @Override public Resource get() { if(current > 0) return new InternalResource(current); else if(current < 0) return externals.get(-1-current); else return null; } @Override public GraphWriter handle(Resource s) { assert(s != null); current = getId(s); return this; } public GraphWriter flush() { try { s.writeByte(2); } catch(IOException e) { throw new RuntimeException("Writing flush failed.", e); } return this; } }