1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package org.simantics.db.layer0.util;
\r
14 import java.io.BufferedOutputStream;
\r
15 import java.io.DataOutput;
\r
16 import java.io.DataOutputStream;
\r
17 import java.io.File;
\r
18 import java.io.FileNotFoundException;
\r
19 import java.io.FileOutputStream;
\r
20 import java.io.IOException;
\r
21 import java.lang.management.ManagementFactory;
\r
22 import java.lang.reflect.InvocationTargetException;
\r
23 import java.lang.reflect.Method;
\r
24 import java.util.ArrayList;
\r
25 import java.util.Collection;
\r
26 import java.util.Collections;
\r
27 import java.util.List;
\r
28 import java.util.UUID;
\r
30 import org.apache.commons.io.output.DeferredFileOutputStream;
\r
31 import org.eclipse.core.runtime.IProgressMonitor;
\r
32 import org.eclipse.core.runtime.OperationCanceledException;
\r
33 import org.eclipse.core.runtime.SubMonitor;
\r
34 import org.simantics.databoard.Bindings;
\r
35 import org.simantics.databoard.Datatypes;
\r
36 import org.simantics.databoard.accessor.error.AccessorException;
\r
37 import org.simantics.databoard.binding.Binding;
\r
38 import org.simantics.databoard.binding.error.BindingException;
\r
39 import org.simantics.databoard.binding.mutable.Variant;
\r
40 import org.simantics.databoard.parser.repository.DataTypeSyntaxError;
\r
41 import org.simantics.databoard.serialization.RuntimeSerializerConstructionException;
\r
42 import org.simantics.databoard.type.Datatype;
\r
43 import org.simantics.databoard.util.binary.BinaryFile;
\r
44 import org.simantics.databoard.util.binary.BinaryMemory;
\r
45 import org.simantics.databoard.util.binary.DeferredBinaryFile;
\r
46 import org.simantics.databoard.util.binary.NullRandomAccessBinary;
\r
47 import org.simantics.databoard.util.binary.RandomAccessBinary;
\r
48 import org.simantics.db.DirectStatements;
\r
49 import org.simantics.db.ReadGraph;
\r
50 import org.simantics.db.RequestProcessor;
\r
51 import org.simantics.db.Resource;
\r
52 import org.simantics.db.Statement;
\r
53 import org.simantics.db.common.request.UniqueRead;
\r
54 import org.simantics.db.common.utils.NameUtils;
\r
55 import org.simantics.db.exception.CancelTransactionException;
\r
56 import org.simantics.db.exception.DatabaseException;
\r
57 import org.simantics.db.exception.ValidationException;
\r
58 import org.simantics.db.layer0.adapter.SubgraphExtent.ExtentStatus;
\r
59 import org.simantics.db.layer0.internal.SimanticsInternal;
\r
60 import org.simantics.db.service.ClusterControl;
\r
61 import org.simantics.db.service.ClusterControl.ClusterState;
\r
62 import org.simantics.db.service.ClusteringSupport;
\r
63 import org.simantics.db.service.CollectionSupport;
\r
64 import org.simantics.db.service.DirectQuerySupport;
\r
65 import org.simantics.db.service.SerialisationSupport;
\r
66 import org.simantics.graph.representation.Extensions;
\r
67 import org.simantics.graph.utils.TGResourceUtil;
\r
68 import org.simantics.graph.utils.TGResourceUtil.LongAdapter;
\r
69 import org.simantics.layer0.Layer0;
\r
70 import org.simantics.utils.threads.logger.ITask;
\r
71 import org.simantics.utils.threads.logger.ThreadLogger;
\r
73 import gnu.trove.list.array.TIntArrayList;
\r
74 import gnu.trove.map.hash.TIntIntHashMap;
\r
75 import gnu.trove.map.hash.TLongObjectHashMap;
\r
76 import gnu.trove.procedure.TIntProcedure;
\r
77 import gnu.trove.procedure.TLongObjectProcedure;
\r
78 import gnu.trove.set.hash.TIntHashSet;
\r
80 public class ModelTransferableGraphSourceRequest extends UniqueRead<ModelTransferableGraphSource> {
\r
82 public static String LOG_FILE = "transferableGraph.log";
\r
83 final static boolean LOG = false;
\r
84 final static private boolean DEBUG = false;
\r
85 final static boolean PROFILE = false;
\r
87 private TransferableGraphConfiguration2 configuration;
\r
88 private SubMonitor monitor;
\r
90 static DataOutput log;
\r
96 FileOutputStream stream = new FileOutputStream(LOG_FILE);
\r
97 log = new DataOutputStream(stream);
\r
98 } catch (FileNotFoundException e) {
\r
99 e.printStackTrace();
\r
105 static void log(String line) {
\r
108 log.writeUTF(line + "\n");
\r
109 } catch (IOException e) {
\r
110 e.printStackTrace();
\r
115 public ModelTransferableGraphSourceRequest(TransferableGraphConfiguration2 conf) {
\r
119 public ModelTransferableGraphSourceRequest(IProgressMonitor monitor, TransferableGraphConfiguration2 conf) {
\r
120 this.monitor = SubMonitor.convert(monitor);
\r
121 this.configuration = conf;
\r
127 int statementIndex = 0;
\r
128 TIntIntHashMap ids;
\r
129 TIntArrayList externalParents = new TIntArrayList();
\r
130 ArrayList<String> externalNames = new ArrayList<String>();
\r
135 private SerialisationSupport support;
\r
137 private Resource getResource(int r) throws DatabaseException {
\r
138 return support.getResource(r);
\r
141 public int getInternalId(int r) {
\r
145 public boolean validateExternal(Resource ext) {
\r
146 ExtentStatus status = configuration.preStatus.get(ext);
\r
147 if(status != null) {
\r
148 if(ExtentStatus.INTERNAL.equals(status)) return false;
\r
149 else if(ExtentStatus.EXCLUDED.equals(status)) return false;
\r
156 * @return -2 if r is not really external and the statement should be excluded
\r
159 public int getId(ReadGraph graph, int r) throws DatabaseException {
\r
160 if(ids.containsKey(r)) {
\r
161 int ret = ids.get(r);
\r
163 for(int i=0;i<=indent;++i)
\r
164 System.out.print(" ");
\r
165 System.out.println("Cycle!!!"); // with " + GraphUtils.getReadableName(g, r));
\r
170 if(!validateExternal(getResource(r))) return -2;
\r
171 Collection<Resource> parents = graph.getObjects(getResource(r), L0.PartOf);
\r
172 if(parents.size() != 1) {
\r
173 throw new ValidationException("Reference to external resource "
\r
174 + NameUtils.getSafeName(graph, getResource(r), true) + " without unique uri (" + parents.size() + " parents).");
\r
176 for(Resource p : parents) {
\r
178 int pid = getId(graph, support.getTransientId(p));
\r
179 if(pid == -2) return -2;
\r
180 externalParents.add(pid);
\r
183 externalNames.add((String)graph.getRelatedValue(getResource(r), L0.HasName));
\r
190 public ModelTransferableGraphSource perform(ReadGraph graph) throws DatabaseException {
\r
192 support = graph.getService(SerialisationSupport.class);
\r
194 this.L0 = Layer0.getInstance(graph);
\r
196 long total = System.nanoTime();
\r
197 long startupTime = System.nanoTime();
\r
198 long startupTimeEnd = System.nanoTime();
\r
199 long domainTime = System.nanoTime();
\r
201 String otherStatements = "other" + UUID.randomUUID().toString();
\r
202 String valueFileName = "value" + UUID.randomUUID().toString();
\r
204 File base_ = SimanticsInternal.getTemporaryDirectory();
\r
205 File base = new File(base_, "exports");
\r
208 File otherStatementsFile = new File(base, otherStatements);
\r
209 File valueFile = new File(base, valueFileName);
\r
211 // System.err.println("f: " + otherStatementsFile.getAbsolutePath());
\r
214 DeferredFileOutputStream otherStatementsStream = new DeferredFileOutputStream(1024*1024, otherStatementsFile);
\r
216 DataOutputStream otherStatementsOutput = new DataOutputStream(new BufferedOutputStream(otherStatementsStream, 1024*1024));
\r
217 DeferredBinaryFile valueOutput = new DeferredBinaryFile(valueFile, 1024*1024, 128*1024);
\r
219 ClusterControl cc = graph.getService(ClusterControl.class);
\r
220 ClusterState clusterState = cc.getClusterState();
\r
222 TIntHashSet excludedShared = new TIntHashSet();
\r
224 ids = new TIntIntHashMap(1000, 0.75f);
\r
226 DomainProcessorState state = new DomainProcessorState();
\r
227 state.extensions.putAll(configuration.baseExtensions);
\r
229 state.statementsOutput = otherStatementsOutput;
\r
230 state.valueOutput = valueOutput;
\r
231 state.valueCount = 0;
\r
232 state.excludedShared = excludedShared;
\r
233 state.monitor = monitor;
\r
234 state.valueModifier = composeTGValueModifier(configuration.valueModifiers);
\r
236 getDomain2(graph, configuration, state, configuration.ignoreVirtualResources);
\r
240 cc.restoreClusterState(clusterState);
\r
242 otherStatementsOutput.flush();
\r
243 otherStatementsOutput.close();
\r
245 // Do not close valueOutput, just flush it and reuse it in
\r
246 // ModelTransferableGraphSource for reading.
\r
247 valueOutput.flush();
\r
249 long domainDuration = System.nanoTime() - domainTime;
\r
252 state.otherStatementsInput = toRandomAccessBinary(otherStatementsStream, 128*1024);
\r
253 state.valueInput = toRandomAccessBinary(valueOutput);
\r
254 state.statementsOutput = null;
\r
255 state.valueOutput = null;
\r
257 long totalEnd = System.nanoTime();
\r
260 System.out.println("startup in " + 1e-9*(startupTimeEnd - startupTime) + "s.");
\r
261 System.out.println("domain was found in " + 1e-9*(domainDuration) + "s.");
\r
262 System.out.println("total time for building subgraph was " + 1e-9*(totalEnd-total) + "s.");
\r
265 return getSource(graph, configuration, state, otherStatementsFile, valueFile);
\r
267 } catch (DatabaseException e) {
\r
269 } catch (IOException e) {
\r
270 throw new DatabaseException(e.getMessage(), e);
\r
271 } catch (Throwable e) {
\r
272 dumpHeap("crash.hprof");
\r
273 throw new DatabaseException(e.getMessage(), e);
\r
278 protected ModelTransferableGraphSource getSource(ReadGraph graph, TransferableGraphConfiguration2 configuration, DomainProcessorState state, File otherStatementsFile, File valueFile) throws DatabaseException {
\r
279 return new ModelTransferableGraphSource(graph, configuration, state, otherStatementsFile, valueFile);
\r
282 private TGValueModifier composeTGValueModifier(Collection<TGValueModifier> configuredModifiers) {
\r
283 List<TGValueModifier> valueModifiers = configuredModifiers == null ? new ArrayList<>(2) : new ArrayList<>(configuredModifiers.size() + 2);
\r
284 valueModifiers.add(new ResourceTGValueModifier(support));
\r
285 valueModifiers.add(RevisionTGValueModifier.INSTANCE);
\r
286 return new ComposedTGValueModifier(valueModifiers.toArray(new TGValueModifier[valueModifiers.size()]));
\r
289 private static RandomAccessBinary toRandomAccessBinary(DeferredFileOutputStream stream, int bufferSize) throws IOException {
\r
290 if (stream.isInMemory())
\r
291 return new BinaryMemory(stream.getData());
\r
292 return new BinaryFile(stream.getFile(), bufferSize);
\r
295 private static RandomAccessBinary toRandomAccessBinary(DeferredBinaryFile file) throws IOException {
\r
296 RandomAccessBinary b = file.getBackend();
\r
297 long size = b.position();
\r
299 if (b instanceof BinaryMemory) {
\r
305 public static DomainOnlyProcessor getDomainOnly(RequestProcessor processor, IProgressMonitor monitor, final Resource resource) throws DatabaseException {
\r
306 return getDomainOnly(processor, monitor, Collections.singletonList(resource));
\r
309 public static DomainOnlyProcessor getDomainOnly(RequestProcessor processor, final IProgressMonitor monitor, final Collection<Resource> resources) throws DatabaseException {
\r
311 return processor.syncRequest(new UniqueRead<DomainOnlyProcessor>() {
\r
314 public DomainOnlyProcessor perform(ReadGraph graph) throws DatabaseException {
\r
318 TransferableGraphConfiguration2 conf = TransferableGraphConfiguration2.createWithResources(graph, resources, Collections.<Resource>emptyList());
\r
320 DomainProcessorState state = new DomainProcessorState();
\r
321 state.extensions.putAll(conf.baseExtensions);
\r
322 state.ids = new TIntIntHashMap(1000, 0.75f);
\r
323 state.statementsOutput = new DataOutputStream(new NullOutputStream());
\r
324 state.valueOutput = new NullRandomAccessBinary();
\r
325 state.valueCount = 0;
\r
326 state.excludedShared = new TIntHashSet();
\r
327 state.monitor = SubMonitor.convert(monitor);
\r
329 return getDomainOnly(graph, conf, state, conf.ignoreVirtualResources);
\r
331 } catch (OperationCanceledException e) {
\r
343 public static class DomainOnlyProcessor extends DomainProcessor3 {
\r
345 final Resource instanceOf;
\r
346 final public List<Resource> internals;
\r
347 final public List<Resource> internalTypes;
\r
349 private int counter = 0;
\r
351 public DomainOnlyProcessor(ReadGraph graph, TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
\r
352 super(graph, conf, state, ignoreVirtual);
\r
353 CollectionSupport cs = graph.getService(CollectionSupport.class);
\r
354 internals = cs.createList();
\r
355 internalTypes = cs.createList();
\r
356 instanceOf = Layer0.getInstance(graph).InstanceOf;
\r
360 final public void addToStream(Resource predicate, Resource object) throws DatabaseException {
\r
364 public void flushStatementStream(int sId, DomainProcessorState state) throws IOException {
\r
368 public void processValue(ReadGraph graph, Resource subject, int sId, DomainProcessorState state) throws DatabaseException, IOException {
\r
372 public void processInternal(ReadGraph graph, Resource subject, DirectStatements stms, DomainProcessorState state) throws DatabaseException, IOException {
\r
374 if((counter++ & 1023) == 0)
\r
375 if(state.monitor != null)
\r
376 if(state.monitor.isCanceled())
\r
377 throw new CancelTransactionException();
\r
379 super.processInternal(graph, subject, stms, state);
\r
381 internals.add(subject);
\r
383 Resource singleType = null;
\r
384 for(Statement s : stms) {
\r
385 if(instanceOf.equals(s.getPredicate())) {
\r
386 if(singleType != null) {
\r
387 internalTypes.add(null);
\r
390 singleType = s.getObject();
\r
395 internalTypes.add(singleType);
\r
402 public static DomainOnlyProcessor getDomainOnly(final ReadGraph graph , final TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
\r
404 final DomainOnlyProcessor processor = new DomainOnlyProcessor(graph, conf, state, ignoreVirtual);
\r
405 getDomain2(graph, state, processor);
\r
410 public static DomainProcessor3 getDomain2(final ReadGraph graph , final TransferableGraphConfiguration2 conf, DomainProcessorState state, boolean ignoreVirtual) throws DatabaseException {
\r
412 ITask task = ThreadLogger.getInstance().begin("getDomain2");
\r
414 final DomainProcessor3 processor = new DomainProcessor3(graph, conf, state, ignoreVirtual);
\r
416 getDomain2(graph, state, processor);
\r
418 final SerialisationSupport support = graph.getService(SerialisationSupport.class);
\r
419 final ClusteringSupport cls = graph.getService(ClusteringSupport.class);
\r
420 final Resource indexRoot = processor.conf.indexRoot;
\r
422 if (state.monitor.isCanceled())
\r
423 throw new CancelTransactionException();
\r
425 TLongObjectHashMap<TIntArrayList> clusterMap = new TLongObjectHashMap<TIntArrayList>();
\r
426 for(Resource r : processor.status.keySet()) {
\r
427 ExtentStatus status = processor.status.get(r);
\r
428 int transientId = support.getTransientId(r);
\r
429 if(ExtentStatus.INTERNAL == status) {
\r
430 long cluster = cls.getCluster(r);
\r
431 TIntArrayList list = clusterMap.get(cluster);
\r
433 list = new TIntArrayList();
\r
434 clusterMap.put(cluster, list);
\r
436 list.add(transientId);
\r
437 } else if(ExtentStatus.EXTERNAL == status) {
\r
438 state.externals.add(transientId);
\r
439 } else if(ExtentStatus.PENDING == status) {
\r
440 String uri = graph.getPossibleURI(r);
\r
442 state.externals.add(transientId);
\r
444 System.err.println("Pending status in export: " + NameUtils.getSafeName(graph, r, true) + " (" + graph.getPossibleURI(r) + ")");
\r
448 if (state.monitor.isCanceled())
\r
449 throw new CancelTransactionException();
\r
451 final TIntArrayList clustering = new TIntArrayList();
\r
452 clusterMap.forEachEntry(new TLongObjectProcedure<TIntArrayList>() {
\r
455 public boolean execute(long cluster, TIntArrayList b) {
\r
456 clustering.add(b.size());
\r
457 b.forEach(new TIntProcedure() {
\r
460 public boolean execute(int rId) {
\r
461 processor.ids.put(rId, processor.id++);
\r
471 if (state.monitor.isCanceled())
\r
472 throw new CancelTransactionException();
\r
474 final TIntArrayList clusterSets = new TIntArrayList();
\r
475 clusterMap.forEachEntry(new TLongObjectProcedure<TIntArrayList>() {
\r
478 public boolean execute(long cluster, TIntArrayList b) {
\r
480 Resource clusterSet = cls.getClusterSetOfCluster(cluster);
\r
481 if(clusterSet != null) {
\r
482 int transientId = support.getTransientId(clusterSet);
\r
483 if(processor.ids.containsKey(transientId)) {
\r
484 clusterSets.add(processor.ids.get(transientId));
\r
487 if(graph.getRootLibrary().equals(clusterSet)) {
\r
488 clusterSets.add(Extensions.ROOT_LIBRARY_CLUSTER_SET);
\r
490 } else if (clusterSet.equals(indexRoot)) {
\r
491 clusterSets.add(Extensions.INDEX_ROOT_CLUSTER_SET);
\r
496 } catch (DatabaseException e) {
\r
498 clusterSets.add(Extensions.NO_CLUSTER_SET);
\r
505 state.extensions.put(Extensions.CLUSTERING, new Variant(Bindings.INT_ARRAY, clustering.toArray()));
\r
506 state.extensions.put(Extensions.CLUSTER_SETS, new Variant(Bindings.INT_ARRAY, clusterSets.toArray()));
\r
508 long total = processor.startupTime + processor.expandTime
\r
509 + processor.classifyPredicateTime
\r
510 + processor.processFringeTime + processor.extentSeedTime
\r
511 + processor.fullResolveTime + processor.fastResolveTime +
\r
512 + processor.parentResolveTime + processor.otherStatementTime;
\r
515 System.out.println("startup took " + 1e-9 * processor.startupTime + "s.");
\r
516 System.out.println("expand took " + 1e-9 * processor.expandTime + "s.");
\r
517 System.out.println("classifyPredicates took " + 1e-9 * processor.classifyPredicateTime + "s.");
\r
518 System.out.println("processFringe took " + 1e-9 * processor.processFringeTime + "s.");
\r
519 System.out.println("extentSeeding took " + 1e-9 * processor.extentSeedTime + "s.");
\r
520 System.out.println("fullResolve took " + 1e-9 * processor.fullResolveTime + "s.");
\r
521 System.out.println("fastResolve took " + 1e-9 * processor.fastResolveTime + "s.");
\r
522 System.out.println("parentResolve took " + 1e-9 * processor.parentResolveTime + "s.");
\r
523 System.out.println("otherStatements took " + 1e-9 * processor.otherStatementTime + "s.");
\r
524 System.out.println("value output took " + 1e-9 * processor.valueOutputTime + "s.");
\r
525 System.out.println("statement output took " + 1e-9 * processor.statementOutputTime + "s.");
\r
526 System.out.println("total " + 1e-9 * total + "s.");
\r
535 public static DomainProcessor3 getDomain2(final ReadGraph graph , DomainProcessorState state, final DomainProcessor3 processor) throws DatabaseException {
\r
536 processor.process(graph, state);
\r
540 static class Expansion3 extends UniqueRead<Collection<DirectStatements>> {
\r
542 final private Collection<Resource> roots;
\r
543 final boolean ignoreVirtual;
\r
545 public Expansion3(Collection<Resource> roots, boolean ignoreVirtual) {
\r
546 this.roots = roots;
\r
547 this.ignoreVirtual = ignoreVirtual;
\r
551 public Collection<DirectStatements> perform(ReadGraph graph) {
\r
553 ArrayList<DirectStatements> result = new ArrayList<DirectStatements>();
\r
555 final DirectQuerySupport dqs = graph.getService(DirectQuerySupport.class);
\r
557 final DomainStatementProcedure3 proc = new DomainStatementProcedure3(result);
\r
559 if (ignoreVirtual) {
\r
560 for(Resource r : roots) {
\r
561 dqs.forEachDirectPersistentStatement(graph, r, proc);
\r
564 for(Resource r : roots) {
\r
565 dqs.forEachDirectStatement(graph, r, proc);
\r
574 private static void dumpHeap(String path) {
\r
577 Object bean = getBean();
\r
581 Method m = bean.getClass().getMethod("dumpHeap", String.class, boolean.class);
\r
582 m.invoke(bean, path, true);
\r
584 } catch (IllegalArgumentException e) {
\r
585 } catch (IllegalAccessException e) {
\r
586 } catch (SecurityException e) {
\r
587 } catch (NoSuchMethodException e) {
\r
588 } catch (InvocationTargetException e) {
\r
594 private static Object getBean() {
\r
595 Class<?> beanClass = getBeanClass();
\r
596 if (beanClass == null)
\r
599 Object bean = ManagementFactory.newPlatformMXBeanProxy(
\r
600 ManagementFactory.getPlatformMBeanServer(),
\r
601 "com.sun.management:type=HotSpotDiagnostic",
\r
604 } catch (IOException e) {
\r
609 private static Class<?> getBeanClass() {
\r
611 Class<?> clazz = Class.forName("com.sun.management.HotSpotDiagnosticMXBean");
\r
613 } catch (ClassNotFoundException e) {
\r
618 public static void main(String[] args) {
\r
622 Datatype dt = Datatypes.translate("{ parts : ( | ResourceRVIPart { role : |CHILD|PROPERTY, resource : Long(unit=\"resource\") } | StringRVIPart { role : |CHILD|PROPERTY, string : String } ) [] }");
\r
623 Binding b = Bindings.getBinding(dt);
\r
624 Object value = b.createDefault();
\r
625 Variant variant = new Variant(b, value);
\r
626 TGResourceUtil util = new TGResourceUtil();
\r
627 LongAdapter la = new LongAdapter() {
\r
629 public long adapt(long in) {
\r
633 util.adaptValue( variant.getBinding(), variant.getValue(), la );
\r
635 } catch (DataTypeSyntaxError e) {
\r
636 e.printStackTrace();
\r
637 } catch (BindingException e) {
\r
638 e.printStackTrace();
\r
639 } catch (RuntimeSerializerConstructionException e) {
\r
640 e.printStackTrace();
\r
641 } catch (AccessorException e) {
\r
642 e.printStackTrace();
\r