/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.layer0.util; import java.io.BufferedOutputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; import org.apache.commons.io.output.DeferredFileOutputStream; import org.eclipse.core.runtime.IProgressMonitor; import org.eclipse.core.runtime.OperationCanceledException; import org.eclipse.core.runtime.SubMonitor; import org.simantics.databoard.Bindings; import org.simantics.databoard.Datatypes; import org.simantics.databoard.accessor.error.AccessorException; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.error.BindingException; import org.simantics.databoard.binding.mutable.Variant; import org.simantics.databoard.parser.repository.DataTypeSyntaxError; import org.simantics.databoard.serialization.RuntimeSerializerConstructionException; import org.simantics.databoard.type.Datatype; import org.simantics.databoard.util.binary.BinaryFile; import org.simantics.databoard.util.binary.BinaryMemory; import org.simantics.databoard.util.binary.DeferredBinaryFile; import org.simantics.databoard.util.binary.NullRandomAccessBinary; import org.simantics.databoard.util.binary.RandomAccessBinary; import org.simantics.db.DirectStatements; import org.simantics.db.ReadGraph; import org.simantics.db.RequestProcessor; import org.simantics.db.Resource; import org.simantics.db.Statement; import org.simantics.db.common.request.UniqueRead; import org.simantics.db.common.utils.NameUtils; import org.simantics.db.exception.CancelTransactionException; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.ValidationException; import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus; import org.simantics.db.layer0.internal.SimanticsInternal; import org.simantics.db.service.ClusterControl; import org.simantics.db.service.ClusterControl.ClusterState; import org.simantics.db.service.ClusteringSupport; import org.simantics.db.service.CollectionSupport; import org.simantics.db.service.DirectQuerySupport; import org.simantics.db.service.SerialisationSupport; import org.simantics.graph.representation.Extensions; import org.simantics.graph.utils.TGResourceUtil; import org.simantics.graph.utils.TGResourceUtil.LongAdapter; import org.simantics.layer0.Layer0; import org.simantics.utils.threads.logger.ITask; import org.simantics.utils.threads.logger.ThreadLogger; import gnu.trove.list.array.TIntArrayList; import gnu.trove.map.hash.TIntIntHashMap; import gnu.trove.map.hash.TLongObjectHashMap; import gnu.trove.procedure.TIntProcedure; import gnu.trove.procedure.TLongObjectProcedure; import gnu.trove.set.hash.TIntHashSet; public class ModelTransferableGraphSourceRequest extends UniqueRead { public static String LOG_FILE = "transferableGraph.log"; final static boolean LOG = false; final static boolean PRINTDEBUG = false; final static boolean DEBUG = LOG | PRINTDEBUG; final static boolean PROFILE = false; private TransferableGraphConfiguration2 configuration; private SubMonitor monitor; static DataOutput log; static { if (LOG) { try { FileOutputStream stream = new FileOutputStream(LOG_FILE); log = new DataOutputStream(stream); } catch (FileNotFoundException e) { e.printStackTrace(); } } } static void log(String line) { if (PRINTDEBUG) System.err.println(line); if (LOG) { try { if(line.length() > 500) line = line.substring(0, 500); log.writeUTF(line + "\n"); } catch (IOException e) { e.printStackTrace(); } } } public ModelTransferableGraphSourceRequest(TransferableGraphConfiguration2 conf) { this(null, conf); } public ModelTransferableGraphSourceRequest(IProgressMonitor monitor, TransferableGraphConfiguration2 conf) { this.monitor = SubMonitor.convert(monitor); this.configuration = conf; } Layer0 L0; int statements[]; int statementIndex = 0; TIntIntHashMap ids; TIntArrayList externalParents = new TIntArrayList(); ArrayList externalNames = new ArrayList(); int id = 0; int indent = 0; private SerialisationSupport support; private Resource getResource(int r) throws DatabaseException { return support.getResource(r); } public int getInternalId(int r) { return ids.get(r); } public boolean validateExternal(Resource ext) { ExtentStatus status = configuration.preStatus.get(ext); if(status != null) { if(ExtentStatus.INTERNAL.equals(status)) return false; else if(ExtentStatus.EXCLUDED.equals(status)) return false; } return true; } /* * * @return -2 if r is not really external and the statement should be excluded * */ public int getId(ReadGraph graph, int r) throws DatabaseException { if(ids.containsKey(r)) { int ret = ids.get(r); if(ret == -1) { for(int i=0;i<=indent;++i) System.out.print(" "); System.out.println("Cycle!!!"); // with " + GraphUtils.getReadableName(g, r)); } return ret; } else { if(!validateExternal(getResource(r))) return -2; Collection parents = graph.getObjects(getResource(r), L0.PartOf); if(parents.size() != 1) { throw new ValidationException("Reference to external resource " + NameUtils.getSafeName(graph, getResource(r), true) + " without unique uri (" + parents.size() + " parents)."); } for(Resource p : parents) { ++indent; int pid = getId(graph, support.getTransientId(p)); if(pid == -2) return -2; externalParents.add(pid); --indent; } externalNames.add((String)graph.getRelatedValue(getResource(r), L0.HasName)); ids.put(r, id); return id++; } } @Override public ModelTransferableGraphSource perform(ReadGraph graph) throws DatabaseException { support = graph.getService(SerialisationSupport.class); this.L0 = Layer0.getInstance(graph); long total = System.nanoTime(); long startupTime = System.nanoTime(); long startupTimeEnd = System.nanoTime(); long domainTime = System.nanoTime(); String otherStatements = "other" + UUID.randomUUID().toString(); String valueFileName = "value" + UUID.randomUUID().toString(); File base_ = SimanticsInternal.getTemporaryDirectory(); File base = new File(base_, "exports"); base.mkdirs(); File otherStatementsFile = new File(base, otherStatements); File valueFile = new File(base, valueFileName); // System.err.println("f: " + otherStatementsFile.getAbsolutePath()); try { DeferredFileOutputStream otherStatementsStream = new DeferredFileOutputStream(1024*1024, otherStatementsFile); DataOutputStream otherStatementsOutput = new DataOutputStream(new BufferedOutputStream(otherStatementsStream, 1024*1024)); DeferredBinaryFile valueOutput = new DeferredBinaryFile(valueFile, 1024*1024, 128*1024); ClusterControl cc = graph.getService(ClusterControl.class); ClusterState clusterState = cc.getClusterState(); TIntHashSet excludedShared = new TIntHashSet(); ids = new TIntIntHashMap(1000, 0.75f); DomainProcessorState state = new DomainProcessorState(); state.extensions.putAll(configuration.baseExtensions); state.ids = ids; state.statementsOutput = otherStatementsOutput; state.valueOutput = valueOutput; state.valueCount = 0; state.excludedShared = excludedShared; state.monitor = monitor; state.valueModifier = composeTGValueModifier(configuration.valueModifiers); getDomain2(graph, configuration, state, configuration.ignoreVirtualResources); id = ids.size(); cc.restoreClusterState(clusterState); otherStatementsOutput.flush(); otherStatementsOutput.close(); // Do not close valueOutput, just flush it and reuse it in // ModelTransferableGraphSource for reading. valueOutput.flush(); long domainDuration = System.nanoTime() - domainTime; state.id = id; state.otherStatementsInput = toRandomAccessBinary(otherStatementsStream, 128*1024); state.valueInput = toRandomAccessBinary(valueOutput); state.statementsOutput = null; state.valueOutput = null; long totalEnd = System.nanoTime(); if(PROFILE) { System.out.println("startup in " + 1e-9*(startupTimeEnd - startupTime) + "s."); System.out.println("domain was found in " + 1e-9*(domainDuration) + "s."); System.out.println("total time for building subgraph was " + 1e-9*(totalEnd-total) + "s."); } return getSource(graph, configuration, state, otherStatementsFile, valueFile); } catch (DatabaseException e) { throw e; } catch (IOException e) { throw new DatabaseException(e.getMessage(), e); } catch (Throwable e) { dumpHeap("crash.hprof"); throw new DatabaseException(e.getMessage(), e); } } protected ModelTransferableGraphSource getSource(ReadGraph graph, TransferableGraphConfiguration2 configuration, DomainProcessorState state, File otherStatementsFile, File valueFile) throws DatabaseException { return new ModelTransferableGraphSource(graph, configuration, state, otherStatementsFile, valueFile); } private TGValueModifier composeTGValueModifier(Collection configuredModifiers) { List valueModifiers = configuredModifiers == null ? new ArrayList<>(2) : new ArrayList<>(configuredModifiers.size() + 2); valueModifiers.add(new ResourceTGValueModifier(support)); valueModifiers.add(RevisionTGValueModifier.INSTANCE); return new ComposedTGValueModifier(valueModifiers.toArray(new TGValueModifier[valueModifiers.size()])); } private static RandomAccessBinary toRandomAccessBinary(DeferredFileOutputStream stream, int bufferSize) throws IOException { if (stream.isInMemory()) return new BinaryMemory(stream.getData()); return new BinaryFile(stream.getFile(), bufferSize); } private static RandomAccessBinary toRandomAccessBinary(DeferredBinaryFile file) throws IOException { RandomAccessBinary b = file.getBackend(); long size = b.position(); b.position(0); if (b instanceof BinaryMemory) { b.setLength(size); } return b; } public static DomainOnlyProcessor getDomainOnly(RequestProcessor processor, IProgressMonitor monitor, final Resource resource) throws DatabaseException { return getDomainOnly(processor, monitor, Collections.singletonList(resource)); } public static DomainOnlyProcessor getDomainOnly(RequestProcessor processor, final IProgressMonitor monitor, final Collection resources) throws DatabaseException { return processor.syncRequest(new UniqueRead() { @Override public DomainOnlyProcessor perform(ReadGraph graph) throws DatabaseException { try { TransferableGraphConfiguration2 conf = TransferableGraphConfiguration2.createWithResources(graph, resources, Collections.emptyList()); DomainProcessorState state = new DomainProcessorState(); state.extensions.putAll(conf.baseExtensions); state.ids = new TIntIntHashMap(1000, 0.75f); state.statementsOutput = new DataOutputStream(new NullOutputStream()); state.valueOutput = new NullRandomAccessBinary(); state.valueCount = 0; state.excludedShared = new TIntHashSet(); state.monitor = SubMonitor.convert(monitor); return getDomainOnly(graph, conf, state, conf.ignoreVirtualResources); } catch (OperationCanceledException e) { return null; } } }); } public static class DomainOnlyProcessor extends DomainProcessor3 { final Resource instanceOf; final public List internals; final public List internalTypes; private int counter = 0; public DomainOnlyProcessor(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { super(graph, conf, state, ignoreVirtual); CollectionSupport cs = graph.getService(CollectionSupport.class); internals = cs.createList(); internalTypes = cs.createList(); instanceOf = Layer0.getInstance(graph).InstanceOf; } @Override final public void addToStream(Resource predicate, Resource object) throws DatabaseException { } @Override public void flushStatementStream(int sId, DomainProcessorState state) throws IOException { } @Override public void processValue(ReadGraph graph, Resource subject, int sId, DomainProcessorState state) throws DatabaseException, IOException { } @Override public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException { if((counter++ & 1023) == 0) if(state.monitor != null) if(state.monitor.isCanceled()) throw new CancelTransactionException(); super.processInternal(graph, subject, stms, state); internals.add(subject); Resource singleType = null; for(Statement s : stms) { if(instanceOf.equals(s.getPredicate())) { if(singleType != null) { internalTypes.add(null); return; } else { singleType = s.getObject(); } } } internalTypes.add(singleType); } } public static DomainOnlyProcessor getDomainOnly(final ReadGraph graph , final TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { final DomainOnlyProcessor processor = new DomainOnlyProcessor(graph, conf, state, ignoreVirtual); getDomain2(graph, state, processor); return processor; } public static DomainProcessor3 getDomain2(final ReadGraph graph , final TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { ITask task = ThreadLogger.getInstance().begin("getDomain2"); final DomainProcessor3 processor = new DomainProcessor3(graph, conf, state, ignoreVirtual); getDomain2(graph, state, processor); final SerialisationSupport support = graph.getService(SerialisationSupport.class); final ClusteringSupport cls = graph.getService(ClusteringSupport.class); final Resource indexRoot = processor.conf.indexRoot; if (state.monitor.isCanceled()) throw new CancelTransactionException(); TLongObjectHashMap clusterMap = new TLongObjectHashMap(); for(Resource r : processor.status.keySet()) { ExtentStatus status = processor.status.get(r); int transientId = support.getTransientId(r); if(ExtentStatus.INTERNAL == status) { long cluster = cls.getCluster(r); TIntArrayList list = clusterMap.get(cluster); if(list == null) { list = new TIntArrayList(); clusterMap.put(cluster, list); } list.add(transientId); } else if(ExtentStatus.EXTERNAL == status) { state.externals.add(transientId); } else if(ExtentStatus.PENDING == status) { String uri = graph.getPossibleURI(r); if(uri != null) { // All internal resources with uri have been discovered already => this must then be external //state.externals.add(transientId); // Pending resources are found through weak links - if they are still pending at this stage do not add an external processor.status.put(r, ExtentStatus.EXTERNAL); } else { state.pending.add(transientId); System.err.println("Pending status in export: " + NameUtils.getSafeName(graph, r, true) + " (" + graph.getPossibleURI(r) + ")"); } } } // Now that we know the status of the resources lets process weak statements for(Statement stm : processor.unresolvedWeakLinks) { ExtentStatus status = processor.status.get(stm.getObject()); if(ExtentStatus.INTERNAL == status) { // Weak links between internals are exported int transientId = support.getTransientId(stm.getSubject()); processor.addToStream(stm.getPredicate(), stm.getObject()); try { processor.flushStatementStream(transientId, state); } catch (IOException e) { throw new DatabaseException(e); } } } if (state.monitor.isCanceled()) throw new CancelTransactionException(); final TIntArrayList clustering = new TIntArrayList(); clusterMap.forEachEntry(new TLongObjectProcedure() { @Override public boolean execute(long cluster, TIntArrayList b) { clustering.add(b.size()); b.forEach(new TIntProcedure() { @Override public boolean execute(int rId) { processor.ids.put(rId, processor.id++); return true; } }); return true; } }); if (state.monitor.isCanceled()) throw new CancelTransactionException(); final TIntArrayList clusterSets = new TIntArrayList(); clusterMap.forEachEntry(new TLongObjectProcedure() { @Override public boolean execute(long cluster, TIntArrayList b) { try { Resource clusterSet = cls.getClusterSetOfCluster(cluster); if(clusterSet != null) { int transientId = support.getTransientId(clusterSet); if(processor.ids.containsKey(transientId)) { clusterSets.add(processor.ids.get(transientId)); return true; } else { if(graph.getRootLibrary().equals(clusterSet)) { clusterSets.add(Extensions.ROOT_LIBRARY_CLUSTER_SET); return true; } else if (clusterSet.equals(indexRoot)) { clusterSets.add(Extensions.INDEX_ROOT_CLUSTER_SET); return true; } } } } catch (DatabaseException e) { } clusterSets.add(Extensions.NO_CLUSTER_SET); return true; } }); state.extensions.put(Extensions.CLUSTERING, new Variant(Bindings.INT_ARRAY, clustering.toArray())); state.extensions.put(Extensions.CLUSTER_SETS, new Variant(Bindings.INT_ARRAY, clusterSets.toArray())); long total = processor.startupTime + processor.expandTime + processor.classifyPredicateTime + processor.processFringeTime + processor.extentSeedTime + processor.fullResolveTime + processor.fastResolveTime + + processor.parentResolveTime + processor.otherStatementTime; if (PROFILE) { System.out.println("startup took " + 1e-9 * processor.startupTime + "s."); System.out.println("expand took " + 1e-9 * processor.expandTime + "s."); System.out.println("classifyPredicates took " + 1e-9 * processor.classifyPredicateTime + "s."); System.out.println("processFringe took " + 1e-9 * processor.processFringeTime + "s."); System.out.println("extentSeeding took " + 1e-9 * processor.extentSeedTime + "s."); System.out.println("fullResolve took " + 1e-9 * processor.fullResolveTime + "s."); System.out.println("fastResolve took " + 1e-9 * processor.fastResolveTime + "s."); System.out.println("parentResolve took " + 1e-9 * processor.parentResolveTime + "s."); System.out.println("otherStatements took " + 1e-9 * processor.otherStatementTime + "s."); System.out.println("value output took " + 1e-9 * processor.valueOutputTime + "s."); System.out.println("statement output took " + 1e-9 * processor.statementOutputTime + "s."); System.out.println("total " + 1e-9 * total + "s."); } task.finish(); return processor; } public static DomainProcessor3 getDomain2(final ReadGraph graph , DomainProcessorState state, final DomainProcessor3 processor) throws DatabaseException { processor.process(graph, state); return processor; } static class Expansion3 extends UniqueRead> { final private Collection roots; final boolean ignoreVirtual; public Expansion3(Collection roots, boolean ignoreVirtual) { this.roots = roots; this.ignoreVirtual = ignoreVirtual; } @Override public Collection perform(ReadGraph graph) { ArrayList result = new ArrayList(); final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class); // final DomainStatementProcedure3 proc = new DomainStatementProcedure3(result); if (ignoreVirtual) { for(Resource r : roots) { result.add(dqs.getDirectPersistentStatements(graph, r)); } } else { for(Resource r : roots) { result.add(dqs.getDirectStatements(graph, r)); } } return result; } } private static void dumpHeap(String path) { try { Object bean = getBean(); if (bean == null) return; Method m = bean.getClass().getMethod("dumpHeap", String.class, boolean.class); m.invoke(bean, path, true); } catch (IllegalArgumentException e) { } catch (IllegalAccessException e) { } catch (SecurityException e) { } catch (NoSuchMethodException e) { } catch (InvocationTargetException e) { } finally { } } private static Object getBean() { Class beanClass = getBeanClass(); if (beanClass == null) return null; try { Object bean = ManagementFactory.newPlatformMXBeanProxy( ManagementFactory.getPlatformMBeanServer(), "com.sun.management:type=HotSpotDiagnostic", beanClass); return bean; } catch (IOException e) { return null; } } private static Class getBeanClass() { try { Class clazz = Class.forName("com.sun.management.HotSpotDiagnosticMXBean"); return clazz; } catch (ClassNotFoundException e) { return null; } } public static void main(String[] args) { try { Datatype dt = Datatypes.translate("{ parts : ( | ResourceRVIPart { role : |CHILD|PROPERTY, resource : Long(unit=\"resource\") } | StringRVIPart { role : |CHILD|PROPERTY, string : String } ) [] }"); Binding b = Bindings.getBinding(dt); Object value = b.createDefault(); Variant variant = new Variant(b, value); TGResourceUtil util = new TGResourceUtil(); LongAdapter la = new LongAdapter() { @Override public long adapt(long in) { return in; } }; util.adaptValue( variant.getBinding(), variant.getValue(), la ); } catch (DataTypeSyntaxError e) { e.printStackTrace(); } catch (BindingException e) { e.printStackTrace(); } catch (RuntimeSerializerConstructionException e) { e.printStackTrace(); } catch (AccessorException e) { e.printStackTrace(); } } }