From df73f31702b7256c0f06a94cd553b637220d830a Mon Sep 17 00:00:00 2001 From: Tuukka Lehtonen Date: Wed, 19 Apr 2017 22:23:13 +0300 Subject: [PATCH] Less memory use for TG import when NamespaceMigrationStep is involved MigrationStateImpl.getProperty for root resource(s) keys now first exports CURRENT_TG back to disk if the TG has already been loaded and then imports it back using StreamingTransferableGraphImportProcess. This avoids the need to keep the whole TransferableGraph1 in memory while doing the import and also allows for the use of the leaner Streaming import process. refs #7153 Change-Id: I37e6a30ec8d31452a66f15378e446efbfb811ffa --- .../layer0/migration/MigrationStateImpl.java | 261 ++++++++++-------- ...reamingTransferableGraphImportProcess.java | 44 ++- .../graph/db/TransferableGraphs.java | 2 +- 3 files changed, 177 insertions(+), 130 deletions(-) diff --git a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/migration/MigrationStateImpl.java b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/migration/MigrationStateImpl.java index bf860e44d..92779b973 100644 --- a/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/migration/MigrationStateImpl.java +++ b/bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/migration/MigrationStateImpl.java @@ -11,6 +11,7 @@ *******************************************************************************/ package org.simantics.db.layer0.migration; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.text.DateFormat; @@ -24,6 +25,7 @@ import org.eclipse.core.runtime.IProgressMonitor; import org.simantics.databoard.Bindings; import org.simantics.databoard.binding.mutable.Variant; import org.simantics.databoard.container.DataContainer; +import org.simantics.databoard.container.DataContainers; import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.Session; @@ -37,6 +39,8 @@ import org.simantics.db.layer0.adapter.impl.DefaultPasteImportAdvisor; import org.simantics.db.layer0.internal.SimanticsInternal; import org.simantics.db.layer0.util.Layer0Utils; import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest; +import org.simantics.db.layer0.util.TGProgressMonitor; +import org.simantics.db.layer0.util.TGTransferableGraphSource; import org.simantics.db.layer0.util.TransferableGraphConfiguration2; import org.simantics.db.request.Read; import org.simantics.db.service.ManagementSupport; @@ -49,10 +53,10 @@ import org.simantics.graph.db.TransferableGraphImporter; import org.simantics.graph.db.TransferableGraphSource; import org.simantics.graph.db.TransferableGraphs; import org.simantics.graph.db.WrapperAdvisor; -import org.simantics.graph.representation.ByteFileReader; import org.simantics.graph.representation.TransferableGraph1; import org.simantics.graph.representation.TransferableGraphFileReader; import org.simantics.layer0.Layer0; +import org.simantics.utils.logging.TimeLogger; public class MigrationStateImpl implements MigrationState { @@ -104,7 +108,9 @@ public class MigrationStateImpl implements MigrationState { try { File modelFile = getProperty(MigrationStateKeys.MODEL_FILE); reader = new TransferableGraphFileReader(modelFile); + TimeLogger.log(MigrationStateImpl.class, "reading TG into memory from " + modelFile); TransferableGraph1 tg = reader.readTG(); + TimeLogger.log(MigrationStateImpl.class, "read TG into memory from " + modelFile); setProperty(MigrationStateKeys.CURRENT_TG, tg); return (T)tg; } catch (DatabaseException e) { @@ -118,20 +124,7 @@ public class MigrationStateImpl implements MigrationState { } else if (MigrationStateKeys.CURRENT_TGS.equals(key)) { File modelFile = getProperty(MigrationStateKeys.MODEL_FILE); - - try { - StreamingTransferableGraphFileReader reader = new StreamingTransferableGraphFileReader(modelFile); - TransferableGraphSource tgs = reader.readTG(); - setProperty(MigrationStateKeys.CURRENT_TGS_READER, reader); - setProperty(MigrationStateKeys.CURRENT_TGS, tgs); - return (T)tgs; - } catch (DatabaseException e) { - throw e; - } catch (IOException e) { - throw new DatabaseException("An I/O exception occurred during reading '" + modelFile.getAbsolutePath() + "'", e); - } catch (Throwable t) { - throw new DatabaseException(t); - } + return (T) initializeTransferableGraphSource(modelFile); } else if (MigrationStateKeys.CURRENT_DATA_CONTAINER.equals(key)) { @@ -164,118 +157,22 @@ public class MigrationStateImpl implements MigrationState { } } else if (MigrationStateKeys.CURRENT_RESOURCE.equals(key) || MigrationStateKeys.CURRENT_ROOT_RESOURCES.equals(key)) { - + final Session session = getProperty(MigrationStateKeys.SESSION); final IProgressMonitor monitor = probeProperty(MigrationStateKeys.PROGRESS_MONITOR); final boolean updateDependencies = MigrationUtils.getProperty(this, MigrationStateKeys.UPDATE_DEPENDENCIES, Boolean.TRUE); - - final TransferableGraph1 tg = probeProperty(MigrationStateKeys.CURRENT_TG); - if(tg != null) { - - final Resource indexRoot = session.syncRequest(new WriteResultRequest() { - @Override - public Resource perform(WriteGraph graph) throws DatabaseException { - if(!updateDependencies) - Layer0Utils.setDependenciesIndexingDisabled(graph, true); - return createTemporaryRoot(graph); - } - }); - - IImportAdvisor baseAdvisor = MigrationUtils.getProperty(this, MigrationStateKeys.IMPORT_ADVISOR, new DefaultPasteImportAdvisor(indexRoot)); - IImportAdvisor2 advisor = new WrapperAdvisor(baseAdvisor) { - @Override - public void beforeWrite(WriteOnlyGraph graph, TransferableGraphImporter process) throws DatabaseException { - super.beforeWrite(graph, process); - if(!updateDependencies) - Layer0Utils.setDependenciesIndexingDisabled(graph, true); - } - @Override - public void afterWrite(WriteOnlyGraph graph, TransferableGraphImporter process) throws DatabaseException { - super.afterWrite(graph, process); - Boolean storeResources = probeProperty(MigrationStateKeys.GET_RESOURCE_IDS); - if(storeResources != null && storeResources) { - long[] ids = process.getResourceIds(session.getService(SerialisationSupport.class)); - setProperty(MigrationStateKeys.RESOURCE_IDS, ids); - } - } - }; - // Make sure that the supplied advisor is redirected to temp - advisor.redirect(indexRoot); - - TransferableGraphs.importGraph1WithMonitor(session, tg, advisor, new TGStatusMonitor() { - @Override - public void status(int percentage) { - monitor.subTask("Importing model from file (" + percentage + "%)"); - } - @Override - public boolean isCanceled() { - return monitor.isCanceled(); - } - }); - - setProperty(MigrationStateKeys.CURRENT_RESOURCE, indexRoot); - setProperty(MigrationStateKeys.CURRENT_ROOT_RESOURCES, new ArrayList<>(advisor.getRoots())); - setProperty(MigrationStateKeys.DATABASE_REVISION_AFTER_TG_IMPORT, session.getService(ManagementSupport.class).getHeadRevisionId()); - setProperty(MigrationStateKeys.CURRENT_TG, null); - return getProperty(key); - } - - final TransferableGraphSource tgs = getProperty(MigrationStateKeys.CURRENT_TGS); - if(tgs != null) { - - final Resource indexRoot = session.syncRequest(new WriteResultRequest() { - @Override - public Resource perform(WriteGraph graph) throws DatabaseException { - if(!updateDependencies) - Layer0Utils.setDependenciesIndexingDisabled(graph, true); - return createTemporaryRoot(graph); - } - }); - - IImportAdvisor baseAdvisor = MigrationUtils.getProperty(this, MigrationStateKeys.IMPORT_ADVISOR, new DefaultPasteImportAdvisor(indexRoot)); - IImportAdvisor2 advisor = new WrapperAdvisor(baseAdvisor) { - @Override - public Resource getTarget() { - return indexRoot; - } - @Override - public void beforeWrite(WriteOnlyGraph graph, TransferableGraphImporter process) throws DatabaseException { - super.beforeWrite(graph, process); - if(!updateDependencies) - Layer0Utils.setDependenciesIndexingDisabled(graph, true); - } - @Override - public void afterWrite(WriteOnlyGraph graph, TransferableGraphImporter process) throws DatabaseException { - super.afterWrite(graph, process); - Boolean storeResources = probeProperty(MigrationStateKeys.GET_RESOURCE_IDS); - if(storeResources != null && storeResources) { - long[] ids = process.getResourceIds(session.getService(SerialisationSupport.class)); - setProperty(MigrationStateKeys.RESOURCE_IDS, ids); - } - } - }; - // Make sure that the supplied advisor is redirected to temp - advisor.redirect(indexRoot); - - TransferableGraphs.importGraph1(session, tgs, advisor, new TGStatusMonitor() { - @Override - public void status(int percentage) { - monitor.subTask("Importing model from file (" + percentage + "%)"); - } - @Override - public boolean isCanceled() { - return monitor.isCanceled(); - } - }); - - setProperty(MigrationStateKeys.CURRENT_RESOURCE, indexRoot); - setProperty(MigrationStateKeys.CURRENT_ROOT_RESOURCES, new ArrayList<>(advisor.getRoots())); - setProperty(MigrationStateKeys.DATABASE_REVISION_AFTER_TG_IMPORT, session.getService(ManagementSupport.class).getHeadRevisionId()); - setProperty(MigrationStateKeys.CURRENT_TG, null); + File temporaryTg = exportCurrentTgAsTemporaryFile(session, monitor); + if (temporaryTg != null) + setProperty(MigrationStateKeys.CURRENT_TGS, initializeTransferableGraphSource(temporaryTg)); + TransferableGraphSource tgs = getProperty(MigrationStateKeys.CURRENT_TGS); + if (tgs != null) { + importTransferableGraphSource(monitor, session, updateDependencies, tgs); + // Delete temporary file if necessary + if (temporaryTg != null) + temporaryTg.delete(); return getProperty(key); - } } else if (MigrationStateKeys.UPDATE_DEPENDENCIES.equals(key)) { @@ -304,7 +201,7 @@ public class MigrationStateImpl implements MigrationState { } } - private static void uncheckedClose(ByteFileReader closeable) { + private static void uncheckedClose(Closeable closeable) { try { if (closeable != null) closeable.close(); @@ -329,4 +226,124 @@ public class MigrationStateImpl implements MigrationState { return indexRoot; } + private File exportCurrentTgAsTemporaryFile(Session session, IProgressMonitor monitor) throws DatabaseException { + TransferableGraph1 tg = probeProperty(MigrationStateKeys.CURRENT_TG); + if (tg == null) + return null; + + try { + // Write TG back to disk and initialize CURRENT_TGS for the migrated TG. + File modelFile = getProperty(MigrationStateKeys.MODEL_FILE); + File tempFile = File.createTempFile("temporary-tgs", ".tg", SimanticsInternal.getTemporaryDirectory()); + TimeLogger.log(MigrationStateImpl.class, "export temporary TG " + tempFile); + + DataContainer dc = DataContainers.readHeader(modelFile); + TransferableGraphs.writeTransferableGraph(session, dc.format, dc.version, dc.metadata, + new TGTransferableGraphSource(tg), + tempFile, + new TGExportMonitor(monitor, "Exporting temporary transferable graph")); + + // Allow potentially large TG structure to be GC'ed. + setProperty(MigrationStateKeys.CURRENT_TG, null); + + TimeLogger.log(MigrationStateImpl.class, "export temporary TG done " + tempFile); + return tempFile; + } catch (Exception e) { + throw new DatabaseException(e); + } + } + + private TransferableGraphSource initializeTransferableGraphSource(File dataContainer) throws DatabaseException { + try { + StreamingTransferableGraphFileReader reader = new StreamingTransferableGraphFileReader(dataContainer); + TransferableGraphSource tgs = reader.readTG(); + setProperty(MigrationStateKeys.CURRENT_TGS_READER, reader); + setProperty(MigrationStateKeys.CURRENT_TGS, tgs); + return tgs; + } catch (DatabaseException e) { + throw e; + } catch (IOException e) { + throw new DatabaseException("An I/O exception occurred during reading '" + dataContainer.getAbsolutePath() + "'", e); + } catch (Throwable t) { + throw new DatabaseException(t); + } + } + + private void importTransferableGraphSource(IProgressMonitor monitor, Session session, boolean updateDependencies, TransferableGraphSource tgs) throws DatabaseException { + TimeLogger.log(MigrationStateImpl.class, "import TGS " + tgs); + final Resource indexRoot = session.syncRequest(new WriteResultRequest() { + @Override + public Resource perform(WriteGraph graph) throws DatabaseException { + if(!updateDependencies) + Layer0Utils.setDependenciesIndexingDisabled(graph, true); + return createTemporaryRoot(graph); + } + }); + + IImportAdvisor baseAdvisor = MigrationUtils.getProperty(this, MigrationStateKeys.IMPORT_ADVISOR, new DefaultPasteImportAdvisor(indexRoot)); + IImportAdvisor2 advisor = new WrapperAdvisor(baseAdvisor) { + @Override + public Resource getTarget() { + return indexRoot; + } + @Override + public void beforeWrite(WriteOnlyGraph graph, TransferableGraphImporter process) throws DatabaseException { + super.beforeWrite(graph, process); + if(!updateDependencies) + Layer0Utils.setDependenciesIndexingDisabled(graph, true); + } + @Override + public void afterWrite(WriteOnlyGraph graph, TransferableGraphImporter process) throws DatabaseException { + super.afterWrite(graph, process); + Boolean storeResources = probeProperty(MigrationStateKeys.GET_RESOURCE_IDS); + if(storeResources != null && storeResources) { + long[] ids = process.getResourceIds(session.getService(SerialisationSupport.class)); + setProperty(MigrationStateKeys.RESOURCE_IDS, ids); + } + } + }; + + // Make sure that the supplied advisor is redirected to temp + advisor.redirect(indexRoot); + + String task = "Importing model into database"; + monitor.subTask(task); + TransferableGraphs.importGraph1(session, tgs, advisor, new TGImportMonitor(monitor, task)); + + setProperty(MigrationStateKeys.CURRENT_RESOURCE, indexRoot); + setProperty(MigrationStateKeys.CURRENT_ROOT_RESOURCES, new ArrayList<>(advisor.getRoots())); + setProperty(MigrationStateKeys.DATABASE_REVISION_AFTER_TG_IMPORT, session.getService(ManagementSupport.class).getHeadRevisionId()); + TimeLogger.log(MigrationStateImpl.class, "imported TGS " + tgs); + } + + + static class TGImportMonitor implements TGStatusMonitor { + private final IProgressMonitor monitor; + private final String message; + public TGImportMonitor(IProgressMonitor monitor, String message) { + this.monitor = monitor; + this.message = message; + } + @Override + public void status(int percentage) { + monitor.subTask(message + " (" + percentage + "%)"); + } + @Override + public boolean isCanceled() { + return monitor.isCanceled(); + } + } + + static class TGExportMonitor extends TGProgressMonitor { + private final String message; + public TGExportMonitor(IProgressMonitor monitor, String message) { + super(monitor); + this.message = message; + } + @Override + protected void workDone(int percentage) { + monitor.subTask(message + " (" + percentage + "%)"); + } + } + } diff --git a/bundles/org.simantics.graph.db/src/org/simantics/graph/db/StreamingTransferableGraphImportProcess.java b/bundles/org.simantics.graph.db/src/org/simantics/graph/db/StreamingTransferableGraphImportProcess.java index 48cebf342..c802f8b74 100644 --- a/bundles/org.simantics.graph.db/src/org/simantics/graph/db/StreamingTransferableGraphImportProcess.java +++ b/bundles/org.simantics.graph.db/src/org/simantics/graph/db/StreamingTransferableGraphImportProcess.java @@ -93,6 +93,7 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap TransferableGraphSource tg; VirtualGraph vg; IImportAdvisor2 advisor; + TGStatusMonitor monitor; ClusterBuilder2 builder; final TGResourceUtil resourceUtil = new TGResourceUtil(); @@ -114,13 +115,29 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap Resource PartOf; Resource HasName; Resource NameOf; - + public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor) { + this(session, vg, tg, advisor, null); + } + + public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor, TGStatusMonitor monitor) { this.tg = tg; this.vg = vg; this.advisor = advisor; + this.monitor = monitor; } - + + private int updatePercentage(int percentage, int done, int total) { + if (monitor != null && (done & 63) == 0) { + int current = 100*done / total; + if (current > percentage) { + percentage = current; + monitor.status(percentage); + } + } + return percentage; + } + public void readIdentities(ReadGraph g) throws Exception { extensions = tg.getExtensions(); resourceCount = tg.getResourceCount(); @@ -465,8 +482,11 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap } } } - - tg.getStatementCount(); + + int[] done = { 0 }; + int[] percentage = { 0 }; + + int statementCount = tg.getStatementCount(); tg.forStatements(null, new TransferableGraphSourceProcedure() { @Override @@ -486,12 +506,16 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap int inverse = handles[inv]; builder.addStatement(graph, object, inverse, subject); } - + + // Count from 0% -> 50% => total = statementCount*2 + percentage[0] = updatePercentage(percentage[0], done[0]++, statementCount*2); + } }); - tg.getValueCount(); + int valueCount = tg.getValueCount(); + done[0] = 0; class ValueProcedure extends InputStream implements TransferableGraphSourceValueProcedure { @@ -532,7 +556,8 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap s.skip(this); } builder.endValue(); - + worked(); + } @Override @@ -552,8 +577,13 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap for (int i = 0; i < length; ++i) builder.appendValue(input.readUnsignedByte()); builder.endValue(); + worked(); } + private void worked() { + // Count from 50% -> 100% => [valueCount, valueCount*2) + percentage[0] = updatePercentage(percentage[0], valueCount + done[0]++, valueCount*2); + } }; tg.forValues2(null, new ValueProcedure()); diff --git a/bundles/org.simantics.graph.db/src/org/simantics/graph/db/TransferableGraphs.java b/bundles/org.simantics.graph.db/src/org/simantics/graph/db/TransferableGraphs.java index cd9111ec1..4f9ea5904 100644 --- a/bundles/org.simantics.graph.db/src/org/simantics/graph/db/TransferableGraphs.java +++ b/bundles/org.simantics.graph.db/src/org/simantics/graph/db/TransferableGraphs.java @@ -185,7 +185,7 @@ public class TransferableGraphs { final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_); - final StreamingTransferableGraphImportProcess process = new StreamingTransferableGraphImportProcess(session, vg, tg, advisor); + final StreamingTransferableGraphImportProcess process = new StreamingTransferableGraphImportProcess(session, vg, tg, advisor, monitor); session.syncRequest(new ReadRequest() { @Override public void run(ReadGraph graph) throws DatabaseException { -- 2.47.1