X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.layer0%2Fsrc%2Forg%2Fsimantics%2Fdb%2Flayer0%2Futil%2FModelTransferableGraphSourceRequest.java;h=9832f9d08be6815f2aaa6c24f6e0d64910a54e22;hb=refs%2Fchanges%2F02%2F1402%2F2;hp=d7fe5c690ddb64760b4f351ec9917e9bb028991f;hpb=d77257f045bacb676d276fe3a702e43e3a4eada3;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/ModelTransferableGraphSourceRequest.java b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/ModelTransferableGraphSourceRequest.java index d7fe5c690..9832f9d08 100644 --- a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/ModelTransferableGraphSourceRequest.java +++ b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/util/ModelTransferableGraphSourceRequest.java @@ -1,650 +1,672 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.db.layer0.util; - -import java.io.BufferedOutputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.UUID; - -import org.apache.commons.io.output.DeferredFileOutputStream; -import org.eclipse.core.runtime.IProgressMonitor; -import org.eclipse.core.runtime.OperationCanceledException; -import org.eclipse.core.runtime.SubMonitor; -import org.simantics.databoard.Bindings; -import org.simantics.databoard.Datatypes; -import org.simantics.databoard.accessor.error.AccessorException; -import org.simantics.databoard.binding.Binding; -import org.simantics.databoard.binding.error.BindingException; -import org.simantics.databoard.binding.mutable.Variant; -import org.simantics.databoard.parser.repository.DataTypeSyntaxError; -import org.simantics.databoard.serialization.RuntimeSerializerConstructionException; -import org.simantics.databoard.type.Datatype; -import org.simantics.databoard.util.binary.BinaryFile; -import org.simantics.databoard.util.binary.BinaryMemory; -import org.simantics.databoard.util.binary.DeferredBinaryFile; -import org.simantics.databoard.util.binary.NullRandomAccessBinary; -import org.simantics.databoard.util.binary.RandomAccessBinary; -import org.simantics.db.DirectStatements; -import org.simantics.db.ReadGraph; -import org.simantics.db.RequestProcessor; -import org.simantics.db.Resource; -import org.simantics.db.Statement; -import org.simantics.db.common.request.UniqueRead; -import org.simantics.db.common.utils.NameUtils; -import org.simantics.db.exception.CancelTransactionException; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.exception.ValidationException; -import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus; -import org.simantics.db.layer0.internal.SimanticsInternal; -import org.simantics.db.service.ClusterControl; -import org.simantics.db.service.ClusterControl.ClusterState; -import org.simantics.db.service.ClusteringSupport; -import org.simantics.db.service.CollectionSupport; -import org.simantics.db.service.DirectQuerySupport; -import org.simantics.db.service.SerialisationSupport; -import org.simantics.graph.representation.Extensions; -import org.simantics.graph.utils.TGResourceUtil; -import org.simantics.graph.utils.TGResourceUtil.LongAdapter; -import org.simantics.layer0.Layer0; -import org.simantics.utils.threads.logger.ITask; -import org.simantics.utils.threads.logger.ThreadLogger; - -import gnu.trove.list.array.TIntArrayList; -import gnu.trove.map.hash.TIntIntHashMap; -import gnu.trove.map.hash.TLongObjectHashMap; -import gnu.trove.procedure.TIntProcedure; -import gnu.trove.procedure.TLongObjectProcedure; -import gnu.trove.set.hash.TIntHashSet; - -public class ModelTransferableGraphSourceRequest extends UniqueRead { - - public static String LOG_FILE = "transferableGraph.log"; - final static boolean LOG = false; - final static private boolean DEBUG = false; - final static boolean PROFILE = false; - - private TransferableGraphConfiguration2 configuration; - private SubMonitor monitor; - - static DataOutput log; - - static { - - if (LOG) { - try { - FileOutputStream stream = new FileOutputStream(LOG_FILE); - log = new DataOutputStream(stream); - } catch (FileNotFoundException e) { - e.printStackTrace(); - } - } - - } - - static void log(String line) { - if (LOG) { - try { - log.writeUTF(line + "\n"); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - public ModelTransferableGraphSourceRequest(TransferableGraphConfiguration2 conf) { - this(null, conf); - } - - public ModelTransferableGraphSourceRequest(IProgressMonitor monitor, TransferableGraphConfiguration2 conf) { - this.monitor = SubMonitor.convert(monitor); - this.configuration = conf; - } - - Layer0 L0; - - int statements[]; - int statementIndex = 0; - TIntIntHashMap ids; - TIntArrayList externalParents = new TIntArrayList(); - ArrayList externalNames = new ArrayList(); - - int id = 0; - int indent = 0; - - private SerialisationSupport support; - - private Resource getResource(int r) throws DatabaseException { - return support.getResource(r); - } - - public int getInternalId(int r) { - return ids.get(r); - } - - public boolean validateExternal(Resource ext) { - ExtentStatus status = configuration.preStatus.get(ext); - if(status != null) { - if(ExtentStatus.INTERNAL.equals(status)) return false; - else if(ExtentStatus.EXCLUDED.equals(status)) return false; - } - return true; - } - - /* - * - * @return -2 if r is not really external and the statement should be excluded - * - */ - public int getId(ReadGraph graph, int r) throws DatabaseException { - if(ids.containsKey(r)) { - int ret = ids.get(r); - if(ret == -1) { - for(int i=0;i<=indent;++i) - System.out.print(" "); - System.out.println("Cycle!!!"); // with " + GraphUtils.getReadableName(g, r)); - } - return ret; - } - else { - if(!validateExternal(getResource(r))) return -2; - Collection parents = graph.getObjects(getResource(r), L0.PartOf); - if(parents.size() != 1) { - throw new ValidationException("Reference to external resource " - + NameUtils.getSafeName(graph, getResource(r), true) + " without unique uri (" + parents.size() + " parents)."); - } - for(Resource p : parents) { - ++indent; - int pid = getId(graph, support.getTransientId(p)); - if(pid == -2) return -2; - externalParents.add(pid); - --indent; - } - externalNames.add((String)graph.getRelatedValue(getResource(r), L0.HasName)); - ids.put(r, id); - return id++; - } - } - - @Override - public ModelTransferableGraphSource perform(ReadGraph graph) throws DatabaseException { - - support = graph.getService(SerialisationSupport.class); - - this.L0 = Layer0.getInstance(graph); - - long total = System.nanoTime(); - long startupTime = System.nanoTime(); - long startupTimeEnd = System.nanoTime(); - long domainTime = System.nanoTime(); - - String otherStatements = "other" + UUID.randomUUID().toString(); - String valueFileName = "value" + UUID.randomUUID().toString(); - - File base_ = SimanticsInternal.getTemporaryDirectory(); - File base = new File(base_, "exports"); - base.mkdirs(); - - File otherStatementsFile = new File(base, otherStatements); - File valueFile = new File(base, valueFileName); - -// System.err.println("f: " + otherStatementsFile.getAbsolutePath()); - - try { - DeferredFileOutputStream otherStatementsStream = new DeferredFileOutputStream(1024*1024, otherStatementsFile); - - DataOutputStream otherStatementsOutput = new DataOutputStream(new BufferedOutputStream(otherStatementsStream, 1024*1024)); - DeferredBinaryFile valueOutput = new DeferredBinaryFile(valueFile, 1024*1024, 128*1024); - - ClusterControl cc = graph.getService(ClusterControl.class); - ClusterState clusterState = cc.getClusterState(); - - TIntHashSet excludedShared = new TIntHashSet(); - - ids = new TIntIntHashMap(1000, 0.75f); - - DomainProcessorState state = new DomainProcessorState(); - state.extensions.putAll(configuration.baseExtensions); - state.ids = ids; - state.statementsOutput = otherStatementsOutput; - state.valueOutput = valueOutput; - state.valueCount = 0; - state.excludedShared = excludedShared; - state.monitor = monitor; - state.valueModifier = composeTGValueModifier(configuration.valueModifiers); - - getDomain2(graph, configuration, state, configuration.ignoreVirtualResources); - - id = ids.size(); - - cc.restoreClusterState(clusterState); - - otherStatementsOutput.flush(); - otherStatementsOutput.close(); - - // Do not close valueOutput, just flush it and reuse it in - // ModelTransferableGraphSource for reading. - valueOutput.flush(); - - long domainDuration = System.nanoTime() - domainTime; - - state.id = id; - state.otherStatementsInput = toRandomAccessBinary(otherStatementsStream, 128*1024); - state.valueInput = toRandomAccessBinary(valueOutput); - state.statementsOutput = null; - state.valueOutput = null; - - long totalEnd = System.nanoTime(); - - if(PROFILE) { - System.out.println("startup in " + 1e-9*(startupTimeEnd - startupTime) + "s."); - System.out.println("domain was found in " + 1e-9*(domainDuration) + "s."); - System.out.println("total time for building subgraph was " + 1e-9*(totalEnd-total) + "s."); - } - - return getSource(graph, configuration, state, otherStatementsFile, valueFile); - - } catch (DatabaseException e) { - throw e; - } catch (IOException e) { - throw new DatabaseException(e.getMessage(), e); - } catch (Throwable e) { - dumpHeap("crash.hprof"); - throw new DatabaseException(e.getMessage(), e); - } - - } - - protected ModelTransferableGraphSource getSource(ReadGraph graph, TransferableGraphConfiguration2 configuration, DomainProcessorState state, File otherStatementsFile, File valueFile) throws DatabaseException { - return new ModelTransferableGraphSource(graph, configuration, state, otherStatementsFile, valueFile); - } - - private TGValueModifier composeTGValueModifier(Collection configuredModifiers) { - List valueModifiers = configuredModifiers == null ? new ArrayList<>(2) : new ArrayList<>(configuredModifiers.size() + 2); - valueModifiers.add(new ResourceTGValueModifier(support)); - valueModifiers.add(RevisionTGValueModifier.INSTANCE); - return new ComposedTGValueModifier(valueModifiers.toArray(new TGValueModifier[valueModifiers.size()])); - } - - private static RandomAccessBinary toRandomAccessBinary(DeferredFileOutputStream stream, int bufferSize) throws IOException { - if (stream.isInMemory()) - return new BinaryMemory(stream.getData()); - return new BinaryFile(stream.getFile(), bufferSize); - } - - private static RandomAccessBinary toRandomAccessBinary(DeferredBinaryFile file) throws IOException { - RandomAccessBinary b = file.getBackend(); - long size = b.position(); - b.position(0); - if (b instanceof BinaryMemory) { - b.setLength(size); - } - return b; - } - - public static DomainOnlyProcessor getDomainOnly(RequestProcessor processor, IProgressMonitor monitor, final Resource resource) throws DatabaseException { - return getDomainOnly(processor, monitor, Collections.singletonList(resource)); - } - - public static DomainOnlyProcessor getDomainOnly(RequestProcessor processor, final IProgressMonitor monitor, final Collection resources) throws DatabaseException { - - return processor.syncRequest(new UniqueRead() { - - @Override - public DomainOnlyProcessor perform(ReadGraph graph) throws DatabaseException { - - try { - - TransferableGraphConfiguration2 conf = TransferableGraphConfiguration2.createWithResources(graph, resources, Collections.emptyList()); - - DomainProcessorState state = new DomainProcessorState(); - state.extensions.putAll(conf.baseExtensions); - state.ids = new TIntIntHashMap(1000, 0.75f); - state.statementsOutput = new DataOutputStream(new NullOutputStream()); - state.valueOutput = new NullRandomAccessBinary(); - state.valueCount = 0; - state.excludedShared = new TIntHashSet(); - state.monitor = SubMonitor.convert(monitor); - - return getDomainOnly(graph, conf, state, conf.ignoreVirtualResources); - - } catch (OperationCanceledException e) { - - return null; - - } - - } - - }); - - } - - public static class DomainOnlyProcessor extends DomainProcessor3 { - - final Resource instanceOf; - final public List internals; - final public List internalTypes; - - private int counter = 0; - - public DomainOnlyProcessor(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { - super(graph, conf, state, ignoreVirtual); - CollectionSupport cs = graph.getService(CollectionSupport.class); - internals = cs.createList(); - internalTypes = cs.createList(); - instanceOf = Layer0.getInstance(graph).InstanceOf; - } - - @Override - final public void addToStream(Resource predicate, Resource object) throws DatabaseException { - } - - @Override - public void flushStatementStream(int sId, DomainProcessorState state) throws IOException { - } - - @Override - public void processValue(ReadGraph graph, Resource subject, int sId, DomainProcessorState state) throws DatabaseException, IOException { - } - - @Override - public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException { - - if((counter++ & 1023) == 0) - if(state.monitor != null) - if(state.monitor.isCanceled()) - throw new CancelTransactionException(); - - super.processInternal(graph, subject, stms, state); - - internals.add(subject); - - Resource singleType = null; - for(Statement s : stms) { - if(instanceOf.equals(s.getPredicate())) { - if(singleType != null) { - internalTypes.add(null); - return; - } else { - singleType = s.getObject(); - } - } - } - - internalTypes.add(singleType); - - } - - - } - - public static DomainOnlyProcessor getDomainOnly(final ReadGraph graph , final TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { - - final DomainOnlyProcessor processor = new DomainOnlyProcessor(graph, conf, state, ignoreVirtual); - getDomain2(graph, state, processor); - return processor; - - } - - public static DomainProcessor3 getDomain2(final ReadGraph graph , final TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { - - ITask task = ThreadLogger.getInstance().begin("getDomain2"); - - final DomainProcessor3 processor = new DomainProcessor3(graph, conf, state, ignoreVirtual); - - getDomain2(graph, state, processor); - - final SerialisationSupport support = graph.getService(SerialisationSupport.class); - final ClusteringSupport cls = graph.getService(ClusteringSupport.class); - final Resource indexRoot = processor.conf.indexRoot; - - if (state.monitor.isCanceled()) - throw new CancelTransactionException(); - - TLongObjectHashMap clusterMap = new TLongObjectHashMap(); - for(Resource r : processor.status.keySet()) { - ExtentStatus status = processor.status.get(r); - int transientId = support.getTransientId(r); - if(ExtentStatus.INTERNAL == status) { - long cluster = cls.getCluster(r); - TIntArrayList list = clusterMap.get(cluster); - if(list == null) { - list = new TIntArrayList(); - clusterMap.put(cluster, list); - } - list.add(transientId); - } else if(ExtentStatus.EXTERNAL == status) { - state.externals.add(transientId); - } else if(ExtentStatus.PENDING == status) { - String uri = graph.getPossibleURI(r); - if(uri != null) - state.externals.add(transientId); - else { - state.pending.add(transientId); - System.err.println("Pending status in export: " + NameUtils.getSafeName(graph, r, true) + " (" + graph.getPossibleURI(r) + ")"); - } - } - } - - if (state.monitor.isCanceled()) - throw new CancelTransactionException(); - - final TIntArrayList clustering = new TIntArrayList(); - clusterMap.forEachEntry(new TLongObjectProcedure() { - - @Override - public boolean execute(long cluster, TIntArrayList b) { - clustering.add(b.size()); - b.forEach(new TIntProcedure() { - - @Override - public boolean execute(int rId) { - processor.ids.put(rId, processor.id++); - return true; - } - - }); - return true; - } - - }); - - if (state.monitor.isCanceled()) - throw new CancelTransactionException(); - - final TIntArrayList clusterSets = new TIntArrayList(); - clusterMap.forEachEntry(new TLongObjectProcedure() { - - @Override - public boolean execute(long cluster, TIntArrayList b) { - try { - Resource clusterSet = cls.getClusterSetOfCluster(cluster); - if(clusterSet != null) { - int transientId = support.getTransientId(clusterSet); - if(processor.ids.containsKey(transientId)) { - clusterSets.add(processor.ids.get(transientId)); - return true; - } else { - if(graph.getRootLibrary().equals(clusterSet)) { - clusterSets.add(Extensions.ROOT_LIBRARY_CLUSTER_SET); - return true; - } else if (clusterSet.equals(indexRoot)) { - clusterSets.add(Extensions.INDEX_ROOT_CLUSTER_SET); - return true; - } - } - } - } catch (DatabaseException e) { - } - clusterSets.add(Extensions.NO_CLUSTER_SET); - return true; - - } - - }); - - state.extensions.put(Extensions.CLUSTERING, new Variant(Bindings.INT_ARRAY, clustering.toArray())); - state.extensions.put(Extensions.CLUSTER_SETS, new Variant(Bindings.INT_ARRAY, clusterSets.toArray())); - - long total = processor.startupTime + processor.expandTime - + processor.classifyPredicateTime - + processor.processFringeTime + processor.extentSeedTime - + processor.fullResolveTime + processor.fastResolveTime + - + processor.parentResolveTime + processor.otherStatementTime; - - if (PROFILE) { - System.out.println("startup took " + 1e-9 * processor.startupTime + "s."); - System.out.println("expand took " + 1e-9 * processor.expandTime + "s."); - System.out.println("classifyPredicates took " + 1e-9 * processor.classifyPredicateTime + "s."); - System.out.println("processFringe took " + 1e-9 * processor.processFringeTime + "s."); - System.out.println("extentSeeding took " + 1e-9 * processor.extentSeedTime + "s."); - System.out.println("fullResolve took " + 1e-9 * processor.fullResolveTime + "s."); - System.out.println("fastResolve took " + 1e-9 * processor.fastResolveTime + "s."); - System.out.println("parentResolve took " + 1e-9 * processor.parentResolveTime + "s."); - System.out.println("otherStatements took " + 1e-9 * processor.otherStatementTime + "s."); - System.out.println("value output took " + 1e-9 * processor.valueOutputTime + "s."); - System.out.println("statement output took " + 1e-9 * processor.statementOutputTime + "s."); - System.out.println("total " + 1e-9 * total + "s."); - } - - task.finish(); - - return processor; - - } - - public static DomainProcessor3 getDomain2(final ReadGraph graph , DomainProcessorState state, final DomainProcessor3 processor) throws DatabaseException { - processor.process(graph, state); - return processor; - } - - static class Expansion3 extends UniqueRead> { - - final private Collection roots; - final boolean ignoreVirtual; - - public Expansion3(Collection roots, boolean ignoreVirtual) { - this.roots = roots; - this.ignoreVirtual = ignoreVirtual; - } - - @Override - public Collection perform(ReadGraph graph) { - - ArrayList result = new ArrayList(); - - final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class); - - final DomainStatementProcedure3 proc = new DomainStatementProcedure3(result); - - if (ignoreVirtual) { - for(Resource r : roots) { - dqs.forEachDirectPersistentStatement(graph, r, proc); - } - } else { - for(Resource r : roots) { - dqs.forEachDirectStatement(graph, r, proc); - } - } - - return result; - - } - - } - private static void dumpHeap(String path) { - - try { - Object bean = getBean(); - if (bean == null) - return; - - Method m = bean.getClass().getMethod("dumpHeap", String.class, boolean.class); - m.invoke(bean, path, true); - - } catch (IllegalArgumentException e) { - } catch (IllegalAccessException e) { - } catch (SecurityException e) { - } catch (NoSuchMethodException e) { - } catch (InvocationTargetException e) { - } finally { - } - - } - - private static Object getBean() { - Class beanClass = getBeanClass(); - if (beanClass == null) - return null; - try { - Object bean = ManagementFactory.newPlatformMXBeanProxy( - ManagementFactory.getPlatformMBeanServer(), - "com.sun.management:type=HotSpotDiagnostic", - beanClass); - return bean; - } catch (IOException e) { - return null; - } - } - - private static Class getBeanClass() { - try { - Class clazz = Class.forName("com.sun.management.HotSpotDiagnosticMXBean"); - return clazz; - } catch (ClassNotFoundException e) { - return null; - } - } - - public static void main(String[] args) { - - try { - - Datatype dt = Datatypes.translate("{ parts : ( | ResourceRVIPart { role : |CHILD|PROPERTY, resource : Long(unit=\"resource\") } | StringRVIPart { role : |CHILD|PROPERTY, string : String } ) [] }"); - Binding b = Bindings.getBinding(dt); - Object value = b.createDefault(); - Variant variant = new Variant(b, value); - TGResourceUtil util = new TGResourceUtil(); - LongAdapter la = new LongAdapter() { - @Override - public long adapt(long in) { - return in; - } - }; - util.adaptValue( variant.getBinding(), variant.getValue(), la ); - - } catch (DataTypeSyntaxError e) { - e.printStackTrace(); - } catch (BindingException e) { - e.printStackTrace(); - } catch (RuntimeSerializerConstructionException e) { - e.printStackTrace(); - } catch (AccessorException e) { - e.printStackTrace(); - } - - } - - -} +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.db.layer0.util; + +import java.io.BufferedOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import org.apache.commons.io.output.DeferredFileOutputStream; +import org.eclipse.core.runtime.IProgressMonitor; +import org.eclipse.core.runtime.OperationCanceledException; +import org.eclipse.core.runtime.SubMonitor; +import org.simantics.databoard.Bindings; +import org.simantics.databoard.Datatypes; +import org.simantics.databoard.accessor.error.AccessorException; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.binding.error.BindingException; +import org.simantics.databoard.binding.mutable.Variant; +import org.simantics.databoard.parser.repository.DataTypeSyntaxError; +import org.simantics.databoard.serialization.RuntimeSerializerConstructionException; +import org.simantics.databoard.type.Datatype; +import org.simantics.databoard.util.binary.BinaryFile; +import org.simantics.databoard.util.binary.BinaryMemory; +import org.simantics.databoard.util.binary.DeferredBinaryFile; +import org.simantics.databoard.util.binary.NullRandomAccessBinary; +import org.simantics.databoard.util.binary.RandomAccessBinary; +import org.simantics.db.DirectStatements; +import org.simantics.db.ReadGraph; +import org.simantics.db.RequestProcessor; +import org.simantics.db.Resource; +import org.simantics.db.Statement; +import org.simantics.db.common.request.UniqueRead; +import org.simantics.db.common.utils.NameUtils; +import org.simantics.db.exception.CancelTransactionException; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.exception.ValidationException; +import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus; +import org.simantics.db.layer0.internal.SimanticsInternal; +import org.simantics.db.service.ClusterControl; +import org.simantics.db.service.ClusterControl.ClusterState; +import org.simantics.db.service.ClusteringSupport; +import org.simantics.db.service.CollectionSupport; +import org.simantics.db.service.DirectQuerySupport; +import org.simantics.db.service.SerialisationSupport; +import org.simantics.graph.representation.Extensions; +import org.simantics.graph.utils.TGResourceUtil; +import org.simantics.graph.utils.TGResourceUtil.LongAdapter; +import org.simantics.layer0.Layer0; +import org.simantics.utils.threads.logger.ITask; +import org.simantics.utils.threads.logger.ThreadLogger; + +import gnu.trove.list.array.TIntArrayList; +import gnu.trove.map.hash.TIntIntHashMap; +import gnu.trove.map.hash.TLongObjectHashMap; +import gnu.trove.procedure.TIntProcedure; +import gnu.trove.procedure.TLongObjectProcedure; +import gnu.trove.set.hash.TIntHashSet; + +public class ModelTransferableGraphSourceRequest extends UniqueRead { + + public static String LOG_FILE = "transferableGraph.log"; + final static boolean LOG = false; + final static boolean PRINTDEBUG = false; + final static boolean DEBUG = LOG | PRINTDEBUG; + final static boolean PROFILE = false; + + private TransferableGraphConfiguration2 configuration; + private SubMonitor monitor; + + static DataOutput log; + + static { + + if (LOG) { + try { + FileOutputStream stream = new FileOutputStream(LOG_FILE); + log = new DataOutputStream(stream); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + } + + } + + static void log(String line) { + if (PRINTDEBUG) System.err.println(line); + if (LOG) { + try { + if(line.length() > 500) line = line.substring(0, 500); + log.writeUTF(line + "\n"); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public ModelTransferableGraphSourceRequest(TransferableGraphConfiguration2 conf) { + this(null, conf); + } + + public ModelTransferableGraphSourceRequest(IProgressMonitor monitor, TransferableGraphConfiguration2 conf) { + this.monitor = SubMonitor.convert(monitor); + this.configuration = conf; + } + + Layer0 L0; + + int statements[]; + int statementIndex = 0; + TIntIntHashMap ids; + TIntArrayList externalParents = new TIntArrayList(); + ArrayList externalNames = new ArrayList(); + + int id = 0; + int indent = 0; + + private SerialisationSupport support; + + private Resource getResource(int r) throws DatabaseException { + return support.getResource(r); + } + + public int getInternalId(int r) { + return ids.get(r); + } + + public boolean validateExternal(Resource ext) { + ExtentStatus status = configuration.preStatus.get(ext); + if(status != null) { + if(ExtentStatus.INTERNAL.equals(status)) return false; + else if(ExtentStatus.EXCLUDED.equals(status)) return false; + } + return true; + } + + /* + * + * @return -2 if r is not really external and the statement should be excluded + * + */ + public int getId(ReadGraph graph, int r) throws DatabaseException { + if(ids.containsKey(r)) { + int ret = ids.get(r); + if(ret == -1) { + for(int i=0;i<=indent;++i) + System.out.print(" "); + System.out.println("Cycle!!!"); // with " + GraphUtils.getReadableName(g, r)); + } + return ret; + } + else { + if(!validateExternal(getResource(r))) return -2; + Collection parents = graph.getObjects(getResource(r), L0.PartOf); + if(parents.size() != 1) { + throw new ValidationException("Reference to external resource " + + NameUtils.getSafeName(graph, getResource(r), true) + " without unique uri (" + parents.size() + " parents)."); + } + for(Resource p : parents) { + ++indent; + int pid = getId(graph, support.getTransientId(p)); + if(pid == -2) return -2; + externalParents.add(pid); + --indent; + } + externalNames.add((String)graph.getRelatedValue(getResource(r), L0.HasName)); + ids.put(r, id); + return id++; + } + } + + @Override + public ModelTransferableGraphSource perform(ReadGraph graph) throws DatabaseException { + + support = graph.getService(SerialisationSupport.class); + + this.L0 = Layer0.getInstance(graph); + + long total = System.nanoTime(); + long startupTime = System.nanoTime(); + long startupTimeEnd = System.nanoTime(); + long domainTime = System.nanoTime(); + + String otherStatements = "other" + UUID.randomUUID().toString(); + String valueFileName = "value" + UUID.randomUUID().toString(); + + File base_ = SimanticsInternal.getTemporaryDirectory(); + File base = new File(base_, "exports"); + base.mkdirs(); + + File otherStatementsFile = new File(base, otherStatements); + File valueFile = new File(base, valueFileName); + +// System.err.println("f: " + otherStatementsFile.getAbsolutePath()); + + try { + DeferredFileOutputStream otherStatementsStream = new DeferredFileOutputStream(1024*1024, otherStatementsFile); + + DataOutputStream otherStatementsOutput = new DataOutputStream(new BufferedOutputStream(otherStatementsStream, 1024*1024)); + DeferredBinaryFile valueOutput = new DeferredBinaryFile(valueFile, 1024*1024, 128*1024); + + ClusterControl cc = graph.getService(ClusterControl.class); + ClusterState clusterState = cc.getClusterState(); + + TIntHashSet excludedShared = new TIntHashSet(); + + ids = new TIntIntHashMap(1000, 0.75f); + + DomainProcessorState state = new DomainProcessorState(); + state.extensions.putAll(configuration.baseExtensions); + state.ids = ids; + state.statementsOutput = otherStatementsOutput; + state.valueOutput = valueOutput; + state.valueCount = 0; + state.excludedShared = excludedShared; + state.monitor = monitor; + state.valueModifier = composeTGValueModifier(configuration.valueModifiers); + + getDomain2(graph, configuration, state, configuration.ignoreVirtualResources); + + id = ids.size(); + + cc.restoreClusterState(clusterState); + + otherStatementsOutput.flush(); + otherStatementsOutput.close(); + + // Do not close valueOutput, just flush it and reuse it in + // ModelTransferableGraphSource for reading. + valueOutput.flush(); + + long domainDuration = System.nanoTime() - domainTime; + + state.id = id; + state.otherStatementsInput = toRandomAccessBinary(otherStatementsStream, 128*1024); + state.valueInput = toRandomAccessBinary(valueOutput); + state.statementsOutput = null; + state.valueOutput = null; + + long totalEnd = System.nanoTime(); + + if(PROFILE) { + System.out.println("startup in " + 1e-9*(startupTimeEnd - startupTime) + "s."); + System.out.println("domain was found in " + 1e-9*(domainDuration) + "s."); + System.out.println("total time for building subgraph was " + 1e-9*(totalEnd-total) + "s."); + } + + return getSource(graph, configuration, state, otherStatementsFile, valueFile); + + } catch (DatabaseException e) { + throw e; + } catch (IOException e) { + throw new DatabaseException(e.getMessage(), e); + } catch (Throwable e) { + dumpHeap("crash.hprof"); + throw new DatabaseException(e.getMessage(), e); + } + + } + + protected ModelTransferableGraphSource getSource(ReadGraph graph, TransferableGraphConfiguration2 configuration, DomainProcessorState state, File otherStatementsFile, File valueFile) throws DatabaseException { + return new ModelTransferableGraphSource(graph, configuration, state, otherStatementsFile, valueFile); + } + + private TGValueModifier composeTGValueModifier(Collection configuredModifiers) { + List valueModifiers = configuredModifiers == null ? new ArrayList<>(2) : new ArrayList<>(configuredModifiers.size() + 2); + valueModifiers.add(new ResourceTGValueModifier(support)); + valueModifiers.add(RevisionTGValueModifier.INSTANCE); + return new ComposedTGValueModifier(valueModifiers.toArray(new TGValueModifier[valueModifiers.size()])); + } + + private static RandomAccessBinary toRandomAccessBinary(DeferredFileOutputStream stream, int bufferSize) throws IOException { + if (stream.isInMemory()) + return new BinaryMemory(stream.getData()); + return new BinaryFile(stream.getFile(), bufferSize); + } + + private static RandomAccessBinary toRandomAccessBinary(DeferredBinaryFile file) throws IOException { + RandomAccessBinary b = file.getBackend(); + long size = b.position(); + b.position(0); + if (b instanceof BinaryMemory) { + b.setLength(size); + } + return b; + } + + public static DomainOnlyProcessor getDomainOnly(RequestProcessor processor, IProgressMonitor monitor, final Resource resource) throws DatabaseException { + return getDomainOnly(processor, monitor, Collections.singletonList(resource)); + } + + public static DomainOnlyProcessor getDomainOnly(RequestProcessor processor, final IProgressMonitor monitor, final Collection resources) throws DatabaseException { + + return processor.syncRequest(new UniqueRead() { + + @Override + public DomainOnlyProcessor perform(ReadGraph graph) throws DatabaseException { + + try { + + TransferableGraphConfiguration2 conf = TransferableGraphConfiguration2.createWithResources(graph, resources, Collections.emptyList()); + + DomainProcessorState state = new DomainProcessorState(); + state.extensions.putAll(conf.baseExtensions); + state.ids = new TIntIntHashMap(1000, 0.75f); + state.statementsOutput = new DataOutputStream(new NullOutputStream()); + state.valueOutput = new NullRandomAccessBinary(); + state.valueCount = 0; + state.excludedShared = new TIntHashSet(); + state.monitor = SubMonitor.convert(monitor); + + return getDomainOnly(graph, conf, state, conf.ignoreVirtualResources); + + } catch (OperationCanceledException e) { + + return null; + + } + + } + + }); + + } + + public static class DomainOnlyProcessor extends DomainProcessor3 { + + final Resource instanceOf; + final public List internals; + final public List internalTypes; + + private int counter = 0; + + public DomainOnlyProcessor(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { + super(graph, conf, state, ignoreVirtual); + CollectionSupport cs = graph.getService(CollectionSupport.class); + internals = cs.createList(); + internalTypes = cs.createList(); + instanceOf = Layer0.getInstance(graph).InstanceOf; + } + + @Override + final public void addToStream(Resource predicate, Resource object) throws DatabaseException { + } + + @Override + public void flushStatementStream(int sId, DomainProcessorState state) throws IOException { + } + + @Override + public void processValue(ReadGraph graph, Resource subject, int sId, DomainProcessorState state) throws DatabaseException, IOException { + } + + @Override + public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException { + + if((counter++ & 1023) == 0) + if(state.monitor != null) + if(state.monitor.isCanceled()) + throw new CancelTransactionException(); + + super.processInternal(graph, subject, stms, state); + + internals.add(subject); + + Resource singleType = null; + for(Statement s : stms) { + if(instanceOf.equals(s.getPredicate())) { + if(singleType != null) { + internalTypes.add(null); + return; + } else { + singleType = s.getObject(); + } + } + } + + internalTypes.add(singleType); + + } + + + } + + public static DomainOnlyProcessor getDomainOnly(final ReadGraph graph , final TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { + + final DomainOnlyProcessor processor = new DomainOnlyProcessor(graph, conf, state, ignoreVirtual); + getDomain2(graph, state, processor); + return processor; + + } + + public static DomainProcessor3 getDomain2(final ReadGraph graph , final TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException { + + ITask task = ThreadLogger.getInstance().begin("getDomain2"); + + final DomainProcessor3 processor = new DomainProcessor3(graph, conf, state, ignoreVirtual); + + getDomain2(graph, state, processor); + + final SerialisationSupport support = graph.getService(SerialisationSupport.class); + final ClusteringSupport cls = graph.getService(ClusteringSupport.class); + final Resource indexRoot = processor.conf.indexRoot; + + if (state.monitor.isCanceled()) + throw new CancelTransactionException(); + + TLongObjectHashMap clusterMap = new TLongObjectHashMap(); + for(Resource r : processor.status.keySet()) { + ExtentStatus status = processor.status.get(r); + int transientId = support.getTransientId(r); + if(ExtentStatus.INTERNAL == status) { + long cluster = cls.getCluster(r); + TIntArrayList list = clusterMap.get(cluster); + if(list == null) { + list = new TIntArrayList(); + clusterMap.put(cluster, list); + } + list.add(transientId); + } else if(ExtentStatus.EXTERNAL == status) { + state.externals.add(transientId); + } else if(ExtentStatus.PENDING == status) { + String uri = graph.getPossibleURI(r); + if(uri != null) { + // All internal resources with uri have been discovered already => this must then be external + //state.externals.add(transientId); + // Pending resources are found through weak links - if they are still pending at this stage do not add an external + processor.status.put(r, ExtentStatus.EXTERNAL); + } + else { + state.pending.add(transientId); + System.err.println("Pending status in export: " + NameUtils.getSafeName(graph, r, true) + " (" + graph.getPossibleURI(r) + ")"); + } + } + } + + // Now that we know the status of the resources lets process weak statements + for(Statement stm : processor.unresolvedWeakLinks) { + ExtentStatus status = processor.status.get(stm.getObject()); + if(ExtentStatus.INTERNAL == status) { + // Weak links between internals are exported + int transientId = support.getTransientId(stm.getSubject()); + processor.addToStream(stm.getPredicate(), stm.getObject()); + try { + processor.flushStatementStream(transientId, state); + } catch (IOException e) { + throw new DatabaseException(e); + } + } + } + + if (state.monitor.isCanceled()) + throw new CancelTransactionException(); + + final TIntArrayList clustering = new TIntArrayList(); + clusterMap.forEachEntry(new TLongObjectProcedure() { + + @Override + public boolean execute(long cluster, TIntArrayList b) { + clustering.add(b.size()); + b.forEach(new TIntProcedure() { + + @Override + public boolean execute(int rId) { + processor.ids.put(rId, processor.id++); + return true; + } + + }); + return true; + } + + }); + + if (state.monitor.isCanceled()) + throw new CancelTransactionException(); + + final TIntArrayList clusterSets = new TIntArrayList(); + clusterMap.forEachEntry(new TLongObjectProcedure() { + + @Override + public boolean execute(long cluster, TIntArrayList b) { + try { + Resource clusterSet = cls.getClusterSetOfCluster(cluster); + if(clusterSet != null) { + int transientId = support.getTransientId(clusterSet); + if(processor.ids.containsKey(transientId)) { + clusterSets.add(processor.ids.get(transientId)); + return true; + } else { + if(graph.getRootLibrary().equals(clusterSet)) { + clusterSets.add(Extensions.ROOT_LIBRARY_CLUSTER_SET); + return true; + } else if (clusterSet.equals(indexRoot)) { + clusterSets.add(Extensions.INDEX_ROOT_CLUSTER_SET); + return true; + } + } + } + } catch (DatabaseException e) { + } + clusterSets.add(Extensions.NO_CLUSTER_SET); + return true; + + } + + }); + + state.extensions.put(Extensions.CLUSTERING, new Variant(Bindings.INT_ARRAY, clustering.toArray())); + state.extensions.put(Extensions.CLUSTER_SETS, new Variant(Bindings.INT_ARRAY, clusterSets.toArray())); + + long total = processor.startupTime + processor.expandTime + + processor.classifyPredicateTime + + processor.processFringeTime + processor.extentSeedTime + + processor.fullResolveTime + processor.fastResolveTime + + + processor.parentResolveTime + processor.otherStatementTime; + + if (PROFILE) { + System.out.println("startup took " + 1e-9 * processor.startupTime + "s."); + System.out.println("expand took " + 1e-9 * processor.expandTime + "s."); + System.out.println("classifyPredicates took " + 1e-9 * processor.classifyPredicateTime + "s."); + System.out.println("processFringe took " + 1e-9 * processor.processFringeTime + "s."); + System.out.println("extentSeeding took " + 1e-9 * processor.extentSeedTime + "s."); + System.out.println("fullResolve took " + 1e-9 * processor.fullResolveTime + "s."); + System.out.println("fastResolve took " + 1e-9 * processor.fastResolveTime + "s."); + System.out.println("parentResolve took " + 1e-9 * processor.parentResolveTime + "s."); + System.out.println("otherStatements took " + 1e-9 * processor.otherStatementTime + "s."); + System.out.println("value output took " + 1e-9 * processor.valueOutputTime + "s."); + System.out.println("statement output took " + 1e-9 * processor.statementOutputTime + "s."); + System.out.println("total " + 1e-9 * total + "s."); + } + + task.finish(); + + return processor; + + } + + public static DomainProcessor3 getDomain2(final ReadGraph graph , DomainProcessorState state, final DomainProcessor3 processor) throws DatabaseException { + processor.process(graph, state); + return processor; + } + + static class Expansion3 extends UniqueRead> { + + final private Collection roots; + final boolean ignoreVirtual; + + public Expansion3(Collection roots, boolean ignoreVirtual) { + this.roots = roots; + this.ignoreVirtual = ignoreVirtual; + } + + @Override + public Collection perform(ReadGraph graph) { + + ArrayList result = new ArrayList(); + + final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class); + + final DomainStatementProcedure3 proc = new DomainStatementProcedure3(result); + + if (ignoreVirtual) { + for(Resource r : roots) { + dqs.forEachDirectPersistentStatement(graph, r, proc); + } + } else { + for(Resource r : roots) { + dqs.forEachDirectStatement(graph, r, proc); + } + } + + return result; + + } + + } + private static void dumpHeap(String path) { + + try { + Object bean = getBean(); + if (bean == null) + return; + + Method m = bean.getClass().getMethod("dumpHeap", String.class, boolean.class); + m.invoke(bean, path, true); + + } catch (IllegalArgumentException e) { + } catch (IllegalAccessException e) { + } catch (SecurityException e) { + } catch (NoSuchMethodException e) { + } catch (InvocationTargetException e) { + } finally { + } + + } + + private static Object getBean() { + Class beanClass = getBeanClass(); + if (beanClass == null) + return null; + try { + Object bean = ManagementFactory.newPlatformMXBeanProxy( + ManagementFactory.getPlatformMBeanServer(), + "com.sun.management:type=HotSpotDiagnostic", + beanClass); + return bean; + } catch (IOException e) { + return null; + } + } + + private static Class getBeanClass() { + try { + Class clazz = Class.forName("com.sun.management.HotSpotDiagnosticMXBean"); + return clazz; + } catch (ClassNotFoundException e) { + return null; + } + } + + public static void main(String[] args) { + + try { + + Datatype dt = Datatypes.translate("{ parts : ( | ResourceRVIPart { role : |CHILD|PROPERTY, resource : Long(unit=\"resource\") } | StringRVIPart { role : |CHILD|PROPERTY, string : String } ) [] }"); + Binding b = Bindings.getBinding(dt); + Object value = b.createDefault(); + Variant variant = new Variant(b, value); + TGResourceUtil util = new TGResourceUtil(); + LongAdapter la = new LongAdapter() { + @Override + public long adapt(long in) { + return in; + } + }; + util.adaptValue( variant.getBinding(), variant.getValue(), la ); + + } catch (DataTypeSyntaxError e) { + e.printStackTrace(); + } catch (BindingException e) { + e.printStackTrace(); + } catch (RuntimeSerializerConstructionException e) { + e.printStackTrace(); + } catch (AccessorException e) { + e.printStackTrace(); + } + + } + + +}