1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.layer0.util;
14 import java.io.BufferedOutputStream;
15 import java.io.DataOutput;
16 import java.io.DataOutputStream;
18 import java.io.FileNotFoundException;
19 import java.io.FileOutputStream;
20 import java.io.IOException;
21 import java.lang.management.ManagementFactory;
22 import java.lang.reflect.InvocationTargetException;
23 import java.lang.reflect.Method;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.UUID;
30 import org.apache.commons.io.output.DeferredFileOutputStream;
31 import org.eclipse.core.runtime.IProgressMonitor;
32 import org.eclipse.core.runtime.OperationCanceledException;
33 import org.eclipse.core.runtime.SubMonitor;
34 import org.simantics.databoard.Bindings;
35 import org.simantics.databoard.Datatypes;
36 import org.simantics.databoard.accessor.error.AccessorException;
37 import org.simantics.databoard.binding.Binding;
38 import org.simantics.databoard.binding.error.BindingException;
39 import org.simantics.databoard.binding.mutable.Variant;
40 import org.simantics.databoard.parser.repository.DataTypeSyntaxError;
41 import org.simantics.databoard.serialization.RuntimeSerializerConstructionException;
42 import org.simantics.databoard.type.Datatype;
43 import org.simantics.databoard.util.binary.BinaryFile;
44 import org.simantics.databoard.util.binary.BinaryMemory;
45 import org.simantics.databoard.util.binary.DeferredBinaryFile;
46 import org.simantics.databoard.util.binary.NullRandomAccessBinary;
47 import org.simantics.databoard.util.binary.RandomAccessBinary;
48 import org.simantics.db.DirectStatements;
49 import org.simantics.db.ReadGraph;
50 import org.simantics.db.RequestProcessor;
51 import org.simantics.db.Resource;
52 import org.simantics.db.Statement;
53 import org.simantics.db.common.request.UniqueRead;
54 import org.simantics.db.common.utils.NameUtils;
55 import org.simantics.db.exception.CancelTransactionException;
56 import org.simantics.db.exception.DatabaseException;
57 import org.simantics.db.exception.ValidationException;
58 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
59 import org.simantics.db.layer0.internal.SimanticsInternal;
60 import org.simantics.db.service.ClusterControl;
61 import org.simantics.db.service.ClusterControl.ClusterState;
62 import org.simantics.db.service.ClusteringSupport;
63 import org.simantics.db.service.CollectionSupport;
64 import org.simantics.db.service.DirectQuerySupport;
65 import org.simantics.db.service.SerialisationSupport;
66 import org.simantics.graph.representation.Extensions;
67 import org.simantics.graph.utils.TGResourceUtil;
68 import org.simantics.graph.utils.TGResourceUtil.LongAdapter;
69 import org.simantics.layer0.Layer0;
70 import org.simantics.utils.threads.logger.ITask;
71 import org.simantics.utils.threads.logger.ThreadLogger;
73 import gnu.trove.list.array.TIntArrayList;
74 import gnu.trove.map.hash.TIntIntHashMap;
75 import gnu.trove.map.hash.TLongObjectHashMap;
76 import gnu.trove.procedure.TIntProcedure;
77 import gnu.trove.procedure.TLongObjectProcedure;
78 import gnu.trove.set.hash.TIntHashSet;
80 public class ModelTransferableGraphSourceRequest extends UniqueRead<ModelTransferableGraphSource> {
82 public static String LOG_FILE = "transferableGraph.log";
83 final static boolean LOG = false;
84 final static boolean PRINTDEBUG = false;
85 final static boolean DEBUG = LOG | PRINTDEBUG;
86 final static boolean PROFILE = false;
88 private TransferableGraphConfiguration2 configuration;
89 private SubMonitor monitor;
91 static DataOutput log;
97 FileOutputStream stream = new FileOutputStream(LOG_FILE);
98 log = new DataOutputStream(stream);
99 } catch (FileNotFoundException e) {
106 static void log(String line) {
107 if (PRINTDEBUG) System.err.println(line);
110 if(line.length() > 500) line = line.substring(0, 500);
111 log.writeUTF(line + "\n");
112 } catch (IOException e) {
118 public ModelTransferableGraphSourceRequest(TransferableGraphConfiguration2 conf) {
122 public ModelTransferableGraphSourceRequest(IProgressMonitor monitor, TransferableGraphConfiguration2 conf) {
123 this.monitor = SubMonitor.convert(monitor);
124 this.configuration = conf;
130 int statementIndex = 0;
132 TIntArrayList externalParents = new TIntArrayList();
133 ArrayList<String> externalNames = new ArrayList<String>();
138 private SerialisationSupport support;
140 private Resource getResource(int r) throws DatabaseException {
141 return support.getResource(r);
144 public int getInternalId(int r) {
148 public boolean validateExternal(Resource ext) {
149 ExtentStatus status = configuration.preStatus.get(ext);
151 if(ExtentStatus.INTERNAL.equals(status)) return false;
152 else if(ExtentStatus.EXCLUDED.equals(status)) return false;
159 * @return -2 if r is not really external and the statement should be excluded
162 public int getId(ReadGraph graph, int r) throws DatabaseException {
163 if(ids.containsKey(r)) {
164 int ret = ids.get(r);
166 for(int i=0;i<=indent;++i)
167 System.out.print(" ");
168 System.out.println("Cycle!!!"); // with " + GraphUtils.getReadableName(g, r));
173 if(!validateExternal(getResource(r))) return -2;
174 Collection<Resource> parents = graph.getObjects(getResource(r), L0.PartOf);
175 if(parents.size() != 1) {
176 throw new ValidationException("Reference to external resource "
177 + NameUtils.getSafeName(graph, getResource(r), true) + " without unique uri (" + parents.size() + " parents).");
179 for(Resource p : parents) {
181 int pid = getId(graph, support.getTransientId(p));
182 if(pid == -2) return -2;
183 externalParents.add(pid);
186 externalNames.add((String)graph.getRelatedValue(getResource(r), L0.HasName));
193 public ModelTransferableGraphSource perform(ReadGraph graph) throws DatabaseException {
195 support = graph.getService(SerialisationSupport.class);
197 this.L0 = Layer0.getInstance(graph);
199 long total = System.nanoTime();
200 long startupTime = System.nanoTime();
201 long startupTimeEnd = System.nanoTime();
202 long domainTime = System.nanoTime();
204 String otherStatements = "other" + UUID.randomUUID().toString();
205 String valueFileName = "value" + UUID.randomUUID().toString();
207 File base_ = SimanticsInternal.getTemporaryDirectory();
208 File base = new File(base_, "exports");
211 File otherStatementsFile = new File(base, otherStatements);
212 File valueFile = new File(base, valueFileName);
214 // System.err.println("f: " + otherStatementsFile.getAbsolutePath());
217 DeferredFileOutputStream otherStatementsStream = new DeferredFileOutputStream(1024*1024, otherStatementsFile);
219 DataOutputStream otherStatementsOutput = new DataOutputStream(new BufferedOutputStream(otherStatementsStream, 1024*1024));
220 DeferredBinaryFile valueOutput = new DeferredBinaryFile(valueFile, 1024*1024, 128*1024);
222 ClusterControl cc = graph.getService(ClusterControl.class);
223 ClusterState clusterState = cc.getClusterState();
225 TIntHashSet excludedShared = new TIntHashSet();
227 ids = new TIntIntHashMap(1000, 0.75f);
229 DomainProcessorState state = new DomainProcessorState();
230 state.extensions.putAll(configuration.baseExtensions);
232 state.statementsOutput = otherStatementsOutput;
233 state.valueOutput = valueOutput;
234 state.valueCount = 0;
235 state.excludedShared = excludedShared;
236 state.monitor = monitor;
237 state.valueModifier = composeTGValueModifier(configuration.valueModifiers);
239 getDomain2(graph, configuration, state, configuration.ignoreVirtualResources);
243 cc.restoreClusterState(clusterState);
245 otherStatementsOutput.flush();
246 otherStatementsOutput.close();
248 // Do not close valueOutput, just flush it and reuse it in
249 // ModelTransferableGraphSource for reading.
252 long domainDuration = System.nanoTime() - domainTime;
255 state.otherStatementsInput = toRandomAccessBinary(otherStatementsStream, 128*1024);
256 state.valueInput = toRandomAccessBinary(valueOutput);
257 state.statementsOutput = null;
258 state.valueOutput = null;
260 long totalEnd = System.nanoTime();
263 System.out.println("startup in " + 1e-9*(startupTimeEnd - startupTime) + "s.");
264 System.out.println("domain was found in " + 1e-9*(domainDuration) + "s.");
265 System.out.println("total time for building subgraph was " + 1e-9*(totalEnd-total) + "s.");
268 return getSource(graph, configuration, state, otherStatementsFile, valueFile);
270 } catch (DatabaseException e) {
272 } catch (IOException e) {
273 throw new DatabaseException(e.getMessage(), e);
274 } catch (Throwable e) {
275 dumpHeap("crash.hprof");
276 throw new DatabaseException(e.getMessage(), e);
281 protected ModelTransferableGraphSource getSource(ReadGraph graph, TransferableGraphConfiguration2 configuration, DomainProcessorState state, File otherStatementsFile, File valueFile) throws DatabaseException {
282 return new ModelTransferableGraphSource(graph, configuration, state, otherStatementsFile, valueFile);
285 private TGValueModifier composeTGValueModifier(Collection<TGValueModifier> configuredModifiers) {
286 List<TGValueModifier> valueModifiers = configuredModifiers == null ? new ArrayList<>(2) : new ArrayList<>(configuredModifiers.size() + 2);
287 valueModifiers.add(new ResourceTGValueModifier(support));
288 valueModifiers.add(RevisionTGValueModifier.INSTANCE);
289 return new ComposedTGValueModifier(valueModifiers.toArray(new TGValueModifier[valueModifiers.size()]));
292 private static RandomAccessBinary toRandomAccessBinary(DeferredFileOutputStream stream, int bufferSize) throws IOException {
293 if (stream.isInMemory())
294 return new BinaryMemory(stream.getData());
295 return new BinaryFile(stream.getFile(), bufferSize);
298 private static RandomAccessBinary toRandomAccessBinary(DeferredBinaryFile file) throws IOException {
299 RandomAccessBinary b = file.getBackend();
300 long size = b.position();
302 if (b instanceof BinaryMemory) {
308 public static DomainOnlyProcessor getDomainOnly(RequestProcessor processor, IProgressMonitor monitor, final Resource resource) throws DatabaseException {
309 return getDomainOnly(processor, monitor, Collections.singletonList(resource));
312 public static DomainOnlyProcessor getDomainOnly(RequestProcessor processor, final IProgressMonitor monitor, final Collection<Resource> resources) throws DatabaseException {
314 return processor.syncRequest(new UniqueRead<DomainOnlyProcessor>() {
317 public DomainOnlyProcessor perform(ReadGraph graph) throws DatabaseException {
321 TransferableGraphConfiguration2 conf = TransferableGraphConfiguration2.createWithResources(graph, resources, Collections.<Resource>emptyList());
323 DomainProcessorState state = new DomainProcessorState();
324 state.extensions.putAll(conf.baseExtensions);
325 state.ids = new TIntIntHashMap(1000, 0.75f);
326 state.statementsOutput = new DataOutputStream(new NullOutputStream());
327 state.valueOutput = new NullRandomAccessBinary();
328 state.valueCount = 0;
329 state.excludedShared = new TIntHashSet();
330 state.monitor = SubMonitor.convert(monitor);
332 return getDomainOnly(graph, conf, state, conf.ignoreVirtualResources);
334 } catch (OperationCanceledException e) {
346 public static class DomainOnlyProcessor extends DomainProcessor3 {
348 final Resource instanceOf;
349 final public List<Resource> internals;
350 final public List<Resource> internalTypes;
352 private int counter = 0;
354 public DomainOnlyProcessor(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
355 super(graph, conf, state, ignoreVirtual);
356 CollectionSupport cs = graph.getService(CollectionSupport.class);
357 internals = cs.createList();
358 internalTypes = cs.createList();
359 instanceOf = Layer0.getInstance(graph).InstanceOf;
363 final public void addToStream(Resource predicate, Resource object) throws DatabaseException {
367 public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
371 public void processValue(ReadGraph graph, Resource subject, int sId, DomainProcessorState state) throws DatabaseException, IOException {
375 public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
377 if((counter++ & 1023) == 0)
378 if(state.monitor != null)
379 if(state.monitor.isCanceled())
380 throw new CancelTransactionException();
382 super.processInternal(graph, subject, stms, state);
384 internals.add(subject);
386 Resource singleType = null;
387 for(Statement s : stms) {
388 if(instanceOf.equals(s.getPredicate())) {
389 if(singleType != null) {
390 internalTypes.add(null);
393 singleType = s.getObject();
398 internalTypes.add(singleType);
405 public static DomainOnlyProcessor getDomainOnly(final ReadGraph graph , final TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
407 final DomainOnlyProcessor processor = new DomainOnlyProcessor(graph, conf, state, ignoreVirtual);
408 getDomain2(graph, state, processor);
413 public static DomainProcessor3 getDomain2(final ReadGraph graph , final TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
415 ITask task = ThreadLogger.getInstance().begin("getDomain2");
417 final DomainProcessor3 processor = new DomainProcessor3(graph, conf, state, ignoreVirtual);
419 getDomain2(graph, state, processor);
421 final SerialisationSupport support = graph.getService(SerialisationSupport.class);
422 final ClusteringSupport cls = graph.getService(ClusteringSupport.class);
423 final Resource indexRoot = processor.conf.indexRoot;
425 if (state.monitor.isCanceled())
426 throw new CancelTransactionException();
428 TLongObjectHashMap<TIntArrayList> clusterMap = new TLongObjectHashMap<TIntArrayList>();
429 for(Resource r : processor.status.keySet()) {
430 ExtentStatus status = processor.status.get(r);
431 int transientId = support.getTransientId(r);
432 if(ExtentStatus.INTERNAL == status) {
433 long cluster = cls.getCluster(r);
434 TIntArrayList list = clusterMap.get(cluster);
436 list = new TIntArrayList();
437 clusterMap.put(cluster, list);
439 list.add(transientId);
440 } else if(ExtentStatus.EXTERNAL == status) {
441 state.externals.add(transientId);
442 } else if(ExtentStatus.PENDING == status) {
443 String uri = graph.getPossibleURI(r);
445 // All internal resources with uri have been discovered already => this must then be external
446 //state.externals.add(transientId);
447 // Pending resources are found through weak links - if they are still pending at this stage do not add an external
448 processor.status.put(r, ExtentStatus.EXTERNAL);
451 state.pending.add(transientId);
452 System.err.println("Pending status in export: " + NameUtils.getSafeName(graph, r, true) + " (" + graph.getPossibleURI(r) + ")");
457 // Now that we know the status of the resources lets process weak statements
458 for(Statement stm : processor.unresolvedWeakLinks) {
459 ExtentStatus status = processor.status.get(stm.getObject());
460 if(ExtentStatus.INTERNAL == status) {
461 // Weak links between internals are exported
462 int transientId = support.getTransientId(stm.getSubject());
463 processor.addToStream(stm.getPredicate(), stm.getObject());
465 processor.flushStatementStream(transientId, state);
466 } catch (IOException e) {
467 throw new DatabaseException(e);
472 if (state.monitor.isCanceled())
473 throw new CancelTransactionException();
475 final TIntArrayList clustering = new TIntArrayList();
476 clusterMap.forEachEntry(new TLongObjectProcedure<TIntArrayList>() {
479 public boolean execute(long cluster, TIntArrayList b) {
480 clustering.add(b.size());
481 b.forEach(new TIntProcedure() {
484 public boolean execute(int rId) {
485 processor.ids.put(rId, processor.id++);
495 if (state.monitor.isCanceled())
496 throw new CancelTransactionException();
498 final TIntArrayList clusterSets = new TIntArrayList();
499 clusterMap.forEachEntry(new TLongObjectProcedure<TIntArrayList>() {
502 public boolean execute(long cluster, TIntArrayList b) {
504 Resource clusterSet = cls.getClusterSetOfCluster(cluster);
505 if(clusterSet != null) {
506 int transientId = support.getTransientId(clusterSet);
507 if(processor.ids.containsKey(transientId)) {
508 clusterSets.add(processor.ids.get(transientId));
511 if(graph.getRootLibrary().equals(clusterSet)) {
512 clusterSets.add(Extensions.ROOT_LIBRARY_CLUSTER_SET);
514 } else if (clusterSet.equals(indexRoot)) {
515 clusterSets.add(Extensions.INDEX_ROOT_CLUSTER_SET);
520 } catch (DatabaseException e) {
522 clusterSets.add(Extensions.NO_CLUSTER_SET);
529 state.extensions.put(Extensions.CLUSTERING, new Variant(Bindings.INT_ARRAY, clustering.toArray()));
530 state.extensions.put(Extensions.CLUSTER_SETS, new Variant(Bindings.INT_ARRAY, clusterSets.toArray()));
532 long total = processor.startupTime + processor.expandTime
533 + processor.classifyPredicateTime
534 + processor.processFringeTime + processor.extentSeedTime
535 + processor.fullResolveTime + processor.fastResolveTime +
536 + processor.parentResolveTime + processor.otherStatementTime;
539 System.out.println("startup took " + 1e-9 * processor.startupTime + "s.");
540 System.out.println("expand took " + 1e-9 * processor.expandTime + "s.");
541 System.out.println("classifyPredicates took " + 1e-9 * processor.classifyPredicateTime + "s.");
542 System.out.println("processFringe took " + 1e-9 * processor.processFringeTime + "s.");
543 System.out.println("extentSeeding took " + 1e-9 * processor.extentSeedTime + "s.");
544 System.out.println("fullResolve took " + 1e-9 * processor.fullResolveTime + "s.");
545 System.out.println("fastResolve took " + 1e-9 * processor.fastResolveTime + "s.");
546 System.out.println("parentResolve took " + 1e-9 * processor.parentResolveTime + "s.");
547 System.out.println("otherStatements took " + 1e-9 * processor.otherStatementTime + "s.");
548 System.out.println("value output took " + 1e-9 * processor.valueOutputTime + "s.");
549 System.out.println("statement output took " + 1e-9 * processor.statementOutputTime + "s.");
550 System.out.println("total " + 1e-9 * total + "s.");
559 public static DomainProcessor3 getDomain2(final ReadGraph graph , DomainProcessorState state, final DomainProcessor3 processor) throws DatabaseException {
560 processor.process(graph, state);
564 static class Expansion3 extends UniqueRead<Collection<DirectStatements>> {
566 final private Collection<Resource> roots;
567 final boolean ignoreVirtual;
569 public Expansion3(Collection<Resource> roots, boolean ignoreVirtual) {
571 this.ignoreVirtual = ignoreVirtual;
575 public Collection<DirectStatements> perform(ReadGraph graph) {
577 ArrayList<DirectStatements> result = new ArrayList<DirectStatements>();
579 final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class);
581 final DomainStatementProcedure3 proc = new DomainStatementProcedure3(result);
584 for(Resource r : roots) {
585 dqs.forEachDirectPersistentStatement(graph, r, proc);
588 for(Resource r : roots) {
589 dqs.forEachDirectStatement(graph, r, proc);
598 private static void dumpHeap(String path) {
601 Object bean = getBean();
605 Method m = bean.getClass().getMethod("dumpHeap", String.class, boolean.class);
606 m.invoke(bean, path, true);
608 } catch (IllegalArgumentException e) {
609 } catch (IllegalAccessException e) {
610 } catch (SecurityException e) {
611 } catch (NoSuchMethodException e) {
612 } catch (InvocationTargetException e) {
618 private static Object getBean() {
619 Class<?> beanClass = getBeanClass();
620 if (beanClass == null)
623 Object bean = ManagementFactory.newPlatformMXBeanProxy(
624 ManagementFactory.getPlatformMBeanServer(),
625 "com.sun.management:type=HotSpotDiagnostic",
628 } catch (IOException e) {
633 private static Class<?> getBeanClass() {
635 Class<?> clazz = Class.forName("com.sun.management.HotSpotDiagnosticMXBean");
637 } catch (ClassNotFoundException e) {
642 public static void main(String[] args) {
646 Datatype dt = Datatypes.translate("{ parts : ( | ResourceRVIPart { role : |CHILD|PROPERTY, resource : Long(unit=\"resource\") } | StringRVIPart { role : |CHILD|PROPERTY, string : String } ) [] }");
647 Binding b = Bindings.getBinding(dt);
648 Object value = b.createDefault();
649 Variant variant = new Variant(b, value);
650 TGResourceUtil util = new TGResourceUtil();
651 LongAdapter la = new LongAdapter() {
653 public long adapt(long in) {
657 util.adaptValue( variant.getBinding(), variant.getValue(), la );
659 } catch (DataTypeSyntaxError e) {
661 } catch (BindingException e) {
663 } catch (RuntimeSerializerConstructionException e) {
665 } catch (AccessorException e) {