]> gerrit.simantics Code Review - simantics/platform.git/commitdiff
Less memory use for TG import when NamespaceMigrationStep is involved 33/433/1
authorTuukka Lehtonen <tuukka.lehtonen@semantum.fi>
Wed, 19 Apr 2017 20:08:10 +0000 (23:08 +0300)
committerTuukka Lehtonen <tuukka.lehtonen@semantum.fi>
Wed, 19 Apr 2017 20:12:07 +0000 (23:12 +0300)
MigrationStateImpl.getProperty for root resource(s) keys now first
exports CURRENT_TG back to disk if the TG has already been loaded and
then imports it back using StreamingTransferableGraphImportProcess. This
avoids the need to keep the whole TransferableGraph1 in memory while
doing the import and also allows for the use of the leaner Streaming
import process.

refs #7153

Change-Id: I2ef6ae055cd8a497263c3aa3e95c86ff29a96f3e

bundles/org.simantics.db.layer0/src/org/simantics/db/layer0/migration/MigrationStateImpl.java
bundles/org.simantics.graph.db/src/org/simantics/graph/db/StreamingTransferableGraphImportProcess.java
bundles/org.simantics.graph.db/src/org/simantics/graph/db/TransferableGraphs.java

index 2b19da58607dd66fa9368296ccf99a11100f30c2..936f423833718bd301e15208aabe945873a6fa47 100644 (file)
@@ -11,6 +11,7 @@
  *******************************************************************************/
 package org.simantics.db.layer0.migration;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.text.DateFormat;
@@ -24,6 +25,7 @@ import org.eclipse.core.runtime.IProgressMonitor;
 import org.simantics.databoard.Bindings;
 import org.simantics.databoard.binding.mutable.Variant;
 import org.simantics.databoard.container.DataContainer;
+import org.simantics.databoard.container.DataContainers;
 import org.simantics.db.ReadGraph;
 import org.simantics.db.Resource;
 import org.simantics.db.Session;
@@ -37,6 +39,7 @@ import org.simantics.db.layer0.adapter.impl.DefaultPasteImportAdvisor;
 import org.simantics.db.layer0.internal.SimanticsInternal;
 import org.simantics.db.layer0.util.Layer0Utils;
 import org.simantics.db.layer0.util.ModelTransferableGraphSourceRequest;
+import org.simantics.db.layer0.util.TGProgressMonitor;
 import org.simantics.db.layer0.util.TGTransferableGraphSource;
 import org.simantics.db.layer0.util.TransferableGraphConfiguration2;
 import org.simantics.db.request.Read;
@@ -51,10 +54,10 @@ import org.simantics.graph.db.TransferableGraphImporter;
 import org.simantics.graph.db.TransferableGraphSource;
 import org.simantics.graph.db.TransferableGraphs;
 import org.simantics.graph.db.WrapperAdvisor;
-import org.simantics.graph.representation.ByteFileReader;
 import org.simantics.graph.representation.TransferableGraph1;
 import org.simantics.graph.representation.TransferableGraphFileReader;
 import org.simantics.layer0.Layer0;
