/**/bin/\r
+/features/*/target/\r
+/bundles/*/target/\r
+/releng/**/target/\r
+/tests/*/target/\r
/**/.polyglot.build.properties
<?xml version="1.0" encoding="UTF-8"?>\r
<classpath>\r
+ <classpathentry exported="true" kind="lib" path="lib/xml-apis-ext-1.3.04.jar"/>\r
<classpathentry exported="true" kind="lib" path="lib/fop-transcoder-allinone-svn-trunk.jar"/>\r
<classpathentry exported="true" kind="lib" path="lib/batik-codec-1.8.jar"/>\r
<classpathentry exported="true" kind="lib" path="lib/batik-extension-1.8.jar"/>\r
<classpathentry exported="true" kind="lib" path="lib/js.jar"/>\r
<classpathentry exported="true" kind="lib" path="lib/xalan-2.7.0.jar"/>\r
<classpathentry exported="true" kind="lib" path="lib/xerces_2_5_0.jar"/>\r
- <classpathentry exported="true" kind="lib" path="lib/xml-apis-1.3.04.jar"/>\r
- <classpathentry exported="true" kind="lib" path="lib/xml-apis-ext-1.3.04.jar"/>\r
<classpathentry exported="true" kind="lib" path="lib/batik-anim-1.8.jar"/>\r
<classpathentry exported="true" kind="lib" path="lib/xmlgraphics-commons-2.0.jar"/>\r
<classpathentry exported="true" kind="lib" path="lib/batik-css-1.8.jar"/>\r
Bundle-Name: Batik
Bundle-SymbolicName: org.apache.batik;singleton:=true
Bundle-Version: 1.8.0.qualifier
-Bundle-Activator: org.apache.batik.Activator
-Require-Bundle: org.eclipse.ui,\r
- org.eclipse.core.runtime
Bundle-RequiredExecutionEnvironment: JavaSE-1.8
-Bundle-ActivationPolicy: lazy
Export-Package: java_cup.runtime,
- javax.xml,
- javax.xml.datatype,
- javax.xml.namespace,
- javax.xml.parsers,
- javax.xml.transform,
- javax.xml.transform.dom,
- javax.xml.transform.sax,
- javax.xml.transform.stream,
- javax.xml.validation,
- javax.xml.xpath,
org.apache.avalon.framework,
org.apache.avalon.framework.activity,
org.apache.avalon.framework.configuration,
org.apache.fop,
org.apache.fop.accessibility,
org.apache.fop.apps,
+ org.apache.fop.apps.io,
+ org.apache.fop.complexscripts.bidi,
org.apache.fop.complexscripts.fonts,
+ org.apache.fop.complexscripts.util,
+ org.apache.fop.events,
org.apache.fop.fo,
org.apache.fop.fonts,
org.apache.fop.fonts.apps,
org.apache.fop.fonts.autodetect,
org.apache.fop.fonts.base14,
+ org.apache.fop.fonts.cff,
org.apache.fop.fonts.substitute,
org.apache.fop.fonts.truetype,
org.apache.fop.fonts.type1,
org.apache.fop.pdf,
org.apache.fop.pdf.xref,
org.apache.fop.render,
+ org.apache.fop.render.gradient,
org.apache.fop.render.intermediate,
org.apache.fop.render.pdf,
org.apache.fop.render.pdf.extensions,
org.apache.fop.render.ps,
org.apache.fop.render.ps.extensions,
org.apache.fop.render.ps.fonts,
+ org.apache.fop.render.ps.svg,
org.apache.fop.svg,
+ org.apache.fop.svg.font,
+ org.apache.fop.svg.text,
+ org.apache.fop.traits,
org.apache.fop.util,
org.apache.html.dom,
org.apache.regexp,
org.apache.xml.serialize,
org.apache.xml.utils,
org.apache.xml.utils.res,
- org.apache.xmlcommons,
org.apache.xmlgraphics.fonts,
org.apache.xmlgraphics.image,
org.apache.xmlgraphics.image.codec.png,
org.w3c.css.sac,
org.w3c.css.sac.helpers,
org.w3c.dom,
- org.w3c.dom.bootstrap,
- org.w3c.dom.css,
org.w3c.dom.events,
org.w3c.dom.html,
org.w3c.dom.ls,
- org.w3c.dom.ranges,
org.w3c.dom.smil,
- org.w3c.dom.stylesheets,
- org.w3c.dom.svg,
- org.w3c.dom.traversal,
- org.w3c.dom.views,
- org.w3c.dom.xpath,
- org.xml.sax,
- org.xml.sax.ext,
- org.xml.sax.helpers
+ org.w3c.dom.svg
Bundle-ClassPath: lib/batik-awt-util-1.8.jar,
lib/batik-dom-1.8.jar,
lib/batik-ext-1.8.jar,
lib/js.jar,
lib/xalan-2.7.0.jar,
lib/xerces_2_5_0.jar,
- lib/xml-apis-1.3.04.jar,
- lib/xml-apis-ext-1.3.04.jar,
- lib/fop-transcoder-allinone-svn-trunk.jar
+ lib/fop-transcoder-allinone-svn-trunk.jar,
+ lib/xml-apis-ext-1.3.04.jar
+Require-Bundle: javax.xml;bundle-version="1.3.4"
lib/js.jar,\\r
lib/xalan-2.7.0.jar,\\r
lib/xerces_2_5_0.jar,\\r
- lib/xml-apis-1.3.04.jar,\\r
- lib/xml-apis-ext-1.3.04.jar,\\r
- lib/fop-transcoder-allinone-svn-trunk.jar\r
+ lib/fop-transcoder-allinone-svn-trunk.jar,\\r
+ lib/xml-apis-ext-1.3.04.jar\r
+++ /dev/null
-package org.apache.batik;\r
-\r
-import org.eclipse.ui.plugin.AbstractUIPlugin;\r
-import org.osgi.framework.BundleContext;\r
-\r
-/**\r
- * The activator class controls the plug-in life cycle\r
- */\r
-public class Activator extends AbstractUIPlugin {\r
-\r
- // The plug-in ID\r
- public static final String PLUGIN_ID = "org.apache.batik"; //$NON-NLS-1$\r
-\r
- // The shared instance\r
- private static Activator plugin;\r
- \r
- /**\r
- * The constructor\r
- */\r
- public Activator() {\r
- }\r
-\r
- /*\r
- * (non-Javadoc)\r
- * @see org.eclipse.ui.plugin.AbstractUIPlugin#start(org.osgi.framework.BundleContext)\r
- */\r
- public void start(BundleContext context) throws Exception {\r
- super.start(context);\r
- plugin = this;\r
- }\r
-\r
- /*\r
- * (non-Javadoc)\r
- * @see org.eclipse.ui.plugin.AbstractUIPlugin#stop(org.osgi.framework.BundleContext)\r
- */\r
- public void stop(BundleContext context) throws Exception {\r
- plugin = null;\r
- super.stop(context);\r
- }\r
-\r
- /**\r
- * Returns the shared instance\r
- *\r
- * @return the shared instance\r
- */\r
- public static Activator getDefault() {\r
- return plugin;\r
- }\r
-\r
-}\r
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" path="src"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"/>
+ <classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
+ <classpathentry kind="output" path="bin"/>
+</classpath>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>org.simantics.acorn</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.pde.ManifestBuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.pde.SchemaBuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.pde.ds.core.builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.pde.PluginNature</nature>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
--- /dev/null
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
+org.eclipse.jdt.core.compiler.compliance=1.8
+org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
+org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
+org.eclipse.jdt.core.compiler.source=1.8
--- /dev/null
+Manifest-Version: 1.0
+Bundle-ManifestVersion: 2
+Bundle-Name: Acorn Database for Simantics
+Bundle-SymbolicName: org.simantics.acorn
+Bundle-Version: 1.1.2.qualifier
+Bundle-Vendor: Semantum Oy
+Require-Bundle: gnu.trove3;bundle-version="3.0.0",
+ gnu.trove2;bundle-version="2.0.4",
+ org.simantics.db.impl;bundle-version="0.8.0",
+ org.simantics.db.server;bundle-version="1.0.0",
+ org.simantics.compressions;bundle-version="1.0.0",
+ org.simantics.backup,
+ org.eclipse.core.runtime;bundle-version="3.11.1",
+ org.simantics.db.procore
+Bundle-RequiredExecutionEnvironment: JavaSE-1.8
+Bundle-ActivationPolicy: lazy
+Bundle-Activator: org.simantics.acorn.internal.Activator
+Service-Component: OSGI-INF/component.xml, OSGI-INF/org.simantics.acorn.AcornDriver.xml
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0" name="org.simantics.acorn.backupProvider">
+ <implementation class="org.simantics.acorn.backup.AcornBackupProvider"/>
+ <service>
+ <provide interface="org.simantics.backup.IBackupProvider"/>
+ </service>
+</scr:component>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0" name="AcornDriver">
+ <implementation class="org.simantics.acorn.AcornDriver"/>
+ <service>
+ <provide interface="org.simantics.db.Driver"/>
+ </service>
+</scr:component>
--- /dev/null
+###############################################################################
+# Copyright (c) 2007, 2010 Association for Decentralized Information Management
+# in Industry THTH ry.
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Eclipse Public License v1.0
+# which accompanies this distribution, and is available at
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Contributors:
+# VTT Technical Research Centre of Finland - initial API and implementation
+###############################################################################
+output.. = bin/
+bin.includes = META-INF/,\
+ .,\
+ log4j.properties,\
+ OSGI-INF/
+source.. = src/
--- /dev/null
+###############################################################################
+# Copyright (c) 2007, 2010 Association for Decentralized Information Management
+# in Industry THTH ry.
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Eclipse Public License v1.0
+# which accompanies this distribution, and is available at
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Contributors:
+# VTT Technical Research Centre of Finland - initial API and implementation
+###############################################################################
+# For the general syntax of property based configuration files see the
+# documentation of org.apache.log4j.PropertyConfigurator.
+
+# The root category uses the appender called rolling. If no priority is
+# specified, the root category assumes the default priority for root
+# which is DEBUG in log4j. The root category is the only category that
+# has a default priority. All other categories need not be assigned a
+# priority in which case they inherit their priority from the
+# hierarchy.
+
+# This will provide console output on log4j configuration loading
+#log4j.debug=true
+
+log4j.rootCategory=warn, stdout
+#log4j.rootCategory=warn
+
+# BEGIN APPENDER: CONSOLE APPENDER (stdout)
+# first: type of appender (fully qualified class name)
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+
+# second: Any configuration information needed for that appender.
+# Many appenders require a layout.
+log4j.appender.stdout.layout=org.apache.log4j.TTCCLayout
+# log4j.appender.stdout.layout=org.apache.log4j.SimpleLayout
+
+# Possible information overload?
+# log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+# additionally, some layouts can take additional information --
+# like the ConversionPattern for the PatternLayout.
+# log4j.appender.stdout.layout.ConversionPattern=%d %-5p %-17c{2} (%30F:%L) %3x - %m%n
+# END APPENDER: CONSOLE APPENDER (stdout)
+
+# BEGIN APPENDER: ROLLING FILE APPENDER (rolling)
+#log4j.appender.rolling=com.tools.logging.PluginFileAppender
+#log4j.appender.rolling=org.apache.log4j.FileAppender
+log4j.appender.rolling=org.apache.log4j.RollingFileAppender
+log4j.appender.rolling.File=procore.log
+log4j.appender.rolling.append=true
+log4j.appender.rolling.MaxFileSize=8192KB
+# Keep one backup file
+log4j.appender.rolling.MaxBackupIndex=1
+log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
+#log4j.appender.rolling.layout.ConversionPattern=%p %t %c - %m%n
+log4j.appender.rolling.layout.ConversionPattern=%-6r [%15.15t] %-5p %30.30c - %m%n
+# END APPENDER: ROLLING FILE APPENDER (rolling)
+
+# BEGIN APPENDER: PLUG-IN LOG APPENDER (plugin)
+log4j.appender.plugin=com.tools.logging.PluginLogAppender
+log4j.appender.plugin.layout=org.apache.log4j.PatternLayout
+#log4j.appender.plugin.layout.ConversionPattern=%p %t %c - %m%n
+log4j.appender.plugin.layout.ConversionPattern=%-6r [%15.15t] %-5p %30.30c - %m%n
+# END APPENDER: PLUG-IN LOG APPENDER (plugin)
--- /dev/null
+package org.simantics.acorn;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.simantics.acorn.internal.AcornDatabase;
+import org.simantics.db.Database;
+import org.simantics.db.server.ProCoreException;
+
+/**
+ * @author Tuukka Lehtonen
+ */
+public class AcornDatabaseManager {
+
+ private static Map<String, Database> dbs = new HashMap<String, Database>();
+
+ public static synchronized Database getDatabase(Path folder) throws ProCoreException {
+ Path canonical;
+ try {
+ if (!Files.exists(folder))
+ Files.createDirectories(folder);
+ canonical = folder.toRealPath();
+ } catch (IOException e) {
+ throw new ProCoreException("Could not get canonical path.", e);
+ }
+
+ String canonicalPath = canonical.toString();
+ Database db = dbs.get(canonicalPath);
+ if (null != db)
+ return db;
+
+ db = new AcornDatabase(canonical);
+ dbs.put(canonicalPath, db);
+ return db;
+ }
+
+}
\ No newline at end of file
--- /dev/null
+package org.simantics.acorn;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.simantics.db.Database;
+import org.simantics.db.DatabaseUserAgent;
+import org.simantics.db.Driver;
+import org.simantics.db.ServerI;
+import org.simantics.db.ServerReference;
+import org.simantics.db.Session;
+import org.simantics.db.SessionReference;
+import org.simantics.db.exception.DatabaseException;
+
+public class AcornDriver implements Driver {
+
+ public static final String AcornDriverName = "acorn";
+
+ private Map<String, ServerI> servers = new HashMap<>();
+ private Map<String, Management> managements = new HashMap<>();
+
+ @Override
+ public String getName() {
+ return AcornDriverName;
+ }
+
+ @Override
+ public DatabaseUserAgent getDatabaseUserAgent(String address) throws DatabaseException {
+ return AcornDatabaseManager.getDatabase(Paths.get(address)).getUserAgent();
+ }
+
+ @Override
+ public void setDatabaseUserAgent(String address, DatabaseUserAgent dbUserAgent) throws DatabaseException {
+ AcornDatabaseManager.getDatabase(Paths.get(address)).setUserAgent(dbUserAgent);
+ }
+
+ @Override
+ public Session getSession(String address, Properties properties) throws DatabaseException {
+ Path dbFolder = Paths.get(address);
+ Session session = AcornSessionManagerImpl.getInstance().createSession(new SessionReference() {
+
+ @Override
+ public ServerReference getServerReference() {
+ return new ServerReference() {
+
+ @Override
+ public Path getDBFolder() {
+ return dbFolder;
+ }
+ };
+ }
+
+ @Override
+ public long getSessionId() {
+ return 0L;
+ }
+ }, null);
+ if (!properties.containsKey("clientId"))
+ properties.put("clientId", dbFolder.toAbsolutePath().toString());
+ session.registerService(Properties.class, properties);
+ Session s = session.peekService(Session.class);
+ if (null == s)
+ session.registerService(Session.class, session);
+ return session;
+ }
+
+ @Override
+ public ServerI getServer(String address, Properties properties) throws DatabaseException {
+ ServerI server = servers.get(address);
+ if (server == null) {
+ server = new AcornServerI(AcornDatabaseManager.getDatabase(Paths.get(address)), address);
+ servers.put(address, server);
+ }
+ return server;
+ }
+
+ @Override
+ public Management getManagement(String address, Properties properties) throws DatabaseException {
+ Management mgmt = managements.get(address);
+ if (mgmt == null) {
+ mgmt = new AcornManagement(AcornDatabaseManager.getDatabase(Paths.get(address)), properties);
+ managements.put(address, mgmt);
+ }
+ return mgmt;
+ }
+
+ private static class AcornServerI implements ServerI {
+
+ private Database database;
+ private String address;
+
+ public AcornServerI(Database db, String address) {
+ this.database = db;
+ this.address = address;
+ }
+
+ @Override
+ public void stop() throws DatabaseException {
+ database.tryToStop();
+ }
+
+ @Override
+ public void start() throws DatabaseException {
+ database.start();
+ }
+
+ @Override
+ public boolean isActive() throws DatabaseException {
+ return database.isRunning();
+ }
+
+ @Override
+ public String getAddress() throws DatabaseException {
+ return address;
+ }
+
+ @Override
+ public String executeAndDisconnect(String command) throws DatabaseException {
+ return "";
+ }
+
+ @Override
+ public String execute(String command) throws DatabaseException {
+ return "";
+ }
+ }
+
+}
--- /dev/null
+package org.simantics.acorn;
+
+import java.nio.file.Path;
+import java.util.Properties;
+
+import org.simantics.db.Database;
+import org.simantics.db.Driver.Management;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.server.ProCoreException;
+
+public class AcornManagement implements Management {
+
+ private final Database db;
+ private final Properties properties;
+
+ AcornManagement(Database db, Properties properties) throws ProCoreException {
+ this.db = db;
+ this.properties = properties;
+ }
+
+ @Override
+ public boolean exist() throws DatabaseException {
+ return db.isFolderOk();
+ }
+
+ @Override
+ public void delete() throws DatabaseException {
+ db.deleteFiles();
+ if (exist())
+ throw new DatabaseException("Failed to delete database. folder=" + db.getFolder());
+ }
+
+ @Override
+ public void create() throws DatabaseException {
+ db.initFolder(properties);
+ if (!exist())
+ throw new DatabaseException("Failed to create Acorn database. folder=" + db.getFolder());
+ }
+
+ @Override
+ public void purge() throws DatabaseException {
+ db.purgeDatabase();
+ }
+
+ @Override
+ public void shutdown() throws DatabaseException {
+ db.tryToStop();
+ db.disconnect();
+ }
+
+}
--- /dev/null
+package org.simantics.acorn;
+
+import java.nio.file.Path;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.simantics.db.Database;
+import org.simantics.db.Session;
+import org.simantics.db.SessionErrorHandler;
+import org.simantics.db.SessionManager;
+import org.simantics.db.SessionReference;
+import org.simantics.db.authentication.UserAuthenticationAgent;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.event.SessionEvent;
+import org.simantics.db.event.SessionListener;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.RuntimeDatabaseException;
+import org.simantics.db.service.LifecycleSupport;
+import org.simantics.utils.datastructures.ListenerList;
+
+import fi.vtt.simantics.procore.internal.SessionImplDb;
+import fi.vtt.simantics.procore.internal.SessionImplSocket;
+
+public class AcornSessionManagerImpl implements SessionManager {
+
+ private static AcornSessionManagerImpl INSTANCE;
+
+ private ConcurrentHashMap<SessionImplDb, Database.Session> sessionMap = new ConcurrentHashMap<>();
+ private ListenerList<SessionListener> sessionListeners = new ListenerList<>(SessionListener.class);
+ private SessionErrorHandler errorHandler;
+
+ private Database database;
+
+ private AcornSessionManagerImpl() {}
+
+ void finish() {
+ sessionMap = null;
+ sessionListeners = null;
+ }
+
+ @Override
+ public void addSessionListener(SessionListener listener) {
+ sessionListeners.add(listener);
+ }
+
+ @Override
+ public Session createSession(SessionReference sessionReference, UserAuthenticationAgent authAgent)
+ throws DatabaseException {
+ SessionImplDb sessionImpl = new SessionImplDb(this, authAgent);
+ boolean ok = false;
+ try {
+ Path dbFolder = sessionReference.getServerReference().getDBFolder();
+ database = AcornDatabaseManager.getDatabase(dbFolder);
+ Database.Session dbSession = database.newSession(sessionImpl);
+ sessionImpl.connect(sessionReference, dbSession);
+ sessionMap.put(sessionImpl, dbSession);
+ fireSessionOpened(sessionImpl);
+ ok = true;
+ } catch (Throwable e) {
+ Logger.defaultLogError("Connection failed. See exception for details.", e);
+ try {
+ fireSessionClosed(sessionImpl, e);
+ sessionMap.remove(sessionImpl);
+ sessionImpl = null;
+ } catch (Throwable t) {
+ }
+ throw new DatabaseException(e);
+ } finally {
+ if (!ok && null != sessionImpl)
+ sessionImpl.getService(LifecycleSupport.class).close();
+ }
+ return sessionImpl;
+ }
+
+ @Override
+ public void removeSessionListener(SessionListener listener) {
+ sessionListeners.remove(listener);
+ }
+
+ private void fireSessionOpened(SessionImplSocket session) {
+ SessionEvent se = new SessionEvent(session, null);
+ for (SessionListener listener : sessionListeners.getListeners()) {
+ listener.sessionOpened(se);
+ }
+ }
+
+ private void fireSessionClosed(SessionImplSocket session, Throwable cause) {
+ SessionEvent se = new SessionEvent(session, cause);
+ for (SessionListener listener : sessionListeners.getListeners()) {
+ listener.sessionClosed(se);
+ }
+ }
+
+ @Override
+ public void shutdown(Session s, Throwable cause) {
+ SessionImplSocket sis = (SessionImplSocket) s;
+ if (null == sis)
+ return;
+ try {
+ fireSessionClosed(sis, cause);
+ } finally {
+ sessionMap.remove(s);
+ }
+ }
+
+ @Override
+ public SessionErrorHandler getErrorHandler() {
+ return errorHandler;
+ }
+
+ @Override
+ public void setErrorHandler(SessionErrorHandler errorHandler) {
+ this.errorHandler = errorHandler;
+ }
+
+ public synchronized static AcornSessionManagerImpl getInstance() {
+ if (INSTANCE == null)
+ INSTANCE = new AcornSessionManagerImpl();
+ return INSTANCE;
+ }
+
+ @Override
+ public Database getDatabase() {
+ return database;
+ }
+
+ public GraphClientImpl2 getClient() {
+ if (sessionMap.values().size() > 1)
+ throw new RuntimeDatabaseException("Currently only one GraphClientImpl2 per session is supported!");
+ org.simantics.db.Database.Session client = sessionMap.values().iterator().next();
+ return (GraphClientImpl2) client;
+ }
+}
--- /dev/null
+package org.simantics.acorn;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.simantics.acorn.cluster.ClusterImpl;
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
+import org.simantics.acorn.exception.InvalidHeadStateException;
+import org.simantics.acorn.internal.ClusterSupport2;
+import org.simantics.acorn.lru.ChangeSetInfo;
+import org.simantics.acorn.lru.ClusterInfo;
+import org.simantics.acorn.lru.ClusterLRU;
+import org.simantics.acorn.lru.ClusterStreamChunk;
+import org.simantics.acorn.lru.FileInfo;
+import org.simantics.acorn.lru.LRU;
+import org.simantics.db.ClusterCreator;
+import org.simantics.db.Database.Session.ClusterIds;
+import org.simantics.db.Database.Session.ResourceSegment;
+import org.simantics.db.ServiceLocator;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.ClusterBase;
+import org.simantics.db.impl.ClusterI;
+import org.simantics.db.impl.ClusterSupport;
+import org.simantics.db.procore.cluster.ClusterTraits;
+import org.simantics.db.service.ClusterSetsSupport;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.threads.logger.ITask;
+import org.simantics.utils.threads.logger.ThreadLogger;
+
+public class ClusterManager {
+
+ private ArrayList<String> currentChanges = new ArrayList<String>();
+
+ public final Path dbFolder;
+ public Path lastSessionDirectory;
+ public Path workingDirectory;
+
+ public LRU<String, ClusterStreamChunk> streamLRU;
+ public LRU<Long, ChangeSetInfo> csLRU;
+ public ClusterLRU clusterLRU;
+ public LRU<String, FileInfo> fileLRU;
+
+ public MainState mainState;
+ public HeadState state;
+
+ private long lastSnapshot = System.nanoTime();
+
+ final public ClusterSupport2 support = new ClusterSupport2(this);
+
+ /*
+ * Public interface
+ *
+ */
+
+ public ClusterManager(Path dbFolder) {
+ this.dbFolder = dbFolder;
+ }
+
+ public ArrayList<String> getChanges(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException {
+ ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
+ info.acquireMutex();
+ try {
+ info.makeResident();
+ return info.getCSSIds();
+ } finally {
+ info.releaseMutex();
+ }
+ }
+
+ public ClusterBase getClusterByClusterKey(int clusterKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
+ return clusterLRU.getClusterByClusterKey(clusterKey);
+ }
+
+ public ClusterBase getClusterByClusterUIDOrMake(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
+ return clusterLRU.getClusterByClusterUIDOrMake(clusterUID);
+ }
+
+ public ClusterImpl getClusterByClusterUIDOrMakeProxy(ClusterUID clusterUID) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
+ return clusterLRU.getClusterByClusterUIDOrMakeProxy(clusterUID);
+ }
+
+ public int getClusterKeyByClusterUIDOrMake(ClusterUID clusterUID) throws AcornAccessVerificationException {
+ return clusterLRU.getClusterKeyByClusterUIDOrMake(clusterUID);
+ }
+
+ public int getClusterKeyByClusterUIDOrMakeWithoutMutex(ClusterUID clusterUID) throws IllegalAcornStateException, AcornAccessVerificationException {
+ return clusterLRU.getClusterKeyByClusterUIDOrMakeWithoutMutex(clusterUID);
+ }
+
+ public int getClusterKeyByUID(long id1, long id2) throws DatabaseException, IllegalAcornStateException {
+ return clusterLRU.getClusterKeyByUIDWithoutMutex(id1, id2);
+ }
+
+ public <T extends ClusterI> T getClusterProxyByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
+ return clusterLRU.getClusterProxyByResourceKey(resourceKey);
+ }
+
+ public ClusterUID getClusterUIDByResourceKey(int resourceKey) throws DatabaseException, AcornAccessVerificationException {
+ return clusterLRU.getClusterUIDByResourceKey(resourceKey);
+ }
+
+ public ClusterUID getClusterUIDByResourceKeyWithoutMutex(int resourceKey) throws DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
+ return clusterLRU.getClusterUIDByResourceKeyWithoutMutex(resourceKey);
+ }
+
+ /*
+ * Private implementation
+ *
+ */
+
+ private static long countFiles(Path directory) throws IOException {
+ try (DirectoryStream<Path> ds = Files.newDirectoryStream(directory)) {
+ int count = 0;
+ for (@SuppressWarnings("unused") Path p : ds)
+ ++count;
+ return count;
+ }
+ }
+
+ // Add check to make sure if it safe to make snapshot (used with cancel which is not yet supported and may cause corrupted head.state writing)
+ private AtomicBoolean safeToMakeSnapshot = new AtomicBoolean(true);
+ private IllegalAcornStateException cause;
+
+ public synchronized boolean makeSnapshot(ServiceLocator locator, boolean fullSave) throws IllegalAcornStateException {
+ try {
+ if (!safeToMakeSnapshot.get())
+ throw cause;
+ // Maximum autosave frequency is per 60s
+ if(!fullSave && System.nanoTime() - lastSnapshot < 10*1000000000L) {
+ // System.err.println("lastSnapshot too early");
+ return false;
+ }
+
+ // Cluster files are always there
+ // Nothing has been written => no need to do anything
+ long amountOfFiles = countFiles(workingDirectory);
+ if(!fullSave && amountOfFiles < 3) {
+ // System.err.println("amountOfFiles < 3");
+ return false;
+ }
+
+ System.err.println("makeSnapshot");
+
+ // Schedule writing of all data to disk
+ refreshHeadState();
+
+ // Wait for all files to be written
+ clusterLRU.shutdown();
+ fileLRU.shutdown();
+ streamLRU.shutdown();
+ csLRU.shutdown();
+
+ // Lets check if it is still safe to make a snapshot
+ if (!safeToMakeSnapshot.get())
+ throw cause;
+
+ persistHeadState();
+
+ if (fullSave)
+ mainState.save(dbFolder);
+
+ ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
+ cssi.save();
+
+ amountOfFiles = countFiles(workingDirectory);
+
+ System.err.println(" -finished: amount of files is " + amountOfFiles);
+
+ workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
+ if (!Files.exists(workingDirectory)) {
+ Files.createDirectories(workingDirectory);
+ }
+
+ cssi.updateWriteDirectory(workingDirectory);
+
+ clusterLRU.setWriteDir(workingDirectory);
+ fileLRU.setWriteDir(workingDirectory);
+ streamLRU.setWriteDir(workingDirectory);
+ csLRU.setWriteDir(workingDirectory);
+
+ clusterLRU.resume();
+ fileLRU.resume();
+ streamLRU.resume();
+ csLRU.resume();
+
+ lastSnapshot = System.nanoTime();
+
+ return true;
+ } catch (IllegalAcornStateException e) {
+ notSafeToMakeSnapshot(e);
+ throw e;
+ } catch (IOException e) {
+ IllegalAcornStateException e1 = new IllegalAcornStateException(e);
+ notSafeToMakeSnapshot(e1);
+ throw e1;
+ }
+ }
+
+ private void refreshHeadState() throws IOException, IllegalAcornStateException {
+ state.clusters.clear();
+ state.files.clear();
+ state.stream.clear();
+ state.cs.clear();
+
+ clusterLRU.persist(state.clusters);
+ fileLRU.persist(state.files);
+ streamLRU.persist(state.stream);
+ csLRU.persist(state.cs);
+ }
+
+ private void persistHeadState() throws IOException {
+ // Sync current working directory
+ Files.walk(workingDirectory, 1).filter(Files::isRegularFile).forEach(FileIO::uncheckedSyncPath);
+ state.save(workingDirectory);
+ mainState.headDir++;
+ }
+
+
+// public void save() throws IOException {
+//
+// refreshHeadState();
+//
+// clusterLRU.shutdown();
+// fileLRU.shutdown();
+// streamLRU.shutdown();
+// csLRU.shutdown();
+//
+// persistHeadState();
+//
+// mainState.save(getBaseDirectory());
+
+// try {
+// ThreadLogVisualizer visualizer = new ThreadLogVisualizer();
+// visualizer.read(new DataInputStream(new FileInputStream(
+// ThreadLogger.LOG_FILE)));
+// visualizer.visualize3(new PrintStream(ThreadLogger.LOG_FILE
+// + ".svg"));
+// } catch (FileNotFoundException e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// }
+
+ // System.err.println("-- load statistics --");
+ // for(Pair<ClusterUID, Integer> entry :
+ // CollectionUtils.valueSortedEntries(histogram)) {
+ // System.err.println(" " + entry.second + " " + entry.first);
+ // }
+
+// }
+
+ private void acquireAll() throws IllegalAcornStateException {
+ clusterLRU.acquireMutex();
+ fileLRU.acquireMutex();
+ streamLRU.acquireMutex();
+ csLRU.acquireMutex();
+ }
+
+ private void releaseAll() {
+ csLRU.releaseMutex();
+ streamLRU.releaseMutex();
+ fileLRU.releaseMutex();
+ clusterLRU.releaseMutex();
+ }
+
+ private AtomicBoolean rollback = new AtomicBoolean(false);
+
+ boolean rolledback() {
+ return rollback.get();
+ }
+
+ public void load() throws IOException {
+
+ // Main state
+ mainState = MainState.load(dbFolder, t -> rollback.set(true));
+
+ lastSessionDirectory = dbFolder.resolve(Integer.toString(mainState.headDir - 1));
+
+ // Head State
+ try {
+ state = HeadState.load(lastSessionDirectory);
+ } catch (InvalidHeadStateException e) {
+ // For backwards compatibility only!
+ Throwable cause = e.getCause();
+ if (cause instanceof Throwable) {
+ try {
+ org.simantics.db.javacore.HeadState oldState = org.simantics.db.javacore.HeadState.load(lastSessionDirectory);
+
+ HeadState newState = new HeadState();
+ newState.clusters = oldState.clusters;
+ newState.cs = oldState.cs;
+ newState.files = oldState.files;
+ newState.stream = oldState.stream;
+ newState.headChangeSetId = oldState.headChangeSetId;
+ newState.reservedIds = oldState.reservedIds;
+ newState.transactionId = oldState.transactionId;
+ state = newState;
+ } catch (InvalidHeadStateException e1) {
+ throw new IOException("Could not load HeadState due to corruption", e1);
+ }
+ } else {
+ // This should never happen as MainState.load() checks the integrity
+ // of head.state files and rolls back in cases of corruption until a
+ // consistent state is found (could be case 0 - initial db state)
+ // IF this does happen something is completely wrong
+ throw new IOException("Could not load HeadState due to corruption", e);
+ }
+ }
+ try {
+ workingDirectory = dbFolder.resolve(Integer.toString(mainState.headDir));
+ Files.createDirectories(workingDirectory);
+
+ csLRU = new LRU<Long, ChangeSetInfo>(this, "Change Set", workingDirectory);
+ streamLRU = new LRU<String, ClusterStreamChunk>(this, "Cluster Stream", workingDirectory);
+ clusterLRU = new ClusterLRU(this, "Cluster", workingDirectory);
+ fileLRU = new LRU<String, FileInfo>(this, "External Value", workingDirectory);
+
+ acquireAll();
+
+ // Clusters
+ for (String clusterKey : state.clusters) {
+ String[] parts1 = clusterKey.split("#");
+ String[] parts = parts1[0].split("\\.");
+ long first = new BigInteger(parts[0], 16).longValue();
+ long second = new BigInteger(parts[1], 16).longValue();
+ ClusterUID uuid = ClusterUID.make(first, second);
+ Path readDir = dbFolder.resolve(parts1[1]);
+ int offset = Integer.parseInt(parts1[2]);
+ int length = Integer.parseInt(parts1[3]);
+ clusterLRU.map(new ClusterInfo(this, clusterLRU, readDir, uuid, offset, length));
+ }
+ // Files
+ for (String fileKey : state.files) {
+ // System.err.println("loadFile: " + fileKey);
+ String[] parts = fileKey.split("#");
+ Path readDir = dbFolder.resolve(parts[1]);
+ int offset = Integer.parseInt(parts[2]);
+ int length = Integer.parseInt(parts[3]);
+ FileInfo info = new FileInfo(fileLRU, readDir, parts[0], offset, length);
+ fileLRU.map(info);
+ }
+ // Update chunks
+ for (String fileKey : state.stream) {
+ // System.err.println("loadStream: " + fileKey);
+ String[] parts = fileKey.split("#");
+ Path readDir = dbFolder.resolve(parts[1]);
+ int offset = Integer.parseInt(parts[2]);
+ int length = Integer.parseInt(parts[3]);
+ ClusterStreamChunk info = new ClusterStreamChunk(this,
+ streamLRU, readDir, parts[0], offset, length);
+ streamLRU.map(info);
+ }
+ // Change sets
+ for (String fileKey : state.cs) {
+ String[] parts = fileKey.split("#");
+ Path readDir = dbFolder.resolve(parts[1]);
+ Long revisionId = Long.parseLong(parts[0]);
+ int offset = Integer.parseInt(parts[2]);
+ int length = Integer.parseInt(parts[3]);
+ ChangeSetInfo info = new ChangeSetInfo(csLRU, readDir, revisionId, offset, length);
+ csLRU.map(info);
+ }
+
+ releaseAll();
+ } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
+ // ROLLBACK ONE DIR UNTIL WE ARE FINE!
+ throw new IOException(e);
+ }
+ }
+
+ public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException, IOException {
+
+ clusterLRU.ensureUpdates(uid);
+
+ ClusterInfo info = clusterLRU.getWithoutMutex(uid);
+ return info.clone(uid, creator);
+ }
+
+ //private int loadCounter = 0;
+
+ public static void startLog(String msg) {
+ tasks.put(msg, ThreadLogger.getInstance().begin(msg));
+ }
+
+ public static void endLog(String msg) {
+ ITask task = tasks.get(msg);
+ if (task != null)
+ task.finish();
+ }
+
+ static Map<String, ITask> tasks = new HashMap<String, ITask>();
+
+ public void update(ClusterUID uid, ClusterImpl clu) throws AcornAccessVerificationException, IllegalAcornStateException {
+ ClusterInfo info = clusterLRU.getWithoutMutex(uid);
+ info.acquireMutex();
+ try {
+ info.update(clu);
+ } finally {
+ info.releaseMutex();
+ }
+ }
+
+ public long getClusterIdOrCreate(ClusterUID clusterUID) {
+ return 1;
+ }
+
+ public int getResourceKey(ClusterUID uid, int index) throws AcornAccessVerificationException {
+ return clusterLRU.getResourceKey(uid, index);
+ }
+
+ public int getResourceKeyWitoutMutex(ClusterUID uid, int index) throws IllegalAcornStateException {
+ return clusterLRU.getResourceKeyWithoutMutex(uid, index);
+ }
+
+ public ClusterIds getClusterIds() throws IllegalAcornStateException {
+ clusterLRU.acquireMutex();
+
+ try {
+ Collection<ClusterInfo> infos = clusterLRU.values();
+ final int status = infos.size();
+ final long[] firsts = new long[status];
+ final long[] seconds = new long[status];
+
+ int index = 0;
+ for (ClusterInfo info : infos) {
+ firsts[index] = 0;
+ seconds[index] = info.getKey().second;
+ index++;
+ }
+
+ return new ClusterIds() {
+
+ @Override
+ public int getStatus() {
+ return status;
+ }
+
+ @Override
+ public long[] getFirst() {
+ return firsts;
+ }
+
+ @Override
+ public long[] getSecond() {
+ return seconds;
+ }
+
+ };
+
+ } catch (Throwable t) {
+ throw new IllegalAcornStateException(t);
+ } finally {
+ clusterLRU.releaseMutex();
+ }
+ }
+
+ public void addIntoCurrentChangeSet(String ccs) throws IllegalAcornStateException {
+ csLRU.acquireMutex();
+
+ try {
+ currentChanges.add(ccs);
+ } catch (Throwable t) {
+ throw new IllegalAcornStateException(t);
+ } finally {
+ csLRU.releaseMutex();
+ }
+ }
+
+ public void commitChangeSet(long changeSetId, byte[] data) throws IllegalAcornStateException {
+ csLRU.acquireMutex();
+ try {
+ ArrayList<String> csids = new ArrayList<String>(currentChanges);
+ currentChanges = new ArrayList<String>();
+ new ChangeSetInfo(csLRU, changeSetId, data, csids);
+ } catch (Throwable t) {
+ throw new IllegalAcornStateException(t);
+ } finally {
+ csLRU.releaseMutex();
+ }
+ }
+
+ public byte[] getMetadata(long changeSetId) throws AcornAccessVerificationException, IllegalAcornStateException {
+
+ ChangeSetInfo info = csLRU.getWithoutMutex(changeSetId);
+ if (info == null) return null;
+ info.acquireMutex();
+ try {
+ return info.getMetadataBytes();
+ } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
+ throw e;
+ } catch (Throwable t) {
+ throw new IllegalAcornStateException(t);
+ } finally {
+ info.releaseMutex();
+ }
+ }
+
+ public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws AcornAccessVerificationException, IllegalAcornStateException {
+
+ ClusterUID uid = ClusterUID.make(clusterUID, 0);
+ String key = uid.toString() + "_" + resourceIndex;
+ FileInfo info = fileLRU.getWithoutMutex(key);
+ if(info == null) return null;
+ info.acquireMutex();
+ try {
+ return info.getResourceFile();
+ } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
+ throw e;
+ } catch (Throwable t) {
+ throw new IllegalAcornStateException(t);
+ } finally {
+ info.releaseMutex();
+ }
+ }
+
+ public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws AcornAccessVerificationException, IllegalAcornStateException {
+ ClusterUID uid = ClusterUID.make(clusterUID, 0);
+
+ String key = uid.toString() + "_" + resourceIndex;
+ FileInfo info = fileLRU.getWithoutMutex(key);
+ if(info == null) return null;
+ info.acquireMutex();
+ try {
+ return info.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
+ } catch (Throwable t) {
+ throw new IllegalAcornStateException(t);
+ } finally {
+ info.releaseMutex();
+ }
+ }
+
+ public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) throws IllegalAcornStateException {
+ try {
+ String key = uid.toString() + "_" + ClusterTraits.getResourceIndexFromResourceKey(resourceKey);
+
+ FileInfo info = null;
+ fileLRU.acquireMutex();
+ try {
+ info = fileLRU.get(key);
+ if (info == null) {
+ info = new FileInfo(fileLRU, key, (int) (offset + size));
+ }
+ } catch (Throwable t) {
+ throw new IllegalAcornStateException(t);
+ } finally {
+ fileLRU.releaseMutex();
+ }
+
+ info.acquireMutex();
+ try {
+ info.updateData(bytes, offset, pos, size);
+ } catch (Throwable t) {
+ throw new IllegalAcornStateException(t);
+ } finally {
+ info.releaseMutex();
+ }
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void shutdown() {
+ clusterLRU.shutdown();
+ fileLRU.shutdown();
+ streamLRU.shutdown();
+ csLRU.shutdown();
+ }
+
+ public void notSafeToMakeSnapshot(IllegalAcornStateException t) {
+ this.safeToMakeSnapshot.compareAndSet(true, false);
+ this.cause = t;
+ }
+
+}
--- /dev/null
+package org.simantics.acorn;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+
+public class ExternalizableExample implements Externalizable {
+
+ public int first;
+ private long second;
+
+ public ExternalizableExample(int first, long second) {
+ this.first = first;
+ this.second = second;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(first);
+ out.writeLong(second);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+
+ }
+
+
+ public static void main(String[] args) {
+ Externalizable test = new ExternalizableExample(123, 3456);
+
+ try (ObjectOutputStream stream = new ObjectOutputStream(Files.newOutputStream(Paths.get("C:/Users/Jani Simomaa/Desktop/test"), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING))) {
+ stream.writeObject(test);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
--- /dev/null
+package org.simantics.acorn;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.simantics.databoard.file.RuntimeIOException;
+
+public class FileIO {
+
+ private static final FileAttribute<?>[] NO_ATTRIBUTES = new FileAttribute[0];
+
+ private static final Set<OpenOption> CREATE_OPTIONS = new HashSet<>(2);
+ private static final Set<OpenOption> APPEND_OPTIONS = new HashSet<>(1);
+
+ static {
+ CREATE_OPTIONS.add(StandardOpenOption.WRITE);
+ CREATE_OPTIONS.add(StandardOpenOption.CREATE);
+
+ APPEND_OPTIONS.add(StandardOpenOption.APPEND);
+ }
+
+ private Path path;
+ private int writePosition = 0;
+
+ private FileIO(Path path) {
+ this.path = path;
+ }
+
+ private static Map<Path, FileIO> map = new HashMap<Path, FileIO>();
+
+ public static FileIO get(Path path) {
+ synchronized(map) {
+ FileIO existing = map.get(path);
+ if(existing == null) {
+ existing = new FileIO(path);
+ map.put(path, existing);
+ }
+ return existing;
+ }
+ }
+
+ //private static final boolean TRACE_SWAP = false;
+ private static final boolean TRACE_PERF = false;
+
+ public synchronized int saveBytes(byte[] bytes, int length, boolean overwrite) throws IOException {
+ if(overwrite) writePosition = 0;
+ int result = writePosition;
+ long start = System.nanoTime();
+ Set<OpenOption> options = writePosition == 0 ? CREATE_OPTIONS : APPEND_OPTIONS;
+
+ ByteBuffer bb = ByteBuffer.wrap(bytes, 0, length);
+ try (FileChannel fc = FileChannel.open(path, options, NO_ATTRIBUTES)) {
+ fc.write(bb);
+
+ writePosition += length;
+ if(TRACE_PERF) {
+ long duration = System.nanoTime()-start;
+ double ds = 1e-9*duration;
+ System.err.println("Wrote " + bytes.length + " bytes @ " + 1e-6*bytes.length / ds + "MB/s");
+ }
+ return result;
+ } catch (Throwable t) {
+ throw new IOException("An error occured file saving bytes for file " + path.toAbsolutePath().toString(), t);
+ }
+ }
+
+ public synchronized byte[] readBytes(int offset, int length) throws IOException {
+ long start = System.nanoTime();
+ try (SeekableByteChannel channel = Files.newByteChannel(path)) {
+ channel.position(offset);
+ ByteBuffer buf = ByteBuffer.allocate(length);
+ int read = 0;
+ while (read < length) {
+ read += channel.read(buf);
+ }
+ byte[] result = buf.array();
+ if (result.length != length)
+ System.err.println("faa");
+ if (TRACE_PERF) {
+ long duration = System.nanoTime() - start;
+ double ds = 1e-9 * duration;
+ System.err.println("Read " + result.length + " bytes @ " + 1e-6 * result.length / ds + "MB/s");
+ }
+ return result;
+ }
+ }
+
+ public static void syncPath(Path f) throws IOException {
+ // Does not seem to need 's' according to unit test in Windows
+ try (RandomAccessFile raf = new RandomAccessFile(f.toFile(), "rw")) {
+ raf.getFD().sync();
+ }
+ }
+
+ static void uncheckedSyncPath(Path f) {
+ try {
+ syncPath(f);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ byte[] buf = new byte[1024*1024];
+
+ long s = System.nanoTime();
+
+ Path test = Paths.get("e:/work/test.dat");
+ OutputStream fs = Files.newOutputStream(test);
+ OutputStream os = new BufferedOutputStream(fs, 128*1024);
+
+ for(int i=0;i<40;i++) {
+ os.write(buf);
+ }
+
+ os.flush();
+ //fs.getFD().sync();
+ os.close();
+
+ syncPath(test);
+
+ long duration = System.nanoTime()-s;
+ System.err.println("Took " + 1e-6*duration + "ms.");
+
+
+ }
+
+}
--- /dev/null
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.acorn;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
+import org.simantics.acorn.internal.ClusterChange;
+import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
+import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
+import org.simantics.acorn.lru.ClusterInfo;
+import org.simantics.acorn.lru.ClusterStreamChunk;
+import org.simantics.acorn.lru.ClusterUpdateOperation;
+import org.simantics.acorn.lru.ClusterChangeSet.Entry;
+import org.simantics.db.ClusterCreator;
+import org.simantics.db.Database;
+import org.simantics.db.ServiceLocator;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.SDBException;
+import org.simantics.db.server.ProCoreException;
+import org.simantics.db.service.ClusterSetsSupport;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.db.service.LifecycleSupport;
+import org.simantics.utils.datastructures.Pair;
+import org.simantics.utils.logging.TimeLogger;
+
+import gnu.trove.map.hash.TLongObjectHashMap;
+
+public class GraphClientImpl2 implements Database.Session {
+
+ public static final boolean DEBUG = false;
+
+ public final ClusterManager clusters;
+
+ private TransactionManager transactionManager = new TransactionManager();
+ private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
+ private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
+
+ private Path dbFolder;
+ private final Database database;
+ private ServiceLocator locator;
+ private MainProgram mainProgram;
+
+ static class ClientThreadFactory implements ThreadFactory {
+
+ final String name;
+ final boolean daemon;
+
+ public ClientThreadFactory(String name, boolean daemon) {
+ this.name = name;
+ this.daemon = daemon;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, name);
+ thread.setDaemon(daemon);
+ return thread;
+ }
+ }
+
+ public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
+ this.database = database;
+ this.dbFolder = dbFolder;
+ this.locator = locator;
+ this.clusters = new ClusterManager(dbFolder);
+ load();
+ ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class);
+ cssi.setReadDirectory(clusters.lastSessionDirectory);
+ mainProgram = new MainProgram(this, clusters);
+ executor.execute(mainProgram);
+ }
+
+ public Path getDbFolder() {
+ return dbFolder;
+ }
+
+ public void tryMakeSnapshot() throws IOException {
+
+ if (isClosing || unexpectedClose)
+ return;
+
+ saver.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ Transaction tr = null;
+ try {
+ // First take a write transaction
+ tr = askWriteTransaction(-1);
+ // Then make sure that MainProgram is idling
+ mainProgram.mutex.acquire();
+ try {
+ synchronized(mainProgram) {
+ if(mainProgram.operations.isEmpty()) {
+ makeSnapshot(false);
+ } else {
+ // MainProgram is becoming busy again - delay snapshotting
+ return;
+ }
+ }
+ } finally {
+ mainProgram.mutex.release();
+ }
+ } catch (IllegalAcornStateException | ProCoreException e) {
+ Logger.defaultLogError(e);
+ unexpectedClose = true;
+ } catch (InterruptedException e) {
+ Logger.defaultLogError(e);
+ } finally {
+ try {
+ if(tr != null)
+ endTransaction(tr.getTransactionId());
+ if (unexpectedClose) {
+ LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
+ try {
+ support.close();
+ } catch (DatabaseException e1) {
+ Logger.defaultLogError(e1);
+ }
+ }
+ } catch (ProCoreException e) {
+ Logger.defaultLogError(e);
+ }
+ }
+ }
+ });
+ }
+
+ public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
+ clusters.makeSnapshot(locator, fullSave);
+ }
+
+ public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
+ try {
+ return clusters.clone(uid, creator);
+ } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
+ unexpectedClose = true;
+ throw new DatabaseException(e);
+ }
+ }
+
+// private void save() throws IOException {
+// clusters.save();
+// }
+
+ public void load() throws IOException {
+ clusters.load();
+ }
+
+// public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
+// clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
+// }
+
+ @Override
+ public Database getDatabase() {
+ return database;
+ }
+
+ private boolean closed = false;
+ private boolean isClosing = false;
+ private boolean unexpectedClose = false;
+
+ @Override
+ public void close() throws ProCoreException {
+ System.err.println("Closing " + this + " and mainProgram " + mainProgram);
+ if(!closed && !isClosing) {
+ isClosing = true;
+ try {
+ if (!unexpectedClose)
+ makeSnapshot(true);
+
+ mainProgram.close();
+ clusters.shutdown();
+ executor.shutdown();
+ saver.shutdown();
+ boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+ boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
+
+ System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
+
+ mainProgram = null;
+ executor = null;
+ saver = null;
+
+ } catch (IllegalAcornStateException | InterruptedException e) {
+ throw new ProCoreException(e);
+ }
+ closed = true;
+ }
+ //impl.close();
+ }
+
+ @Override
+ public void open() throws ProCoreException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isClosed() throws ProCoreException {
+ return closed;
+ }
+
+ @Override
+ public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
+ clusters.state.headChangeSetId++;
+ long committedChangeSetId = changeSetId + 1;
+ try {
+ clusters.commitChangeSet(committedChangeSetId, metadata);
+
+ clusters.state.transactionId = transactionId;
+
+ mainProgram.committed();
+
+ TimeLogger.log("Accepted commit");
+ } catch (IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
+ }
+
+ @Override
+ public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
+ UnsupportedOperationException e = new UnsupportedOperationException("org.simantics.acorn.GraphClientImpl2.cancelCommit() is not supported operation! Closing down to prevent further havoc");
+ clusters.notSafeToMakeSnapshot(new IllegalAcornStateException(e));
+ throw e;
+// System.err.println("GraphClientImpl2.cancelCommit() called!! this is experimental and might cause havoc!");
+// try {
+// undo(new long[] {changeSetId}, onChangeSetUpdate);
+// } catch (SDBException e) {
+// e.printStackTrace();
+// throw new ProCoreException(e);
+// }
+// clusters.state.headChangeSetId++;
+// return clusters.state.headChangeSetId;
+ }
+
+ @Override
+ public Transaction askReadTransaction() throws ProCoreException {
+ return transactionManager.askReadTransaction();
+ }
+
+ enum TransactionState {
+ IDLE,WRITE,READ
+ }
+
+ class TransactionRequest {
+ public TransactionState state;
+ public Semaphore semaphore;
+ public TransactionRequest(TransactionState state, Semaphore semaphore) {
+ this.state = state;
+ this.semaphore = semaphore;
+ }
+ }
+
+ class TransactionManager {
+
+ private TransactionState currentTransactionState = TransactionState.IDLE;
+
+ private int reads = 0;
+
+ LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
+
+ TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
+
+ private synchronized Transaction makeTransaction(TransactionRequest req) {
+
+ final int csId = clusters.state.headChangeSetId;
+ final long trId = clusters.state.transactionId+1;
+ requestMap.put(trId, req);
+ return new Transaction() {
+
+ @Override
+ public long getTransactionId() {
+ return trId;
+ }
+
+ @Override
+ public long getHeadChangeSetId() {
+ return csId;
+ }
+ };
+ }
+
+ /*
+ * This method cannot be synchronized since it waits and must support multiple entries
+ * by query thread(s) and internal transactions such as snapshot saver
+ */
+ public Transaction askReadTransaction() throws ProCoreException {
+
+ Semaphore semaphore = new Semaphore(0);
+
+ TransactionRequest req = queue(TransactionState.READ, semaphore);
+
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new ProCoreException(e);
+ }
+
+ return makeTransaction(req);
+
+ }
+
+ private synchronized void dispatch() {
+ TransactionRequest r = requests.removeFirst();
+ if(r.state == TransactionState.READ) reads++;
+ r.semaphore.release();
+ }
+
+ private synchronized void processRequests() {
+
+ while(true) {
+
+ if(requests.isEmpty()) return;
+ TransactionRequest req = requests.peek();
+
+ if(currentTransactionState == TransactionState.IDLE) {
+
+ // Accept anything while IDLE
+ currentTransactionState = req.state;
+ dispatch();
+
+ } else if (currentTransactionState == TransactionState.READ) {
+
+ if(req.state == currentTransactionState) {
+
+ // Allow other reads
+ dispatch();
+
+ } else {
+
+ // Wait
+ return;
+
+ }
+
+ } else if (currentTransactionState == TransactionState.WRITE) {
+
+ // Wait
+ return;
+
+ }
+
+ }
+
+ }
+
+ private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
+ TransactionRequest req = new TransactionRequest(state, semaphore);
+ requests.addLast(req);
+ processRequests();
+ return req;
+ }
+
+ /*
+ * This method cannot be synchronized since it waits and must support multiple entries
+ * by query thread(s) and internal transactions such as snapshot saver
+ */
+ public Transaction askWriteTransaction() throws IllegalAcornStateException {
+
+ Semaphore semaphore = new Semaphore(0);
+ TransactionRequest req = queue(TransactionState.WRITE, semaphore);
+
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new IllegalAcornStateException(e);
+ }
+ mainProgram.startTransaction(clusters.state.headChangeSetId+1);
+ return makeTransaction(req);
+ }
+
+ public synchronized long endTransaction(long transactionId) throws ProCoreException {
+
+ TransactionRequest req = requestMap.remove(transactionId);
+ if(req.state == TransactionState.WRITE) {
+ currentTransactionState = TransactionState.IDLE;
+ processRequests();
+ } else {
+ reads--;
+ if(reads == 0) {
+ currentTransactionState = TransactionState.IDLE;
+ processRequests();
+ }
+ }
+ return clusters.state.transactionId;
+ }
+
+ }
+
+ @Override
+ public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
+ try {
+ if (isClosing || unexpectedClose || closed) {
+ throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
+ }
+ return transactionManager.askWriteTransaction();
+ } catch (IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
+ }
+
+ @Override
+ public long endTransaction(long transactionId) throws ProCoreException {
+ return transactionManager.endTransaction(transactionId);
+ }
+
+ @Override
+ public String execute(String command) throws ProCoreException {
+ // This is called only by WriteGraphImpl.commitAccessorChanges
+ // We can ignore this in Acorn
+ return "";
+ }
+
+ @Override
+ public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
+ try {
+ return clusters.getMetadata(changeSetId);
+ } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
+ }
+
+ @Override
+ public ChangeSetData getChangeSetData(long minChangeSetId,
+ long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
+ throws ProCoreException {
+
+ new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
+ return null;
+
+ }
+
+ @Override
+ public ChangeSetIds getChangeSetIds() throws ProCoreException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Cluster getCluster(byte[] clusterId) throws ProCoreException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
+ throws ProCoreException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ClusterIds getClusterIds() throws ProCoreException {
+ try {
+ return clusters.getClusterIds();
+ } catch (IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
+ }
+
+ @Override
+ public Information getInformation() throws ProCoreException {
+ return new Information() {
+
+ @Override
+ public String getServerId() {
+ return "server";
+ }
+
+ @Override
+ public String getProtocolId() {
+ return "";
+ }
+
+ @Override
+ public String getDatabaseId() {
+ return "database";
+ }
+
+ @Override
+ public long getFirstChangeSetId() {
+ return 0;
+ }
+
+ };
+ }
+
+ @Override
+ public Refresh getRefresh(long changeSetId) throws ProCoreException {
+
+ final ClusterIds ids = getClusterIds();
+
+ return new Refresh() {
+
+ @Override
+ public long getHeadChangeSetId() {
+ return clusters.state.headChangeSetId;
+ }
+
+ @Override
+ public long[] getFirst() {
+ return ids.getFirst();
+ }
+
+ @Override
+ public long[] getSecond() {
+ return ids.getSecond();
+ }
+
+ };
+
+ }
+
+ public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
+ return clusters.getResourceFile(clusterUID, resourceIndex);
+ }
+
+ @Override
+ public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
+ try {
+ return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
+ } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
+ throw new ProCoreException(e);
+ }
+ }
+
+ @Override
+ public long reserveIds(int count) throws ProCoreException {
+ return clusters.state.reservedIds++;
+ }
+
+ @Override
+ public void updateCluster(byte[] operations) throws ProCoreException {
+ ClusterInfo info = null;
+ try {
+ ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
+ info = clusters.clusterLRU.getOrCreate(operation.uid, true);
+ if(info == null)
+ throw new IllegalAcornStateException("info == null for operation " + operation);
+ info.acquireMutex();
+ info.scheduleUpdate();
+ mainProgram.schedule(operation);
+ } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
+ throw new ProCoreException(e);
+ } finally {
+ if (info != null)
+ info.releaseMutex();
+ }
+ }
+
+ private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
+
+ String[] ss = ccsId.split("\\.");
+ String chunkKey = ss[0];
+ int chunkOffset = Integer.parseInt(ss[1]);
+ ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
+ if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
+ chunk.acquireMutex();
+ try {
+ return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
+ } catch (DatabaseException e) {
+ throw e;
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ chunk.releaseMutex();
+ }
+ }
+
+ private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
+ UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
+
+ int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
+
+ clusters.clusterLRU.acquireMutex();
+ try {
+
+ ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
+ for(int i=0;i<proc.entries.size();i++) {
+
+ Entry e = proc.entries.get(proc.entries.size() - 1 - i);
+ e.process(clusters, cs, clusterKey);
+ }
+ cs.flush();
+
+ } finally {
+ clusters.clusterLRU.releaseMutex();
+ }
+ }
+
+ @Override
+ public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
+ try {
+ final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
+
+ UndoClusterSupport support = new UndoClusterSupport(clusters);
+
+ final int changeSetId = clusters.state.headChangeSetId;
+
+ if(ClusterUpdateProcessorBase.DEBUG)
+ System.err.println(" === BEGIN UNDO ===");
+
+ for(int i=0;i<changeSetIds.length;i++) {
+ final long id = changeSetIds[changeSetIds.length-1-i];
+ ArrayList<String> ccss = clusters.getChanges(id);
+
+ for(int j=0;j<ccss.size();j++) {
+ try {
+ if(ClusterUpdateProcessorBase.DEBUG)
+ System.err.println("performUndo " + ccss.get(ccss.size()-j-1));
+ performUndo(ccss.get(ccss.size()-j-1), clusterChanges, support);
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ if(ClusterUpdateProcessorBase.DEBUG)
+ System.err.println(" === END UNDO ===");
+
+ for(int i=0;i<clusterChanges.size();i++) {
+
+ final int changeSetIndex = i;
+
+ final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
+
+ final ClusterUID cuid = pair.first;
+ final byte[] data = pair.second;
+
+ onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
+
+ @Override
+ public long getChangeSetId() {
+ return changeSetId;
+ }
+
+ @Override
+ public int getChangeSetIndex() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfClusterChangeSets() {
+ return clusterChanges.size();
+ }
+
+ @Override
+ public int getIndexOfClusterChangeSet() {
+ return changeSetIndex;
+ }
+
+ @Override
+ public byte[] getClusterId() {
+ return cuid.asBytes();
+ }
+
+ @Override
+ public boolean getNewCluster() {
+ return false;
+ }
+
+ @Override
+ public byte[] getData() {
+ return data;
+ }
+
+ });
+ }
+ } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
+ throw new ProCoreException(e1);
+ }
+ return false;
+ }
+
+ public ServiceLocator getServiceLocator() {
+ return locator;
+ }
+
+ @Override
+ public boolean refreshEnabled() {
+ return false;
+ }
+
+ @Override
+ public boolean rolledback() {
+ return clusters.rolledback();
+ }
+
+
+
+
+
+
+
+
+
+
+
+ ////////////////////////
+
+
+
+
+
+
+
+
+
+
+
+
+}
+
--- /dev/null
+package org.simantics.acorn;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.simantics.acorn.exception.InvalidHeadStateException;
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.binding.mutable.MutableVariant;
+import org.simantics.databoard.serialization.Serializer;
+import org.simantics.databoard.util.binary.BinaryMemory;
+
+public class HeadState {
+
+ public static final String HEAD_STATE = "head.state";
+ public static final String SHA_1 = "SHA-1";
+
+ public int headChangeSetId = 0;
+ public long transactionId = 1;
+ public long reservedIds = 3;
+
+ public ArrayList<String> clusters = new ArrayList<>();
+ public ArrayList<String> files = new ArrayList<>();
+ public ArrayList<String> stream = new ArrayList<>();
+ public ArrayList<String> cs = new ArrayList<>();
+// public ArrayList<String> ccs = new ArrayList<String>();
+
+ public static HeadState load(Path directory) throws InvalidHeadStateException {
+ Path f = directory.resolve(HEAD_STATE);
+
+ try {
+ byte[] bytes = Files.readAllBytes(f);
+ MessageDigest sha1 = MessageDigest.getInstance(SHA_1);
+ int digestLength = sha1.getDigestLength();
+
+ sha1.update(bytes, digestLength, bytes.length - digestLength);
+ byte[] newChecksum = sha1.digest();
+ if (!Arrays.equals(newChecksum, Arrays.copyOfRange(bytes, 0, digestLength))) {
+ throw new InvalidHeadStateException(
+ "Checksum " + Arrays.toString(newChecksum) + " does not match excpected "
+ + Arrays.toString(Arrays.copyOfRange(bytes, 0, digestLength)) + " for " + f.toAbsolutePath());
+ }
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes, digestLength, bytes.length - digestLength)) {
+ HeadState object = (HeadState) org.simantics.databoard.Files.readFile(bais, Bindings.getBindingUnchecked(HeadState.class));
+ return object;
+ }
+ } catch (IOException i) {
+ return new HeadState();
+// throw new InvalidHeadStateException(i);
+ } catch (NoSuchAlgorithmException e) {
+ throw new Error("SHA-1 Algorithm not found", e);
+ } catch (Throwable t) {
+ throw new InvalidHeadStateException(t);
+ }
+ }
+
+ public void save(Path directory) throws IOException {
+ Path f = directory.resolve(HEAD_STATE);
+ try {
+ BinaryMemory rf = new BinaryMemory(4096);
+ try {
+ MutableVariant v = new MutableVariant(Bindings.getBindingUnchecked(HeadState.class), this);
+ Serializer s = Bindings.getSerializerUnchecked( Bindings.VARIANT );
+ s.serialize(rf, v);
+ } finally {
+ rf.close();
+ }
+
+ byte[] bytes = rf.toByteBuffer().array();
+
+ MessageDigest sha1 = MessageDigest.getInstance(SHA_1);
+ sha1.update(bytes);
+ byte[] checksum = sha1.digest();
+
+ try (OutputStream out = Files.newOutputStream(f)) {
+ out.write(checksum);
+ out.write(bytes);
+ }
+ FileIO.syncPath(f);
+ } catch (NoSuchAlgorithmException e) {
+ throw new Error("SHA-1 digest not found, should not happen", e);
+ }
+ }
+
+ public static void validateHeadStateIntegrity(Path headState) throws InvalidHeadStateException, IOException {
+ try {
+ byte[] bytes = Files.readAllBytes(headState);
+ MessageDigest sha1 = MessageDigest.getInstance(SHA_1);
+ int digestLength = sha1.getDigestLength();
+ sha1.update(bytes, digestLength, bytes.length - digestLength);
+ byte[] newChecksum = sha1.digest();
+ if (!Arrays.equals(newChecksum, Arrays.copyOfRange(bytes, 0, digestLength))) {
+ throw new InvalidHeadStateException(
+ "Checksum " + Arrays.toString(newChecksum) + " does not match excpected "
+ + Arrays.toString(Arrays.copyOfRange(bytes, 0, digestLength)) + " for " + headState.toAbsolutePath());
+ }
+ } catch (NoSuchAlgorithmException e) {
+ throw new Error("SHA-1 digest not found, should not happen", e);
+ }
+ }
+}
--- /dev/null
+package org.simantics.acorn;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
+import org.simantics.acorn.lru.ClusterStreamChunk;
+import org.simantics.acorn.lru.ClusterUpdateOperation;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.logging.TimeLogger;
+
+public class MainProgram implements Runnable, Closeable {
+
+ private static final int CLUSTER_THREADS = 4;
+ private static final int CHUNK_CACHE_SIZE = 100;
+
+ private final GraphClientImpl2 client;
+ private final ClusterManager clusters;
+ private final ExecutorService[] clusterUpdateThreads;
+ private final List<ClusterUpdateOperation>[] updateSchedules;
+
+ private int residentOperationBytes = 0;
+ private long currentChangeSetId = -1;
+ private int nextChunkId = 0;
+ private boolean alive = true;
+ private Semaphore deathBarrier = new Semaphore(0);
+
+ final Semaphore mutex = new Semaphore(1);
+ final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
+
+ static class ClusterThreadFactory implements ThreadFactory {
+
+ final String name;
+ final boolean daemon;
+
+ public ClusterThreadFactory(String name, boolean daemon) {
+ this.name = name;
+ this.daemon = daemon;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, name);
+ thread.setDaemon(daemon);
+ return thread;
+ }
+ }
+
+ public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
+
+ this.client = client;
+ this.clusters = clusters;
+ this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
+ this.updateSchedules = new ArrayList[CLUSTER_THREADS];
+ for(int i=0;i<clusterUpdateThreads.length;i++) {
+ clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
+ updateSchedules[i] = new ArrayList<ClusterUpdateOperation>();
+ }
+ }
+
+ public void startTransaction(long id) {
+ currentChangeSetId = id;
+ nextChunkId = 0;
+ }
+
+ private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
+
+ @Override
+ public int compare(ClusterUID o1, ClusterUID o2) {
+ return Long.compare(o1.second, o2.second);
+ }
+ };
+
+ @Override
+ public void run() {
+ try {
+
+ mutex.acquire();
+ main:
+ while(alive) {
+
+ TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
+
+ synchronized(MainProgram.this) {
+
+ while(!operations.isEmpty() && updates.size() < 100) {
+
+ ClusterStreamChunk chunk = operations.pollFirst();
+
+ for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
+ ClusterUpdateOperation o = chunk.operations.get(i);
+ ClusterUID uid = o.uid;
+ List<ClusterUpdateOperation> ops = updates.get(uid);
+ if(ops == null) {
+ ops = new ArrayList<ClusterUpdateOperation>();
+ updates.put(uid, ops);
+ }
+ ops.add(o);
+ }
+
+ chunk.nextToProcess = chunk.operations.size();
+
+ if(!chunk.isCommitted()) {
+ assert(operations.isEmpty());
+ operations.add(chunk);
+ break;
+ }
+
+ }
+
+ if(updates.isEmpty()) {
+ try {
+ long start = System.nanoTime();
+ mutex.release();
+ MainProgram.this.wait(5000);
+ mutex.acquire();
+ if (!alive)
+ break main;
+ long duration = System.nanoTime()-start;
+ if(duration > 4000000000L) {
+
+ // Was this a time-out or a new stream request?
+ if(operations.isEmpty()) {
+
+ /*
+ * We are idling here.
+ * Flush all caches gradually
+ */
+
+ // Write pending cs to disk
+ boolean written = clusters.csLRU.swapForced();
+ while(written) {
+ if(!updates.isEmpty()) break;
+ written = clusters.csLRU.swapForced();
+ }
+ // Write pending chunks to disk
+ written = clusters.streamLRU.swapForced();
+ while(written) {
+ if(!updates.isEmpty()) break;
+ written = clusters.streamLRU.swapForced();
+ }
+ // Write pending files to disk
+ written = clusters.fileLRU.swapForced();
+ while(written) {
+ if(!updates.isEmpty()) break;
+ written = clusters.fileLRU.swapForced();
+ }
+ // Write pending clusters to disk
+ written = clusters.clusterLRU.swapForced();
+ while(written) {
+ if(!updates.isEmpty()) break;
+ written = clusters.clusterLRU.swapForced();
+ }
+
+ client.tryMakeSnapshot();
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+// long sss = System.nanoTime();
+
+ for(int i=0;i<CLUSTER_THREADS;i++)
+ updateSchedules[i].clear();
+
+ final Semaphore s = new Semaphore(0);
+
+ for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
+ ClusterUID key = entry.getKey();
+ int hash = key.hashCode() & (clusterUpdateThreads.length-1);
+ updateSchedules[hash].addAll(entry.getValue());
+ }
+
+ // final AtomicLong elapsed = new AtomicLong(0);
+ int acquireAmount = 0;
+ for(int i=0;i<CLUSTER_THREADS;i++) {
+ final List<ClusterUpdateOperation> ops = updateSchedules[i];
+ if (!ops.isEmpty()) {
+ acquireAmount++;
+ clusterUpdateThreads[i].submit(new Callable<Object>() {
+
+ @Override
+ public Object call() throws Exception {
+ //long st = System.nanoTime();
+ try {
+ for(ClusterUpdateOperation op : ops) {
+ op.run();
+ }
+ } finally {
+ s.release();
+ }
+ return null;
+
+ // long duration = System.nanoTime()-st;
+ // elapsed.addAndGet(duration);
+ // double dur = 1e-9*duration;
+ // if(dur > 0.05)
+ // System.err.println("duration=" + dur + "s. " + ops.size());
+ }
+ });
+ }
+ }
+
+ s.acquire(acquireAmount);
+
+ /*
+ * Here we are actively processing updates from client.
+ * Maintain necessary caching here.
+ */
+
+ clusters.streamLRU.acquireMutex();
+ try {
+ swapChunks();
+ } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
+ e.printStackTrace();
+ } finally {
+ clusters.streamLRU.releaseMutex();
+ }
+ clusters.csLRU.acquireMutex();
+ try {
+ swapCS();
+ } catch (Throwable t) {
+ throw new IllegalAcornStateException(t);
+ } finally {
+ clusters.csLRU.releaseMutex();
+ }
+
+ TimeLogger.log("Performed updates");
+
+ }
+
+ } catch (Throwable t) {
+ t.printStackTrace();
+ } finally {
+ deathBarrier.release();
+ }
+ }
+
+ /*
+ * Mutex for streamLRU is assumed here
+ *
+ */
+ private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
+
+ // Cache chunks during update operations
+ boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
+ while(written) {
+ written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
+ }
+ }
+
+ private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
+
+ // Cache chunks during update operations
+ boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
+ while(written) {
+ written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
+ }
+ }
+
+ public synchronized void committed() {
+
+ ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+ if (!alive) {
+ System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last);
+// return;
+ }
+ if(last != null) last.commit();
+
+ }
+
+ public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
+ if (!alive) {
+ System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
+// return;
+ }
+ clusters.streamLRU.acquireMutex();
+
+ try {
+
+ ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+ if(last == null || last.isCommitted()) {
+ String id = "" + currentChangeSetId + "-" + nextChunkId++;
+ last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
+ operations.add(last);
+ }
+
+ String chunkId = last.getKey();
+ int chunkOffset = last.operations.size();
+ operation.scheduled(chunkId + "." + chunkOffset);
+
+ last.addOperation(operation);
+
+ swapChunks();
+
+ notifyAll();
+ } catch (IllegalAcornStateException e) {
+ throw e;
+ } catch (Throwable t) {
+ throw new IllegalAcornStateException(t);
+ } finally {
+ clusters.streamLRU.releaseMutex();
+ }
+ }
+
+ @Override
+ public void close() {
+ alive = false;
+ synchronized (this) {
+ notifyAll();
+ }
+ try {
+ deathBarrier.acquire();
+ } catch (InterruptedException e) {
+ }
+
+ for (ExecutorService executor : clusterUpdateThreads)
+ executor.shutdown();
+
+ for (int i = 0; i < clusterUpdateThreads.length; i++) {
+ try {
+ ExecutorService executor = clusterUpdateThreads[i];
+ executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+ clusterUpdateThreads[i] = null;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
--- /dev/null
+package org.simantics.acorn;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.simantics.acorn.exception.InvalidHeadStateException;
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.binding.mutable.MutableVariant;
+import org.simantics.databoard.file.RuntimeIOException;
+import org.simantics.databoard.serialization.Serializer;
+import org.simantics.databoard.util.binary.BinaryMemory;
+import org.simantics.utils.FileUtils;
+
+public class MainState implements Serializable {
+
+ private static final long serialVersionUID = 6237383147637270225L;
+
+ public static final String MAIN_STATE = "main.state";
+
+ public int headDir = 0;
+
+ public MainState() {
+ }
+
+ private MainState(int headDir) {
+ this.headDir = headDir;
+ }
+
+ public static MainState load(Path directory, Consumer<Exception> callback) throws IOException {
+ Files.createDirectories(directory);
+ Path mainState = directory.resolve(MAIN_STATE);
+ try {
+ byte[] bytes = Files.readAllBytes(mainState);
+ MainState state = null;
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
+ state = (MainState) org.simantics.databoard.Files.readFile(bais, Bindings.getBindingUnchecked(MainState.class));
+ }
+
+ while (true) {
+ Path latest = directory.resolve(Integer.toString(state.headDir - 1));
+ try {
+ Path headState = latest.resolve(HeadState.HEAD_STATE);
+ HeadState.validateHeadStateIntegrity(headState);
+ break;
+ } catch (InvalidHeadStateException e) {
+ e.printStackTrace();
+ state.headDir--;
+ callback.accept(e);
+ } finally {
+ cleanBaseDirectory(directory, latest, callback);
+ }
+ }
+ return state;
+ } catch(Exception i) {
+ callback.accept(i);
+ int largest = -1;
+ Path latest = findNewHeadStateDir(directory, callback);
+ if (latest != null)
+ largest = safeParseInt(-1, latest.getFileName().toString());
+ // +1 because we want to return the next head version to use,
+ // not the latest existing version.
+ largest++;
+ MainState state = new MainState( largest );
+ cleanBaseDirectory(directory, latest, callback);
+ return state;
+ } finally {
+ if (Files.exists(mainState)) {
+ Files.delete(mainState);
+ }
+ }
+ }
+
+ public void save(Path directory) throws IOException {
+ Path f = directory.resolve(MAIN_STATE);
+ BinaryMemory rf = new BinaryMemory(4096);
+ try {
+ MutableVariant v = new MutableVariant(Bindings.getBindingUnchecked(MainState.class), this);
+ Serializer s = Bindings.getSerializerUnchecked( Bindings.VARIANT );
+ s.serialize(rf, v);
+ } finally {
+ rf.close();
+ }
+ byte[] bytes = rf.toByteBuffer().array();
+ try (OutputStream out = Files.newOutputStream(f)) {
+ out.write(bytes);
+ }
+ FileIO.syncPath(f);
+ }
+
+ private static boolean isInteger(Path p) {
+ try {
+ Integer.parseInt(p.getFileName().toString());
+ return true;
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ }
+
+ /**
+ *
+ * @param directory
+ * @param callback
+ * @return
+ * @throws IOException
+ */
+ private static Path findNewHeadStateDir(Path directory, Consumer<Exception> callback) throws IOException {
+ try (Stream<Path> s = Files.walk(directory, 1)) {
+ List<Path> reverseSortedPaths = s
+ .filter(p -> !p.equals(directory) && isInteger(p) && Files.isDirectory(p))
+ .sorted((p1, p2) -> {
+ int p1Name = Integer.parseInt(p1.getFileName().toString());
+ int p2Name = Integer.parseInt(p2.getFileName().toString());
+ return Integer.compare(p2Name, p1Name);
+ }).collect(Collectors.toList());
+
+ Path latest = null;
+ for (Path last : reverseSortedPaths) {
+ Path headState = last.resolve(HeadState.HEAD_STATE);
+ try {
+ HeadState.validateHeadStateIntegrity(headState);
+ latest = last;
+ break;
+ } catch (IOException | InvalidHeadStateException e) {
+ // Cleanup is done in {@link cleanBaseDirectory} method
+ callback.accept(e);
+ }
+ }
+ return latest;
+ }
+ }
+
+ private static int safeParseInt(int defaultValue, String s) {
+ try {
+ return Integer.parseInt(s);
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
+ private static void cleanBaseDirectory(Path directory, Path latest, Consumer<Exception> callback) throws IOException {
+ try (Stream<Path> s = Files.walk(directory, 1)) {
+ List<Path> reverseSortedPaths = s
+ .filter(p -> !p.equals(directory) && isInteger(p) && Files.isDirectory(p))
+ .sorted((p1, p2) -> {
+ int p1Name = Integer.parseInt(p1.getFileName().toString());
+ int p2Name = Integer.parseInt(p2.getFileName().toString());
+ return Integer.compare(p2Name, p1Name);
+ }).collect(Collectors.toList());
+
+ for (Path p : reverseSortedPaths) {
+ if (!p.equals(latest)) {
+ if (Files.exists(p.resolve(HeadState.HEAD_STATE))) {
+ // this indicates that there is a possibility that index and vg's are out of sync
+ // if we are able to find folders with higher number than the current head.state
+ callback.accept(null);
+ }
+ uncheckedDeleteAll(p);
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ private static void uncheckedDeleteAll(Path path) {
+ try {
+ FileUtils.deleteAll(path.toFile());
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+}
--- /dev/null
+package org.simantics.acorn;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
+import org.simantics.db.exception.SDBException;
+
+public interface Persistable {
+
+ void toFile(Path path) throws IOException ;
+ void fromFile(byte[] data) throws IllegalAcornStateException, AcornAccessVerificationException;
+
+}
\ No newline at end of file
--- /dev/null
+package org.simantics.acorn;
+
+import java.io.InputStream;
+
+import org.simantics.db.Session;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.ClusterBase;
+import org.simantics.db.impl.ClusterSupport;
+import org.simantics.db.impl.IClusterTable;
+import org.simantics.db.service.ClusterUID;
+
+public class UndoClusterSupport implements ClusterSupport {
+
+ final ClusterManager impl;
+
+ public UndoClusterSupport(ClusterManager impl) {
+ this.impl = impl;
+ }
+
+ @Override
+ public int createClusterKeyByClusterUID(ClusterUID clusterUID,
+ long clusterId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ClusterBase getClusterByClusterUIDOrMake(ClusterUID clusterUID) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ClusterBase getClusterByClusterId(long clusterId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ClusterBase getClusterByClusterKey(int clusterKey) {
+ try {
+ return impl.getClusterByClusterKey(clusterKey);
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ @Override
+ public int getClusterKeyByClusterUIDOrMake(ClusterUID clusterUID) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ClusterBase getClusterByResourceKey(int resourceKey) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getClusterIdOrCreate(ClusterUID clusterUID) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addStatement(Object cluster) {
+ }
+
+ @Override
+ public void cancelStatement(Object cluster) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void removeStatement(Object cluster) {
+ }
+
+ @Override
+ public void cancelValue(Object cluster) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void removeValue(Object cluster) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setValue(Object cluster, long clusterId, byte[] bytes,
+ int length) {
+ }
+
+ @Override
+ public void modiValue(Object cluster, long clusterId, long voffset,
+ int length, byte[] bytes, int offset) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setImmutable(Object cluster, boolean immutable) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setDeleted(Object cluster, boolean deleted) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createResource(Object cluster, short resourceIndex,
+ long clusterId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addStatementIndex(Object cluster, int resourceKey,
+ ClusterUID clusterUID, byte op) {
+ }
+
+ @Override
+ public void setStreamOff(boolean setOff) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean getStreamOff() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public InputStream getValueStreamEx(int resourceIndex, long clusterId)
+ throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte[] getValueEx(int resourceIndex, long clusterId)
+ throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte[] getValueEx(int resourceIndex, long clusterId, long voffset,
+ int length) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getValueSizeEx(int resourceIndex, long clusterId)
+ throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int wait4RequestsLess(int limit) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Session getSession() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IClusterTable getClusterTable() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getClusterKeyByClusterUIDOrMake(long id1, long id2) {
+ throw new UnsupportedOperationException();
+ }
+
+}
--- /dev/null
+package org.simantics.acorn.backup;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.simantics.acorn.AcornSessionManagerImpl;
+import org.simantics.acorn.GraphClientImpl2;
+import org.simantics.acorn.exception.IllegalAcornStateException;
+import org.simantics.backup.BackupException;
+import org.simantics.backup.IBackupProvider;
+import org.simantics.db.server.ProCoreException;
+import org.simantics.utils.FileUtils;
+
+/**
+ * @author Jani
+ *
+ * TODO: get rid of {@link GraphClientImpl2#getInstance()} invocations somehow in a cleaner way
+ */
+public class AcornBackupProvider implements IBackupProvider {
+
+ private static final String IDENTIFIER = "AcornBackupProvider";
+ private long trId = -1;
+ private final Semaphore lock = new Semaphore(1);
+ private final GraphClientImpl2 client;
+
+ public AcornBackupProvider() {
+ this.client = AcornSessionManagerImpl.getInstance().getClient();
+ }
+
+ private static Path getAcornMetadataFile(Path dbFolder) {
+ return dbFolder.getParent().resolve(IDENTIFIER);
+ }
+
+ @Override
+ public void lock() throws BackupException {
+ try {
+ if (trId != -1)
+ throw new IllegalStateException(this + " backup provider is already locked");
+ trId = client.askWriteTransaction(-1).getTransactionId();
+ } catch (ProCoreException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Future<BackupException> backup(Path targetPath, int revision) throws BackupException {
+ boolean releaseLock = true;
+ try {
+ lock.acquire();
+
+ client.makeSnapshot(true);
+
+ Path dbDir = client.getDbFolder();
+ int newestFolder = client.clusters.mainState.headDir - 1;
+ int latestFolder = -2;
+ Path AcornMetadataFile = getAcornMetadataFile(dbDir);
+ if (Files.exists(AcornMetadataFile)) {
+ try (BufferedReader br = Files.newBufferedReader(AcornMetadataFile)) {
+ latestFolder = Integer.parseInt( br.readLine() );
+ }
+ }
+
+ AcornBackupRunnable r = new AcornBackupRunnable(
+ lock, targetPath, revision, dbDir, latestFolder, newestFolder);
+ new Thread(r, "Acorn backup thread").start();
+
+ releaseLock = false;
+ return r;
+ } catch (InterruptedException e) {
+ releaseLock = false;
+ throw new BackupException("Failed to lock Acorn for backup.", e);
+ } catch (NumberFormatException e) {
+ throw new BackupException("Failed to read Acorn head state file.", e);
+ } catch (IllegalAcornStateException | IOException e) {
+ throw new BackupException("I/O problem during Acorn backup.", e);
+ } finally {
+ if (releaseLock)
+ lock.release();
+ }
+ }
+
+ @Override
+ public void unlock() throws BackupException {
+ try {
+ if (trId == -1)
+ throw new BackupException(this + " backup provider is not locked");
+ client.endTransaction(trId);
+ trId = -1;
+ } catch (ProCoreException e) {
+ throw new BackupException(e);
+ }
+ }
+
+ @Override
+ public void restore(Path fromPath, int revision) {
+ try {
+ // 1. Resolve initial backup restore target.
+ // This can be DB directory directly or a temporary directory that
+ // will replace the DB directory.
+ Path dbRoot = client.getDbFolder();
+ Path restorePath = dbRoot;
+ if (!Files.exists(dbRoot, LinkOption.NOFOLLOW_LINKS)) {
+ Files.createDirectories(dbRoot);
+ } else {
+ Path dbRootParent = dbRoot.getParent();
+ restorePath = dbRootParent == null ? Files.createTempDirectory("restore")
+ : Files.createTempDirectory(dbRootParent, "restore");
+ }
+
+ // 2. Restore the backup.
+ Files.walkFileTree(fromPath, new RestoreCopyVisitor(restorePath, revision));
+
+ // 3. Override existing DB root with restored temporary copy if necessary.
+ if (dbRoot != restorePath) {
+ FileUtils.deleteAll(dbRoot.toFile());
+ Files.move(restorePath, dbRoot);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private class RestoreCopyVisitor extends SimpleFileVisitor<Path> {
+
+ private final Path toPath;
+ private final int revision;
+ private Path currentSubFolder;
+
+ public RestoreCopyVisitor(Path toPath, int revision) {
+ this.toPath = toPath;
+ this.revision = revision;
+ }
+
+ @Override
+ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
+ Path dirName = dir.getFileName();
+ if (dirName.toString().equals(IDENTIFIER)) {
+ currentSubFolder = dir;
+ return FileVisitResult.CONTINUE;
+ } else if (dir.getParent().getFileName().toString().equals(IDENTIFIER)) {
+ Path targetPath = toPath.resolve(dirName);
+ if (!Files.exists(targetPath)) {
+ Files.createDirectory(targetPath);
+ }
+ return FileVisitResult.CONTINUE;
+ } else if (dirName.toString().length() == 1 && Character.isDigit(dirName.toString().charAt(0))) {
+ int dirNameInt = Integer.parseInt(dirName.toString());
+ if (dirNameInt <= revision) {
+ return FileVisitResult.CONTINUE;
+ } else {
+ return FileVisitResult.SKIP_SUBTREE;
+ }
+ } else {
+ return FileVisitResult.CONTINUE;
+ }
+ }
+
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ if (file.getFileName().toString().endsWith(".tar.gz"))
+ return FileVisitResult.CONTINUE;
+ System.out.println("Restore " + file + " to " + toPath.resolve(currentSubFolder.relativize(file)));
+ Files.copy(file, toPath.resolve(currentSubFolder.relativize(file)), StandardCopyOption.REPLACE_EXISTING);
+ return FileVisitResult.CONTINUE;
+ }
+ }
+
+ private static class AcornBackupRunnable implements Runnable, Future<BackupException> {
+
+ private final Semaphore lock;
+ private final Path targetPath;
+ private final int revision;
+ private final Path baseDir;
+ private final int latestFolder;
+ private final int newestFolder;
+
+ private boolean done = false;
+ private final Semaphore completion = new Semaphore(0);
+ private BackupException exception = null;
+
+ public AcornBackupRunnable(Semaphore lock, Path targetPath, int revision,
+ Path baseDir, int latestFolder, int newestFolder) {
+ this.lock = lock;
+ this.targetPath = targetPath;
+ this.revision = revision;
+ this.baseDir = baseDir;
+ this.latestFolder = latestFolder;
+ this.newestFolder = newestFolder;
+ }
+
+ @Override
+ public void run() {
+ try {
+ doBackup();
+ writeHeadstateFile();
+ } catch (IOException e) {
+ exception = new BackupException("Acorn backup failed", e);
+ rollback();
+ } finally {
+ done = true;
+ lock.release();
+ completion.release();
+ }
+ }
+
+ private void doBackup() throws IOException {
+ Path target = targetPath.resolve(String.valueOf(revision)).resolve(IDENTIFIER);
+ if (!Files.exists(target))
+ Files.createDirectories(target);
+ Files.walkFileTree(baseDir,
+ new BackupCopyVisitor(baseDir, target));
+ }
+
+ private void writeHeadstateFile() throws IOException {
+ Path AcornMetadataFile = getAcornMetadataFile(baseDir);
+ if (!Files.exists(AcornMetadataFile)) {
+ Files.createFile(AcornMetadataFile);
+ }
+ Files.write(AcornMetadataFile,
+ Arrays.asList(Integer.toString(newestFolder)),
+ StandardOpenOption.WRITE,
+ StandardOpenOption.TRUNCATE_EXISTING,
+ StandardOpenOption.CREATE);
+ }
+
+ private void rollback() {
+ // TODO
+ }
+
+ private class BackupCopyVisitor extends SimpleFileVisitor<Path> {
+
+ private Path fromPath;
+ private Path toPath;
+
+ public BackupCopyVisitor(Path fromPath, Path toPath) {
+ this.fromPath = fromPath;
+ this.toPath = toPath;
+ }
+
+ @Override
+ public FileVisitResult preVisitDirectory(Path dir,
+ BasicFileAttributes attrs) throws IOException {
+ Path dirName = dir.getFileName();
+ if (dirName.equals(fromPath)) {
+ Path targetPath = toPath.resolve(fromPath.relativize(dir));
+ if (!Files.exists(targetPath)) {
+ Files.createDirectory(targetPath);
+ }
+ return FileVisitResult.CONTINUE;
+ } else {
+ int dirNameInt = Integer.parseInt(dirName.toString());
+ if (latestFolder < dirNameInt && dirNameInt <= newestFolder) {
+ Path targetPath = toPath.resolve(fromPath
+ .relativize(dir));
+ if (!Files.exists(targetPath)) {
+ Files.createDirectory(targetPath);
+ }
+ return FileVisitResult.CONTINUE;
+ }
+ return FileVisitResult.SKIP_SUBTREE;
+ }
+ }
+
+ @Override
+ public FileVisitResult visitFile(Path file,
+ BasicFileAttributes attrs) throws IOException {
+ System.out.println("Backup " + file + " to "
+ + toPath.resolve(fromPath.relativize(file)));
+ Files.copy(file, toPath.resolve(fromPath.relativize(file)),
+ StandardCopyOption.REPLACE_EXISTING);
+ return FileVisitResult.CONTINUE;
+ }
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ public BackupException get() throws InterruptedException {
+ completion.acquire();
+ completion.release();
+ return exception;
+ }
+
+ @Override
+ public BackupException get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
+ if (completion.tryAcquire(timeout, unit))
+ completion.release();
+ else
+ throw new TimeoutException("Acorn backup completion waiting timed out.");
+ return exception;
+ }
+
+ }
+
+}
--- /dev/null
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.acorn.cluster;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.simantics.acorn.exception.AcornAccessVerificationException;
+import org.simantics.acorn.exception.IllegalAcornStateException;
+import org.simantics.acorn.internal.ClusterChange;
+import org.simantics.acorn.internal.ClusterStream;
+import org.simantics.acorn.internal.ClusterSupport2;
+import org.simantics.acorn.internal.DebugPolicy;
+import org.simantics.db.Resource;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.ExternalValueException;
+import org.simantics.db.exception.ValidationException;
+import org.simantics.db.impl.ClusterI;
+import org.simantics.db.impl.ClusterI.PredicateProcedure;
+import org.simantics.db.impl.ClusterSupport;
+import org.simantics.db.impl.ClusterTraitsBase;
+import org.simantics.db.impl.ForEachObjectContextProcedure;
+import org.simantics.db.impl.ForEachObjectProcedure;
+import org.simantics.db.impl.ForPossibleRelatedValueContextProcedure;
+import org.simantics.db.impl.ForPossibleRelatedValueProcedure;
+import org.simantics.db.impl.IClusterTable;
+import org.simantics.db.impl.Table;
+import org.simantics.db.impl.TableHeader;
+import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.query.QueryProcessor;
+import org.simantics.db.procedure.AsyncContextMultiProcedure;
+import org.simantics.db.procedure.AsyncMultiProcedure;
+import org.simantics.db.procore.cluster.ClusterMap;
+import org.simantics.db.procore.cluster.ClusterPrintDebugInfo;
+import org.simantics.db.procore.cluster.ClusterTraits;
+import org.simantics.db.procore.cluster.CompleteTable;
+import org.simantics.db.procore.cluster.FlatTable;
+import org.simantics.db.procore.cluster.ForeignTable;
+import org.simantics.db.procore.cluster.ObjectTable;
+import org.simantics.db.procore.cluster.PredicateTable;
+import org.simantics.db.procore.cluster.ResourceTable;
+import org.simantics.db.procore.cluster.ValueTable;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.datastructures.Callback;
+
+final public class ClusterBig extends ClusterImpl {
+ private static final int TABLE_HEADER_SIZE = TableHeader.HEADER_SIZE + TableHeader.EXTRA_SIZE;
+ private static final int RESOURCE_TABLE_OFFSET = 0;
+ private static final int PREDICATE_TABLE_OFFSET = RESOURCE_TABLE_OFFSET + TABLE_HEADER_SIZE;
+ private static final int OBJECT_TABLE_OFFSET = PREDICATE_TABLE_OFFSET + TABLE_HEADER_SIZE;
+ private static final int VALUE_TABLE_OFFSET = OBJECT_TABLE_OFFSET + TABLE_HEADER_SIZE;
+ private static final int FLAT_TABLE_OFFSET = VALUE_TABLE_OFFSET + TABLE_HEADER_SIZE;
+ private static final int COMPLETE_TABLE_OFFSET = FLAT_TABLE_OFFSET + TABLE_HEADER_SIZE;
+ private static final int FOREIGN_TABLE_OFFSET = COMPLETE_TABLE_OFFSET + TABLE_HEADER_SIZE;
+ private static final int INT_HEADER_SIZE = FOREIGN_TABLE_OFFSET + TABLE_HEADER_SIZE;
+ private final int clusterBits;
+ final private ResourceTable resourceTable;
+ //final private ResourceTable movedResourceTable;
+ final private PredicateTable predicateTable;
+ final private ObjectTable objectTable;
+ final private ValueTable valueTable;
+ final private FlatTable flatTable;
+ final private ForeignTable foreignTable;
+ final private CompleteTable completeTable;
+ final private ClusterMap clusterMap;
+ final private int[] headerTable;
+ final private ClusterSupport2 clusterSupport;
+
+ public ClusterBig(IClusterTable clusterTable, ClusterUID clusterUID, int clusterKey, ClusterSupport2 support) {
+ super(clusterTable, clusterUID, clusterKey, support);
+ if(DebugPolicy.REPORT_CLUSTER_EVENTS)
+ new Exception(getClusterUID().toString()).printStackTrace();
+ this.headerTable = new int[INT_HEADER_SIZE];
+ this.resourceTable = new ResourceTable(this, headerTable, RESOURCE_TABLE_OFFSET);
+ this.foreignTable = new ForeignTable(this, headerTable, FOREIGN_TABLE_OFFSET);
+ this.predicateTable = new PredicateTable(this, headerTable, PREDICATE_TABLE_OFFSET);
+ this.objectTable = new ObjectTable(this, headerTable, OBJECT_TABLE_OFFSET);
+ this.valueTable = new ValueTable(this, headerTable, VALUE_TABLE_OFFSET);
+ this.completeTable = new CompleteTable(this, headerTable, COMPLETE_TABLE_OFFSET);
+ this.flatTable = null;
+ this.clusterMap = new ClusterMap(foreignTable, flatTable);
+ this.clusterSupport = support;
+ this.clusterBits = ClusterTraitsBase.getClusterBits(clusterKey);
+ this.importance = 0;
+// clusterTable.setDirtySizeInBytes(true);
+ }
+ protected ClusterBig(IClusterTable clusterTable, long[] longs, int[] ints, byte[] bytes, ClusterSupport2 support, int clusterKey)
+ throws DatabaseException {
+ super(clusterTable, checkValidity(0, longs, ints, bytes), clusterKey, support);
+ if(DebugPolicy.REPORT_CLUSTER_EVENTS)
+ new Exception(getClusterUID().toString()).printStackTrace();
+ if (ints.length < INT_HEADER_SIZE)
+ throw new IllegalArgumentException("Too small integer table for cluster.");
+ this.headerTable = ints;
+ this.resourceTable = new ResourceTable(this, ints, RESOURCE_TABLE_OFFSET, longs);
+ this.foreignTable = new ForeignTable(this, headerTable, FOREIGN_TABLE_OFFSET, longs);
+ this.predicateTable = new PredicateTable(this, ints, PREDICATE_TABLE_OFFSET, ints);
+ this.objectTable = new ObjectTable(this, ints, OBJECT_TABLE_OFFSET, ints);
+ this.valueTable = new ValueTable(this, ints, VALUE_TABLE_OFFSET, bytes);
+ this.flatTable = null;
+ this.completeTable = new CompleteTable(this, headerTable, COMPLETE_TABLE_OFFSET, ints);
+ this.clusterMap = new ClusterMap(foreignTable, flatTable);
+ this.clusterSupport = support;
+ this.clusterBits = ClusterTraitsBase.getClusterBits(clusterKey);
+ }
+ void analyse() {
+ System.out.println("Cluster " + clusterId);
+ System.out.println("-size:" + getUsedSpace());
+ System.out.println(" -rt:" + (resourceTable.getTableCapacity() * 8 + 8));
+ System.out.println(" -ft:" + foreignTable.getTableCapacity() * 8);
+ System.out.println(" -pt:" + predicateTable.getTableCapacity() * 4);
+ System.out.println(" -ot:" + objectTable.getTableCapacity() * 4);
+ System.out.println(" -ct:" + completeTable.getTableCapacity() * 4);
+ System.out.println(" -vt:" + valueTable.getTableCapacity());
+
+ System.out.println("-resourceTable:");
+ System.out.println(" -resourceCount=" + resourceTable.getResourceCount());
+ System.out.println(" -size=" + resourceTable.getTableSize());
+ System.out.println(" -capacity=" + resourceTable.getTableCapacity());
+ System.out.println(" -count=" + resourceTable.getTableCount());
+ System.out.println(" -size=" + resourceTable.getTableSize());
+ //resourceTable.analyse();
+ }
+ public void checkDirectReference(int dr)
+ throws DatabaseException {
+ if (!ClusterTraits.statementIndexIsDirect(dr))
+ throw new ValidationException("Reference is not direct. Reference=" + dr);
+ if (ClusterTraits.isFlat(dr))
+ throw new ValidationException("Reference is flat. Reference=" + dr);
+ if (ClusterTraits.isLocal(dr)) {
+ if (dr < 1 || dr > resourceTable.getUsedSize())
+ throw new ValidationException("Illegal local reference. Reference=" + dr);
+ } else {
+ int fi = ClusterTraits.getForeignIndexFromReference(dr);
+ int ri = ClusterTraits.getResourceIndexFromForeignReference(dr);
+ if (fi < 1 || fi > foreignTable.getUsedSize())
+ throw new ValidationException("Illegal foreign reference. Reference=" + dr + " foreign index=" + fi);
+ if (ri < 1 || ri > ClusterTraits.getMaxNumberOfResources())
+ throw new ValidationException("Illegal foreign reference. Reference=" + dr + " resource index=" + ri);
+ }
+ }
+ public void checkPredicateIndex(int pi)
+ throws DatabaseException {
+ predicateTable.checkPredicateSetIndex(this, pi);
+ }
+ public void checkObjectSetReference(int or)
+ throws DatabaseException {
+ if (ClusterTraits.statementIndexIsDirect(or))
+ throw new ValidationException("Illegal object set reference. Reference=" + or);
+ int oi = ClusterTraits.statementIndexGet(or);
+ this.objectTable.checkObjectSetIndex(this, oi);
+ }
+
+ public void checkValueInit()
+ throws DatabaseException {
+ valueTable.checkValueInit();
+ }
+ public void checkValue(int capacity, int index)
+ throws DatabaseException {
+ valueTable.checkValue(capacity, index);
+ }
+ public void checkValueFini()
+ throws DatabaseException {
+ valueTable.checkValueFini();
+ }
+ public void checkForeingIndex(int fi)
+ throws DatabaseException {
+ if (fi<1 || fi > foreignTable.getUsedSize())
+ throw new ValidationException("Illegal foreign index=" + fi);
+ }
+ public void checkCompleteSetReference(int cr)
+ throws DatabaseException {
+ if (!ClusterTraits.completeReferenceIsMultiple(cr))
+ throw new ValidationException("Illegal complete set reference. Reference=" + cr);
+ int ci = cr;
+ this.completeTable.checkCompleteSetIndex(this, ci);
+ }
+ public void check()
+ throws DatabaseException {
+ this.completeTable.check(this);
+ this.objectTable.check(this);
+ // Must be after object table check.
+ this.predicateTable.check(this);
+ this.resourceTable.check(this);
+ }
+ @Override
+ public CompleteTypeEnum getCompleteType(int resourceKey, ClusterSupport support)
+ throws DatabaseException {
+ final int resourceRef = getLocalReference(resourceKey);
+ int completeRef = resourceTable.getCompleteObjectRef(resourceRef);
+ CompleteTypeEnum ct = ClusterTraits.completeReferenceGetType(completeRef);
+ if (DEBUG)
+ System.out.println("Cluster.getCompleteType rk=" + resourceKey + " ct=" + ct);
+ int i = ct.getValue();
+ switch (i) {
+ case 0: return CompleteTypeEnum.NotComplete;
+ case 1: return CompleteTypeEnum.InstanceOf;
+ case 2: return CompleteTypeEnum.Inherits;
+ case 3: return CompleteTypeEnum.SubrelationOf;
+ default: throw new DatabaseException("Illegal complete type enumeration.");
+ }
+ }
+
+ @Override
+ public int getCompleteObjectKey(int resourceKey, ClusterSupport support)
+ throws DatabaseException {
+ final int resourceRef = getLocalReference(resourceKey);
+ int completeRef = resourceTable.getCompleteObjectRef(resourceRef);
+ int clusterIndex;
+ int resourceIndex = ClusterTraits.completeReferenceGetResourceIndex(completeRef);
+
+ ClusterI.CompleteTypeEnum completeType = ClusterTraits.completeReferenceGetType(completeRef);
+ if (completeType == ClusterI.CompleteTypeEnum.NotComplete)
+ throw new DatabaseException("Resource has multiple complete objects. Resource key=" + resourceKey + ".");
+
+ if (ClusterTraits.completeReferenceIsLocal(completeRef)) {
+ clusterIndex = clusterKey;
+ } else {
+ int foreignIndex = ClusterTraits.completeReferenceGetForeignIndex(completeRef);
+// System.err.println("completeRef=" + completeRef + " foreignIndex=" + foreignIndex );
+ ClusterUID clusterUID = foreignTable.getResourceUID(foreignIndex).asCID();
+ ClusterI c = support.getClusterByClusterUIDOrMake(clusterUID);
+ clusterIndex = c.getClusterKey();
+ }
+ int key = ClusterTraits.createResourceKey(clusterIndex, resourceIndex);
+ if (DEBUG)
+ System.out.println("Cluster.complete object rk=" + resourceKey + " ck=" + key);
+ return key;
+ }
+
+ @Override
+ public boolean isComplete(int resourceKey, ClusterSupport support)
+ throws DatabaseException {
+ final int resourceRef = getLocalReference(resourceKey);
+ int completeRef = resourceTable.getCompleteObjectRef(resourceRef);
+ ClusterI.CompleteTypeEnum completeType = ClusterTraits.completeReferenceGetType(completeRef);
+ boolean complete = completeType != ClusterI.CompleteTypeEnum.NotComplete;
+ if (DEBUG)
+ System.out.println("Cluster.key=" + resourceKey + " isComplete=" + complete);
+ return complete;
+ }
+
+ public int getSingleObject(int resourceKey, int predicateKey, int objectIndex, ClusterSupport support) throws DatabaseException {
+ if (DEBUG)
+ System.out.println("Cluster.getSingleObject: rk=" + resourceKey + " pk=" + predicateKey);
+ if (0 == objectIndex) {
+ final int resourceIndex = getLocalReference(resourceKey);
+ final int pRef = getInternalReferenceOrZero(predicateKey, support);
+ final ClusterI.CompleteTypeEnum pCompleteType = ClusterTraitsBase.getCompleteTypeFromResourceKey(predicateKey);
+ return resourceTable.getSingleObject(resourceIndex, support, pRef, pCompleteType, completeTable, this);
+ }
+ return objectTable.getSingleObject(objectIndex, support, this);
+ }
+
+ public void forObjects(int resourceKey, int predicateKey, int objectIndex, QueryProcessor processor, ReadGraphImpl graph, AsyncMultiProcedure<Resource> procedure,
+ ClusterSupport support) throws DatabaseException {
+ if (DEBUG)
+ System.out.println("Cluster.forObjects1: rk=" + resourceKey + " pk=" + predicateKey);
+ if (0 == objectIndex) {
+ final int resourceIndex = getLocalReference(resourceKey);
+ final int pRef = getInternalReferenceOrZero(predicateKey, support);
+ final ClusterI.CompleteTypeEnum pCompleteType = ClusterTraitsBase.getCompleteTypeFromResourceKey(predicateKey);
+ resourceTable.foreachObject(resourceIndex, graph, procedure, support, pRef, pCompleteType, completeTable, this);
+ return;
+ }
+ objectTable.foreachObject(graph, objectIndex, procedure, this);
+ }
+ public <C> void forObjects(int resourceKey, int predicateKey, int objectIndex, QueryProcessor processor, ReadGraphImpl graph, C context, AsyncContextMultiProcedure<C, Resource> procedure,
+ ClusterSupport support) throws DatabaseException {
+ if (DEBUG)
+ System.out.println("Cluster.forObjects1: rk=" + resourceKey + " pk=" + predicateKey);
+ if (0 == objectIndex) {
+ final int resourceIndex = getLocalReference(resourceKey);
+ final int pRef = getInternalReferenceOrZero(predicateKey, support);
+ final ClusterI.CompleteTypeEnum pCompleteType = ClusterTraitsBase.getCompleteTypeFromResourceKey(predicateKey);
+ resourceTable.foreachObject(resourceIndex, graph, context, procedure, support, pRef, pCompleteType, completeTable, this);
+ return;
+ }
+ objectTable.foreachObject(graph, objectIndex, context, procedure, this);
+ }
+ @Override
+ public <Context> boolean forObjects(int resourceKey, int predicateKey, int objectIndex, ObjectProcedure<Context> procedure,
+ Context context, ClusterSupport support) throws DatabaseException {
+ if (DEBUG)
+ System.out.println("Cluster.forObjects2: rk=" + resourceKey + " pk=" + predicateKey);
+ if (0 == objectIndex) {
+ final int resourceIndex = getLocalReference(resourceKey);
+ final int pRef = getInternalReferenceOrZero(predicateKey, support);
+ final ClusterI.CompleteTypeEnum pCompleteType = ClusterTraitsBase.getCompleteTypeFromResourceKey(predicateKey);
+ return resourceTable.foreachObject(resourceIndex, procedure, context, support, this, pRef, pCompleteType, completeTable);
+ }
+ return objectTable.foreachObject(objectIndex, procedure, context, support, this);
+ }
+
+ @Override
+ public int getSingleObject(int resourceKey, int predicateKey, ClusterSupport support) throws DatabaseException {
+ if (DEBUG)
+ System.out.println("Cluster.getSingleObject2: rk=" + resourceKey + " pk=" + predicateKey);
+ final int resourceIndex = getLocalReference(resourceKey);
+ final int pRef = getInternalReferenceOrZero(predicateKey, support);
+ final ClusterI.CompleteTypeEnum pCompleteType = ClusterTraitsBase.getCompleteTypeFromResourceKey(predicateKey);
+ if (ClusterI.CompleteTypeEnum.NotComplete != pCompleteType)
+ return resourceTable.getSingleObject(resourceIndex, support, pRef, pCompleteType, completeTable, this);
+ final int predicateIndex = resourceTable.getPredicateIndex(resourceIndex);
+ if (0 == predicateIndex)
+ return resourceTable.getSingleObject(resourceIndex, support, pRef, pCompleteType, completeTable, this);
+ int objectIndex = predicateTable.getObjectIndex(predicateIndex, pRef);
+ return getSingleObject(resourceKey, predicateKey, objectIndex, support);
+ }
+
+ @Override
+ public <T> int getSingleObject(int resourceKey, ForPossibleRelatedValueProcedure<T> procedure, ClusterSupport support) throws DatabaseException {
+ final int predicateKey = procedure.predicateKey;
+ if (DEBUG)
+ System.out.println("Cluster.getSingleObject2: rk=" + resource