-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package fi.vtt.simantics.procore.internal;\r
-\r
-import java.io.File;\r
-import java.io.IOException;\r
-import java.io.InputStream;\r
-import java.nio.charset.Charset;\r
-import java.nio.file.FileVisitResult;\r
-import java.nio.file.Files;\r
-import java.nio.file.Path;\r
-import java.nio.file.SimpleFileVisitor;\r
-import java.nio.file.attribute.BasicFileAttributes;\r
-import java.util.ArrayList;\r
-import java.util.Collection;\r
-import java.util.HashSet;\r
-import java.util.TreeMap;\r
-import java.util.concurrent.CopyOnWriteArrayList;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.atomic.AtomicInteger;\r
-\r
-import org.eclipse.core.runtime.Platform;\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.db.AsyncReadGraph;\r
-import org.simantics.db.ChangeSet;\r
-import org.simantics.db.DevelopmentKeys;\r
-import org.simantics.db.ExternalValueSupport;\r
-import org.simantics.db.Metadata;\r
-import org.simantics.db.MonitorContext;\r
-import org.simantics.db.MonitorHandler;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.ResourceSerializer;\r
-import org.simantics.db.Session;\r
-import org.simantics.db.SessionManager;\r
-import org.simantics.db.SessionVariables;\r
-import org.simantics.db.VirtualGraph;\r
-import org.simantics.db.WriteGraph;\r
-import org.simantics.db.authentication.UserAuthenticationAgent;\r
-import org.simantics.db.authentication.UserAuthenticator;\r
-import org.simantics.db.common.Indexing;\r
-import org.simantics.db.common.TransactionPolicyRelease;\r
-import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;\r
-import org.simantics.db.common.procedure.adapter.ProcedureAdapter;\r
-import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;\r
-import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;\r
-import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;\r
-import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;\r
-import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;\r
-import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;\r
-import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;\r
-import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;\r
-import org.simantics.db.common.service.ServiceActivityMonitorImpl;\r
-import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.event.ChangeEvent;\r
-import org.simantics.db.event.ChangeListener;\r
-import org.simantics.db.event.SessionEventListener;\r
-import org.simantics.db.exception.CancelTransactionException;\r
-import org.simantics.db.exception.ClusterSetExistException;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.exception.ImmutableException;\r
-import org.simantics.db.exception.InvalidResourceReferenceException;\r
-import org.simantics.db.exception.ResourceNotFoundException;\r
-import org.simantics.db.exception.RuntimeDatabaseException;\r
-import org.simantics.db.exception.ServiceException;\r
-import org.simantics.db.exception.ServiceNotFoundException;\r
-import org.simantics.db.impl.ClusterBase;\r
-import org.simantics.db.impl.ClusterI;\r
-import org.simantics.db.impl.ClusterTraitsBase;\r
-import org.simantics.db.impl.ClusterTranslator;\r
-import org.simantics.db.impl.ResourceImpl;\r
-import org.simantics.db.impl.TransientGraph;\r
-import org.simantics.db.impl.VirtualGraphImpl;\r
-import org.simantics.db.impl.graph.DelayedWriteGraph;\r
-import org.simantics.db.impl.graph.ReadGraphImpl;\r
-import org.simantics.db.impl.graph.WriteGraphImpl;\r
-import org.simantics.db.impl.graph.WriteSupport;\r
-import org.simantics.db.impl.internal.RandomAccessValueSupport;\r
-import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;\r
-import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;\r
-import org.simantics.db.impl.query.QueryProcessor;\r
-import org.simantics.db.impl.query.QueryProcessor.SessionRead;\r
-import org.simantics.db.impl.query.QueryProcessor.SessionTask;\r
-import org.simantics.db.impl.service.QueryDebug;\r
-import org.simantics.db.impl.support.VirtualGraphServerSupport;\r
-import org.simantics.db.impl.support.WriteRequestScheduleSupport;\r
-import org.simantics.db.procedure.AsyncListener;\r
-import org.simantics.db.procedure.AsyncMultiListener;\r
-import org.simantics.db.procedure.AsyncMultiProcedure;\r
-import org.simantics.db.procedure.AsyncProcedure;\r
-import org.simantics.db.procedure.Listener;\r
-import org.simantics.db.procedure.ListenerBase;\r
-import org.simantics.db.procedure.MultiListener;\r
-import org.simantics.db.procedure.MultiProcedure;\r
-import org.simantics.db.procedure.Procedure;\r
-import org.simantics.db.procedure.SyncListener;\r
-import org.simantics.db.procedure.SyncMultiListener;\r
-import org.simantics.db.procedure.SyncMultiProcedure;\r
-import org.simantics.db.procedure.SyncProcedure;\r
-import org.simantics.db.procore.cluster.ClusterImpl;\r
-import org.simantics.db.procore.cluster.ClusterTraits;\r
-import org.simantics.db.procore.protocol.Constants;\r
-import org.simantics.db.procore.protocol.DebugPolicy;\r
-import org.simantics.db.request.AsyncMultiRead;\r
-import org.simantics.db.request.AsyncRead;\r
-import org.simantics.db.request.DelayedWrite;\r
-import org.simantics.db.request.DelayedWriteResult;\r
-import org.simantics.db.request.ExternalRead;\r
-import org.simantics.db.request.MultiRead;\r
-import org.simantics.db.request.Read;\r
-import org.simantics.db.request.ReadInterface;\r
-import org.simantics.db.request.UndoTraits;\r
-import org.simantics.db.request.Write;\r
-import org.simantics.db.request.WriteInterface;\r
-import org.simantics.db.request.WriteOnly;\r
-import org.simantics.db.request.WriteOnlyResult;\r
-import org.simantics.db.request.WriteResult;\r
-import org.simantics.db.request.WriteTraits;\r
-import org.simantics.db.service.ByteReader;\r
-import org.simantics.db.service.ClusterBuilder;\r
-import org.simantics.db.service.ClusterBuilderFactory;\r
-import org.simantics.db.service.ClusterControl;\r
-import org.simantics.db.service.ClusterSetsSupport;\r
-import org.simantics.db.service.ClusterUID;\r
-import org.simantics.db.service.ClusteringSupport;\r
-import org.simantics.db.service.CollectionSupport;\r
-import org.simantics.db.service.DebugSupport;\r
-import org.simantics.db.service.DirectQuerySupport;\r
-import org.simantics.db.service.GraphChangeListenerSupport;\r
-import org.simantics.db.service.InitSupport;\r
-import org.simantics.db.service.LifecycleSupport;\r
-import org.simantics.db.service.ManagementSupport;\r
-import org.simantics.db.service.QueryControl;\r
-import org.simantics.db.service.SerialisationSupport;\r
-import org.simantics.db.service.ServerInformation;\r
-import org.simantics.db.service.ServiceActivityMonitor;\r
-import org.simantics.db.service.SessionEventSupport;\r
-import org.simantics.db.service.SessionMonitorSupport;\r
-import org.simantics.db.service.SessionUserSupport;\r
-import org.simantics.db.service.StatementSupport;\r
-import org.simantics.db.service.TransactionPolicySupport;\r
-import org.simantics.db.service.TransactionSupport;\r
-import org.simantics.db.service.TransferableGraphSupport;\r
-import org.simantics.db.service.UndoRedoSupport;\r
-import org.simantics.db.service.VirtualGraphSupport;\r
-import org.simantics.db.service.XSupport;\r
-import org.simantics.layer0.Layer0;\r
-import org.simantics.utils.DataContainer;\r
-import org.simantics.utils.Development;\r
-import org.simantics.utils.datastructures.Callback;\r
-import org.simantics.utils.threads.logger.ITask;\r
-import org.simantics.utils.threads.logger.ThreadLogger;\r
-\r
-import gnu.trove.procedure.TLongProcedure;\r
-import gnu.trove.set.hash.TLongHashSet;\r
-\r
-\r
-public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {\r
-\r
- protected static final boolean DEBUG = false;\r
-\r
- private static final boolean DIAGNOSTICS = false;\r
-\r
- private TransactionPolicySupport transactionPolicy;\r
-\r
- final private ClusterControl clusterControl;\r
-\r
- final protected BuiltinSupportImpl builtinSupport;\r
- final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;\r
- final protected ClusterSetsSupport clusterSetsSupport;\r
- final protected LifecycleSupportImpl lifecycleSupport;\r
-\r
- protected QuerySupportImpl querySupport;\r
- protected ResourceSupportImpl resourceSupport;\r
- protected WriteSupport writeSupport;\r
- public ClusterTranslator clusterTranslator;\r
-\r
- boolean dirtyPrimitives = false;\r
-\r
- public static final int SERVICE_MODE_CREATE = 2;\r
- public static final int SERVICE_MODE_ALLOW = 1;\r
- public int serviceMode = 0;\r
- public boolean createdImmutableClusters = false;\r
- public TLongHashSet createdClusters = new TLongHashSet();\r
-\r
- private Layer0 L0;\r
-\r
- /**\r
- * The service locator maintained by the workbench. These services are\r
- * initialized during workbench during the <code>init</code> method.\r
- */\r
- final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();\r
-\r
- final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();\r
-\r
- final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();\r
-\r
- final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();\r
-\r
- final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();\r
-\r
- final HashSet<Thread> sessionThreads = new HashSet<Thread>();\r
-\r
- final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();\r
-\r
- final protected State state = new State();\r
-\r
- protected GraphSession graphSession = null;\r
-\r
- protected SessionManager sessionManagerImpl = null;\r
-\r
- protected UserAuthenticationAgent authAgent = null;\r
-\r
- protected UserAuthenticator authenticator = null;\r
-\r
- protected Resource user = null;\r
-\r
- protected ClusterStream clusterStream = null;\r
-\r
- protected SessionRequestManager requestManager = null;\r
-\r
- public ClusterTable clusterTable = null;\r
-\r
- public QueryProcessor queryProvider2 = null;\r
-\r
- ClientChangesImpl clientChanges = null;\r
-\r
- MonitorHandler[] monitorHandlers = new MonitorHandler[0];\r
-\r
-// protected long newClusterId = Constants.NullClusterId;\r
-\r
- protected int flushCounter = 0;\r
-\r
- protected boolean writeOnly = false;\r
-\r
- WriteState<?> writeState = null;\r
- WriteStateBase<?> delayedWriteState = null;\r
- protected Resource defaultClusterSet = null; // If not null then used for newResource().\r
-\r
- public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {\r
-\r
-// if (authAgent == null)\r
-// throw new IllegalArgumentException("null authentication agent");\r
-\r
- File t = StaticSessionProperties.virtualGraphStoragePath;\r
- if (null == t)\r
- t = new File(".");\r
- this.clusterTable = new ClusterTable(this, t);\r
- this.builtinSupport = new BuiltinSupportImpl(this);\r
- this.sessionManagerImpl = sessionManagerImpl;\r
- this.user = null;\r
-// this.authAgent = authAgent;\r
-\r
- serviceLocator.registerService(Session.class, this);\r
- serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));\r
- serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));\r
- serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));\r
- serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));\r
- serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));\r
- serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));\r
- serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));\r
- serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));\r
- serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));\r
- serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));\r
- serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));\r
- serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));\r
- serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));\r
- serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));\r
- serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));\r
- serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));\r
- serviceLocator.registerService(XSupport.class, new XSupportImpl(this));\r
- serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());\r
- serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));\r
- serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());\r
- serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());\r
- ServiceActivityUpdaterForWriteTransactions.register(this);\r
-\r
- this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);\r
- serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);\r
- serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);\r
- this.lifecycleSupport = new LifecycleSupportImpl(this);\r
- serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);\r
- this.transactionPolicy = new TransactionPolicyRelease();\r
- serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);\r
- this.clusterControl = new ClusterControlImpl(this);\r
- serviceLocator.registerService(ClusterControl.class, clusterControl);\r
- this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.\r
- this.clusterSetsSupport.updateReadAndWriteDirectories(t.toPath(), t.toPath());\r
- serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);\r
- }\r
-\r
- /*\r
- *\r
- * Session interface\r
- */\r
-\r
-\r
- @Override\r
- public Resource getRootLibrary() {\r
- return queryProvider2.getRootLibraryResource();\r
- }\r
-\r
- void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {\r
- if (!graphSession.dbSession.refreshEnabled())\r
- return;\r
- try {\r
- getClusterTable().refresh(csid, this, clusterUID);\r
- } catch (Throwable t) {\r
- Logger.defaultLogError("Refesh failed.", t);\r
- }\r
- }\r
-\r
- static final class TaskHelper {\r
- private final String name;\r
- private Object result;\r
- final Semaphore sema = new Semaphore(0);\r
- private Throwable throwable = null;\r
- final Callback<DatabaseException> callback = new Callback<DatabaseException>() {\r
- @Override\r
- public void run(DatabaseException e) {\r
- synchronized (TaskHelper.this) {\r
- throwable = e;\r
- }\r
- }\r
- };\r
- final Procedure<Object> proc = new Procedure<Object>() {\r
- @Override\r
- public void execute(Object result) {\r
- callback.run(null);\r
- }\r
- @Override\r
- public void exception(Throwable t) {\r
- if (t instanceof DatabaseException)\r
- callback.run((DatabaseException)t);\r
- else\r
- callback.run(new DatabaseException("" + name + "operation failed.", t));\r
- }\r
- };\r
- final WriteTraits writeTraits = new WriteTraits() {\r
- @Override\r
- public UndoTraits getUndoTraits() {\r
- return null;\r
- }\r
- @Override\r
- public VirtualGraph getProvider() {\r
- return null;\r
- }\r
- };\r
- TaskHelper(String name) {\r
- this.name = name;\r
- }\r
- <T> T getResult() {\r
- return (T)result;\r
- }\r
- void setResult(Object result) {\r
- this.result = result;\r
- }\r
-// Throwable throwableGet() {\r
-// return throwable;\r
-// }\r
- synchronized void throwableSet(Throwable t) {\r
- throwable = t;\r
- }\r
-// void throwableSet(String t) {\r
-// throwable = new InternalException("" + name + " operation failed. " + t);\r
-// }\r
- synchronized void throwableCheck()\r
- throws DatabaseException {\r
- if (null != throwable)\r
- if (throwable instanceof DatabaseException)\r
- throw (DatabaseException)throwable;\r
- else\r
- throw new DatabaseException("Undo operation failed.", throwable);\r
- }\r
- void throw_(String message)\r
- throws DatabaseException {\r
- throw new DatabaseException("" + name + " operation failed. " + message);\r
- }\r
- }\r
-\r
-\r
-\r
- private ListenerBase getListenerBase(Object procedure) {\r
- if (procedure instanceof ListenerBase)\r
- return (ListenerBase) procedure;\r
- else\r
- return null;\r
- }\r
-\r
- public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {\r
- scheduleRequest(request, callback, notify, null);\r
- }\r
-\r
- /* (non-Javadoc)\r
- * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)\r
- */\r
- @Override\r
- public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {\r
-\r
- assert (request != null);\r
-\r
- if(Development.DEVELOPMENT) {\r
- try {\r
- if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))\r
- System.err.println("schedule write '" + request + "'");\r
- } catch (Throwable t) {\r
- Logger.defaultLogError(t);\r
- }\r
- }\r
-\r
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
-\r
- requestManager.scheduleWrite(new SessionTask(request, thread, thread) {\r
-\r
- @Override\r
- public void run(int thread) {\r
-\r
- if(Development.DEVELOPMENT) {\r
- try {\r
- if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))\r
- System.err.println("perform write '" + request + "'");\r
- } catch (Throwable t) {\r
- Logger.defaultLogError(t);\r
- }\r
- }\r
-\r
- ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
-\r
- try {\r
-\r
- flushCounter = 0;\r
- clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
-\r
- VirtualGraph vg = getProvider(request.getProvider());\r
-\r
- WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
- writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {\r
-\r
- @Override\r
- public void execute(Object result) {\r
- if(callback != null) callback.run(null);\r
- }\r
-\r
- @Override\r
- public void exception(Throwable t) {\r
- if(callback != null) callback.run((DatabaseException)t);\r
- }\r
-\r
- });\r
-\r
- assert (null != writer);\r
-// writer.state.barrier.inc();\r
-\r
- try {\r
- request.perform(writer);\r
- assert (null != writer);\r
- } catch (Throwable t) {\r
- if (!(t instanceof CancelTransactionException))\r
- Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);\r
- writeState.except(t);\r
- } finally {\r
-// writer.state.barrier.dec();\r
-// writer.waitAsync(request);\r
- }\r
-\r
-\r
- assert(!queryProvider2.dirty);\r
-\r
- } catch (Throwable e) {\r
-\r
- // Log it first, just to be safe that the error is always logged.\r
- if (!(e instanceof CancelTransactionException))\r
- Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);\r
-\r
-// writeState.getGraph().state.barrier.dec();\r
-// writeState.getGraph().waitAsync(request);\r
-\r
- writeState.except(e);\r
-\r
-// try {\r
-// // Callback is client code, we have to be prepared for it to throw unexpected exceptions.\r
-// // All we can do here is to log those, can't really pass them anywhere.\r
-// if (callback != null) {\r
-// if(e instanceof DatabaseException) callback.run((DatabaseException)e);\r
-// else callback.run(new DatabaseException(e));\r
-// }\r
-// } catch (Throwable e2) {\r
-// Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);\r
-// }\r
-// boolean empty = clusterStream.reallyFlush();\r
-// int callerThread = -1;\r
-// ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);\r
-// SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);\r
-// try {\r
-// // Send all local changes to server so it can calculate correct reverse change set.\r
-// // This will call clusterStream.accept().\r
-// state.cancelCommit(context, clusterStream);\r
-// if (!empty) {\r
-// if (!context.isOk()) // this is a blocking operation\r
-// throw new InternalException("Cancel failed. This should never happen. Contact application support.");\r
-// getQueryProvider2().performDirtyUpdates(writeState.getGraph());\r
-// }\r
-// state.cancelCommit2(context, clusterStream);\r
-// } catch (DatabaseException e3) {\r
-// Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);\r
-// } finally {\r
-// clusterStream.setOff(false);\r
-// clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
-// }\r
-\r
- } finally {\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
-\r
- }\r
-\r
- task.finish();\r
-\r
- if(Development.DEVELOPMENT) {\r
- try {\r
- if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))\r
- System.err.println("finish write '" + request + "'");\r
- } catch (Throwable t) {\r
- Logger.defaultLogError(t);\r
- }\r
- }\r
-\r
- }\r
-\r
- }, combine);\r
-\r
- }\r
-\r
- public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {\r
- scheduleRequest(request, procedure, notify, null);\r
- }\r
-\r
- /* (non-Javadoc)\r
- * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)\r
- */\r
- @Override\r
- public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {\r
-\r
- assert (request != null);\r
-\r
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
-\r
- requestManager.scheduleWrite(new SessionTask(request, thread) {\r
-\r
- @Override\r
- public void run(int thread) {\r
-\r
- ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
-\r
- flushCounter = 0;\r
- clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
-\r
- VirtualGraph vg = getProvider(request.getProvider());\r
- WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
-\r
- try {\r
- WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);\r
- writeState = writeStateT;\r
-\r
- assert (null != writer);\r
-// writer.state.barrier.inc();\r
- writeStateT.setResult(request.perform(writer));\r
- assert (null != writer);\r
-\r
-// writer.state.barrier.dec();\r
-// writer.waitAsync(null);\r
-\r
- } catch (Throwable e) {\r
-\r
-// writer.state.barrier.dec();\r
-// writer.waitAsync(null);\r
-\r
- writeState.except(e);\r
-\r
-// state.stopWriteTransaction(clusterStream);\r
-//\r
-// } catch (Throwable e) {\r
-// // Log it first, just to be safe that the error is always logged.\r
-// Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);\r
-//\r
-// try {\r
-// // Callback is client code, we have to be prepared for it to throw unexpected exceptions.\r
-// // All we can do here is to log those, can't really pass them anywhere.\r
-// if (procedure != null) {\r
-// if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);\r
-// else procedure.exception(new DatabaseException(e));\r
-// }\r
-// } catch (Throwable e2) {\r
-// Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);\r
-// }\r
-//\r
-// clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
-//\r
-// state.stopWriteTransaction(clusterStream);\r
-\r
- } finally {\r
- fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
- }\r
-\r
-// if(notify != null) notify.release();\r
-\r
- task.finish();\r
-\r
- }\r
-\r
- }, combine);\r
-\r
- }\r
-\r
- public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {\r
- scheduleRequest(request, callback, notify, null);\r
- }\r
-\r
- /* (non-Javadoc)\r
- * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)\r
- */\r
- @Override\r
- public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {\r
-\r
- final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");\r
-\r
- assert (request != null);\r
-\r
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
-\r
- requestManager.scheduleWrite(new SessionTask(request, thread) {\r
-\r
- @Override\r
- public void run(int thread) {\r
- fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
-\r
- Procedure<Object> stateProcedure = new Procedure<Object>() {\r
- @Override\r
- public void execute(Object result) {\r
- if (callback != null)\r
- callback.run(null);\r
- }\r
- @Override\r
- public void exception(Throwable t) {\r
- if (callback != null) {\r
- if (t instanceof DatabaseException) callback.run((DatabaseException) t);\r
- else callback.run(new DatabaseException(t));\r
- } else\r
- Logger.defaultLogError("Unhandled exception", t);\r
- }\r
- };\r
-\r
- final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
- delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);\r
- DelayedWriteGraph dwg = null;\r
-// newGraph.state.barrier.inc();\r
-\r
- try {\r
- dwg = new DelayedWriteGraph(newGraph);\r
- request.perform(dwg);\r
- } catch (Throwable e) {\r
- delayedWriteState.except(e);\r
- total.finish();\r
- return;\r
- } finally {\r
-// newGraph.state.barrier.dec();\r
-// newGraph.waitAsync(request);\r
- fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
- }\r
-\r
- delayedWriteState = null;\r
-\r
- ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");\r
- fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
-\r
- flushCounter = 0;\r
- clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
-\r
- acquireWriteOnly();\r
-\r
- VirtualGraph vg = getProvider(request.getProvider());\r
- WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();\r
-\r
- WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
-\r
- writeState = new WriteState<Object>(writer, request, notify, stateProcedure);\r
-\r
- assert (null != writer);\r
-// writer.state.barrier.inc();\r
-\r
- try {\r
- // Cannot cancel\r
- dwg.commit(writer, request);\r
-\r
- if(defaultClusterSet != null) {\r
- XSupport xs = getService(XSupport.class);\r
- defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);\r
- ClusteringSupport cs = getService(ClusteringSupport.class);\r
- clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));\r
- }\r
-\r
- // This makes clusters available from server\r
- clusterStream.reallyFlush();\r
-\r
- releaseWriteOnly(writer);\r
- handleUpdatesAndMetadata(writer);\r
-\r
- } catch (ServiceException e) {\r
-// writer.state.barrier.dec();\r
-// writer.waitAsync(null);\r
-\r
- // These shall be requested from server\r
- clusterTable.removeWriteOnlyClusters();\r
- // This makes clusters available from server\r
- clusterStream.reallyFlush();\r
-\r
- releaseWriteOnly(writer);\r
- writeState.except(e);\r
- } finally {\r
- // Debugging & Profiling\r
- fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
- task2.finish();\r
- total.finish();\r
- }\r
- }\r
-\r
- }, combine);\r
-\r
- }\r
-\r
- public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {\r
- scheduleRequest(request, procedure, notify, null);\r
- }\r
-\r
- /* (non-Javadoc)\r
- * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)\r
- */\r
- @Override\r
- public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {\r
- throw new Error("Not implemented");\r
- }\r
-\r
- protected ClusterImpl getNewResourceCluster() throws DatabaseException {\r
- ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);\r
- if((serviceMode & SERVICE_MODE_CREATE) > 0) {\r
- createdClusters.add(cluster.clusterId);\r
- }\r
- return cluster;\r
- }\r
-\r
- class WriteOnlySupport implements WriteSupport {\r
-\r
- ClusterStream stream;\r
- ClusterImpl currentCluster;\r
-\r
- public WriteOnlySupport() {\r
- this.stream = clusterStream;\r
- }\r
-\r
- @Override\r
- public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {\r
- claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );\r
- }\r
-\r
- @Override\r
- public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {\r
-\r
- ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);\r
-\r
- if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)\r
- if(s != queryProvider2.getRootLibrary())\r
- throw new ImmutableException("Trying to modify immutable resource key=" + s);\r
-\r
- try {\r
- maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError(e);\r
- //throw e;\r
- }\r
-\r
- clientChanges.invalidate(s);\r
-\r
- if (cluster.isWriteOnly())\r
- return;\r
- queryProvider2.updateStatements(s, p);\r
-\r
- }\r
-\r
- @Override\r
- public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {\r
- claimValue(provider, ((ResourceImpl)resource).id, value, value.length);\r
- }\r
-\r
- @Override\r
- public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {\r
-\r
- ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);\r
- try {\r
- maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError(e);\r
- }\r
-\r
- clientChanges.invalidate(rid);\r
-\r
- if (cluster.isWriteOnly())\r
- return;\r
- queryProvider2.updateValue(rid);\r
-\r
- }\r
-\r
- @Override\r
- public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {\r
-\r
- if(amount < 65536) {\r
- claimValue(provider,resource, reader.readBytes(null, amount));\r
- return;\r
- }\r
-\r
- byte[] bytes = new byte[65536];\r
-\r
- int rid = ((ResourceImpl)resource).id;\r
- ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);\r
- try {\r
- int left = amount;\r
- while(left > 0) {\r
- int block = Math.min(left, 65536);\r
- reader.readBytes(bytes, block);\r
- maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));\r
- left -= block;\r
- }\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError(e);\r
- }\r
-\r
- clientChanges.invalidate(rid);\r
-\r
- if (cluster.isWriteOnly())\r
- return;\r
- queryProvider2.updateValue(rid);\r
-\r
- }\r
-\r
- private void maintainCluster(ClusterImpl before, ClusterI after_) {\r
- if(after_ != null && after_ != before) {\r
- ClusterImpl after = (ClusterImpl)after_;\r
- if(currentCluster == before) currentCluster = after;\r
- clusterTable.replaceCluster(after);\r
- }\r
- }\r
-\r
- public int createResourceKey(int foreignCounter) throws DatabaseException {\r
- if(currentCluster == null)\r
- currentCluster = getNewResourceCluster();\r
- if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {\r
- ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();\r
- newCluster.foreignLookup = new byte[foreignCounter];\r
- currentCluster = newCluster;\r
- if (DEBUG)\r
- System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);\r
- }\r
- return currentCluster.createResource(clusterTranslator);\r
- }\r
-\r
- @Override\r
- public Resource createResource(VirtualGraph provider) throws DatabaseException {\r
- if(currentCluster == null) {\r
- if (null != defaultClusterSet) {\r
- ResourceImpl result = getNewResource(defaultClusterSet);\r
- currentCluster = clusterTable.getClusterByResourceKey(result.id);\r
- return result;\r
- } else {\r
- currentCluster = getNewResourceCluster();\r
- }\r
- }\r
- if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {\r
- if (null != defaultClusterSet) {\r
- ResourceImpl result = getNewResource(defaultClusterSet);\r
- currentCluster = clusterTable.getClusterByResourceKey(result.id);\r
- return result;\r
- } else {\r
- currentCluster = getNewResourceCluster();\r
- }\r
- }\r
- return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));\r
- }\r
-\r
- @Override\r
- public Resource createResource(VirtualGraph provider, long clusterId)\r
- throws DatabaseException {\r
- return getNewResource(clusterId);\r
- }\r
-\r
- @Override\r
- public Resource createResource(VirtualGraph provider, Resource clusterSet)\r
- throws DatabaseException {\r
- return getNewResource(clusterSet);\r
- }\r
-\r
- @Override\r
- public void createClusterSet(VirtualGraph provider, Resource clusterSet)\r
- throws DatabaseException {\r
- getNewClusterSet(clusterSet);\r
- }\r
-\r
- @Override\r
- public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)\r
- throws ServiceException {\r
- return containsClusterSet(clusterSet);\r
- }\r
-\r
- public void selectCluster(long cluster) {\r
- currentCluster = clusterTable.getClusterByClusterId(cluster);\r
- long setResourceId = clusterSetsSupport.getSet(cluster);\r
- clusterSetsSupport.put(setResourceId, cluster);\r
- }\r
-\r
- @Override\r
- public Resource setDefaultClusterSet(Resource clusterSet)\r
- throws ServiceException {\r
- Resource result = setDefaultClusterSet4NewResource(clusterSet);\r
- if(clusterSet != null) {\r
- long id = clusterSetsSupport.get(clusterSet.getResourceId());\r
- currentCluster = clusterTable.getClusterByClusterId(id);\r
- return result;\r
- } else {\r
- currentCluster = null;\r
- return null;\r
- }\r
- }\r
-\r
- @Override\r
- public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {\r
-\r
- provider = getProvider(provider);\r
- if (null == provider) {\r
- int key = ((ResourceImpl)resource).id;\r
-\r
-\r
-\r
- ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);\r
-// if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)\r
-// if(key != queryProvider2.getRootLibrary())\r
-// throw new ImmutableException("Trying to modify immutable resource key=" + key);\r
-\r
-// try {\r
-// cluster.removeValue(key, clusterTranslator);\r
-// } catch (DatabaseException e) {\r
-// Logger.defaultLogError(e);\r
-// return;\r
-// }\r
-\r
- clusterTable.writeOnlyInvalidate(cluster);\r
-\r
- try {\r
- int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);\r
- clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);\r
- clusterTranslator.removeValue(cluster);\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError(e);\r
- }\r
-\r
- queryProvider2.invalidateResource(key);\r
- clientChanges.invalidate(key);\r
-\r
- } else {\r
- ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);\r
- queryProvider2.updateValue(querySupport.getId(resource));\r
- clientChanges.claimValue(resource);\r
- }\r
-\r
-\r
- }\r
-\r
- @Override\r
- public void flush(boolean intermediate) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public void flushCluster() {\r
- clusterTable.flushCluster(graphSession);\r
- if(defaultClusterSet != null) {\r
- clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);\r
- }\r
- currentCluster = null;\r
- }\r
-\r
- @Override\r
- public void flushCluster(Resource r) {\r
- throw new UnsupportedOperationException("flushCluster resource " + r);\r
- }\r
-\r
- @Override\r
- public void gc() {\r
- }\r
-\r
- @Override\r
- public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,\r
- Resource object) {\r
-\r
- int s = ((ResourceImpl)subject).id;\r
- int p = ((ResourceImpl)predicate).id;\r
- int o = ((ResourceImpl)object).id;\r
-\r
- provider = getProvider(provider);\r
- if (null == provider) {\r
-\r
- ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);\r
- clusterTable.writeOnlyInvalidate(cluster);\r
-\r
- try {\r
-\r
- int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);\r
- int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);\r
- int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);\r
-\r
- ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);\r
- ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);\r
-\r
- clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);\r
- clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);\r
- clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);\r
- clusterTranslator.removeStatement(cluster);\r
-\r
- queryProvider2.invalidateResource(s);\r
- clientChanges.invalidate(s);\r
-\r
- } catch (DatabaseException e) {\r
-\r
- Logger.defaultLogError(e);\r
-\r
- }\r
-\r
- return true;\r
-\r
- } else {\r
- \r
- ((VirtualGraphImpl)provider).deny(s, p, o);\r
- queryProvider2.invalidateResource(s);\r
- clientChanges.invalidate(s);\r
-\r
- return true;\r
-\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public void setValue(VirtualGraph provider, Resource resource, byte[] value) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public boolean writeOnly() {\r
- return true;\r
- }\r
-\r
- @Override\r
- public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {\r
- writeSupport.performWriteRequest(graph, request);\r
- }\r
-\r
- @Override\r
- public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> void addMetadata(Metadata data) throws ServiceException {\r
- writeSupport.addMetadata(data);\r
- }\r
-\r
- @Override\r
- public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {\r
- return writeSupport.getMetadata(clazz);\r
- }\r
-\r
- @Override\r
- public TreeMap<String, byte[]> getMetadata() {\r
- return writeSupport.getMetadata();\r
- }\r
-\r
- @Override\r
- public void commitDone(WriteTraits writeTraits, long csid) {\r
- writeSupport.commitDone(writeTraits, csid);\r
- }\r
-\r
- @Override\r
- public void clearUndoList(WriteTraits writeTraits) {\r
- writeSupport.clearUndoList(writeTraits);\r
- }\r
- @Override\r
- public int clearMetadata() {\r
- return writeSupport.clearMetadata();\r
- }\r
-\r
- @Override\r
- public void startUndo() {\r
- writeSupport.startUndo();\r
- }\r
- }\r
-\r
- class VirtualWriteOnlySupport implements WriteSupport {\r
-\r
-// @Override\r
-// public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,\r
-// Resource object) {\r
-// throw new UnsupportedOperationException();\r
-// }\r
-\r
- @Override\r
- public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {\r
-\r
- TransientGraph impl = (TransientGraph)provider;\r
- impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));\r
- getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));\r
- clientChanges.claim(subject, predicate, object);\r
-\r
- }\r
-\r
- @Override\r
- public void claim(VirtualGraph provider, int subject, int predicate, int object) {\r
-\r
- TransientGraph impl = (TransientGraph)provider;\r
- impl.claim(subject, predicate, object);\r
- getQueryProvider2().updateStatements(subject, predicate);\r
- clientChanges.claim(subject, predicate, object);\r
-\r
- }\r
-\r
- @Override\r
- public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {\r
- claimValue(provider, ((ResourceImpl)resource).id, value, value.length);\r
- }\r
-\r
- @Override\r
- public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {\r
- ((VirtualGraphImpl)provider).claimValue(resource, value, length);\r
- getQueryProvider2().updateValue(resource);\r
- clientChanges.claimValue(resource);\r
- }\r
-\r
- @Override\r
- public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {\r
- byte[] value = reader.readBytes(null, amount);\r
- claimValue(provider, resource, value);\r
- }\r
-\r
- @Override\r
- public Resource createResource(VirtualGraph provider) {\r
- TransientGraph impl = (TransientGraph)provider;\r
- return impl.getResource(impl.newResource(false));\r
- }\r
-\r
- @Override\r
- public Resource createResource(VirtualGraph provider, long clusterId) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public Resource createResource(VirtualGraph provider, Resource clusterSet)\r
- throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public void createClusterSet(VirtualGraph provider, Resource clusterSet)\r
- throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)\r
- throws ServiceException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public Resource setDefaultClusterSet(Resource clusterSet)\r
- throws ServiceException {\r
- return null;\r
- }\r
-\r
- @Override\r
- public void denyValue(VirtualGraph provider, Resource resource) {\r
- ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);\r
- getQueryProvider2().updateValue(querySupport.getId(resource));\r
- // NOTE: this only keeps track of value changes by-resource.\r
- clientChanges.claimValue(resource);\r
- }\r
-\r
- @Override\r
- public void flush(boolean intermediate) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public void flushCluster() {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public void flushCluster(Resource r) {\r
- throw new UnsupportedOperationException("Resource " + r);\r
- }\r
-\r
- @Override\r
- public void gc() {\r
- }\r
-\r
- @Override\r
- public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {\r
- TransientGraph impl = (TransientGraph) provider;\r
- impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));\r
- getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));\r
- clientChanges.deny(subject, predicate, object);\r
- return true;\r
- }\r
-\r
- @Override\r
- public void setValue(VirtualGraph provider, Resource resource, byte[] value) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public boolean writeOnly() {\r
- return true;\r
- }\r
-\r
- @Override\r
- public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> void addMetadata(Metadata data) throws ServiceException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public TreeMap<String, byte[]> getMetadata() {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public void commitDone(WriteTraits writeTraits, long csid) {\r
- }\r
-\r
- @Override\r
- public void clearUndoList(WriteTraits writeTraits) {\r
- }\r
-\r
- @Override\r
- public int clearMetadata() {\r
- return 0;\r
- }\r
-\r
- @Override\r
- public void startUndo() {\r
- }\r
- }\r
-\r
- private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {\r
-\r
- try {\r
-\r
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
-\r
- flushCounter = 0;\r
- clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
-\r
- acquireWriteOnly();\r
-\r
- WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();\r
-\r
- WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());\r
-\r
- WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);\r
- writeState = writeStateT;\r
-\r
- assert (null != writer);\r
-// writer.state.barrier.inc();\r
- long start = System.nanoTime();\r
- T result = request.perform(writer);\r
- long duration = System.nanoTime() - start;\r
- if (DEBUG) {\r
- System.err.println("################");\r
- System.err.println("WriteOnly duration " + 1e-9*duration);\r
- }\r
- writeStateT.setResult(result);\r
-\r
- // This makes clusters available from server\r
- clusterStream.reallyFlush();\r
-\r
- // This will trigger query updates\r
- releaseWriteOnly(writer);\r
- assert (null != writer);\r
-\r
- handleUpdatesAndMetadata(writer);\r
-\r
- } catch (CancelTransactionException e) {\r
-\r
- releaseWriteOnly(writeState.getGraph());\r
-\r
- clusterTable.removeWriteOnlyClusters();\r
- state.stopWriteTransaction(clusterStream);\r
-\r
- } catch (Throwable e) {\r
-\r
- e.printStackTrace();\r
-\r
- releaseWriteOnly(writeState.getGraph());\r
-\r
- clusterTable.removeWriteOnlyClusters();\r
-\r
- if (callback != null)\r
- callback.exception(new DatabaseException(e));\r
-\r
- state.stopWriteTransaction(clusterStream);\r
- Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);\r
-\r
- } finally {\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
-\r
- }\r
-\r
- }\r
-\r
- public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {\r
- scheduleRequest(request, callback, notify, null);\r
- }\r
-\r
- @Override\r
- public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {\r
-\r
- assertAlive();\r
-\r
- assert (request != null);\r
-\r
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
-\r
- requestManager.scheduleWrite(new SessionTask(request, thread, thread) {\r
-\r
- @Override\r
- public void run(int thread) {\r
-\r
- ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
-\r
- try {\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
-\r
- flushCounter = 0;\r
- clientChanges = new ClientChangesImpl(SessionImplSocket.this);\r
-\r
- acquireWriteOnly();\r
-\r
- VirtualGraph vg = getProvider(request.getProvider());\r
- WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();\r
-\r
- WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);\r
-\r
- writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {\r
-\r
- @Override\r
- public void execute(Object result) {\r
- if(callback != null) callback.run(null);\r
- }\r
-\r
- @Override\r
- public void exception(Throwable t) {\r
- if(callback != null) callback.run((DatabaseException)t);\r
- }\r
-\r
- });\r
-\r
- assert (null != writer);\r
-// writer.state.barrier.inc();\r
-\r
- try {\r
-\r
- request.perform(writer);\r
-\r
- } catch (Throwable e) {\r
-\r
-// writer.state.barrier.dec();\r
-// writer.waitAsync(null);\r
-\r
- releaseWriteOnly(writer);\r
-\r
- clusterTable.removeWriteOnlyClusters();\r
-\r
- if(!(e instanceof CancelTransactionException)) {\r
- if (callback != null)\r
- callback.run(new DatabaseException(e));\r
- }\r
-\r
- writeState.except(e);\r
-\r
- return;\r
-\r
- }\r
-\r
- // This makes clusters available from server\r
- boolean empty = clusterStream.reallyFlush();\r
- // This was needed to make WO requests call metadata listeners.\r
- // NOTE: the calling event does not contain clientChanges information.\r
- if (!empty && clientChanges.isEmpty())\r
- clientChanges.setNotEmpty(true);\r
- releaseWriteOnly(writer);\r
- assert (null != writer);\r
- } finally {\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_WRITES);\r
-\r
- }\r
-\r
-\r
- task.finish();\r
-\r
- }\r
-\r
- }, combine);\r
-\r
- }\r
-\r
- public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {\r
- scheduleRequest(request, callback, notify, null);\r
- }\r
-\r
- /* (non-Javadoc)\r
- * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)\r
- */\r
- @Override\r
- public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {\r
-\r
- assert (request != null);\r
-\r
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
-\r
- requestManager.scheduleWrite(new SessionTask(request, thread) {\r
-\r
- @Override\r
- public void run(int thread) {\r
-\r
- ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);\r
-\r
- performWriteOnly(request, notify, callback);\r
-\r
- task.finish();\r
-\r
- }\r
-\r
- }, combine);\r
-\r
- }\r
-\r
- public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {\r
-\r
- assert (request != null);\r
- assert (procedure != null);\r
-\r
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
-\r
- requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {\r
-\r
- @Override\r
- public void run(int thread) {\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
-\r
- ListenerBase listener = getListenerBase(procedure);\r
-\r
- final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
-\r
- try {\r
-\r
- if (listener != null) {\r
-\r
- try {\r
- newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- procedure.exception(graph, t);\r
- if(throwable != null) {\r
- throwable.set(t);\r
- } else {\r
- // ErrorLogger.defaultLogError("Unhandled exception", t);\r
- }\r
- }\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, T t) {\r
- if(result != null) result.set(t);\r
- procedure.execute(graph, t);\r
- }\r
-\r
- }, listener);\r
- } catch (Throwable t) {\r
- // This is handled by the AsyncProcedure\r
- //Logger.defaultLogError("Internal error", t);\r
- }\r
-\r
- } else {\r
-\r
- try {\r
-\r
-// newGraph.state.barrier.inc();\r
-\r
- T t = request.perform(newGraph);\r
-\r
- try {\r
-\r
- if(result != null) result.set(t);\r
- procedure.execute(newGraph, t);\r
-\r
- } catch (Throwable th) {\r
-\r
- if(throwable != null) {\r
- throwable.set(th);\r
- } else {\r
- Logger.defaultLogError("Unhandled exception", th);\r
- }\r
-\r
- }\r
-\r
- } catch (Throwable t) {\r
-\r
- if (DEBUG)\r
- t.printStackTrace();\r
-\r
- if(throwable != null) {\r
- throwable.set(t);\r
- } else {\r
- Logger.defaultLogError("Unhandled exception", t);\r
- }\r
-\r
- try {\r
-\r
- procedure.exception(newGraph, t);\r
-\r
- } catch (Throwable t2) {\r
-\r
- if(throwable != null) {\r
- throwable.set(t2);\r
- } else {\r
- Logger.defaultLogError("Unhandled exception", t2);\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
-// newGraph.state.barrier.dec();\r
-// newGraph.waitAsync(request);\r
-\r
- }\r
-\r
- } finally {\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
-\r
- }\r
-\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {\r
-\r
- assert (request != null);\r
- assert (procedure != null);\r
-\r
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
-\r
- requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {\r
-\r
- @Override\r
- public void run(int thread) {\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
-\r
- final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
-\r
- try {\r
-\r
- if (listener != null) {\r
-\r
- newGraph.processor.query(newGraph, request, null, procedure, listener);\r
-\r
-// newGraph.waitAsync(request);\r
-\r
- } else {\r
-\r
- final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(\r
- procedure, "request");\r
-\r
- try {\r
-\r
-// newGraph.state.barrier.inc();\r
-\r
- request.perform(newGraph, wrapper);\r
-\r
-// newGraph.waitAsync(request);\r
-\r
- } catch (Throwable t) {\r
-\r
- wrapper.exception(newGraph, t);\r
-// newGraph.waitAsync(request);\r
-\r
-\r
- }\r
-\r
- }\r
-\r
- } finally {\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
-\r
- }\r
-\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {\r
-\r
- assert (request != null);\r
- assert (procedure != null);\r
-\r
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
-\r
- int sync = notify != null ? thread : -1;\r
-\r
- requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {\r
-\r
- @Override\r
- public void run(int thread) {\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
-\r
- ListenerBase listener = getListenerBase(procedure);\r
-\r
- final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
-\r
- try {\r
-\r
- if (listener != null) {\r
-\r
- newGraph.processor.query(newGraph, request, null, procedure, listener);\r
-\r
-// newGraph.waitAsync(request);\r
-\r
- } else {\r
-\r
- final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);\r
-\r
- try {\r
-\r
- request.perform(newGraph, wrapper);\r
-\r
- } catch (Throwable t) {\r
-\r
- t.printStackTrace();\r
-\r
- }\r
-\r
- }\r
-\r
- } finally {\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
-\r
- }\r
-\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {\r
-\r
- assert (request != null);\r
- assert (procedure != null);\r
-\r
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;\r
-\r
- requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {\r
-\r
- @Override\r
- public void run(int thread) {\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
-\r
- ListenerBase listener = getListenerBase(procedure);\r
-\r
- final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());\r
-\r
- try {\r
-\r
- if (listener != null) {\r
-\r
- newGraph.processor.query(newGraph, request, null, new Procedure<T>() {\r
-\r
- @Override\r
- public void exception(Throwable t) {\r
- procedure.exception(t);\r
- if(throwable != null) {\r
- throwable.set(t);\r
- }\r
- }\r
-\r
- @Override\r
- public void execute(T t) {\r
- if(result != null) result.set(t);\r
- procedure.execute(t);\r
- }\r
-\r
- }, listener);\r
-\r
-// newGraph.waitAsync(request);\r
-\r
- } else {\r
-\r
-// newGraph.state.barrier.inc();\r
-\r
- request.register(newGraph, new Listener<T>() {\r
-\r
- @Override\r
- public void exception(Throwable t) {\r
- if(throwable != null) throwable.set(t);\r
- procedure.exception(t);\r
-// newGraph.state.barrier.dec();\r
- }\r
-\r
- @Override\r
- public void execute(T t) {\r
- if(result != null) result.set(t);\r
- procedure.execute(t);\r
-// newGraph.state.barrier.dec();\r
- }\r
-\r
- @Override\r
- public boolean isDisposed() {\r
- return true;\r
- }\r
-\r
- });\r
-\r
-// newGraph.waitAsync(request);\r
-\r
- }\r
-\r
- } finally {\r
-\r
- fireSessionVariableChange(SessionVariables.QUEUED_READS);\r
-\r
- }\r
-\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
-\r
- @Override\r
- public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {\r
-\r
- scheduleRequest(request, procedure, null, null, null);\r
-\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- Semaphore notify = new Semaphore(0);\r
- DataContainer<Throwable> container = new DataContainer<Throwable>();\r
- DataContainer<T> result = new DataContainer<T>();\r
- scheduleRequest(request, procedure, notify, container, result);\r
- acquire(notify, request);\r
- Throwable throwable = container.get();\r
- if(throwable != null) {\r
- if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
- else throw new DatabaseException("Unexpected exception", throwable);\r
- }\r
- return result.get();\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- Semaphore notify = new Semaphore(0);\r
- final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();\r
- final DataContainer<T> resultContainer = new DataContainer<T>();\r
- scheduleRequest(request, new AsyncProcedure<T>() {\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- exceptionContainer.set(throwable);\r
- procedure.exception(graph, throwable);\r
- }\r
- @Override\r
- public void execute(AsyncReadGraph graph, T result) {\r
- resultContainer.set(result);\r
- procedure.execute(graph, result);\r
- }\r
- }, getListenerBase(procedure), notify);\r
- acquire(notify, request, procedure);\r
- Throwable throwable = exceptionContainer.get();\r
- if (throwable != null) {\r
- if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
- else throw new DatabaseException("Unexpected exception", throwable);\r
- }\r
- return resultContainer.get();\r
- }\r
-\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- Semaphore notify = new Semaphore(0);\r
- final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();\r
- scheduleRequest(request, new AsyncMultiProcedure<T>() {\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable throwable) {\r
- exceptionContainer.set(throwable);\r
- procedure.exception(graph, throwable);\r
- }\r
- @Override\r
- public void execute(AsyncReadGraph graph, T result) {\r
- procedure.execute(graph, result);\r
- }\r
- @Override\r
- public void finished(AsyncReadGraph graph) {\r
- procedure.finished(graph);\r
- }\r
- }, notify);\r
- acquire(notify, request, procedure);\r
- Throwable throwable = exceptionContainer.get();\r
- if (throwable != null) {\r
- if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
- else throw new DatabaseException("Unexpected exception", throwable);\r
- }\r
- // TODO: implement return value\r
- System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");\r
- return null;\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {\r
- return syncRequest(request, new ProcedureAdapter<T>());\r
-\r
-\r
-// assert(request != null);\r
-//\r
-// final DataContainer<T> result = new DataContainer<T>();\r
-// final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
-//\r
-// syncRequest(request, new Procedure<T>() {\r
-//\r
-// @Override\r
-// public void execute(T t) {\r
-// result.set(t);\r
-// }\r
-//\r
-// @Override\r
-// public void exception(Throwable t) {\r
-// exception.set(t);\r
-// }\r
-//\r
-// });\r
-//\r
-// Throwable t = exception.get();\r
-// if(t != null) {\r
-// if(t instanceof DatabaseException) throw (DatabaseException)t;\r
-// else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);\r
-// }\r
-//\r
-// return result.get();\r
-\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {\r
- return syncRequest(request, (Procedure<T>)procedure);\r
- }\r
-\r
-\r
-// @Override\r
-// public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {\r
-// assertNotSession();\r
-// Semaphore notify = new Semaphore(0);\r
-// DataContainer<Throwable> container = new DataContainer<Throwable>();\r
-// DataContainer<T> result = new DataContainer<T>();\r
-// scheduleRequest(request, procedure, notify, container, result);\r
-// acquire(notify, request);\r
-// Throwable throwable = container.get();\r
-// if(throwable != null) {\r
-// if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
-// else throw new DatabaseException("Unexpected exception", throwable);\r
-// }\r
-// return result.get();\r
-// }\r
-\r
-\r
- @Override\r
- public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- Semaphore notify = new Semaphore(0);\r
- final DataContainer<Throwable> container = new DataContainer<Throwable>();\r
- final DataContainer<T> result = new DataContainer<T>();\r
- scheduleRequest(request, procedure, notify, container, result);\r
- acquire(notify, request);\r
- Throwable throwable = container.get();\r
- if (throwable != null) {\r
- if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;\r
- else throw new DatabaseException("Unexpected exception", throwable);\r
- }\r
- return result.get();\r
- }\r
-\r
- @Override\r
- public void syncRequest(Write request) throws DatabaseException {\r
- assertNotSession();\r
- assertAlive();\r
- Semaphore notify = new Semaphore(0);\r
- final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();\r
- scheduleRequest(request, new Callback<DatabaseException>() {\r
-\r
- @Override\r
- public void run(DatabaseException e) {\r
- exception.set(e);\r
- }\r
-\r
- }, notify);\r
- acquire(notify, request);\r
- if(exception.get() != null) throw exception.get();\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {\r
- assertNotSession();\r
- Semaphore notify = new Semaphore(0);\r
- final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
- final DataContainer<T> result = new DataContainer<T>();\r
- scheduleRequest(request, new Procedure<T>() {\r
-\r
- @Override\r
- public void exception(Throwable t) {\r
- exception.set(t);\r
- }\r
-\r
- @Override\r
- public void execute(T t) {\r
- result.set(t);\r
- }\r
-\r
- }, notify);\r
- acquire(notify, request);\r
- if(exception.get() != null) {\r
- Throwable t = exception.get();\r
- if(t instanceof DatabaseException) throw (DatabaseException)t;\r
- else throw new DatabaseException(t);\r
- }\r
- return result.get();\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {\r
- assertNotSession();\r
- Semaphore notify = new Semaphore(0);\r
- final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
- final DataContainer<T> result = new DataContainer<T>();\r
- scheduleRequest(request, new Procedure<T>() {\r
-\r
- @Override\r
- public void exception(Throwable t) {\r
- exception.set(t);\r
- }\r
-\r
- @Override\r
- public void execute(T t) {\r
- result.set(t);\r
- }\r
-\r
- }, notify);\r
- acquire(notify, request);\r
- if(exception.get() != null) {\r
- Throwable t = exception.get();\r
- if(t instanceof DatabaseException) throw (DatabaseException)t;\r
- else throw new DatabaseException(t);\r
- }\r
- return result.get();\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {\r
- assertNotSession();\r
- Semaphore notify = new Semaphore(0);\r
- final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
- final DataContainer<T> result = new DataContainer<T>();\r
- scheduleRequest(request, new Procedure<T>() {\r
-\r
- @Override\r
- public void exception(Throwable t) {\r
- exception.set(t);\r
- }\r
-\r
- @Override\r
- public void execute(T t) {\r
- result.set(t);\r
- }\r
-\r
- }, notify);\r
- acquire(notify, request);\r
- if(exception.get() != null) {\r
- Throwable t = exception.get();\r
- if(t instanceof DatabaseException) throw (DatabaseException)t;\r
- else throw new DatabaseException(t);\r
- }\r
- return result.get();\r
- }\r
-\r
- @Override\r
- public void syncRequest(DelayedWrite request) throws DatabaseException {\r
- assertNotSession();\r
- Semaphore notify = new Semaphore(0);\r
- final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();\r
- scheduleRequest(request, new Callback<DatabaseException>() {\r
- @Override\r
- public void run(DatabaseException e) {\r
- exception.set(e);\r
- }\r
- }, notify);\r
- acquire(notify, request);\r
- if(exception.get() != null) throw exception.get();\r
- }\r
-\r
- @Override\r
- public void syncRequest(WriteOnly request) throws DatabaseException {\r
- assertNotSession();\r
- assertAlive();\r
- Semaphore notify = new Semaphore(0);\r
- final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();\r
- scheduleRequest(request, new Callback<DatabaseException>() {\r
- @Override\r
- public void run(DatabaseException e) {\r
- exception.set(e);\r
- }\r
- }, notify);\r
- acquire(notify, request);\r
- if(exception.get() != null) throw exception.get();\r
- }\r
-\r
- /*\r
- *\r
- * GraphRequestProcessor interface\r
- */\r
-\r
- @Override\r
- public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {\r
-\r
- scheduleRequest(request, procedure, null, null);\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {\r
-\r
- scheduleRequest(request, procedure, null);\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {\r
-\r
- scheduleRequest(request, procedure, null, null, null);\r
-\r
- }\r
-\r
- @Override\r
- public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {\r
-\r
- scheduleRequest(request, callback, null);\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {\r
-\r
- scheduleRequest(request, procedure, null);\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {\r
-\r
- scheduleRequest(request, procedure, null);\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {\r
-\r
- scheduleRequest(request, procedure, null);\r
-\r
- }\r
-\r
- @Override\r
- public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {\r
-\r
- scheduleRequest(request, callback, null);\r
-\r
- }\r
-\r
- @Override\r
- public void asyncRequest(final Write r) {\r
- asyncRequest(r, null);\r
- }\r
-\r
- @Override\r
- public void asyncRequest(final DelayedWrite r) {\r
- asyncRequest(r, null);\r
- }\r
-\r
- @Override\r
- public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {\r
-\r
- scheduleRequest(request, callback, null);\r
-\r
- }\r
-\r
- @Override\r
- public void asyncRequest(final WriteOnly request) {\r
-\r
- asyncRequest(request, null);\r
-\r
- }\r
-\r
- @Override\r
- public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {\r
- r.request(this, procedure);\r
- }\r
-\r
- @Override\r
- public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {\r
- r.request(this, procedure);\r
- }\r
-\r
- @Override\r
- public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {\r
- r.request(this, procedure);\r
- }\r
-\r
- @Override\r
- public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {\r
- r.request(this, procedure);\r
- }\r
-\r
- @Override\r
- public <T> void async(ReadInterface<T> r, Listener<T> procedure) {\r
- r.request(this, procedure);\r
- }\r
-\r
- @Override\r
- public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {\r
- r.request(this, procedure);\r
- }\r
-\r
- @Override\r
- public <T> T sync(ReadInterface<T> r) throws DatabaseException {\r
- return r.request(this);\r
- }\r
-\r
- @Override\r
- public <T> T sync(WriteInterface<T> r) throws DatabaseException {\r
- return r.request(this);\r
- }\r
-\r
- @Override\r
- public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {\r
- r.request(this, procedure);\r
- }\r
-\r
- @Override\r
- public <T> void async(WriteInterface<T> r) {\r
- r.request(this, new ProcedureAdapter<T>());\r
- }\r
-\r
- //@Override\r
- public void incAsync() {\r
- state.incAsync();\r
- }\r
-\r
- //@Override\r
- public void decAsync() {\r
- state.decAsync();\r
- }\r
-\r
- public long getCluster(ResourceImpl resource) {\r
- ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);\r
- return cluster.getClusterId();\r
- }\r
-\r
- public long getCluster(int id) {\r
- if (clusterTable == null)\r
- System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");\r
- return clusterTable.getClusterIdByResourceKeyNoThrow(id);\r
- }\r
-\r
- public ResourceImpl getResource(int id) {\r
- return new ResourceImpl(resourceSupport, id);\r
- }\r
-\r
- public ResourceImpl getResource(int resourceIndex, long clusterId) {\r
- assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));\r
- ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);\r
- int key = proxy.getClusterKey();\r
- int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);\r
- return new ResourceImpl(resourceSupport, resourceKey);\r
- }\r
-\r
- public ResourceImpl getResource2(int id) {\r
- assert (id != 0);\r
- return new ResourceImpl(resourceSupport, id);\r
- }\r
-\r
- final public int getId(ResourceImpl impl) {\r
- return impl.id;\r
- }\r
-\r
- public static final Charset UTF8 = Charset.forName("utf-8");\r
-\r
-\r
-\r
-\r
- static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {\r
- for(TransientGraph g : support.providers) {\r
- if(g.isPending(subject)) return false;\r
- }\r
- return true;\r
- }\r
-\r
- static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {\r
- for(TransientGraph g : support.providers) {\r
- if(g.isPending(subject, predicate)) return false;\r
- }\r
- return true;\r
- }\r
-\r
- static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {\r
-\r
- Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {\r
-\r
- AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);\r
-\r
- @Override\r
- public void run(ReadGraphImpl graph) {\r
- if(ready.decrementAndGet() == 0) {\r
- runnable.run(graph);\r
- }\r
- }\r
-\r
- };\r
-\r
- for(TransientGraph g : support.providers) {\r
- if(g.isPending(subject)) {\r
- try {\r
- g.load(graph, subject, composite);\r
- } catch (DatabaseException e) {\r
- e.printStackTrace();\r
- }\r
- } else {\r
- composite.run(graph);\r
- }\r
- }\r
-\r
- composite.run(graph);\r
-\r
- }\r
-\r
- static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {\r
-\r
- Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {\r
-\r
- AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);\r
-\r
- @Override\r
- public void run(ReadGraphImpl graph) {\r
- if(ready.decrementAndGet() == 0) {\r
- runnable.run(graph);\r
- }\r
- }\r
-\r
- };\r
-\r
- for(TransientGraph g : support.providers) {\r
- if(g.isPending(subject, predicate)) {\r
- try {\r
- g.load(graph, subject, predicate, composite);\r
- } catch (DatabaseException e) {\r
- e.printStackTrace();\r
- }\r
- } else {\r
- composite.run(graph);\r
- }\r
- }\r
-\r
- composite.run(graph);\r
-\r
- }\r
-\r
-// void dumpHeap() {\r
-//\r
-// try {\r
-// ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),\r
-// "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(\r
-// "d:/heap" + flushCounter + ".txt", true);\r
-// } catch (IOException e) {\r
-// e.printStackTrace();\r
-// }\r
-//\r
-// }\r
-\r
- void fireReactionsToSynchronize(ChangeSet cs) {\r
-\r
- // Do not fire empty events\r
- if (cs.isEmpty())\r
- return;\r
-\r
- ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());\r
-// g.state.barrier.inc();\r
-\r
- try {\r
-\r
- if (!cs.isEmpty()) {\r
- ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);\r
- for (ChangeListener l : changeListeners2) {\r
- try {\r
- l.graphChanged(e2);\r
- } catch (Exception ex) {\r
- ex.printStackTrace();\r
- }\r
- }\r
- }\r
-\r
- } finally {\r
-\r
-// g.state.barrier.dec();\r
-// g.waitAsync(null);\r
-\r
- }\r
-\r
- }\r
-\r
- void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {\r
- try {\r
-\r
- // Do not fire empty events\r
- if (cs2.isEmpty())\r
- return;\r
-\r
-// graph.restart();\r
-// graph.state.barrier.inc();\r
-\r
- try {\r
-\r
- if (!cs2.isEmpty()) {\r
-\r
- ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);\r
- for (ChangeListener l : changeListeners2) {\r
- try {\r
- l.graphChanged(e2);\r
-// System.out.println("changelistener " + l);\r
- } catch (Exception ex) {\r
- ex.printStackTrace();\r
- }\r
- }\r
-\r
- }\r
-\r
- } finally {\r
-\r
-// graph.state.barrier.dec();\r
-// graph.waitAsync(null);\r
-\r
- }\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
-\r
- }\r
- }\r
- void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {\r
-\r
- try {\r
-\r
- // Do not fire empty events\r
- if (cs2.isEmpty())\r
- return;\r
-\r
- // Do not fire on virtual requests\r
- if(graph.getProvider() != null)\r
- return;\r
-\r
- WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);\r
-\r
- try {\r
-\r
- ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);\r
- for (ChangeListener l : metadataListeners) {\r
- try {\r
- l.graphChanged(e2);\r
- } catch (Throwable ex) {\r
- ex.printStackTrace();\r
- }\r
- }\r
-\r
- } finally {\r
-\r
- }\r
-\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
-\r
- }\r
-\r
-\r
- /*\r
- * (non-Javadoc)\r
- *\r
- * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)\r
- */\r
- @Override\r
- public <T> T getService(Class<T> api) {\r
- T t = peekService(api);\r
- if (t == null) {\r
- if (state.isClosed())\r
- throw new ServiceNotFoundException(this, api, "Session has been shut down");\r
- throw new ServiceNotFoundException(this, api);\r
- }\r
- return t;\r
- }\r
-\r
- /**\r
- * @return\r
- */\r
- protected abstract ServerInformation getCachedServerInformation();\r
-\r
- private Class<?> serviceKey1 = null;\r
- private Class<?> serviceKey2 = null;\r
- private Object service1 = null;\r
- private Object service2 = null;\r
-\r
- /*\r
- * (non-Javadoc)\r
- *\r
- * @see\r
- * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)\r
- */\r
- @SuppressWarnings("unchecked")\r
- @Override\r
- public synchronized <T> T peekService(Class<T> api) {\r
-\r
- if(serviceKey1 == api) {\r
- return (T)service1;\r
- } else if (serviceKey2 == api) {\r
- // Promote this key\r
- Object result = service2;\r
- service2 = service1;\r
- serviceKey2 = serviceKey1;\r
- service1 = result;\r
- serviceKey1 = api;\r
- return (T)result;\r
- }\r
-\r
- if (Layer0.class == api)\r
- return (T) L0;\r
- if (ServerInformation.class == api)\r
- return (T) getCachedServerInformation();\r
- else if (WriteGraphImpl.class == api)\r
- return (T) writeState.getGraph();\r
- else if (ClusterBuilder.class == api)\r
- return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);\r
- else if (ClusterBuilderFactory.class == api)\r
- return (T)new ClusterBuilderFactoryImpl(this);\r
-\r
- service2 = service1;\r
- serviceKey2 = serviceKey1;\r
-\r
- service1 = serviceLocator.peekService(api);\r
- serviceKey1 = api;\r
-\r
- return (T)service1;\r
-\r
- }\r
-\r
- /*\r
- * (non-Javadoc)\r
- *\r
- * @see\r
- * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)\r
- */\r
- @Override\r
- public boolean hasService(Class<?> api) {\r
- return serviceLocator.hasService(api);\r
- }\r
-\r
- /**\r
- * @param api the api that must be implemented by the specified service\r
- * @param service the service implementation\r
- */\r
- @Override\r
- public <T> void registerService(Class<T> api, T service) {\r
- if(Layer0.class == api) {\r
- L0 = (Layer0)service;\r
- return;\r
- }\r
- serviceLocator.registerService(api, service);\r
- if (TransactionPolicySupport.class == api) {\r
- transactionPolicy = (TransactionPolicySupport)service;\r
- state.resetTransactionPolicy();\r
- }\r
- if (api == serviceKey1)\r
- service1 = service;\r
- else if (api == serviceKey2)\r
- service2 = service;\r
- }\r
-\r
- // ----------------\r
- // SessionMonitor\r
- // ----------------\r
-\r
- void fireSessionVariableChange(String variable) {\r
- for (MonitorHandler h : monitorHandlers) {\r
- MonitorContext ctx = monitorContexts.getLeft(h);\r
- assert ctx != null;\r
- // SafeRunner functionality repeated here to avoid dependency.\r
- try {\r
- h.valuesChanged(ctx);\r
- } catch (Exception e) {\r
- Logger.defaultLogError("monitor handler notification produced the following exception", e);\r
- } catch (LinkageError e) {\r
- Logger.defaultLogError("monitor handler notification produced a linkage error", e);\r
- }\r
- }\r
- }\r
-\r
-\r
- class ResourceSerializerImpl implements ResourceSerializer {\r
-\r
- public long createRandomAccessId(int id) throws DatabaseException {\r
- if(id < 0)\r
- return id;\r
- int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);\r
- long cluster = getCluster(id);\r
- if (0 == cluster)\r
- return 0; // Better to return 0 then invalid id.\r
- long result = ClusterTraitsBase.createResourceId(cluster, index);\r
- return result;\r
- }\r
-\r
- @Override\r
- public long getRandomAccessId(Resource resource) throws DatabaseException {\r
-\r
- ResourceImpl resourceImpl = (ResourceImpl) resource;\r
- return createRandomAccessId(resourceImpl.id);\r
-\r
- }\r
-\r
- @Override\r
- public int getTransientId(Resource resource) throws DatabaseException {\r
-\r
- ResourceImpl resourceImpl = (ResourceImpl) resource;\r
- return resourceImpl.id;\r
-\r
- }\r
-\r
- @Override\r
- public String createRandomAccessId(Resource resource)\r
- throws InvalidResourceReferenceException {\r
-\r
- if(resource == null) throw new IllegalArgumentException();\r
-\r
- ResourceImpl resourceImpl = (ResourceImpl) resource;\r
-\r
- if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";\r
-\r
- int r;\r
- try {\r
- r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);\r
- } catch (DatabaseException e1) {\r
- throw new InvalidResourceReferenceException(e1);\r
- }\r
- try {\r
- // Serialize as '<resource index>_<cluster id>'\r
- return "" + r + "_" + getCluster(resourceImpl);\r
- } catch (Throwable e) {\r
- e.printStackTrace();\r
- throw new InvalidResourceReferenceException(e);\r
- } finally {\r
- }\r
- }\r
-\r
- public int getTransientId(long serialized) throws DatabaseException {\r
- if (serialized <= 0)\r
- return (int)serialized;\r
- int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);\r
- long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);\r
- ClusterI cluster = clusterTranslator.getClusterByClusterId(c);\r
- if (cluster == null)\r
- throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);\r
- int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);\r
- return key;\r
- }\r
-\r
- @Override\r
- public Resource getResource(long randomAccessId) throws DatabaseException {\r
- return getResourceByKey(getTransientId(randomAccessId));\r
- }\r
-\r
- @Override\r
- public Resource getResource(int transientId) throws DatabaseException {\r
- return getResourceByKey(transientId);\r
- }\r
-\r
- @Override\r
- public Resource getResource(String randomAccessId)\r
- throws InvalidResourceReferenceException {\r
- try {\r
- int i = randomAccessId.indexOf('_');\r
- if (i == -1)\r
- throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"\r
- + randomAccessId + "'");\r
- int r = Integer.parseInt(randomAccessId.substring(0, i));\r
- if(r < 0) return getResourceByKey(r);\r
- long c = Long.parseLong(randomAccessId.substring(i + 1));\r
- ClusterI cluster = clusterTranslator.getClusterByClusterId(c);\r
- int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);\r
- if (cluster.hasResource(key, clusterTranslator))\r
- return getResourceByKey(key);\r
- } catch (InvalidResourceReferenceException e) {\r
- throw e;\r
- } catch (NumberFormatException e) {\r
- throw new InvalidResourceReferenceException(e);\r
- } catch (Throwable e) {\r
- e.printStackTrace();\r
- throw new InvalidResourceReferenceException(e);\r
- } finally {\r
- }\r
- throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);\r
- }\r
-\r
- @Override\r
- public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {\r
- try {\r
- return true;\r
- } catch (Throwable e) {\r
- e.printStackTrace();\r
- throw new InvalidResourceReferenceException(e);\r
- } finally {\r
- }\r
- }\r
- }\r
-\r
- // Copied from old SessionImpl\r
-\r
- void check() {\r
- if (state.isClosed())\r
- throw new Error("Session closed.");\r
- }\r
-\r
-\r
-\r
- public ResourceImpl getNewResource(long clusterId) {\r
- ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);\r
- int newId;\r
- try {\r
- newId = cluster.createResource(clusterTranslator);\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError(e);\r
- return null;\r
- }\r
- return new ResourceImpl(resourceSupport, newId);\r
- }\r
-\r
- public ResourceImpl getNewResource(Resource clusterSet)\r
- throws DatabaseException {\r
- long resourceId = clusterSet.getResourceId();\r
- Long clusterId = clusterSetsSupport.get(resourceId);\r
- if (null == clusterId)\r
- throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);\r
- if (Constants.NewClusterId == clusterId) {\r
- clusterId = getService(ClusteringSupport.class).createCluster();\r
- if ((serviceMode & SERVICE_MODE_CREATE) > 0)\r
- createdClusters.add(clusterId);\r
- clusterSetsSupport.put(resourceId, clusterId);\r
- return getNewResource(clusterId);\r
- } else {\r
- ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);\r
- ResourceImpl result;\r
- if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {\r
- clusterId = getService(ClusteringSupport.class).createCluster();\r
- if ((serviceMode & SERVICE_MODE_CREATE) > 0)\r
- createdClusters.add(clusterId);\r
- clusterSetsSupport.put(resourceId, clusterId);\r
- return getNewResource(clusterId);\r
- } else {\r
- result = getNewResource(clusterId);\r
- int resultKey = querySupport.getId(result);\r
- long resultCluster = querySupport.getClusterId(resultKey);\r
- if (clusterId != resultCluster)\r
- clusterSetsSupport.put(resourceId, resultCluster);\r
- return result;\r
- }\r
- }\r
- }\r
-\r
- public void getNewClusterSet(Resource clusterSet)\r
- throws DatabaseException {\r
- if (DEBUG)\r
- System.out.println("new cluster set=" + clusterSet);\r
- long resourceId = clusterSet.getResourceId();\r
- if (clusterSetsSupport.containsKey(resourceId))\r
- throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);\r
- clusterSetsSupport.put(resourceId, Constants.NewClusterId);\r
- }\r
- public boolean containsClusterSet(Resource clusterSet)\r
- throws ServiceException {\r
- long resourceId = clusterSet.getResourceId();\r
- return clusterSetsSupport.containsKey(resourceId);\r
- }\r
- public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {\r
- Resource r = defaultClusterSet;\r
- defaultClusterSet = clusterSet;\r
- return r;\r
- }\r
- void printDiagnostics() {\r
-\r
- if (DIAGNOSTICS) {\r
-\r
- final DataContainer<Long> totalMem = new DataContainer<Long>(0L);\r
- final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);\r
- for(ClusterI cluster : clusterTable.getClusters()) {\r
- try {\r
- if (cluster.isLoaded() && !cluster.isEmpty()) {\r
- residentClusters.set(residentClusters.get() + 1);\r
- totalMem.set(totalMem.get() + cluster.getUsedSpace());\r
- }\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError(e);\r
- }\r
- }\r
-\r
- System.out.println("--------------------------------");\r
- System.out.println("Cluster information:");\r
- System.out.println("-amount of resident clusters=" + residentClusters.get());\r
- System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");\r
-\r
- for(ClusterI cluster : clusterTable.getClusters()) {\r
- System.out.print("Cluster " + cluster.getClusterId() + " [");\r
- try {\r
- if (!cluster.isLoaded())\r
- System.out.println("not loaded]");\r
- else if (cluster.isEmpty())\r
- System.out.println("is empty]");\r
- else {\r
- System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));\r
- System.out.print(",references=" + cluster.getReferenceCount());\r
- System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");\r
- }\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError(e);\r
- System.out.println("is corrupted]");\r
- }\r
- }\r
-\r
- queryProvider2.printDiagnostics();\r
- System.out.println("--------------------------------");\r
- }\r
-\r
- }\r
-\r
- public void fireStartReadTransaction() {\r
-\r
- if (DIAGNOSTICS)\r
- System.out.println("StartReadTransaction");\r
-\r
- for (SessionEventListener listener : eventListeners) {\r
- try {\r
- listener.readTransactionStarted();\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
- }\r
-\r
- }\r
-\r
- public void fireFinishReadTransaction() {\r
-\r
- if (DIAGNOSTICS)\r
- System.out.println("FinishReadTransaction");\r
-\r
- for (SessionEventListener listener : eventListeners) {\r
- try {\r
- listener.readTransactionFinished();\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
- }\r
-\r
- }\r
-\r
- public void fireStartWriteTransaction() {\r
-\r
- if (DIAGNOSTICS)\r
- System.out.println("StartWriteTransaction");\r
-\r
- for (SessionEventListener listener : eventListeners) {\r
- try {\r
- listener.writeTransactionStarted();\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
- }\r
-\r
- }\r
-\r
- public void fireFinishWriteTransaction() {\r
-\r
- if (DIAGNOSTICS)\r
- System.out.println("FinishWriteTransaction");\r
-\r
- for (SessionEventListener listener : eventListeners) {\r
- try {\r
- listener.writeTransactionFinished();\r
- } catch (Throwable t) {\r
- t.printStackTrace();\r
- }\r
- }\r
-\r
- Indexing.resetDependenciesIndexingDisabled();\r
-\r
- }\r
-\r
- Resource deserialize(final long resourceId, final long clusterId) throws IOException {\r
- throw new Error("Not supported at the moment.");\r
- }\r
-\r
- State getState() {\r
- return state;\r
- }\r
-\r
- ClusterTable getClusterTable() {\r
- return clusterTable;\r
- }\r
-\r
- public GraphSession getGraphSession() {\r
- return graphSession;\r
- }\r
-\r
- public QueryProcessor getQueryProvider2() {\r
- return queryProvider2;\r
- }\r
-\r
- ClientChangesImpl getClientChanges() {\r
- return clientChanges;\r
- }\r
-\r
- boolean getWriteOnly() {\r
- return writeOnly;\r
- }\r
-\r
- static int counter = 0;\r
-\r
- public void onClusterLoaded(long clusterId) {\r
-\r
- clusterTable.updateSize();\r
-\r
- }\r
-\r
-\r
-\r
-\r
- /*\r
- * Implementation of the interface RequestProcessor\r
- */\r
-\r
- @Override\r
- public <T> T syncRequest(final Read<T> request) throws DatabaseException {\r
-\r
- assertNotSession();\r
- assertAlive();\r
-\r
- assert(request != null);\r
-\r
- final DataContainer<T> result = new DataContainer<T>();\r
- final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
-\r
- syncRequest(request, new AsyncProcedure<T>() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, T t) {\r
- result.set(t);\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- exception.set(t);\r
- }\r
-\r
- });\r
-\r
- Throwable t = exception.get();\r
- if(t != null) {\r
- if(t instanceof DatabaseException) throw (DatabaseException)t;\r
- else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);\r
- }\r
-\r
- return result.get();\r
-\r
- }\r
-\r
-// @Override\r
-// public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {\r
-// assertNotSession();\r
-// return syncRequest(request, (AsyncProcedure<T>)procedure);\r
-// }\r
-\r
- @Override\r
- public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new SyncToAsyncListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new NoneToAsyncListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {\r
-\r
- assertNotSession();\r
-\r
- assert(request != null);\r
-\r
- final DataContainer<T> result = new DataContainer<T>();\r
- final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
-\r
- syncRequest(request, new AsyncProcedure<T>() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, T t) {\r
- result.set(t);\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- exception.set(t);\r
- }\r
-\r
- });\r
-\r
- Throwable t = exception.get();\r
- if(t != null) {\r
- if(t instanceof DatabaseException) throw (DatabaseException)t;\r
- else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);\r
- }\r
-\r
- return result.get();\r
-\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, (AsyncProcedure<T>)procedure);\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new SyncToAsyncListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new NoneToAsyncListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {\r
-\r
- assertNotSession();\r
-\r
- assert(request != null);\r
-\r
- final ArrayList<T> result = new ArrayList<T>();\r
- final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
-\r
- syncRequest(request, new AsyncMultiProcedure<T>() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, T t) {\r
- synchronized(result) {\r
- result.add(t);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(AsyncReadGraph graph) {\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- exception.set(t);\r
- }\r
-\r
-\r
- });\r
-\r
- Throwable t = exception.get();\r
- if(t != null) {\r
- if(t instanceof DatabaseException) throw (DatabaseException)t;\r
- else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);\r
- }\r
-\r
- return result;\r
-\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, (AsyncMultiProcedure<T>)procedure);\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {\r
-\r
- assertNotSession();\r
-\r
- assert(request != null);\r
-\r
- final ArrayList<T> result = new ArrayList<T>();\r
- final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
-\r
- syncRequest(request, new AsyncMultiProcedure<T>() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, T t) {\r
- synchronized(result) {\r
- result.add(t);\r
- }\r
- }\r
-\r
- @Override\r
- public void finished(AsyncReadGraph graph) {\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- exception.set(t);\r
- }\r
-\r
-\r
- });\r
-\r
- Throwable t = exception.get();\r
- if(t != null) {\r
- if(t instanceof DatabaseException) throw (DatabaseException)t;\r
- else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);\r
- }\r
-\r
- return result;\r
-\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, (AsyncMultiProcedure<T>)procedure);\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {\r
- assertNotSession();\r
- return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));\r
- }\r
-\r
-\r
- @Override\r
- public <T> void asyncRequest(Read<T> request) {\r
-\r
- asyncRequest(request, new ProcedureAdapter<T>() {\r
- @Override\r
- public void exception(Throwable t) {\r
- t.printStackTrace();\r
- }\r
- });\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {\r
- asyncRequest(request, (AsyncProcedure<T>)procedure);\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {\r
- asyncRequest(request, new SyncToAsyncListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {\r
- asyncRequest(request, new NoneToAsyncListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {\r
- asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {\r
- asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- final public <T> void asyncRequest(final AsyncRead<T> request) {\r
-\r
- assert(request != null);\r
-\r
- asyncRequest(request, new ProcedureAdapter<T>() {\r
- @Override\r
- public void exception(Throwable t) {\r
- t.printStackTrace();\r
- }\r
- });\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {\r
- scheduleRequest(request, procedure, procedure, null);\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {\r
- asyncRequest(request, new SyncToAsyncListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {\r
- asyncRequest(request, new NoneToAsyncListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {\r
- asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {\r
- asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> request) {\r
-\r
- assert(request != null);\r
-\r
- asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- t.printStackTrace();\r
- }\r
- });\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {\r
- asyncRequest(request, (AsyncMultiProcedure<T>)procedure);\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {\r
- asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {\r
- asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {\r
- asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {\r
- asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- final public <T> void asyncRequest(AsyncMultiRead<T> request) {\r
-\r
- assert(request != null);\r
-\r
- asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- t.printStackTrace();\r
- }\r
- });\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {\r
- asyncRequest(request, (AsyncMultiProcedure<T>)procedure);\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {\r
- asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {\r
- asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {\r
- asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {\r
- asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {\r
- assertNotSession();\r
- throw new Error("Not implemented!");\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {\r
- throw new Error("Not implemented!");\r
- }\r
-\r
- @Override\r
- final public <T> void asyncRequest(final ExternalRead<T> request) {\r
-\r
- assert(request != null);\r
-\r
- asyncRequest(request, new ProcedureAdapter<T>() {\r
- @Override\r
- public void exception(Throwable t) {\r
- t.printStackTrace();\r
- }\r
- });\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {\r
- asyncRequest(request, (Procedure<T>)procedure);\r
- }\r
-\r
-\r
-\r
- void check(Throwable t) throws DatabaseException {\r
- if(t != null) {\r
- if(t instanceof DatabaseException) throw (DatabaseException)t;\r
- else throw new DatabaseException("Unexpected exception", t);\r
- }\r
- }\r
-\r
- void check(DataContainer<Throwable> container) throws DatabaseException {\r
- Throwable t = container.get();\r
- if(t != null) {\r
- if(t instanceof DatabaseException) throw (DatabaseException)t;\r
- else throw new DatabaseException("Unexpected exception", t);\r
- }\r
- }\r
-\r
-\r
-\r
-\r
-\r
-\r
- boolean sameProvider(Write request) {\r
- if(writeState.getGraph().provider != null) {\r
- return writeState.getGraph().provider.equals(request.getProvider());\r
- } else {\r
- return request.getProvider() == null;\r
- }\r
- }\r
-\r
- boolean plainWrite(WriteGraphImpl graph) {\r
- if(graph == null) return false;\r
- if(graph.writeSupport.writeOnly()) return false;\r
- return true;\r
- }\r
-\r
-\r
- public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");\r
- private void assertNotSession() throws DatabaseException {\r
- Thread current = Thread.currentThread();\r
- if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");\r
- }\r
-\r
- void assertAlive() {\r
- if (!state.isAlive())\r
- throw new RuntimeDatabaseException("Session has been shut down.");\r
- }\r
-\r
- public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {\r
- return querySupport.getValueStream(graph, querySupport.getId(resource));\r
- }\r
-\r
- public byte[] getValue(ReadGraphImpl graph, Resource resource) {\r
- return querySupport.getValue(graph, querySupport.getId(resource));\r
- }\r
-\r
- <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {\r
- acquire(semaphore, request, null);\r
- }\r
-\r
- private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {\r
- assertAlive();\r
- try {\r
- long tryCount = 0;\r
- // Loop until the semaphore is acquired reporting requests that take a long time.\r
- while (true) {\r
-\r
- if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {\r
- // Acquired OK.\r
- return;\r
- } else {\r
- if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {\r
- ++tryCount;\r
- long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT\r
- .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)\r
- * tryCount;\r
- System.err.println("DB client request '" + request + "' is taking long to execute, so far "\r
- + (waitTime) + " ms. (procedure=" + procedure + ")");\r
- }\r
- }\r
-\r
- assertAlive();\r
-\r
- }\r
- } catch (InterruptedException e) {\r
- e.printStackTrace();\r
- // FIXME: Should perhaps do something else in this case ??\r
- }\r
- }\r
-\r
-\r
-\r
-// @Override\r
-// public boolean holdOnToTransactionAfterCancel() {\r
-// return transactionPolicy.holdOnToTransactionAfterCancel();\r
-// }\r
-//\r
-// @Override\r
-// public boolean holdOnToTransactionAfterCommit() {\r
-// return transactionPolicy.holdOnToTransactionAfterCommit();\r
-// }\r
-//\r
-// @Override\r
-// public boolean holdOnToTransactionAfterRead() {\r
-// return transactionPolicy.holdOnToTransactionAfterRead();\r
-// }\r
-//\r
-// @Override\r
-// public void onRelinquish() {\r
-// transactionPolicy.onRelinquish();\r
-// }\r
-//\r
-// @Override\r
-// public void onRelinquishDone() {\r
-// transactionPolicy.onRelinquishDone();\r
-// }\r
-//\r
-// @Override\r
-// public void onRelinquishError() {\r
-// transactionPolicy.onRelinquishError();\r
-// }\r
-\r
-\r
- @Override\r
- public Session getSession() {\r
- return this;\r
- }\r
-\r
-\r
- protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);\r
- protected abstract ResourceImpl getNewResource() throws DatabaseException;\r
-\r
- public void ceased(int thread) {\r
-\r
- requestManager.ceased(thread);\r
-\r
- }\r
-\r
- public int getAmountOfQueryThreads() {\r
- // This must be a power of two\r
- return 1;\r
-// return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());\r
- }\r
-\r
- public Resource getResourceByKey(int key) throws ResourceNotFoundException {\r
-\r
- return new ResourceImpl(resourceSupport, key);\r
-\r
- }\r
-\r
- public void acquireWriteOnly() {\r
- writeOnly = true;\r
- }\r
-\r
- public void releaseWriteOnly(ReadGraphImpl graph) {\r
- writeOnly = false;\r
- queryProvider2.releaseWrite(graph);\r
- }\r
-\r
- public void handleUpdatesAndMetadata(WriteGraphImpl writer) {\r
-\r
- long start = System.nanoTime();\r
-\r
- while(dirtyPrimitives) {\r
- dirtyPrimitives = false;\r
- getQueryProvider2().performDirtyUpdates(writer);\r
- getQueryProvider2().performScheduledUpdates(writer);\r
- }\r
-\r
- fireMetadataListeners(writer, clientChanges);\r
-\r
- if(DebugPolicy.PERFORMANCE_DATA) {\r
- long end = System.nanoTime();\r
- System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));\r
- }\r
-\r
- }\r
-\r
- void removeTemporaryData() {\r
- File platform = Platform.getLocation().toFile();\r
- File tempFiles = new File(platform, "tempFiles");\r
- File temp = new File(tempFiles, "db");\r
- if(!temp.exists()) \r
- return;\r
- try {\r
- Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {\r
- @Override\r
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {\r
- try {\r
- Files.delete(file);\r
- } catch (IOException e) {\r
- Logger.defaultLogError(e);\r
- }\r
- return FileVisitResult.CONTINUE;\r
- }\r
- });\r
- } catch (IOException e) {\r
- Logger.defaultLogError(e);\r
- }\r
- }\r
-\r
- public void handleCreatedClusters() {\r
- if ((serviceMode & SERVICE_MODE_CREATE) > 0) {\r
- createdClusters.forEach(new TLongProcedure() {\r
-\r
- @Override\r
- public boolean execute(long value) {\r
- ClusterImpl cluster = clusterTable.getClusterByClusterId(value);\r
- cluster.setImmutable(true, clusterTranslator);\r
- return true;\r
- }\r
- });\r
- }\r
- createdClusters.clear();\r
- }\r
-\r
- @Override\r
- public Object getModificationCounter() {\r
- return queryProvider2.modificationCounter;\r
- }\r
- \r
- @Override\r
- public void markUndoPoint() {\r
- state.setCombine(false);\r
- }\r
-\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package fi.vtt.simantics.procore.internal;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import org.eclipse.core.runtime.Platform;
+import org.simantics.databoard.Bindings;
+import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.ChangeSet;
+import org.simantics.db.DevelopmentKeys;
+import org.simantics.db.Disposable;
+import org.simantics.db.ExternalValueSupport;
+import org.simantics.db.Metadata;
+import org.simantics.db.MonitorContext;
+import org.simantics.db.MonitorHandler;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.Resource;
+import org.simantics.db.ResourceSerializer;
+import org.simantics.db.Session;
+import org.simantics.db.SessionManager;
+import org.simantics.db.SessionVariables;
+import org.simantics.db.VirtualGraph;
+import org.simantics.db.WriteGraph;
+import org.simantics.db.authentication.UserAuthenticationAgent;
+import org.simantics.db.authentication.UserAuthenticator;
+import org.simantics.db.common.Indexing;
+import org.simantics.db.common.TransactionPolicyRelease;
+import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
+import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
+import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
+import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
+import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
+import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
+import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
+import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
+import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
+import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
+import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
+import org.simantics.db.common.service.ServiceActivityMonitorImpl;
+import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.event.ChangeEvent;
+import org.simantics.db.event.ChangeListener;
+import org.simantics.db.event.SessionEventListener;
+import org.simantics.db.exception.CancelTransactionException;
+import org.simantics.db.exception.ClusterSetExistException;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.ImmutableException;
+import org.simantics.db.exception.InvalidResourceReferenceException;
+import org.simantics.db.exception.ResourceNotFoundException;
+import org.simantics.db.exception.RuntimeDatabaseException;
+import org.simantics.db.exception.ServiceException;
+import org.simantics.db.exception.ServiceNotFoundException;
+import org.simantics.db.impl.BlockingAsyncProcedure;
+import org.simantics.db.impl.ClusterBase;
+import org.simantics.db.impl.ClusterI;
+import org.simantics.db.impl.ClusterTraitsBase;
+import org.simantics.db.impl.ClusterTranslator;
+import org.simantics.db.impl.ResourceImpl;
+import org.simantics.db.impl.TransientGraph;
+import org.simantics.db.impl.VirtualGraphImpl;
+import org.simantics.db.impl.graph.DelayedWriteGraph;
+import org.simantics.db.impl.graph.ReadGraphImpl;
+import org.simantics.db.impl.graph.WriteGraphImpl;
+import org.simantics.db.impl.graph.WriteSupport;
+import org.simantics.db.impl.internal.RandomAccessValueSupport;
+import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
+import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
+import org.simantics.db.impl.query.QueryCache;
+import org.simantics.db.impl.query.QueryCacheBase;
+import org.simantics.db.impl.query.QueryProcessor;
+import org.simantics.db.impl.query.QueryProcessor.SessionRead;
+import org.simantics.db.impl.query.QueryProcessor.SessionTask;
+import org.simantics.db.impl.service.QueryDebug;
+import org.simantics.db.impl.support.VirtualGraphServerSupport;
+import org.simantics.db.impl.support.WriteRequestScheduleSupport;
+import org.simantics.db.procedure.AsyncListener;
+import org.simantics.db.procedure.AsyncMultiListener;
+import org.simantics.db.procedure.AsyncMultiProcedure;
+import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.procedure.Listener;
+import org.simantics.db.procedure.ListenerBase;
+import org.simantics.db.procedure.MultiListener;
+import org.simantics.db.procedure.MultiProcedure;
+import org.simantics.db.procedure.Procedure;
+import org.simantics.db.procedure.SyncListener;
+import org.simantics.db.procedure.SyncMultiListener;
+import org.simantics.db.procedure.SyncMultiProcedure;
+import org.simantics.db.procedure.SyncProcedure;
+import org.simantics.db.procore.cluster.ClusterImpl;
+import org.simantics.db.procore.cluster.ClusterTraits;
+import org.simantics.db.procore.protocol.Constants;
+import org.simantics.db.procore.protocol.DebugPolicy;
+import org.simantics.db.request.AsyncMultiRead;
+import org.simantics.db.request.AsyncRead;
+import org.simantics.db.request.DelayedWrite;
+import org.simantics.db.request.DelayedWriteResult;
+import org.simantics.db.request.ExternalRead;
+import org.simantics.db.request.MultiRead;
+import org.simantics.db.request.Read;
+import org.simantics.db.request.ReadInterface;
+import org.simantics.db.request.Write;
+import org.simantics.db.request.WriteInterface;
+import org.simantics.db.request.WriteOnly;
+import org.simantics.db.request.WriteOnlyResult;
+import org.simantics.db.request.WriteResult;
+import org.simantics.db.request.WriteTraits;
+import org.simantics.db.service.ByteReader;
+import org.simantics.db.service.ClusterBuilder;
+import org.simantics.db.service.ClusterBuilderFactory;
+import org.simantics.db.service.ClusterControl;
+import org.simantics.db.service.ClusterSetsSupport;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.db.service.ClusteringSupport;
+import org.simantics.db.service.CollectionSupport;
+import org.simantics.db.service.DebugSupport;
+import org.simantics.db.service.DirectQuerySupport;
+import org.simantics.db.service.EventSupport;
+import org.simantics.db.service.GraphChangeListenerSupport;
+import org.simantics.db.service.InitSupport;
+import org.simantics.db.service.LifecycleSupport;
+import org.simantics.db.service.ManagementSupport;
+import org.simantics.db.service.QueryControl;
+import org.simantics.db.service.SerialisationSupport;
+import org.simantics.db.service.ServerInformation;
+import org.simantics.db.service.ServiceActivityMonitor;
+import org.simantics.db.service.SessionEventSupport;
+import org.simantics.db.service.SessionMonitorSupport;
+import org.simantics.db.service.SessionUserSupport;
+import org.simantics.db.service.StatementSupport;
+import org.simantics.db.service.TransactionPolicySupport;
+import org.simantics.db.service.TransactionSupport;
+import org.simantics.db.service.TransferableGraphSupport;
+import org.simantics.db.service.UndoRedoSupport;
+import org.simantics.db.service.VirtualGraphSupport;
+import org.simantics.db.service.XSupport;
+import org.simantics.layer0.Layer0;
+import org.simantics.utils.DataContainer;
+import org.simantics.utils.Development;
+import org.simantics.utils.threads.logger.ITask;
+import org.simantics.utils.threads.logger.ThreadLogger;
+import org.slf4j.LoggerFactory;
+
+import gnu.trove.procedure.TLongProcedure;
+import gnu.trove.set.hash.TLongHashSet;
+
+
+public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
+
+ private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class);
+
+ protected static final boolean DEBUG = false;
+
+ private static final boolean DIAGNOSTICS = false;
+
+ private TransactionPolicySupport transactionPolicy;
+
+ final private ClusterControl clusterControl;
+
+ final protected BuiltinSupportImpl builtinSupport;
+ final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
+ final protected ClusterSetsSupport clusterSetsSupport;
+ final protected LifecycleSupportImpl lifecycleSupport;
+
+ protected QuerySupportImpl querySupport;
+ protected ResourceSupportImpl resourceSupport;
+ protected WriteSupport writeSupport;
+ public ClusterTranslator clusterTranslator;
+
+ boolean dirtyPrimitives = false;
+
+ public static final int SERVICE_MODE_CREATE = 2;
+ public static final int SERVICE_MODE_ALLOW = 1;
+ public int serviceMode = 0;
+ public boolean createdImmutableClusters = false;
+ public TLongHashSet createdClusters = new TLongHashSet();
+
+ private Layer0 L0;
+
+ /**
+ * The service locator maintained by the workbench. These services are
+ * initialized during workbench during the <code>init</code> method.
+ */
+ final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
+
+ final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
+
+ final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
+
+ final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
+
+ final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
+
+ final HashSet<Thread> sessionThreads = new HashSet<Thread>();
+
+ final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
+
+ final protected State state = new State();
+
+ protected GraphSession graphSession = null;
+
+ protected SessionManager sessionManagerImpl = null;
+
+ protected UserAuthenticationAgent authAgent = null;
+
+ protected UserAuthenticator authenticator = null;
+
+ protected Resource user = null;
+
+ protected ClusterStream clusterStream = null;
+
+ protected SessionRequestManager requestManager = null;
+
+ public ClusterTable clusterTable = null;
+
+ public QueryProcessor queryProvider2 = null;
+
+ ClientChangesImpl clientChanges = null;
+
+ MonitorHandler[] monitorHandlers = new MonitorHandler[0];
+
+// protected long newClusterId = Constants.NullClusterId;
+
+ protected int flushCounter = 0;
+
+ protected boolean writeOnly = false;
+
+ WriteState<?> writeState = null;
+ WriteStateBase<?> delayedWriteState = null;
+ protected Resource defaultClusterSet = null; // If not null then used for newResource().
+
+ public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
+
+// if (authAgent == null)
+// throw new IllegalArgumentException("null authentication agent");
+
+ File t = StaticSessionProperties.virtualGraphStoragePath;
+ if (null == t)
+ t = new File(".");
+ this.clusterTable = new ClusterTable(this, t);
+ this.builtinSupport = new BuiltinSupportImpl(this);
+ this.sessionManagerImpl = sessionManagerImpl;
+ this.user = null;
+// this.authAgent = authAgent;
+
+ serviceLocator.registerService(Session.class, this);
+ serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
+ serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
+ serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
+ serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
+ serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
+ serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
+ serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
+ serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
+ serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
+ serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
+ serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
+ serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
+ serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
+ serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
+ serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
+ serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
+ serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
+ serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
+ serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
+ serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
+ serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
+ serviceLocator.registerService(EventSupport.class, new EventSupportImpl());
+ ServiceActivityUpdaterForWriteTransactions.register(this);
+
+ this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
+ serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
+ serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
+ this.lifecycleSupport = new LifecycleSupportImpl(this);
+ serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
+ this.transactionPolicy = new TransactionPolicyRelease();
+ serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
+ this.clusterControl = new ClusterControlImpl(this);
+ serviceLocator.registerService(ClusterControl.class, clusterControl);
+ this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
+ this.clusterSetsSupport.setReadDirectory(t.toPath());
+ this.clusterSetsSupport.updateWriteDirectory(t.toPath());
+ serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
+ }
+
+ /*
+ *
+ * Session interface
+ */
+
+
+ @Override
+ public Resource getRootLibrary() {
+ return queryProvider2.getRootLibraryResource();
+ }
+
+ void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
+ if (!graphSession.dbSession.refreshEnabled())
+ return;
+ try {
+ getClusterTable().refresh(csid, this, clusterUID);
+ } catch (Throwable t) {
+ LOGGER.error("refresh({}, {}) failed", thread, csid, t);
+ }
+ }
+
+ static final class TaskHelper {
+ private final String name;
+ private Object result;
+ final Semaphore sema = new Semaphore(0);
+ private Throwable throwable = null;
+ final Consumer<DatabaseException> callback = e -> {
+ synchronized (TaskHelper.this) {
+ throwable = e;
+ }
+ };
+ final Procedure<Object> proc = new Procedure<Object>() {
+ @Override
+ public void execute(Object result) {
+ callback.accept(null);
+ }
+ @Override
+ public void exception(Throwable t) {
+ if (t instanceof DatabaseException)
+ callback.accept((DatabaseException)t);
+ else
+ callback.accept(new DatabaseException("" + name + "operation failed.", t));
+ }
+ };
+ final WriteTraits writeTraits = new WriteTraits() {};
+ TaskHelper(String name) {
+ this.name = name;
+ }
+ @SuppressWarnings("unchecked")
+ <T> T getResult() {
+ return (T)result;
+ }
+ void setResult(Object result) {
+ this.result = result;
+ }
+// Throwable throwableGet() {
+// return throwable;
+// }
+ synchronized void throwableSet(Throwable t) {
+ throwable = t;
+ }
+// void throwableSet(String t) {
+// throwable = new InternalException("" + name + " operation failed. " + t);
+// }
+ synchronized void throwableCheck()
+ throws DatabaseException {
+ if (null != throwable)
+ if (throwable instanceof DatabaseException)
+ throw (DatabaseException)throwable;
+ else
+ throw new DatabaseException("Undo operation failed.", throwable);
+ }
+ void throw_(String message)
+ throws DatabaseException {
+ throw new DatabaseException("" + name + " operation failed. " + message);
+ }
+ }
+
+
+
+ private ListenerBase getListenerBase(Object procedure) {
+ if (procedure instanceof ListenerBase)
+ return (ListenerBase) procedure;
+ else
+ return null;
+ }
+
+ public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
+ scheduleRequest(request, callback, notify, null);
+ }
+
+ /* (non-Javadoc)
+ * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
+ */
+ @Override
+ public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
+
+ assert (request != null);
+
+ if(Development.DEVELOPMENT) {
+ try {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
+ System.err.println("schedule write '" + request + "'");
+ } catch (Throwable t) {
+ Logger.defaultLogError(t);
+ }
+ }
+
+ requestManager.scheduleWrite(new SessionTask(null) {
+
+ @Override
+ public void run0(int thread) {
+
+ if(Development.DEVELOPMENT) {
+ try {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
+ System.err.println("perform write '" + request + "'");
+ } catch (Throwable t) {
+ Logger.defaultLogError(t);
+ }
+ }
+
+ ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
+
+ fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
+
+ try {
+
+ flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
+ clientChanges = new ClientChangesImpl(SessionImplSocket.this);
+
+ VirtualGraph vg = getProvider(request.getProvider());
+
+ WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
+ writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
+
+ @Override
+ public void execute(Object result) {
+ if(callback != null) callback.accept(null);
+ }
+
+ @Override
+ public void exception(Throwable t) {
+ if(callback != null) callback.accept((DatabaseException)t);
+ }
+
+ });
+
+ assert (null != writer);
+ writer.asyncBarrier.inc();
+
+ try {
+ request.perform(writer);
+ assert (null != writer);
+ } catch (Throwable t) {
+ if (!(t instanceof CancelTransactionException))
+ LOGGER.error("Write transaction caused an unexpected error, see exception.", t);
+ writeState.except(t);
+ } finally {
+ writer.asyncBarrier.dec();
+// writer.waitAsync(request);
+ }
+
+
+ assert(!queryProvider2.cache.dirty);
+
+ } catch (Throwable e) {
+
+ // Log it first, just to be safe that the error is always logged.
+ if (!(e instanceof CancelTransactionException))
+ LOGGER.error("Write transaction caused an unexpected error, see exception.", e);
+
+// writeState.getGraph().state.barrier.dec();
+// writeState.getGraph().waitAsync(request);
+
+ writeState.except(e);
+
+// try {
+// // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
+// // All we can do here is to log those, can't really pass them anywhere.
+// if (callback != null) {
+// if(e instanceof DatabaseException) callback.run((DatabaseException)e);
+// else callback.run(new DatabaseException(e));
+// }
+// } catch (Throwable e2) {
+// Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
+// }
+// boolean empty = clusterStream.reallyFlush();
+// int callerThread = -1;
+// ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
+// SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
+// try {
+// // Send all local changes to server so it can calculate correct reverse change set.
+// // This will call clusterStream.accept().
+// state.cancelCommit(context, clusterStream);
+// if (!empty) {
+// if (!context.isOk()) // this is a blocking operation
+// throw new InternalException("Cancel failed. This should never happen.");
+// getQueryProvider2().performDirtyUpdates(writeState.getGraph());
+// }
+// state.cancelCommit2(context, clusterStream);
+// } catch (DatabaseException e3) {
+// Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
+// } finally {
+// clusterStream.setOff(false);
+// clientChanges = new ClientChangesImpl(SessionImplSocket.this);
+// }
+
+ } finally {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
+
+ }
+
+ task.finish();
+
+ if(Development.DEVELOPMENT) {
+ try {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
+ System.err.println("finish write '" + request + "'");
+ } catch (Throwable t) {
+ Logger.defaultLogError(t);
+ }
+ }
+
+ }
+
+ }, combine);
+
+ }
+
+ public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
+ scheduleRequest(request, procedure, notify, null);
+ }
+
+ /* (non-Javadoc)
+ * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
+ */
+ @Override
+ public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
+
+ assert (request != null);
+
+ requestManager.scheduleWrite(new SessionTask(null) {
+
+ @Override
+ public void run0(int thread) {
+
+ ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
+
+ fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
+
+ flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
+ clientChanges = new ClientChangesImpl(SessionImplSocket.this);
+
+ VirtualGraph vg = getProvider(request.getProvider());
+ WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
+
+ WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
+ writeState = writeStateT;
+ assert (null != writer);
+
+ try {
+ writer.asyncBarrier.inc();
+ writeStateT.setResult(request.perform(writer));
+ assert (null != writer);
+ } catch (Throwable e) {
+ writeState.except(e);
+ } finally {
+ writer.asyncBarrier.dec();
+ fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
+ }
+
+ task.finish();
+
+ }
+
+ }, combine);
+
+ }
+
+ public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
+ scheduleRequest(request, callback, notify, null);
+ }
+
+ /* (non-Javadoc)
+ * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
+ */
+ @Override
+ public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
+
+ final ITask total = ThreadLogger.task("ScheduleDelayedWrite");
+
+ assert (request != null);
+
+ requestManager.scheduleWrite(new SessionTask(null) {
+
+ @Override
+ public void run0(int thread) {
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ Procedure<Object> stateProcedure = new Procedure<Object>() {
+ @Override
+ public void execute(Object result) {
+ if (callback != null)
+ callback.accept(null);
+ }
+ @Override
+ public void exception(Throwable t) {
+ if (callback != null) {
+ if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
+ else callback.accept(new DatabaseException(t));
+ } else
+ LOGGER.error("Unhandled exception", t);
+ }
+ };
+
+ final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+ delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
+ DelayedWriteGraph dwg = null;
+// newGraph.state.barrier.inc();
+
+ try {
+ dwg = new DelayedWriteGraph(newGraph);
+ request.perform(dwg);
+ } catch (Throwable e) {
+ delayedWriteState.except(e);
+ total.finish();
+ dwg.close();
+ return;
+ } finally {
+// newGraph.state.barrier.dec();
+// newGraph.waitAsync(request);
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+ }
+
+ delayedWriteState = null;
+
+ ITask task2 = ThreadLogger.task("DelayedWriteCommit");
+ fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
+
+ flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
+ clientChanges = new ClientChangesImpl(SessionImplSocket.this);
+
+ acquireWriteOnly();
+
+ VirtualGraph vg = getProvider(request.getProvider());
+ WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
+
+ WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
+
+ writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
+
+ assert (null != writer);
+// writer.state.barrier.inc();
+
+ try {
+ // Cannot cancel
+ dwg.commit(writer, request);
+
+ if(defaultClusterSet != null) {
+ XSupport xs = getService(XSupport.class);
+ defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
+ ClusteringSupport cs = getService(ClusteringSupport.class);
+ clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
+ }
+
+ // This makes clusters available from server
+ clusterStream.reallyFlush();
+
+ releaseWriteOnly(writer);
+ handleUpdatesAndMetadata(writer);
+
+ } catch (ServiceException e) {
+// writer.state.barrier.dec();
+// writer.waitAsync(null);
+
+ // These shall be requested from server
+ clusterTable.removeWriteOnlyClusters();
+ // This makes clusters available from server
+ clusterStream.reallyFlush();
+
+ releaseWriteOnly(writer);
+ writeState.except(e);
+ } finally {
+ // Debugging & Profiling
+ fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
+ task2.finish();
+ total.finish();
+ }
+ }
+
+ }, combine);
+
+ }
+
+ public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
+ scheduleRequest(request, procedure, notify, null);
+ }
+
+ /* (non-Javadoc)
+ * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
+ */
+ @Override
+ public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
+ throw new Error("Not implemented");
+ }
+
+ protected ClusterImpl getNewResourceCluster() throws DatabaseException {
+ ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
+ if((serviceMode & SERVICE_MODE_CREATE) > 0) {
+ createdClusters.add(cluster.clusterId);
+ }
+ return cluster;
+ }
+
+ class WriteOnlySupport implements WriteSupport {
+
+ ClusterStream stream;
+ ClusterImpl currentCluster;
+
+ public WriteOnlySupport() {
+ this.stream = clusterStream;
+ }
+
+ @Override
+ public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
+ claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
+ }
+
+ @Override
+ public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
+
+ ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
+
+ if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
+ if(s != queryProvider2.getRootLibrary())
+ throw new ImmutableException("Trying to modify immutable resource key=" + s);
+
+ try {
+ maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ //throw e;
+ }
+
+ clientChanges.invalidate(s);
+
+ if (cluster.isWriteOnly())
+ return;
+ queryProvider2.updateStatements(s, p);
+
+ }
+
+ @Override
+ public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
+ claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
+ }
+
+ @Override
+ public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
+
+ ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
+ try {
+ maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ clientChanges.invalidate(rid);
+
+ if (cluster.isWriteOnly())
+ return;
+ queryProvider2.updateValue(rid);
+
+ }
+
+ @Override
+ public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
+
+ if(amount < 65536) {
+ claimValue(provider,resource, reader.readBytes(null, amount));
+ return;
+ }
+
+ byte[] bytes = new byte[65536];
+
+ int rid = ((ResourceImpl)resource).id;
+ ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
+ try {
+ int left = amount;
+ while(left > 0) {
+ int block = Math.min(left, 65536);
+ reader.readBytes(bytes, block);
+ maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
+ left -= block;
+ }
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ clientChanges.invalidate(rid);
+
+ if (cluster.isWriteOnly())
+ return;
+ queryProvider2.updateValue(rid);
+
+ }
+
+ private void maintainCluster(ClusterImpl before, ClusterI after_) {
+ if(after_ != null && after_ != before) {
+ ClusterImpl after = (ClusterImpl)after_;
+ if(currentCluster == before) {
+ currentCluster = after;
+ }
+ clusterTable.replaceCluster(after);
+ }
+ }
+
+ public int createResourceKey(int foreignCounter) throws DatabaseException {
+ if(currentCluster == null) {
+ currentCluster = getNewResourceCluster();
+ }
+ if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
+ ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
+ newCluster.foreignLookup = new byte[foreignCounter];
+ currentCluster = newCluster;
+ if (DEBUG)
+ System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
+ }
+ return currentCluster.createResource(clusterTranslator);
+ }
+
+ @Override
+ public Resource createResource(VirtualGraph provider) throws DatabaseException {
+ if(currentCluster == null) {
+ if (null != defaultClusterSet) {
+ ResourceImpl result = getNewResource(defaultClusterSet);
+ currentCluster = clusterTable.getClusterByResourceKey(result.id);
+ return result;
+ } else {
+ currentCluster = getNewResourceCluster();
+ }
+ }
+ if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
+ if (null != defaultClusterSet) {
+ ResourceImpl result = getNewResource(defaultClusterSet);
+ currentCluster = clusterTable.getClusterByResourceKey(result.id);
+ return result;
+ } else {
+ currentCluster = getNewResourceCluster();
+ }
+ }
+ return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
+ }
+
+ @Override
+ public Resource createResource(VirtualGraph provider, long clusterId)
+ throws DatabaseException {
+ return getNewResource(clusterId);
+ }
+
+ @Override
+ public Resource createResource(VirtualGraph provider, Resource clusterSet)
+ throws DatabaseException {
+ return getNewResource(clusterSet);
+ }
+
+ @Override
+ public void createClusterSet(VirtualGraph provider, Resource clusterSet)
+ throws DatabaseException {
+ getNewClusterSet(clusterSet);
+ }
+
+ @Override
+ public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
+ throws ServiceException {
+ return containsClusterSet(clusterSet);
+ }
+
+ public void selectCluster(long cluster) {
+ currentCluster = clusterTable.getClusterByClusterId(cluster);
+ long setResourceId = clusterSetsSupport.getSet(cluster);
+ clusterSetsSupport.put(setResourceId, cluster);
+ }
+
+ @Override
+ public Resource setDefaultClusterSet(Resource clusterSet)
+ throws ServiceException {
+ Resource result = setDefaultClusterSet4NewResource(clusterSet);
+ if(clusterSet != null) {
+ long id = clusterSetsSupport.get(clusterSet.getResourceId());
+ currentCluster = clusterTable.getClusterByClusterId(id);
+ return result;
+ } else {
+ currentCluster = null;
+ return null;
+ }
+ }
+
+ @Override
+ public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
+
+ provider = getProvider(provider);
+ if (null == provider) {
+ int key = ((ResourceImpl)resource).id;
+
+
+
+ ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
+// if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
+// if(key != queryProvider2.getRootLibrary())
+// throw new ImmutableException("Trying to modify immutable resource key=" + key);
+
+// try {
+// cluster.removeValue(key, clusterTranslator);
+// } catch (DatabaseException e) {
+// Logger.defaultLogError(e);
+// return;
+// }
+
+ clusterTable.writeOnlyInvalidate(cluster);
+
+ try {
+ int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
+ clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
+ clusterTranslator.removeValue(cluster);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ queryProvider2.invalidateResource(key);
+ clientChanges.invalidate(key);
+
+ } else {
+ ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
+ queryProvider2.updateValue(querySupport.getId(resource));
+ clientChanges.claimValue(resource);
+ }
+
+
+ }
+
+ @Override
+ public void flush(boolean intermediate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void flushCluster() {
+ clusterTable.flushCluster(graphSession);
+ if(defaultClusterSet != null) {
+ clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
+ }
+ currentCluster = null;
+ }
+
+ @Override
+ public void flushCluster(Resource r) {
+ throw new UnsupportedOperationException("flushCluster resource " + r);
+ }
+
+ @Override
+ public void gc() {
+ }
+
+ @Override
+ public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
+ Resource object) {
+
+ int s = ((ResourceImpl)subject).id;
+ int p = ((ResourceImpl)predicate).id;
+ int o = ((ResourceImpl)object).id;
+
+ provider = getProvider(provider);
+ if (null == provider) {
+
+ ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
+ clusterTable.writeOnlyInvalidate(cluster);
+
+ try {
+
+ int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
+
+ ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
+ ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
+
+ clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
+ clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
+ clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
+ clusterTranslator.removeStatement(cluster);
+
+ queryProvider2.invalidateResource(s);
+ clientChanges.invalidate(s);
+
+ } catch (DatabaseException e) {
+
+ Logger.defaultLogError(e);
+
+ }
+
+ return true;
+
+ } else {
+
+ ((VirtualGraphImpl)provider).deny(s, p, o);
+ queryProvider2.invalidateResource(s);
+ clientChanges.invalidate(s);
+
+ return true;
+
+ }
+
+ }
+
+ @Override
+ public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean writeOnly() {
+ return true;
+ }
+
+ @Override
+ public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
+ writeSupport.performWriteRequest(graph, request);
+ }
+
+ @Override
+ public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void addMetadata(Metadata data) throws ServiceException {
+ writeSupport.addMetadata(data);
+ }
+
+ @Override
+ public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
+ return writeSupport.getMetadata(clazz);
+ }
+
+ @Override
+ public TreeMap<String, byte[]> getMetadata() {
+ return writeSupport.getMetadata();
+ }
+
+ @Override
+ public void commitDone(WriteTraits writeTraits, long csid) {
+ writeSupport.commitDone(writeTraits, csid);
+ }
+
+ @Override
+ public void clearUndoList(WriteTraits writeTraits) {
+ writeSupport.clearUndoList(writeTraits);
+ }
+ @Override
+ public int clearMetadata() {
+ return writeSupport.clearMetadata();
+ }
+
+ @Override
+ public void startUndo() {
+ writeSupport.startUndo();
+ }
+ }
+
+ class VirtualWriteOnlySupport implements WriteSupport {
+
+// @Override
+// public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
+// Resource object) {
+// throw new UnsupportedOperationException();
+// }
+
+ @Override
+ public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
+
+ TransientGraph impl = (TransientGraph)provider;
+ impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
+ getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
+ clientChanges.claim(subject, predicate, object);
+
+ }
+
+ @Override
+ public void claim(VirtualGraph provider, int subject, int predicate, int object) {
+
+ TransientGraph impl = (TransientGraph)provider;
+ impl.claim(subject, predicate, object);
+ getQueryProvider2().updateStatements(subject, predicate);
+ clientChanges.claim(subject, predicate, object);
+
+ }
+
+ @Override
+ public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
+ claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
+ }
+
+ @Override
+ public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
+ ((VirtualGraphImpl)provider).claimValue(resource, value, length);
+ getQueryProvider2().updateValue(resource);
+ clientChanges.claimValue(resource);
+ }
+
+ @Override
+ public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
+ byte[] value = reader.readBytes(null, amount);
+ claimValue(provider, resource, value);
+ }
+
+ @Override
+ public Resource createResource(VirtualGraph provider) {
+ TransientGraph impl = (TransientGraph)provider;
+ return impl.getResource(impl.newResource(false));
+ }
+
+ @Override
+ public Resource createResource(VirtualGraph provider, long clusterId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Resource createResource(VirtualGraph provider, Resource clusterSet)
+ throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createClusterSet(VirtualGraph provider, Resource clusterSet)
+ throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
+ throws ServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Resource setDefaultClusterSet(Resource clusterSet)
+ throws ServiceException {
+ return null;
+ }
+
+ @Override
+ public void denyValue(VirtualGraph provider, Resource resource) {
+ ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
+ getQueryProvider2().updateValue(querySupport.getId(resource));
+ // NOTE: this only keeps track of value changes by-resource.
+ clientChanges.claimValue(resource);
+ }
+
+ @Override
+ public void flush(boolean intermediate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void flushCluster() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void flushCluster(Resource r) {
+ throw new UnsupportedOperationException("Resource " + r);
+ }
+
+ @Override
+ public void gc() {
+ }
+
+ @Override
+ public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
+ TransientGraph impl = (TransientGraph) provider;
+ impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
+ getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
+ clientChanges.deny(subject, predicate, object);
+ return true;
+ }
+
+ @Override
+ public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean writeOnly() {
+ return true;
+ }
+
+ @Override
+ public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void addMetadata(Metadata data) throws ServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TreeMap<String, byte[]> getMetadata() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void commitDone(WriteTraits writeTraits, long csid) {
+ }
+
+ @Override
+ public void clearUndoList(WriteTraits writeTraits) {
+ }
+
+ @Override
+ public int clearMetadata() {
+ return 0;
+ }
+
+ @Override
+ public void startUndo() {
+ }
+ }
+
+ private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
+
+ try {
+
+ //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+
+ fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
+
+ flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
+ clientChanges = new ClientChangesImpl(SessionImplSocket.this);
+
+ acquireWriteOnly();
+
+ WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
+
+ WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
+
+ WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
+ writeState = writeStateT;
+
+ assert (null != writer);
+// writer.state.barrier.inc();
+ long start = System.nanoTime();
+ T result = request.perform(writer);
+ long duration = System.nanoTime() - start;
+ if (DEBUG) {
+ System.err.println("################");
+ System.err.println("WriteOnly duration " + 1e-9*duration);
+ }
+ writeStateT.setResult(result);
+
+ // This makes clusters available from server
+ clusterStream.reallyFlush();
+
+ // This will trigger query updates
+ releaseWriteOnly(writer);
+ assert (null != writer);
+
+ handleUpdatesAndMetadata(writer);
+
+ } catch (CancelTransactionException e) {
+
+ releaseWriteOnly(writeState.getGraph());
+
+ clusterTable.removeWriteOnlyClusters();
+ state.stopWriteTransaction(clusterStream);
+
+ } catch (Throwable e) {
+
+ e.printStackTrace();
+
+ releaseWriteOnly(writeState.getGraph());
+
+ clusterTable.removeWriteOnlyClusters();
+
+ if (callback != null)
+ callback.exception(new DatabaseException(e));
+
+ state.stopWriteTransaction(clusterStream);
+ LOGGER.error("Write transaction caused an unexpected error, see exception.", e);
+
+ } finally {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
+
+ }
+
+ }
+
+ public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
+ scheduleRequest(request, callback, notify, null);
+ }
+
+ @Override
+ public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
+
+ assertAlive();
+
+ assert (request != null);
+
+ requestManager.scheduleWrite(new SessionTask(null) {
+
+ @Override
+ public void run0(int thread) {
+
+ ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
+
+ try {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
+
+ flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
+ clientChanges = new ClientChangesImpl(SessionImplSocket.this);
+
+ acquireWriteOnly();
+
+ VirtualGraph vg = getProvider(request.getProvider());
+ WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
+
+ WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
+
+ writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
+
+ @Override
+ public void execute(Object result) {
+ if(callback != null) callback.accept(null);
+ }
+
+ @Override
+ public void exception(Throwable t) {
+ if(callback != null) callback.accept((DatabaseException)t);
+ }
+
+ });
+
+ assert (null != writer);
+// writer.state.barrier.inc();
+
+ try {
+
+ request.perform(writer);
+
+ } catch (Throwable e) {
+
+// writer.state.barrier.dec();
+// writer.waitAsync(null);
+
+ releaseWriteOnly(writer);
+
+ clusterTable.removeWriteOnlyClusters();
+
+ if(!(e instanceof CancelTransactionException)) {
+ if (callback != null)
+ callback.accept(new DatabaseException(e));
+ }
+
+ writeState.except(e);
+
+ return;
+
+ }
+
+ // This makes clusters available from server
+ boolean empty = clusterStream.reallyFlush();
+ // This was needed to make WO requests call metadata listeners.
+ // NOTE: the calling event does not contain clientChanges information.
+ if (!empty && clientChanges.isEmpty())
+ clientChanges.setNotEmpty(true);
+ releaseWriteOnly(writer);
+ assert (null != writer);
+ } finally {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
+
+ }
+
+
+ task.finish();
+
+ }
+
+ }, combine);
+
+ }
+
+ public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
+ scheduleRequest(request, callback, notify, null);
+ }
+
+ /* (non-Javadoc)
+ * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
+ */
+ @Override
+ public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
+
+ assert (request != null);
+
+ requestManager.scheduleWrite(new SessionTask(null) {
+
+ @Override
+ public void run0(int thread) {
+
+ ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
+
+ performWriteOnly(request, notify, callback);
+
+ task.finish();
+
+ }
+
+ }, combine);
+
+ }
+
+ public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
+
+ assert (request != null);
+ assert (procedure != null);
+
+ requestManager.scheduleRead(new SessionRead(throwable, notify) {
+
+ @Override
+ public void run0(int thread) {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ ITask task = ThreadLogger.task(request);
+
+ ListenerBase listener = getListenerBase(procedure);
+
+ final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+
+ // This is never synced but increase to prevent it from visiting 0
+ newGraph.asyncBarrier.inc();
+
+ try {
+
+ if (listener != null) {
+
+ try {
+
+ AsyncProcedure ap = new AsyncProcedure<T>() {
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ procedure.exception(graph, t);
+ if(throwable != null) {
+ throwable.set(t);
+ } else {
+ // ErrorLogger.defaultLogError("Unhandled exception", t);
+ }
+ }
+
+ @Override
+ public void execute(AsyncReadGraph graph, T t) {
+ if(result != null) result.set(t);
+ procedure.execute(graph, t);
+ }
+
+ };
+
+ QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
+
+ } catch (Throwable t) {
+ // This is handled by the AsyncProcedure
+ //Logger.defaultLogError("Internal error", t);
+ }
+
+ } else {
+
+ try {
+
+ T t = request.perform(newGraph);
+
+ try {
+
+ if(result != null) result.set(t);
+ procedure.execute(newGraph, t);
+
+ } catch (Throwable th) {
+
+ if(throwable != null) {
+ throwable.set(th);
+ } else {
+ LOGGER.error("Unhandled exception", th);
+ }
+
+ }
+
+ } catch (Throwable t) {
+
+ if (DEBUG)
+ t.printStackTrace();
+
+ if(throwable != null) {
+ throwable.set(t);
+ } else {
+ LOGGER.error("Unhandled exception", t);
+ }
+
+ try {
+
+ procedure.exception(newGraph, t);
+
+ } catch (Throwable t2) {
+
+ if(throwable != null) {
+ throwable.set(t2);
+ } else {
+ LOGGER.error("Unhandled exception", t2);
+ }
+
+ }
+
+ }
+
+ task.finish();
+
+ }
+
+ } finally {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ }
+
+ }
+
+ });
+
+ }
+
+ public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
+
+ assert (request != null);
+ assert (procedure != null);
+
+ int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+
+ requestManager.scheduleRead(new SessionRead(null, notify) {
+
+ @Override
+ public void run0(int thread) {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ ITask task = ThreadLogger.task(request);
+
+ final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+ newGraph.asyncBarrier.inc();
+
+ try {
+
+ if (listener != null) {
+
+ try {
+ QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
+ } catch (DatabaseException e) {
+ LOGGER.error("Unhandled query exception", e);
+ }
+
+ } else {
+
+ BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
+
+ public void execute(AsyncReadGraph graph_, T result) {
+ task.finish();
+ super.execute(graph_, result);
+ }
+
+ public void exception(AsyncReadGraph graph_, Throwable t) {
+ task.finish();
+ super.exception(graph_, t);
+ }
+
+ };
+
+ try {
+ wrap.performSync(request);
+ } catch (DatabaseException e) {
+ LOGGER.error("Unhandled query exception", e);
+ }
+
+ }
+
+ } finally {
+
+ newGraph.asyncBarrier.dec();
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ }
+
+ }
+
+ });
+
+ }
+
+ public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
+
+ assert (request != null);
+ assert (procedure != null);
+
+ int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+
+ int sync = notify != null ? thread : -1;
+
+ requestManager.scheduleRead(new SessionRead(null, notify) {
+
+ @Override
+ public void run0(int thread) {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ ListenerBase listener = getListenerBase(procedure);
+
+ final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+
+ try {
+
+ if (listener != null) {
+
+ newGraph.processor.query(newGraph, request, null, procedure, listener);
+
+// newGraph.waitAsync(request);
+
+ } else {
+
+ final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
+
+ try {
+
+ request.perform(newGraph, wrapper);
+
+ } catch (Throwable t) {
+
+ t.printStackTrace();
+
+ }
+
+ }
+
+ } finally {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ }
+
+ }
+
+ });
+
+ }
+
+ public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
+
+ assert (request != null);
+ assert (procedure != null);
+
+ int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+
+ int sync = notify != null ? thread : -1;
+
+ requestManager.scheduleRead(new SessionRead(null, notify) {
+
+ @Override
+ public void run0(int thread) {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ ListenerBase listener = getListenerBase(procedure);
+
+ final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+
+ try {
+
+ if (listener != null) {
+
+ newGraph.processor.query(newGraph, request, null, procedure, listener);
+
+// newGraph.waitAsync(request);
+
+ } else {
+
+ final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
+
+ try {
+
+ request.perform(newGraph, wrapper);
+
+ } catch (Throwable t) {
+
+ t.printStackTrace();
+
+ }
+
+ }
+
+ } finally {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ }
+
+ }
+
+ });
+
+ }
+
+ public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
+
+ assert (request != null);
+ assert (procedure != null);
+
+ int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+
+ requestManager.scheduleRead(new SessionRead(throwable, notify) {
+
+ @Override
+ public void run0(int thread) {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ ListenerBase listener = getListenerBase(procedure);
+
+ final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+
+ try {
+
+ if (listener != null) {
+
+ try {
+ QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ } else {
+
+// newGraph.state.barrier.inc();
+
+ request.register(newGraph, new Listener<T>() {
+
+ @Override
+ public void exception(Throwable t) {
+ if(throwable != null) throwable.set(t);
+ procedure.exception(t);
+// newGraph.state.barrier.dec();
+ }
+
+ @Override
+ public void execute(T t) {
+ if(result != null) result.set(t);
+ procedure.execute(t);
+// newGraph.state.barrier.dec();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return true;
+ }
+
+ });
+
+// newGraph.waitAsync(request);
+
+ }
+
+ } finally {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ }
+
+ }
+
+ });
+
+ }
+
+
+ @Override
+ public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
+
+ scheduleRequest(request, procedure, null, null, null);
+
+ }
+
+ @Override
+ public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
+ assertNotSession();
+ Semaphore notify = new Semaphore(0);
+ DataContainer<Throwable> container = new DataContainer<Throwable>();
+ DataContainer<T> result = new DataContainer<T>();
+ scheduleRequest(request, procedure, notify, container, result);
+ acquire(notify, request);
+ Throwable throwable = container.get();
+ if(throwable != null) {
+ if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
+ else throw new DatabaseException("Unexpected exception", throwable);
+ }
+ return result.get();
+ }
+
+ @Override
+ public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
+ assertNotSession();
+ Semaphore notify = new Semaphore(0);
+ final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
+ final DataContainer<T> resultContainer = new DataContainer<T>();
+ scheduleRequest(request, new AsyncProcedure<T>() {
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ exceptionContainer.set(throwable);
+ procedure.exception(graph, throwable);
+ }
+ @Override
+ public void execute(AsyncReadGraph graph, T result) {
+ resultContainer.set(result);
+ procedure.execute(graph, result);
+ }
+ }, getListenerBase(procedure), notify);
+ acquire(notify, request, procedure);
+ Throwable throwable = exceptionContainer.get();
+ if (throwable != null) {
+ if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
+ else throw new DatabaseException("Unexpected exception", throwable);
+ }
+ return resultContainer.get();
+ }
+
+
+ @Override
+ public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
+ assertNotSession();
+ Semaphore notify = new Semaphore(0);
+ final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
+ scheduleRequest(request, new AsyncMultiProcedure<T>() {
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ exceptionContainer.set(throwable);
+ procedure.exception(graph, throwable);
+ }
+ @Override
+ public void execute(AsyncReadGraph graph, T result) {
+ procedure.execute(graph, result);
+ }
+ @Override
+ public void finished(AsyncReadGraph graph) {
+ procedure.finished(graph);
+ }
+ }, notify);
+ acquire(notify, request, procedure);
+ Throwable throwable = exceptionContainer.get();
+ if (throwable != null) {
+ if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
+ else throw new DatabaseException("Unexpected exception", throwable);
+ }
+ // TODO: implement return value
+ System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
+ return null;
+ }
+
+ @Override
+ public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
+ return syncRequest(request, new ProcedureAdapter<T>());
+
+
+// assert(request != null);
+//
+// final DataContainer<T> result = new DataContainer<T>();
+// final DataContainer<Throwable> exception = new DataContainer<Throwable>();
+//
+// syncRequest(request, new Procedure<T>() {
+//
+// @Override
+// public void execute(T t) {
+// result.set(t);
+// }
+//
+// @Override
+// public void exception(Throwable t) {
+// exception.set(t);
+// }
+//
+// });
+//
+// Throwable t = exception.get();
+// if(t != null) {
+// if(t instanceof DatabaseException) throw (DatabaseException)t;
+// else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
+// }
+//
+// return result.get();
+
+ }
+
+ @Override
+ public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
+ return syncRequest(request, (Procedure<T>)procedure);
+ }
+
+
+// @Override
+// public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
+// assertNotSession();
+// Semaphore notify = new Semaphore(0);
+// DataContainer<Throwable> container = new DataContainer<Throwable>();
+// DataContainer<T> result = new DataContainer<T>();
+// scheduleRequest(request, procedure, notify, container, result);
+// acquire(notify, request);
+// Throwable throwable = container.get();
+// if(throwable != null) {
+// if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
+// else throw new DatabaseException("Unexpected exception", throwable);
+// }
+// return result.get();
+// }
+
+
+ @Override
+ public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
+ assertNotSession();
+ Semaphore notify = new Semaphore(0);
+ final DataContainer<Throwable> container = new DataContainer<Throwable>();
+ final DataContainer<T> result = new DataContainer<T>();
+ scheduleRequest(request, procedure, notify, container, result);
+ acquire(notify, request);
+ Throwable throwable = container.get();
+ if (throwable != null) {
+ if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
+ else throw new DatabaseException("Unexpected exception", throwable);
+ }
+ return result.get();
+ }
+
+ @Override
+ public void syncRequest(Write request) throws DatabaseException {
+ assertNotSession();
+ assertAlive();
+ Semaphore notify = new Semaphore(0);
+ final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
+ scheduleRequest(request, e -> exception.set(e), notify);
+ acquire(notify, request);
+ if(exception.get() != null) throw exception.get();
+ }
+
+ @Override
+ public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
+ assertNotSession();
+ Semaphore notify = new Semaphore(0);
+ final DataContainer<Throwable> exception = new DataContainer<Throwable>();
+ final DataContainer<T> result = new DataContainer<T>();
+ scheduleRequest(request, new Procedure<T>() {
+
+ @Override
+ public void exception(Throwable t) {
+ exception.set(t);
+ }
+
+ @Override
+ public void execute(T t) {
+ result.set(t);
+ }
+
+ }, notify);
+ acquire(notify, request);
+ if(exception.get() != null) {
+ Throwable t = exception.get();
+ if(t instanceof DatabaseException) throw (DatabaseException)t;
+ else throw new DatabaseException(t);
+ }
+ return result.get();
+ }
+
+ @Override
+ public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
+ assertNotSession();
+ Semaphore notify = new Semaphore(0);
+ final DataContainer<Throwable> exception = new DataContainer<Throwable>();
+ final DataContainer<T> result = new DataContainer<T>();
+ scheduleRequest(request, new Procedure<T>() {
+
+ @Override
+ public void exception(Throwable t) {
+ exception.set(t);
+ }
+
+ @Override
+ public void execute(T t) {
+ result.set(t);
+ }
+
+ }, notify);
+ acquire(notify, request);
+ if(exception.get() != null) {
+ Throwable t = exception.get();
+ if(t instanceof DatabaseException) throw (DatabaseException)t;
+ else throw new DatabaseException(t);
+ }
+ return result.get();
+ }
+
+ @Override
+ public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
+ assertNotSession();
+ Semaphore notify = new Semaphore(0);
+ final DataContainer<Throwable> exception = new DataContainer<Throwable>();
+ final DataContainer<T> result = new DataContainer<T>();
+ scheduleRequest(request, new Procedure<T>() {
+
+ @Override
+ public void exception(Throwable t) {
+ exception.set(t);
+ }
+
+ @Override
+ public void execute(T t) {
+ result.set(t);
+ }
+
+ }, notify);
+ acquire(notify, request);
+ if(exception.get() != null) {
+ Throwable t = exception.get();
+ if(t instanceof DatabaseException) throw (DatabaseException)t;
+ else throw new DatabaseException(t);
+ }
+ return result.get();
+ }
+
+ @Override
+ public void syncRequest(DelayedWrite request) throws DatabaseException {
+ assertNotSession();
+ Semaphore notify = new Semaphore(0);
+ final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
+ scheduleRequest(request, e -> exception.set(e), notify);
+ acquire(notify, request);
+ if(exception.get() != null) throw exception.get();
+ }
+
+ @Override
+ public void syncRequest(WriteOnly request) throws DatabaseException {
+ assertNotSession();
+ assertAlive();
+ Semaphore notify = new Semaphore(0);
+ final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
+ scheduleRequest(request, e -> exception.set(e), notify);
+ acquire(notify, request);
+ if(exception.get() != null) throw exception.get();
+ }
+
+ /*
+ *
+ * GraphRequestProcessor interface
+ */
+
+ @Override
+ public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
+
+ scheduleRequest(request, procedure, null, null);
+
+ }
+
+ @Override
+ public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
+
+ scheduleRequest(request, procedure, null);
+
+ }
+
+ @Override
+ public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
+
+ scheduleRequest(request, procedure, null, null, null);
+
+ }
+
+ @Override
+ public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
+
+ scheduleRequest(request, callback, null);
+
+ }
+
+ @Override
+ public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
+
+ scheduleRequest(request, procedure, null);
+
+ }
+
+ @Override
+ public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
+
+ scheduleRequest(request, procedure, null);
+
+ }
+
+ @Override
+ public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
+
+ scheduleRequest(request, procedure, null);
+
+ }
+
+ @Override
+ public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
+
+ scheduleRequest(request, callback, null);
+
+ }
+
+ @Override
+ public void asyncRequest(final Write r) {
+ asyncRequest(r, null);
+ }
+
+ @Override
+ public void asyncRequest(final DelayedWrite r) {
+ asyncRequest(r, null);
+ }
+
+ @Override
+ public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
+
+ scheduleRequest(request, callback, null);
+
+ }
+
+ @Override
+ public void asyncRequest(final WriteOnly request) {
+
+ asyncRequest(request, null);
+
+ }
+
+ @Override
+ public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
+ r.request(this, procedure);
+ }
+
+ @Override
+ public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
+ r.request(this, procedure);
+ }
+
+ @Override
+ public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
+ r.request(this, procedure);
+ }
+
+ @Override
+ public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
+ r.request(this, procedure);
+ }
+
+ @Override
+ public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
+ r.request(this, procedure);
+ }
+
+ @Override
+ public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
+ r.request(this, procedure);
+ }
+
+ @Override
+ public <T> T sync(ReadInterface<T> r) throws DatabaseException {
+ return r.request(this);
+ }
+
+ @Override
+ public <T> T sync(WriteInterface<T> r) throws DatabaseException {
+ return r.request(this);
+ }
+
+ @Override
+ public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
+ r.request(this, procedure);
+ }
+
+ @Override
+ public <T> void async(WriteInterface<T> r) {
+ r.request(this, new ProcedureAdapter<T>());
+ }
+
+ //@Override
+ public void incAsync() {
+ state.incAsync();
+ }
+
+ //@Override
+ public void decAsync() {
+ state.decAsync();
+ }
+
+ public long getCluster(ResourceImpl resource) {
+ ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
+ return cluster.getClusterId();
+ }
+
+ public long getCluster(int id) {
+ if (clusterTable == null)
+ System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
+ return clusterTable.getClusterIdByResourceKeyNoThrow(id);
+ }
+
+ public ResourceImpl getResource(int id) {
+ return new ResourceImpl(resourceSupport, id);
+ }
+
+ public ResourceImpl getResource(int resourceIndex, long clusterId) {
+ assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
+ ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
+ int key = proxy.getClusterKey();
+ int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
+ return new ResourceImpl(resourceSupport, resourceKey);
+ }
+
+ public ResourceImpl getResource2(int id) {
+ assert (id != 0);
+ return new ResourceImpl(resourceSupport, id);
+ }
+
+ final public int getId(ResourceImpl impl) {
+ return impl.id;
+ }
+
+ public static final Charset UTF8 = Charset.forName("utf-8");
+
+
+
+
+ static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
+ for(TransientGraph g : support.providers) {
+ if(g.isPending(subject)) return false;
+ }
+ return true;
+ }
+
+ static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
+ for(TransientGraph g : support.providers) {
+ if(g.isPending(subject, predicate)) return false;
+ }
+ return true;
+ }
+
+ static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
+
+ Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
+
+ AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
+
+ @Override
+ public void accept(ReadGraphImpl graph) {
+ if(ready.decrementAndGet() == 0) {
+ runnable.accept(graph);
+ }
+ }
+
+ };
+
+ for(TransientGraph g : support.providers) {
+ if(g.isPending(subject)) {
+ try {
+ g.load(graph, subject, composite);
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+ } else {
+ composite.accept(graph);
+ }
+ }
+
+ composite.accept(graph);
+
+ }
+
+ static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
+
+ Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
+
+ AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
+
+ @Override
+ public void accept(ReadGraphImpl graph) {
+ if(ready.decrementAndGet() == 0) {
+ runnable.accept(graph);
+ }
+ }
+
+ };
+
+ for(TransientGraph g : support.providers) {
+ if(g.isPending(subject, predicate)) {
+ try {
+ g.load(graph, subject, predicate, composite);
+ } catch (DatabaseException e) {
+ e.printStackTrace();
+ }
+ } else {
+ composite.accept(graph);
+ }
+ }
+
+ composite.accept(graph);
+
+ }
+
+// void dumpHeap() {
+//
+// try {
+// ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
+// "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
+// "d:/heap" + flushCounter + ".txt", true);
+// } catch (IOException e) {
+// e.printStackTrace();
+// }
+//
+// }
+
+ void fireReactionsToSynchronize(ChangeSet cs) {
+
+ // Do not fire empty events
+ if (cs.isEmpty())
+ return;
+
+ ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
+// g.state.barrier.inc();
+
+ try {
+
+ if (!cs.isEmpty()) {
+ ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
+ for (ChangeListener l : changeListeners2) {
+ try {
+ l.graphChanged(e2);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ }
+
+ } finally {
+
+// g.state.barrier.dec();
+// g.waitAsync(null);
+
+ }
+
+ }
+
+ void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
+ try {
+
+ // Do not fire empty events
+ if (cs2.isEmpty())
+ return;
+
+// graph.restart();
+// graph.state.barrier.inc();
+
+ try {
+
+ if (!cs2.isEmpty()) {
+
+ ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
+ for (ChangeListener l : changeListeners2) {
+ try {
+ l.graphChanged(e2);
+// System.out.println("changelistener " + l);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ }
+
+ } finally {
+
+// graph.state.barrier.dec();
+// graph.waitAsync(null);
+
+ }
+ } catch (Throwable t) {
+ t.printStackTrace();
+
+ }
+ }
+ void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
+
+ try {
+
+ // Do not fire empty events
+ if (cs2.isEmpty())
+ return;
+
+ // Do not fire on virtual requests
+ if(graph.getProvider() != null)
+ return;
+
+ WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
+ reactionGraph.asyncBarrier.inc();
+
+ try {
+
+ ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
+ for (ChangeListener l : metadataListeners) {
+ try {
+ l.graphChanged(e2);
+ } catch (Throwable ex) {
+ LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex);
+ }
+ }
+
+ } finally {
+
+ reactionGraph.asyncBarrier.dec();
+
+ }
+
+ } catch (Throwable t) {
+ LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t);
+ }
+
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
+ */
+ @Override
+ public <T> T getService(Class<T> api) {
+ T t = peekService(api);
+ if (t == null) {
+ if (state.isClosed())
+ throw new ServiceNotFoundException(this, api, "Session has been shut down");
+ throw new ServiceNotFoundException(this, api);
+ }
+ return t;
+ }
+
+ /**
+ * @return
+ */
+ protected abstract ServerInformation getCachedServerInformation();
+
+ private Class<?> serviceKey1 = null;
+ private Class<?> serviceKey2 = null;
+ private Object service1 = null;
+ private Object service2 = null;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T peekService(Class<T> api) {
+ if (Layer0.class == api)
+ return (T) L0;
+
+ synchronized (this) {
+ if (serviceKey1 == api) {
+ return (T) service1;
+ }
+ if (serviceKey2 == api) {
+ // Promote this key
+ Object result = service2;
+ service2 = service1;
+ serviceKey2 = serviceKey1;
+ service1 = result;
+ serviceKey1 = api;
+ return (T)result;
+ }
+
+ if (ServerInformation.class == api)
+ return (T) getCachedServerInformation();
+ else if (WriteGraphImpl.class == api)
+ return (T) writeState.getGraph();
+ else if (ClusterBuilder.class == api)
+ return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
+ else if (ClusterBuilderFactory.class == api)
+ return (T)new ClusterBuilderFactoryImpl(this);
+
+ service2 = service1;
+ serviceKey2 = serviceKey1;
+
+ service1 = serviceLocator.peekService(api);
+ serviceKey1 = api;
+
+ return (T)service1;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
+ */
+ @Override
+ public boolean hasService(Class<?> api) {
+ return serviceLocator.hasService(api);
+ }
+
+ /**
+ * @param api the api that must be implemented by the specified service
+ * @param service the service implementation
+ */
+ @Override
+ public <T> void registerService(Class<T> api, T service) {
+ if(Layer0.class == api) {
+ L0 = (Layer0)service;
+ return;
+ }
+ serviceLocator.registerService(api, service);
+ if (TransactionPolicySupport.class == api) {
+ transactionPolicy = (TransactionPolicySupport)service;
+ state.resetTransactionPolicy();
+ }
+ if (api == serviceKey1)
+ service1 = service;
+ else if (api == serviceKey2)
+ service2 = service;
+ }
+
+ // ----------------
+ // SessionMonitor
+ // ----------------
+
+ void fireSessionVariableChange(String variable) {
+ for (MonitorHandler h : monitorHandlers) {
+ MonitorContext ctx = monitorContexts.getLeft(h);
+ assert ctx != null;
+ // SafeRunner functionality repeated here to avoid dependency.
+ try {
+ h.valuesChanged(ctx);
+ } catch (Exception e) {
+ LOGGER.error("monitor handler notification produced the following exception", e);
+ } catch (LinkageError e) {
+ LOGGER.error("monitor handler notification produced a linkage error", e);
+ }
+ }
+ }
+
+
+ class ResourceSerializerImpl implements ResourceSerializer {
+
+ public long createRandomAccessId(int id) throws DatabaseException {
+ if(id < 0)
+ return id;
+ int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
+ long cluster = getCluster(id);
+ if (0 == cluster)
+ return 0; // Better to return 0 then invalid id.
+ long result = ClusterTraitsBase.createResourceId(cluster, index);
+ return result;
+ }
+
+ @Override
+ public long getRandomAccessId(Resource resource) throws DatabaseException {
+
+ ResourceImpl resourceImpl = (ResourceImpl) resource;
+ return createRandomAccessId(resourceImpl.id);
+
+ }
+
+ @Override
+ public int getTransientId(Resource resource) throws DatabaseException {
+
+ ResourceImpl resourceImpl = (ResourceImpl) resource;
+ return resourceImpl.id;
+
+ }
+
+ @Override
+ public String createRandomAccessId(Resource resource)
+ throws InvalidResourceReferenceException {
+
+ if(resource == null) throw new IllegalArgumentException();
+
+ ResourceImpl resourceImpl = (ResourceImpl) resource;
+
+ if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
+
+ int r;
+ try {
+ r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
+ } catch (DatabaseException e1) {
+ throw new InvalidResourceReferenceException(e1);
+ }
+ try {
+ // Serialize as '<resource index>_<cluster id>'
+ return "" + r + "_" + getCluster(resourceImpl);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw new InvalidResourceReferenceException(e);
+ } finally {
+ }
+ }
+
+ public int getTransientId(long serialized) throws DatabaseException {
+ if (serialized <= 0)
+ return (int)serialized;
+ int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
+ long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
+ ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
+ if (cluster == null)
+ throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
+ int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
+ return key;
+ }
+
+ @Override
+ public Resource getResource(long randomAccessId) throws DatabaseException {
+ return getResourceByKey(getTransientId(randomAccessId));
+ }
+
+ @Override
+ public Resource getResource(int transientId) throws DatabaseException {
+ return getResourceByKey(transientId);
+ }
+
+ @Override
+ public Resource getResource(String randomAccessId)
+ throws InvalidResourceReferenceException {
+ try {
+ int i = randomAccessId.indexOf('_');
+ if (i == -1)
+ throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
+ + randomAccessId + "'");
+ int r = Integer.parseInt(randomAccessId.substring(0, i));
+ if(r < 0) return getResourceByKey(r);
+ long c = Long.parseLong(randomAccessId.substring(i + 1));
+ ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
+ int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
+ if (cluster.hasResource(key, clusterTranslator))
+ return getResourceByKey(key);
+ } catch (InvalidResourceReferenceException e) {
+ throw e;
+ } catch (NumberFormatException e) {
+ throw new InvalidResourceReferenceException(e);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw new InvalidResourceReferenceException(e);
+ } finally {
+ }
+ throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
+ }
+
+ @Override
+ public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
+ try {
+ return true;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw new InvalidResourceReferenceException(e);
+ } finally {
+ }
+ }
+ }
+
+ // Copied from old SessionImpl
+
+ void check() {
+ if (state.isClosed())
+ throw new Error("Session closed.");
+ }
+
+
+
+ public ResourceImpl getNewResource(long clusterId) {
+ ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
+ int newId;
+ try {
+ newId = cluster.createResource(clusterTranslator);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ return null;
+ }
+ return new ResourceImpl(resourceSupport, newId);
+ }
+
+ public ResourceImpl getNewResource(Resource clusterSet)
+ throws DatabaseException {
+ long resourceId = clusterSet.getResourceId();
+ Long clusterId = clusterSetsSupport.get(resourceId);
+ if (null == clusterId)
+ throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
+ if (Constants.NewClusterId == clusterId) {
+ clusterId = getService(ClusteringSupport.class).createCluster();
+ if ((serviceMode & SERVICE_MODE_CREATE) > 0)
+ createdClusters.add(clusterId);
+ clusterSetsSupport.put(resourceId, clusterId);
+ return getNewResource(clusterId);
+ } else {
+ ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
+ ResourceImpl result;
+ if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
+ clusterId = getService(ClusteringSupport.class).createCluster();
+ if ((serviceMode & SERVICE_MODE_CREATE) > 0)
+ createdClusters.add(clusterId);
+ clusterSetsSupport.put(resourceId, clusterId);
+ return getNewResource(clusterId);
+ } else {
+ result = getNewResource(clusterId);
+ int resultKey = querySupport.getId(result);
+ long resultCluster = querySupport.getClusterId(resultKey);
+ if (clusterId != resultCluster)
+ clusterSetsSupport.put(resourceId, resultCluster);
+ return result;
+ }
+ }
+ }
+
+ public void getNewClusterSet(Resource clusterSet)
+ throws DatabaseException {
+ if (DEBUG)
+ System.out.println("new cluster set=" + clusterSet);
+ long resourceId = clusterSet.getResourceId();
+ if (clusterSetsSupport.containsKey(resourceId))
+ throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
+ clusterSetsSupport.put(resourceId, Constants.NewClusterId);
+ }
+ public boolean containsClusterSet(Resource clusterSet)
+ throws ServiceException {
+ long resourceId = clusterSet.getResourceId();
+ return clusterSetsSupport.containsKey(resourceId);
+ }
+ public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
+ Resource r = defaultClusterSet;
+ defaultClusterSet = clusterSet;
+ return r;
+ }
+ void printDiagnostics() {
+
+ if (DIAGNOSTICS) {
+
+ final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
+ final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
+ for(ClusterI cluster : clusterTable.getClusters()) {
+ try {
+ if (cluster.isLoaded() && !cluster.isEmpty()) {
+ residentClusters.set(residentClusters.get() + 1);
+ totalMem.set(totalMem.get() + cluster.getUsedSpace());
+ }
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+ }
+
+ System.out.println("--------------------------------");
+ System.out.println("Cluster information:");
+ System.out.println("-amount of resident clusters=" + residentClusters.get());
+ System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
+
+ for(ClusterI cluster : clusterTable.getClusters()) {
+ System.out.print("Cluster " + cluster.getClusterId() + " [");
+ try {
+ if (!cluster.isLoaded())
+ System.out.println("not loaded]");
+ else if (cluster.isEmpty())
+ System.out.println("is empty]");
+ else {
+ System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
+ System.out.print(",references=" + cluster.getReferenceCount());
+ System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
+ }
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ System.out.println("is corrupted]");
+ }
+ }
+
+ queryProvider2.printDiagnostics();
+ System.out.println("--------------------------------");
+ }
+
+ }
+
+ public void fireStartReadTransaction() {
+
+ if (DIAGNOSTICS)
+ System.out.println("StartReadTransaction");
+
+ for (SessionEventListener listener : eventListeners) {
+ try {
+ listener.readTransactionStarted();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+
+ }
+
+ public void fireFinishReadTransaction() {
+
+ if (DIAGNOSTICS)
+ System.out.println("FinishReadTransaction");
+
+ for (SessionEventListener listener : eventListeners) {
+ try {
+ listener.readTransactionFinished();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+
+ }
+
+ public void fireStartWriteTransaction() {
+
+ if (DIAGNOSTICS)
+ System.out.println("StartWriteTransaction");
+
+ for (SessionEventListener listener : eventListeners) {
+ try {
+ listener.writeTransactionStarted();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+
+ }
+
+ public void fireFinishWriteTransaction() {
+
+ if (DIAGNOSTICS)
+ System.out.println("FinishWriteTransaction");
+
+ for (SessionEventListener listener : eventListeners) {
+ try {
+ listener.writeTransactionFinished();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+
+ Indexing.resetDependenciesIndexingDisabled();
+
+ }
+
+ Resource deserialize(final long resourceId, final long clusterId) throws IOException {
+ throw new Error("Not supported at the moment.");
+ }
+
+ State getState() {
+ return state;
+ }
+
+ ClusterTable getClusterTable() {
+ return clusterTable;
+ }
+
+ public GraphSession getGraphSession() {
+ return graphSession;
+ }
+
+ public QueryProcessor getQueryProvider2() {
+ return queryProvider2;
+ }
+
+ ClientChangesImpl getClientChanges() {
+ return clientChanges;
+ }
+
+ boolean getWriteOnly() {
+ return writeOnly;
+ }
+
+ static int counter = 0;
+
+ public void onClusterLoaded(long clusterId) {
+
+ clusterTable.updateSize();
+
+ }
+
+
+
+
+ /*
+ * Implementation of the interface RequestProcessor
+ */
+
+ @Override
+ public <T> T syncRequest(final Read<T> request) throws DatabaseException {
+
+ assertNotSession();
+ assertAlive();
+
+ assert(request != null);
+
+ final DataContainer<T> result = new DataContainer<T>();
+ final DataContainer<Throwable> exception = new DataContainer<Throwable>();
+
+ syncRequest(request, new AsyncProcedure<T>() {
+
+ @Override
+ public void execute(AsyncReadGraph graph, T t) {
+ result.set(t);
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ exception.set(t);
+ }
+
+ });
+
+ Throwable t = exception.get();
+ if(t != null) {
+ if(t instanceof DatabaseException) throw (DatabaseException)t;
+ else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
+ }
+
+ return result.get();
+
+ }
+
+// @Override
+// public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
+// assertNotSession();
+// return syncRequest(request, (AsyncProcedure<T>)procedure);
+// }
+
+ @Override
+ public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new SyncToAsyncListener<T>(procedure));
+ }
+
+ @Override
+ public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new NoneToAsyncListener<T>(procedure));
+ }
+
+ @Override
+ public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
+ }
+
+ @Override
+ public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
+ }
+
+ @Override
+ public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
+
+ assertNotSession();
+
+ assert(request != null);
+
+ final DataContainer<T> result = new DataContainer<T>();
+ final DataContainer<Throwable> exception = new DataContainer<Throwable>();
+
+ syncRequest(request, new AsyncProcedure<T>() {
+
+ @Override
+ public void execute(AsyncReadGraph graph, T t) {
+ result.set(t);
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ exception.set(t);
+ }
+
+ });
+
+ Throwable t = exception.get();
+ if(t != null) {
+ if(t instanceof DatabaseException) throw (DatabaseException)t;
+ else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
+ }
+
+ return result.get();
+
+ }
+
+ @Override
+ public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, (AsyncProcedure<T>)procedure);
+ }
+
+ @Override
+ public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new SyncToAsyncListener<T>(procedure));
+ }
+
+ @Override
+ public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new NoneToAsyncListener<T>(procedure));
+ }
+
+ @Override
+ public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
+ }
+
+ @Override
+ final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
+
+ assertNotSession();
+
+ assert(request != null);
+
+ final ArrayList<T> result = new ArrayList<T>();
+ final DataContainer<Throwable> exception = new DataContainer<Throwable>();
+
+ syncRequest(request, new SyncMultiProcedure<T>() {
+
+ @Override
+ public void execute(ReadGraph graph, T t) {
+ synchronized(result) {
+ result.add(t);
+ }
+ }
+
+ @Override
+ public void finished(ReadGraph graph) {
+ }
+
+ @Override
+ public void exception(ReadGraph graph, Throwable t) {
+ exception.set(t);
+ }
+
+
+ });
+
+ Throwable t = exception.get();
+ if(t != null) {
+ if(t instanceof DatabaseException) throw (DatabaseException)t;
+ else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
+ }
+
+ return result;
+
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
+ assertNotSession();
+ throw new Error("Not implemented!");
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, (SyncMultiProcedure<T>)procedure);
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
+ }
+
+ @Override
+ final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
+
+ assertNotSession();
+
+ assert(request != null);
+
+ final ArrayList<T> result = new ArrayList<T>();
+ final DataContainer<Throwable> exception = new DataContainer<Throwable>();
+
+ syncRequest(request, new AsyncMultiProcedure<T>() {
+
+ @Override
+ public void execute(AsyncReadGraph graph, T t) {
+ synchronized(result) {
+ result.add(t);
+ }
+ }
+
+ @Override
+ public void finished(AsyncReadGraph graph) {
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ exception.set(t);
+ }
+
+
+ });
+
+ Throwable t = exception.get();
+ if (t != null) {
+ if (t instanceof DatabaseException)
+ throw (DatabaseException) t;
+ else
+ throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
+ }
+
+ return result;
+
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+ }
+
+ @Override
+ final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
+ assertNotSession();
+ return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+ }
+
+
+ @Override
+ public <T> void asyncRequest(Read<T> request) {
+
+ asyncRequest(request, new ProcedureAdapter<T>() {
+ @Override
+ public void exception(Throwable t) {
+ t.printStackTrace();
+ }
+ });
+
+ }
+
+ @Override
+ public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
+ asyncRequest(request, (AsyncProcedure<T>)procedure);
+ }
+
+ @Override
+ public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
+ asyncRequest(request, new SyncToAsyncListener<T>(procedure));
+ }
+
+ @Override
+ public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
+ asyncRequest(request, new NoneToAsyncListener<T>(procedure));
+ }
+
+ @Override
+ public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
+ asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
+ }
+
+ @Override
+ public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
+ asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
+ }
+
+ @Override
+ final public <T> void asyncRequest(final AsyncRead<T> request) {
+
+ assert(request != null);
+
+ asyncRequest(request, new ProcedureAdapter<T>() {
+ @Override
+ public void exception(Throwable t) {
+ t.printStackTrace();
+ }
+ });
+
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
+ scheduleRequest(request, procedure, procedure, null);
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
+ asyncRequest(request, new SyncToAsyncListener<T>(procedure));
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
+ asyncRequest(request, new NoneToAsyncListener<T>(procedure));
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
+ asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
+ }
+
+ @Override
+ final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
+ asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
+ }
+
+ @Override
+ public <T> void asyncRequest(MultiRead<T> request) {
+
+ assert(request != null);
+
+ asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
+ @Override
+ public void exception(ReadGraph graph, Throwable t) {
+ t.printStackTrace();
+ }
+ });
+
+ }
+
+ @Override
+ public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
+ asyncRequest(request, (SyncMultiProcedure<T>)procedure);
+ }
+
+ @Override
+ public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
+ asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
+ }
+
+ @Override
+ public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
+ scheduleRequest(request, procedure, null);
+ }
+
+ @Override
+ public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
+ asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
+ }
+
+ @Override
+ final public <T> void asyncRequest(AsyncMultiRead<T> request) {
+
+ assert(request != null);
+
+ asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ t.printStackTrace();
+ }
+ });
+
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
+ asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
+ asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
+ asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
+ asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+ }
+
+ @Override
+ final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
+ asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+ }
+
+ @Override
+ final public <T> void asyncRequest(final ExternalRead<T> request) {
+
+ assert(request != null);
+
+ asyncRequest(request, new ProcedureAdapter<T>() {
+ @Override
+ public void exception(Throwable t) {
+ t.printStackTrace();
+ }
+ });
+
+ }
+
+ @Override
+ public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
+ asyncRequest(request, (Procedure<T>)procedure);
+ }
+
+ boolean sameProvider(Write request) {
+ if(writeState.getGraph().provider != null) {
+ return writeState.getGraph().provider.equals(request.getProvider());
+ } else {
+ return request.getProvider() == null;
+ }
+ }
+
+ boolean plainWrite(WriteGraphImpl graph) {
+ if(graph == null) return false;
+ if(graph.writeSupport.writeOnly()) return false;
+ return true;
+ }
+
+
+ public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
+
+ private void assertNotSession() throws DatabaseException {
+ Thread current = Thread.currentThread();
+ if (sessionThreads.contains(current))
+ throw new ServiceException("Caller is already inside a transaction.");
+ }
+
+ void assertAlive() {
+ if (!state.isAlive())
+ throw new RuntimeDatabaseException("Session has been shut down.");
+ }
+
+ public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
+ return querySupport.getValueStream(graph, querySupport.getId(resource));
+ }
+
+ public byte[] getValue(ReadGraphImpl graph, Resource resource) {
+ return querySupport.getValue(graph, querySupport.getId(resource));
+ }
+
+ <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
+ acquire(semaphore, request, null);
+ }
+
+ private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
+ assertAlive();
+ try {
+ long tryCount = 0;
+ // Loop until the semaphore is acquired reporting requests that take a long time.
+ while (true) {
+
+ if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
+ // Acquired OK.
+ return;
+ } else {
+ if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
+ ++tryCount;
+ long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
+ .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
+ * tryCount;
+ System.err.println("DB client request '" + request + "' is taking long to execute, so far "
+ + (waitTime) + " ms. (procedure=" + procedure + ")");
+ }
+ }
+
+ assertAlive();
+
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ // FIXME: Should perhaps do something else in this case ??
+ }
+ }
+
+
+
+// @Override
+// public boolean holdOnToTransactionAfterCancel() {
+// return transactionPolicy.holdOnToTransactionAfterCancel();
+// }
+//
+// @Override
+// public boolean holdOnToTransactionAfterCommit() {
+// return transactionPolicy.holdOnToTransactionAfterCommit();
+// }
+//
+// @Override
+// public boolean holdOnToTransactionAfterRead() {
+// return transactionPolicy.holdOnToTransactionAfterRead();
+// }
+//
+// @Override
+// public void onRelinquish() {
+// transactionPolicy.onRelinquish();
+// }
+//
+// @Override
+// public void onRelinquishDone() {
+// transactionPolicy.onRelinquishDone();
+// }
+//
+// @Override
+// public void onRelinquishError() {
+// transactionPolicy.onRelinquishError();
+// }
+
+
+ @Override
+ public Session getSession() {
+ return this;
+ }
+
+
+ protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
+ protected abstract ResourceImpl getNewResource() throws DatabaseException;
+
+ public void ceased(int thread) {
+
+ requestManager.ceased(thread);
+
+ }
+
+ public int getAmountOfQueryThreads() {
+ // This must be a power of two
+ return 1;
+// return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
+ }
+
+ public Resource getResourceByKey(int key) throws ResourceNotFoundException {
+
+ return new ResourceImpl(resourceSupport, key);
+
+ }
+
+ public void acquireWriteOnly() {
+ writeOnly = true;
+ }
+
+ public void releaseWriteOnly(ReadGraphImpl graph) {
+ writeOnly = false;
+ queryProvider2.releaseWrite(graph);
+ }
+
+ public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
+
+ long start = System.nanoTime();
+
+ while(dirtyPrimitives) {
+ dirtyPrimitives = false;
+ getQueryProvider2().propagateChangesInQueryCache(writer);
+ getQueryProvider2().listening.fireListeners(writer);
+ }
+
+ fireMetadataListeners(writer, clientChanges);
+
+ if(DebugPolicy.PERFORMANCE_DATA) {
+ long end = System.nanoTime();
+ System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
+ }
+
+ }
+
+ void removeTemporaryData() {
+ File platform = Platform.getLocation().toFile();
+ File tempFiles = new File(platform, "tempFiles");
+ File temp = new File(tempFiles, "db");
+ if(!temp.exists())
+ return;
+ try {
+ Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
+ try {
+ Files.delete(file);
+ } catch (IOException e) {
+ Logger.defaultLogError(e);
+ }
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ } catch (IOException e) {
+ Logger.defaultLogError(e);
+ }
+ }
+
+ public void handleCreatedClusters() {
+ if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
+ createdClusters.forEach(new TLongProcedure() {
+
+ @Override
+ public boolean execute(long value) {
+ ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
+ cluster.setImmutable(true, clusterTranslator);
+ return true;
+ }
+ });
+ }
+ createdClusters.clear();
+ }
+
+ @Override
+ public Object getModificationCounter() {
+ return queryProvider2.modificationCounter;
+ }
+
+ @Override
+ public void markUndoPoint() {
+ state.setCombine(false);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T l0() {
+ return (T) L0;
+ }
+
+}