+import org.simantics.utils.logging.TimeLogger;
 
 public class MigrationStateImpl implements MigrationState {
 
@@ -106,7 +109,9 @@ public class MigrationStateImpl implements MigrationState {
             try {
                 File modelFile = getProperty(MigrationStateKeys.MODEL_FILE);
                 reader = new TransferableGraphFileReader(modelFile);
+                TimeLogger.log(MigrationStateImpl.class, "reading TG into memory from " + modelFile);
                 TransferableGraph1 tg = reader.readTG();
+                TimeLogger.log(MigrationStateImpl.class, "read TG into memory from " + modelFile);
                 setProperty(MigrationStateKeys.CURRENT_TG, tg);
                 return (T)tg;
             } catch (DatabaseException e) {
@@ -120,20 +125,7 @@ public class MigrationStateImpl implements MigrationState {
         } else if (MigrationStateKeys.CURRENT_TGS.equals(key)) {
 
             File modelFile = getProperty(MigrationStateKeys.MODEL_FILE);
-            
-            try {
-                StreamingTransferableGraphFileReader reader = new StreamingTransferableGraphFileReader(modelFile);
-                TransferableGraphSource tgs = reader.readTG();
-                setProperty(MigrationStateKeys.CURRENT_TGS_READER, reader);
-                setProperty(MigrationStateKeys.CURRENT_TGS, tgs);
-                return (T)tgs;
-            } catch (DatabaseException e) {
-                throw e;
-            } catch (IOException e) {
-               throw new DatabaseException("An I/O exception occurred during reading '" + modelFile.getAbsolutePath() + "'", e);
-            } catch (Throwable t) {
-                throw new DatabaseException(t);
-            }
+            return (T) initializeTransferableGraphSource(modelFile);
 
         } else if (MigrationStateKeys.CURRENT_DATA_CONTAINER.equals(key)) {
             
@@ -166,120 +158,22 @@ public class MigrationStateImpl implements MigrationState {
             }
 
         } else if (MigrationStateKeys.CURRENT_RESOURCE.equals(key) || MigrationStateKeys.CURRENT_ROOT_RESOURCES.equals(key)) {
-            
+
             final Session session = getProperty(MigrationStateKeys.SESSION);
             final IProgressMonitor monitor = probeProperty(MigrationStateKeys.PROGRESS_MONITOR);
             final boolean updateDependencies = MigrationUtils.getProperty(this, MigrationStateKeys.UPDATE_DEPENDENCIES, Boolean.TRUE);
-            
-            final TransferableGraph1 tg = probeProperty(MigrationStateKeys.CURRENT_TG);
-            if(tg != null) {
-               
-                final Resource indexRoot = session.syncRequest(new WriteResultRequest<Resource>() {
-                    @Override
-                    public Resource perform(WriteGraph graph) throws DatabaseException {
-                        if(!updateDependencies)
-                            Layer0Utils.setDependenciesIndexingDisabled(graph, true);
-                        return createTemporaryRoot(graph);
-                    }
-                });
-
-                IImportAdvisor baseAdvisor = MigrationUtils.getProperty(this, MigrationStateKeys.IMPORT_ADVISOR, new DefaultPasteImportAdvisor(indexRoot));
-                IImportAdvisor2 advisor = new WrapperAdvisor(baseAdvisor) {
-                       @Override
-                    public void beforeWrite(WriteOnlyGraph graph, TransferableGraphImporter process) throws DatabaseException {
-                       super.beforeWrite(graph, process);
-                        if(!updateDependencies)
-                            Layer0Utils.setDependenciesIndexingDisabled(graph, true);
-                    }
-                       @Override
-                    public void afterWrite(WriteOnlyGraph graph, TransferableGraphImporter process) throws DatabaseException {
-                       super.afterWrite(graph, process);
-                       Boolean storeResources = probeProperty(MigrationStateKeys.GET_RESOURCE_IDS);
-                       if(storeResources != null && storeResources) {
-                               long[] ids = process.getResourceIds(session.getService(SerialisationSupport.class));
-                               setProperty(MigrationStateKeys.RESOURCE_IDS, ids);
-                       }
-                    }
-                               };
-                               // Make sure that the supplied advisor is redirected to temp
-                               advisor.redirect(indexRoot);
-
-                ImportResult ir = TransferableGraphs.importGraph1(session, new TGTransferableGraphSource(tg), advisor, new TGStatusMonitor() {
-                    @Override
-                    public void status(int percentage) {
-                        monitor.subTask("Importing model from file (" + percentage + "%)");
-                    }
-                    @Override
-                    public boolean isCanceled() {
-                        return monitor.isCanceled();
-                    }
-                });
-
-                setProperty(MigrationStateKeys.IMPORT_RESULT, ir);
-                setProperty(MigrationStateKeys.CURRENT_RESOURCE, indexRoot);
-                setProperty(MigrationStateKeys.CURRENT_ROOT_RESOURCES, new ArrayList<>(advisor.getRoots()));
-                setProperty(MigrationStateKeys.DATABASE_REVISION_AFTER_TG_IMPORT, session.getService(ManagementSupport.class).getHeadRevisionId());
-                setProperty(MigrationStateKeys.CURRENT_TG, null);
-
-                return getProperty(key);
-            }
 
-            final TransferableGraphSource tgs = getProperty(MigrationStateKeys.CURRENT_TGS);
-            if(tgs != null) {
-
-                final Resource indexRoot = session.syncRequest(new WriteResultRequest<Resource>() {
-                    @Override
-                    public Resource perform(WriteGraph graph) throws DatabaseException {
-                        if(!updateDependencies)
-                            Layer0Utils.setDependenciesIndexingDisabled(graph, true);
-                        return createTemporaryRoot(graph);
-                    }
-                });
-
-                IImportAdvisor baseAdvisor = MigrationUtils.getProperty(this, MigrationStateKeys.IMPORT_ADVISOR, new DefaultPasteImportAdvisor(indexRoot));
-                IImportAdvisor2 advisor = new WrapperAdvisor(baseAdvisor) {
-                       @Override
-                       public Resource getTarget() {
-                               return indexRoot;
-                       }
-                       @Override
-                    public void beforeWrite(WriteOnlyGraph graph, TransferableGraphImporter process) throws DatabaseException {
-                       super.beforeWrite(graph, process);
-                        if(!updateDependencies)
-                            Layer0Utils.setDependenciesIndexingDisabled(graph, true);
-                    }
-                       @Override
-                    public void afterWrite(WriteOnlyGraph graph, TransferableGraphImporter process) throws DatabaseException {
-                       super.afterWrite(graph, process);
-                       Boolean storeResources = probeProperty(MigrationStateKeys.GET_RESOURCE_IDS);
-                       if(storeResources != null && storeResources) {
-                               long[] ids = process.getResourceIds(session.getService(SerialisationSupport.class));
-                               setProperty(MigrationStateKeys.RESOURCE_IDS, ids);
-                       }
-                    }
-                               };
-                               // Make sure that the supplied advisor is redirected to temp
-                               advisor.redirect(indexRoot);
-                               
-                ImportResult ir = TransferableGraphs.importGraph1(session, tgs, advisor, new TGStatusMonitor() {
-                    @Override
-                    public void status(int percentage) {
-                        monitor.subTask("Importing model from file (" + percentage + "%)");
-                    }
-                    @Override
-                    public boolean isCanceled() {
-                        return monitor.isCanceled();
-                    }
-                });
-
-                setProperty(MigrationStateKeys.IMPORT_RESULT, ir);
-                setProperty(MigrationStateKeys.CURRENT_RESOURCE, indexRoot);
-                setProperty(MigrationStateKeys.CURRENT_ROOT_RESOURCES, new ArrayList<>(advisor.getRoots()));
-                setProperty(MigrationStateKeys.DATABASE_REVISION_AFTER_TG_IMPORT, session.getService(ManagementSupport.class).getHeadRevisionId());
-                setProperty(MigrationStateKeys.CURRENT_TG, null);
+            File temporaryTg = exportCurrentTgAsTemporaryFile(session, monitor);
+            if (temporaryTg != null)
+                setProperty(MigrationStateKeys.CURRENT_TGS, initializeTransferableGraphSource(temporaryTg));
 
+            TransferableGraphSource tgs = getProperty(MigrationStateKeys.CURRENT_TGS);
+            if (tgs != null) {
+                importTransferableGraphSource(monitor, session, updateDependencies, tgs);
+                // Delete temporary file if necessary
+                if (temporaryTg != null)
+                    temporaryTg.delete();
                 return getProperty(key);
-
             }
 
         } else if (MigrationStateKeys.UPDATE_DEPENDENCIES.equals(key)) {
@@ -308,7 +202,7 @@ public class MigrationStateImpl implements MigrationState {
                }
        }
 
-       private static void uncheckedClose(ByteFileReader closeable) {
+       private static void uncheckedClose(Closeable closeable) {
                try {
                        if (closeable != null)
                                closeable.close();
@@ -333,4 +227,125 @@ public class MigrationStateImpl implements MigrationState {
         return indexRoot;
     }
 
+       private File exportCurrentTgAsTemporaryFile(Session session, IProgressMonitor monitor) throws DatabaseException {
+               TransferableGraph1 tg = probeProperty(MigrationStateKeys.CURRENT_TG);
+               if (tg == null)
+                       return null;
+
+               try {
+                       // Write TG back to disk and initialize CURRENT_TGS for the migrated TG.
+                       File modelFile = getProperty(MigrationStateKeys.MODEL_FILE);
+                       File tempFile = File.createTempFile("temporary-tgs", ".tg", SimanticsInternal.getTemporaryDirectory());
+                       TimeLogger.log(MigrationStateImpl.class, "export temporary TG " + tempFile);
+
+                       DataContainer dc = DataContainers.readHeader(modelFile);
+                       TransferableGraphs.writeTransferableGraph(session, dc.format, dc.version, dc.metadata,
+                                       new TGTransferableGraphSource(tg),
+                                       tempFile,
+                                       new TGExportMonitor(monitor, "Exporting temporary transferable graph"));
+
+                       // Allow potentially large TG structure to be GC'ed.
+                       setProperty(MigrationStateKeys.CURRENT_TG, null);
+
+                       TimeLogger.log(MigrationStateImpl.class, "export temporary TG done " + tempFile);
+                       return tempFile;
+               } catch (Exception e) {
+                       throw new DatabaseException(e);
+               }
+       }
+
+       private TransferableGraphSource initializeTransferableGraphSource(File dataContainer) throws DatabaseException {
+               try {
+                       StreamingTransferableGraphFileReader reader = new StreamingTransferableGraphFileReader(dataContainer);
+                       TransferableGraphSource tgs = reader.readTG();
+                       setProperty(MigrationStateKeys.CURRENT_TGS_READER, reader);
+                       setProperty(MigrationStateKeys.CURRENT_TGS, tgs);
+                       return tgs;
+               } catch (DatabaseException e) {
+                       throw e;
+               } catch (IOException e) {
+                       throw new DatabaseException("An I/O exception occurred during reading '" + dataContainer.getAbsolutePath() + "'", e);
+               } catch (Throwable t) {
+                       throw new DatabaseException(t);
+               }
+       }
+
+       private void importTransferableGraphSource(IProgressMonitor monitor, Session session, boolean updateDependencies, TransferableGraphSource tgs) throws DatabaseException {
+               TimeLogger.log(MigrationStateImpl.class, "import TGS " + tgs);
+               final Resource indexRoot = session.syncRequest(new WriteResultRequest<Resource>() {
+                       @Override
+                       public Resource perform(WriteGraph graph) throws DatabaseException {
+                               if(!updateDependencies)
+                                       Layer0Utils.setDependenciesIndexingDisabled(graph, true);
+                               return createTemporaryRoot(graph);
+                       }
+               });
+
+               IImportAdvisor baseAdvisor = MigrationUtils.getProperty(this, MigrationStateKeys.IMPORT_ADVISOR, new DefaultPasteImportAdvisor(indexRoot));
+               IImportAdvisor2 advisor = new WrapperAdvisor(baseAdvisor) {
+                       @Override
+                       public Resource getTarget() {
+                               return indexRoot;
+                       }
+                       @Override
+                       public void beforeWrite(WriteOnlyGraph graph, TransferableGraphImporter process) throws DatabaseException {
+                               super.beforeWrite(graph, process);
+                               if(!updateDependencies)
+                                       Layer0Utils.setDependenciesIndexingDisabled(graph, true);
+                       }
+                       @Override
+                       public void afterWrite(WriteOnlyGraph graph, TransferableGraphImporter process) throws DatabaseException {
+                               super.afterWrite(graph, process);
+                               Boolean storeResources = probeProperty(MigrationStateKeys.GET_RESOURCE_IDS);
+                               if(storeResources != null && storeResources) {
+                                       long[] ids = process.getResourceIds(session.getService(SerialisationSupport.class));
+                                       setProperty(MigrationStateKeys.RESOURCE_IDS, ids);
+                               }
+                       }
+               };
+
+               // Make sure that the supplied advisor is redirected to temp
+               advisor.redirect(indexRoot);
+
+               String task = "Importing model into database";
+               monitor.subTask(task);
+               ImportResult ir = TransferableGraphs.importGraph1(session, tgs, advisor, new TGImportMonitor(monitor, task));
+
+               setProperty(MigrationStateKeys.IMPORT_RESULT, ir);
+               setProperty(MigrationStateKeys.CURRENT_RESOURCE, indexRoot);
+               setProperty(MigrationStateKeys.CURRENT_ROOT_RESOURCES, new ArrayList<>(advisor.getRoots()));
+               setProperty(MigrationStateKeys.DATABASE_REVISION_AFTER_TG_IMPORT, session.getService(ManagementSupport.class).getHeadRevisionId());
+               TimeLogger.log(MigrationStateImpl.class, "imported TGS " + tgs);
+       }
+
+       
+       static class TGImportMonitor implements TGStatusMonitor {
+               private final IProgressMonitor monitor;
+               private final String message;
+               public TGImportMonitor(IProgressMonitor monitor, String message) {
+                       this.monitor = monitor;
+                       this.message = message;
+               }
+               @Override
+               public void status(int percentage) {
+                       monitor.subTask(message + " (" + percentage + "%)");
+               }
+               @Override
+               public boolean isCanceled() {
+                       return monitor.isCanceled();
+               }
+       }
+
+       static class TGExportMonitor extends TGProgressMonitor {
+               private final String message;
+               public TGExportMonitor(IProgressMonitor monitor, String message) {
+                       super(monitor);
+                       this.message = message;
+               }
+               @Override
+               protected void workDone(int percentage) {
+                       monitor.subTask(message + " (" + percentage + "%)");
+               }
+       }
+
 }
index 3599eefc0ecc81c9cb6d012bf31415f48bcfc34f..524885f5d57a71ba291579c0942f1e0cdff3945f 100644 (file)
@@ -72,6 +72,7 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap
        TransferableGraphSource tg;
        VirtualGraph vg;
        IImportAdvisor2 advisor;
+       TGStatusMonitor monitor;
        ClusterBuilder2 builder;
        final TGResourceUtil resourceUtil = new TGResourceUtil();
 
@@ -99,9 +100,25 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap
        Resource NameOf;        
 
        public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor) {
+               this(session, vg, tg, advisor, null);
+       }
+
+       public StreamingTransferableGraphImportProcess(Session session, VirtualGraph vg, TransferableGraphSource tg, IImportAdvisor2 advisor, TGStatusMonitor monitor) {
                this.tg = tg;
                this.vg = vg;
                this.advisor = advisor;
+               this.monitor = monitor;
+       }
+
+       private int updatePercentage(int percentage, int done, int total) {
+               if (monitor != null && (done & 63) == 0) {
+                       int current = 100*done / total;
+                       if (current > percentage) {
+                               percentage = current;
+                               monitor.status(percentage);
+                       }
+               }
+               return percentage;
        }
 
        public void readIdentities(ReadGraph g) throws Exception {
@@ -459,8 +476,11 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap
                                }
                        }
                }               
-               
-               tg.getStatementCount();
+
+               int[] done = { 0 };
+               int[] percentage = { 0 };
+
+               int statementCount = tg.getStatementCount();
                tg.forStatements(null, new TransferableGraphSourceProcedure<int[]>() {
 
                        @Override
@@ -480,13 +500,17 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap
                                    int inverse = handles[inv];
                                    builder.addStatement(graph, object, inverse, subject);    
                                }
-                               
+
+                               // Count from 0% -> 50% => total = statementCount*2
+                               percentage[0] = updatePercentage(percentage[0], done[0]++, statementCount*2);
+               
                        }
                        
                }); 
                
-               tg.getValueCount();
-               
+               int valueCount = tg.getValueCount();
+               done[0] = 0;
+
                class ValueProcedure extends InputStream implements TransferableGraphSourceValueProcedure {
 
             private TGResourceUtil util = new TGResourceUtil();
@@ -526,7 +550,8 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap
                     s.skip(this);
                 }
                 builder.endValue();
-                
+                work();
+
             }
 
             @Override
@@ -546,8 +571,13 @@ public class StreamingTransferableGraphImportProcess implements TransferableGrap
                 for (int i = 0; i < length; ++i)
                     builder.appendValue(input.readUnsignedByte());
                 builder.endValue();
+                work();
             }
 
+            private void work() {
+                // Count from 50% -> 100% => [valueCount, valueCount*2)
+                percentage[0] = updatePercentage(percentage[0], valueCount + done[0]++, valueCount*2);
+            }
                };
                
                tg.forValues2(null, new ValueProcedure());
index 6aa24637e4a075792121eace6f5b81447d722e6a..67689ff46320c44b71de5e2a19a404f42376f16b 100644 (file)
@@ -185,7 +185,7 @@ public class TransferableGraphs {
         
         final IImportAdvisor2 advisor = (advisor_ instanceof IImportAdvisor2) ? ((IImportAdvisor2)advisor_) : new WrapperAdvisor(advisor_);
 
-               final StreamingTransferableGraphImportProcess process = new StreamingTransferableGraphImportProcess(session, vg, tg, advisor);
+               final StreamingTransferableGraphImportProcess process = new StreamingTransferableGraphImportProcess(session, vg, tg, advisor, monitor);
                session.syncRequest(new ReadRequest() {
                        @Override
                        public void run(ReadGraph graph) throws DatabaseException {