X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionImplSocket.java;h=2ff47c29266912afa68ee0515fd5c47d1b809b89;hp=7302d13d45d6b0a9f071945d301ba80eed42f1a2;hb=cec265e1216a0a211e6e6dc4f91d5fda4c5747db;hpb=ffdf83729b496d5afe74c7888075bb17ce1c4bbb diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index 7302d13d4..2ff47c292 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -1,3615 +1,3609 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package fi.vtt.simantics.procore.internal; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.nio.file.FileVisitResult; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.TreeMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; - -import org.eclipse.core.runtime.Platform; -import org.simantics.databoard.Bindings; -import org.simantics.db.AsyncReadGraph; -import org.simantics.db.ChangeSet; -import org.simantics.db.DevelopmentKeys; -import org.simantics.db.ExternalValueSupport; -import org.simantics.db.Metadata; -import org.simantics.db.MonitorContext; -import org.simantics.db.MonitorHandler; -import org.simantics.db.Resource; -import org.simantics.db.ResourceSerializer; -import org.simantics.db.Session; -import org.simantics.db.SessionManager; -import org.simantics.db.SessionVariables; -import org.simantics.db.VirtualGraph; -import org.simantics.db.WriteGraph; -import org.simantics.db.authentication.UserAuthenticationAgent; -import org.simantics.db.authentication.UserAuthenticator; -import org.simantics.db.common.Indexing; -import org.simantics.db.common.TransactionPolicyRelease; -import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; -import org.simantics.db.common.procedure.adapter.ProcedureAdapter; -import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener; -import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener; -import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure; -import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure; -import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener; -import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener; -import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure; -import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure; -import org.simantics.db.common.service.ServiceActivityMonitorImpl; -import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions; -import org.simantics.db.common.utils.Logger; -import org.simantics.db.event.ChangeEvent; -import org.simantics.db.event.ChangeListener; -import org.simantics.db.event.SessionEventListener; -import org.simantics.db.exception.CancelTransactionException; -import org.simantics.db.exception.ClusterSetExistException; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.exception.ImmutableException; -import org.simantics.db.exception.InvalidResourceReferenceException; -import org.simantics.db.exception.ResourceNotFoundException; -import org.simantics.db.exception.RuntimeDatabaseException; -import org.simantics.db.exception.ServiceException; -import org.simantics.db.exception.ServiceNotFoundException; -import org.simantics.db.impl.ClusterBase; -import org.simantics.db.impl.ClusterI; -import org.simantics.db.impl.ClusterTraitsBase; -import org.simantics.db.impl.ClusterTranslator; -import org.simantics.db.impl.ResourceImpl; -import org.simantics.db.impl.TransientGraph; -import org.simantics.db.impl.VirtualGraphImpl; -import org.simantics.db.impl.graph.DelayedWriteGraph; -import org.simantics.db.impl.graph.ReadGraphImpl; -import org.simantics.db.impl.graph.WriteGraphImpl; -import org.simantics.db.impl.graph.WriteSupport; -import org.simantics.db.impl.internal.RandomAccessValueSupport; -import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4; -import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4; -import org.simantics.db.impl.query.QueryProcessor; -import org.simantics.db.impl.query.QueryProcessor.SessionRead; -import org.simantics.db.impl.query.QueryProcessor.SessionTask; -import org.simantics.db.impl.service.QueryDebug; -import org.simantics.db.impl.support.VirtualGraphServerSupport; -import org.simantics.db.impl.support.WriteRequestScheduleSupport; -import org.simantics.db.procedure.AsyncListener; -import org.simantics.db.procedure.AsyncMultiListener; -import org.simantics.db.procedure.AsyncMultiProcedure; -import org.simantics.db.procedure.AsyncProcedure; -import org.simantics.db.procedure.Listener; -import org.simantics.db.procedure.ListenerBase; -import org.simantics.db.procedure.MultiListener; -import org.simantics.db.procedure.MultiProcedure; -import org.simantics.db.procedure.Procedure; -import org.simantics.db.procedure.SyncListener; -import org.simantics.db.procedure.SyncMultiListener; -import org.simantics.db.procedure.SyncMultiProcedure; -import org.simantics.db.procedure.SyncProcedure; -import org.simantics.db.procore.cluster.ClusterImpl; -import org.simantics.db.procore.cluster.ClusterTraits; -import org.simantics.db.procore.protocol.Constants; -import org.simantics.db.procore.protocol.DebugPolicy; -import org.simantics.db.request.AsyncMultiRead; -import org.simantics.db.request.AsyncRead; -import org.simantics.db.request.DelayedWrite; -import org.simantics.db.request.DelayedWriteResult; -import org.simantics.db.request.ExternalRead; -import org.simantics.db.request.MultiRead; -import org.simantics.db.request.Read; -import org.simantics.db.request.ReadInterface; -import org.simantics.db.request.UndoTraits; -import org.simantics.db.request.Write; -import org.simantics.db.request.WriteInterface; -import org.simantics.db.request.WriteOnly; -import org.simantics.db.request.WriteOnlyResult; -import org.simantics.db.request.WriteResult; -import org.simantics.db.request.WriteTraits; -import org.simantics.db.service.ByteReader; -import org.simantics.db.service.ClusterBuilder; -import org.simantics.db.service.ClusterBuilderFactory; -import org.simantics.db.service.ClusterControl; -import org.simantics.db.service.ClusterSetsSupport; -import org.simantics.db.service.ClusterUID; -import org.simantics.db.service.ClusteringSupport; -import org.simantics.db.service.CollectionSupport; -import org.simantics.db.service.DebugSupport; -import org.simantics.db.service.DirectQuerySupport; -import org.simantics.db.service.GraphChangeListenerSupport; -import org.simantics.db.service.InitSupport; -import org.simantics.db.service.LifecycleSupport; -import org.simantics.db.service.ManagementSupport; -import org.simantics.db.service.QueryControl; -import org.simantics.db.service.SerialisationSupport; -import org.simantics.db.service.ServerInformation; -import org.simantics.db.service.ServiceActivityMonitor; -import org.simantics.db.service.SessionEventSupport; -import org.simantics.db.service.SessionMonitorSupport; -import org.simantics.db.service.SessionUserSupport; -import org.simantics.db.service.StatementSupport; -import org.simantics.db.service.TransactionPolicySupport; -import org.simantics.db.service.TransactionSupport; -import org.simantics.db.service.TransferableGraphSupport; -import org.simantics.db.service.UndoRedoSupport; -import org.simantics.db.service.VirtualGraphSupport; -import org.simantics.db.service.XSupport; -import org.simantics.layer0.Layer0; -import org.simantics.utils.DataContainer; -import org.simantics.utils.Development; -import org.simantics.utils.datastructures.Callback; -import org.simantics.utils.threads.logger.ITask; -import org.simantics.utils.threads.logger.ThreadLogger; - -import gnu.trove.procedure.TLongProcedure; -import gnu.trove.set.hash.TLongHashSet; - - -public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport { - - protected static final boolean DEBUG = false; - - private static final boolean DIAGNOSTICS = false; - - private TransactionPolicySupport transactionPolicy; - - final private ClusterControl clusterControl; - - final protected BuiltinSupportImpl builtinSupport; - final protected VirtualGraphServerSupportImpl virtualGraphServerSupport; - final protected ClusterSetsSupport clusterSetsSupport; - final protected LifecycleSupportImpl lifecycleSupport; - - protected QuerySupportImpl querySupport; - protected ResourceSupportImpl resourceSupport; - protected WriteSupport writeSupport; - public ClusterTranslator clusterTranslator; - - boolean dirtyPrimitives = false; - - public static final int SERVICE_MODE_CREATE = 2; - public static final int SERVICE_MODE_ALLOW = 1; - public int serviceMode = 0; - public boolean createdImmutableClusters = false; - public TLongHashSet createdClusters = new TLongHashSet(); - - private Layer0 L0; - - /** - * The service locator maintained by the workbench. These services are - * initialized during workbench during the init method. - */ - final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl(); - - final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl(); - - final CopyOnWriteArrayList changeListeners2 = new CopyOnWriteArrayList(); - - final CopyOnWriteArrayList metadataListeners = new CopyOnWriteArrayList(); - - final CopyOnWriteArrayList eventListeners = new CopyOnWriteArrayList(); - - final HashSet sessionThreads = new HashSet(); - - final BijectionMap monitorContexts = new BijectionMap(); - - final protected State state = new State(); - - protected GraphSession graphSession = null; - - protected SessionManager sessionManagerImpl = null; - - protected UserAuthenticationAgent authAgent = null; - - protected UserAuthenticator authenticator = null; - - protected Resource user = null; - - protected ClusterStream clusterStream = null; - - protected SessionRequestManager requestManager = null; - - public ClusterTable clusterTable = null; - - public QueryProcessor queryProvider2 = null; - - ClientChangesImpl clientChanges = null; - - MonitorHandler[] monitorHandlers = new MonitorHandler[0]; - -// protected long newClusterId = Constants.NullClusterId; - - protected int flushCounter = 0; - - protected boolean writeOnly = false; - - WriteState writeState = null; - WriteStateBase delayedWriteState = null; - protected Resource defaultClusterSet = null; // If not null then used for newResource(). - - public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) { - -// if (authAgent == null) -// throw new IllegalArgumentException("null authentication agent"); - - File t = StaticSessionProperties.virtualGraphStoragePath; - if (null == t) - t = new File("."); - this.clusterTable = new ClusterTable(this, t); - this.builtinSupport = new BuiltinSupportImpl(this); - this.sessionManagerImpl = sessionManagerImpl; - this.user = null; -// this.authAgent = authAgent; - - serviceLocator.registerService(Session.class, this); - serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this)); - serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this)); - serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this)); - serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this)); - serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this)); - serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this)); - serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this)); - serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this)); - serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this)); - serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this)); - serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this)); - serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this)); - serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this)); - serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this)); - serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this)); - serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this)); - serviceLocator.registerService(XSupport.class, new XSupportImpl(this)); - serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl()); - serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this)); - serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl()); - serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl()); - ServiceActivityUpdaterForWriteTransactions.register(this); - - this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t); - serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport); - serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport); - this.lifecycleSupport = new LifecycleSupportImpl(this); - serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport); - this.transactionPolicy = new TransactionPolicyRelease(); - serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy); - this.clusterControl = new ClusterControlImpl(this); - serviceLocator.registerService(ClusterControl.class, clusterControl); - this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs. - this.clusterSetsSupport.setReadDirectory(t.toPath()); - this.clusterSetsSupport.updateWriteDirectory(t.toPath()); - serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport); - } - - /* - * - * Session interface - */ - - - @Override - public Resource getRootLibrary() { - return queryProvider2.getRootLibraryResource(); - } - - void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException { - if (!graphSession.dbSession.refreshEnabled()) - return; - try { - getClusterTable().refresh(csid, this, clusterUID); - } catch (Throwable t) { - Logger.defaultLogError("Refesh failed.", t); - } - } - - static final class TaskHelper { - private final String name; - private Object result; - final Semaphore sema = new Semaphore(0); - private Throwable throwable = null; - final Callback callback = new Callback() { - @Override - public void run(DatabaseException e) { - synchronized (TaskHelper.this) { - throwable = e; - } - } - }; - final Procedure proc = new Procedure() { - @Override - public void execute(Object result) { - callback.run(null); - } - @Override - public void exception(Throwable t) { - if (t instanceof DatabaseException) - callback.run((DatabaseException)t); - else - callback.run(new DatabaseException("" + name + "operation failed.", t)); - } - }; - final WriteTraits writeTraits = new WriteTraits() { - @Override - public UndoTraits getUndoTraits() { - return null; - } - @Override - public VirtualGraph getProvider() { - return null; - } - }; - TaskHelper(String name) { - this.name = name; - } - T getResult() { - return (T)result; - } - void setResult(Object result) { - this.result = result; - } -// Throwable throwableGet() { -// return throwable; -// } - synchronized void throwableSet(Throwable t) { - throwable = t; - } -// void throwableSet(String t) { -// throwable = new InternalException("" + name + " operation failed. " + t); -// } - synchronized void throwableCheck() - throws DatabaseException { - if (null != throwable) - if (throwable instanceof DatabaseException) - throw (DatabaseException)throwable; - else - throw new DatabaseException("Undo operation failed.", throwable); - } - void throw_(String message) - throws DatabaseException { - throw new DatabaseException("" + name + " operation failed. " + message); - } - } - - - - private ListenerBase getListenerBase(Object procedure) { - if (procedure instanceof ListenerBase) - return (ListenerBase) procedure; - else - return null; - } - - public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify) { - scheduleRequest(request, callback, notify, null); - } - - /* (non-Javadoc) - * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) - */ - @Override - public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify, Boolean combine) { - - assert (request != null); - - if(Development.DEVELOPMENT) { - try { - if(Development.getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN)) - System.err.println("schedule write '" + request + "'"); - } catch (Throwable t) { - Logger.defaultLogError(t); - } - } - - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread, thread) { - - @Override - public void run(int thread) { - - if(Development.DEVELOPMENT) { - try { - if(Development.getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN)) - System.err.println("perform write '" + request + "'"); - } catch (Throwable t) { - Logger.defaultLogError(t); - } - } - - ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); - - fireSessionVariableChange(SessionVariables.QUEUED_WRITES); - - try { - - flushCounter = 0; - clientChanges = new ClientChangesImpl(SessionImplSocket.this); - - VirtualGraph vg = getProvider(request.getProvider()); - - WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); - writeState = new WriteState(writer, request, notify, new Procedure() { - - @Override - public void execute(Object result) { - if(callback != null) callback.run(null); - } - - @Override - public void exception(Throwable t) { - if(callback != null) callback.run((DatabaseException)t); - } - - }); - - assert (null != writer); -// writer.state.barrier.inc(); - - try { - request.perform(writer); - assert (null != writer); - } catch (Throwable t) { - if (!(t instanceof CancelTransactionException)) - Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t); - writeState.except(t); - } finally { -// writer.state.barrier.dec(); -// writer.waitAsync(request); - } - - - assert(!queryProvider2.dirty); - - } catch (Throwable e) { - - // Log it first, just to be safe that the error is always logged. - if (!(e instanceof CancelTransactionException)) - Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); - -// writeState.getGraph().state.barrier.dec(); -// writeState.getGraph().waitAsync(request); - - writeState.except(e); - -// try { -// // Callback is client code, we have to be prepared for it to throw unexpected exceptions. -// // All we can do here is to log those, can't really pass them anywhere. -// if (callback != null) { -// if(e instanceof DatabaseException) callback.run((DatabaseException)e); -// else callback.run(new DatabaseException(e)); -// } -// } catch (Throwable e2) { -// Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2); -// } -// boolean empty = clusterStream.reallyFlush(); -// int callerThread = -1; -// ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this); -// SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1); -// try { -// // Send all local changes to server so it can calculate correct reverse change set. -// // This will call clusterStream.accept(). -// state.cancelCommit(context, clusterStream); -// if (!empty) { -// if (!context.isOk()) // this is a blocking operation -// throw new InternalException("Cancel failed. This should never happen. Contact application support."); -// getQueryProvider2().performDirtyUpdates(writeState.getGraph()); -// } -// state.cancelCommit2(context, clusterStream); -// } catch (DatabaseException e3) { -// Logger.defaultLogError("Write request cancel caused an unexpected error.", e3); -// } finally { -// clusterStream.setOff(false); -// clientChanges = new ClientChangesImpl(SessionImplSocket.this); -// } - - } finally { - - fireSessionVariableChange(SessionVariables.QUEUED_WRITES); - - } - - task.finish(); - - if(Development.DEVELOPMENT) { - try { - if(Development.getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN)) - System.err.println("finish write '" + request + "'"); - } catch (Throwable t) { - Logger.defaultLogError(t); - } - } - - } - - }, combine); - - } - - public void scheduleRequest(final WriteResult request, final Procedure procedure, final Semaphore notify) { - scheduleRequest(request, procedure, notify, null); - } - - /* (non-Javadoc) - * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean) - */ - @Override - public void scheduleRequest(final WriteResult request, final Procedure procedure, final Semaphore notify, Boolean combine) { - - assert (request != null); - - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread) { - - @Override - public void run(int thread) { - - ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); - - fireSessionVariableChange(SessionVariables.QUEUED_WRITES); - - flushCounter = 0; - clientChanges = new ClientChangesImpl(SessionImplSocket.this); - - VirtualGraph vg = getProvider(request.getProvider()); - WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); - - try { - WriteState writeStateT = new WriteState(writer, request, notify, procedure); - writeState = writeStateT; - - assert (null != writer); -// writer.state.barrier.inc(); - writeStateT.setResult(request.perform(writer)); - assert (null != writer); - -// writer.state.barrier.dec(); -// writer.waitAsync(null); - - } catch (Throwable e) { - -// writer.state.barrier.dec(); -// writer.waitAsync(null); - - writeState.except(e); - -// state.stopWriteTransaction(clusterStream); -// -// } catch (Throwable e) { -// // Log it first, just to be safe that the error is always logged. -// Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); -// -// try { -// // Callback is client code, we have to be prepared for it to throw unexpected exceptions. -// // All we can do here is to log those, can't really pass them anywhere. -// if (procedure != null) { -// if(e instanceof DatabaseException) procedure.exception((DatabaseException)e); -// else procedure.exception(new DatabaseException(e)); -// } -// } catch (Throwable e2) { -// Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2); -// } -// -// clientChanges = new ClientChangesImpl(SessionImplSocket.this); -// -// state.stopWriteTransaction(clusterStream); - - } finally { - fireSessionVariableChange(SessionVariables.QUEUED_WRITES); - } - -// if(notify != null) notify.release(); - - task.finish(); - - } - - }, combine); - - } - - public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify) { - scheduleRequest(request, callback, notify, null); - } - - /* (non-Javadoc) - * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) - */ - @Override - public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify, Boolean combine) { - - final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite"); - - assert (request != null); - - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread) { - - @Override - public void run(int thread) { - fireSessionVariableChange(SessionVariables.QUEUED_READS); - - Procedure stateProcedure = new Procedure() { - @Override - public void execute(Object result) { - if (callback != null) - callback.run(null); - } - @Override - public void exception(Throwable t) { - if (callback != null) { - if (t instanceof DatabaseException) callback.run((DatabaseException) t); - else callback.run(new DatabaseException(t)); - } else - Logger.defaultLogError("Unhandled exception", t); - } - }; - - final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); - delayedWriteState = new WriteStateBase(request, notify, stateProcedure); - DelayedWriteGraph dwg = null; -// newGraph.state.barrier.inc(); - - try { - dwg = new DelayedWriteGraph(newGraph); - request.perform(dwg); - } catch (Throwable e) { - delayedWriteState.except(e); - total.finish(); - return; - } finally { -// newGraph.state.barrier.dec(); -// newGraph.waitAsync(request); - fireSessionVariableChange(SessionVariables.QUEUED_READS); - } - - delayedWriteState = null; - - ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit"); - fireSessionVariableChange(SessionVariables.QUEUED_WRITES); - - flushCounter = 0; - clientChanges = new ClientChangesImpl(SessionImplSocket.this); - - acquireWriteOnly(); - - VirtualGraph vg = getProvider(request.getProvider()); - WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport(); - - WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); - - writeState = new WriteState(writer, request, notify, stateProcedure); - - assert (null != writer); -// writer.state.barrier.inc(); - - try { - // Cannot cancel - dwg.commit(writer, request); - - if(defaultClusterSet != null) { - XSupport xs = getService(XSupport.class); - defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet); - ClusteringSupport cs = getService(ClusteringSupport.class); - clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet)); - } - - // This makes clusters available from server - clusterStream.reallyFlush(); - - releaseWriteOnly(writer); - handleUpdatesAndMetadata(writer); - - } catch (ServiceException e) { -// writer.state.barrier.dec(); -// writer.waitAsync(null); - - // These shall be requested from server - clusterTable.removeWriteOnlyClusters(); - // This makes clusters available from server - clusterStream.reallyFlush(); - - releaseWriteOnly(writer); - writeState.except(e); - } finally { - // Debugging & Profiling - fireSessionVariableChange(SessionVariables.QUEUED_WRITES); - task2.finish(); - total.finish(); - } - } - - }, combine); - - } - - public void scheduleRequest(final DelayedWriteResult request, final Procedure procedure, final Semaphore notify) { - scheduleRequest(request, procedure, notify, null); - } - - /* (non-Javadoc) - * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean) - */ - @Override - public void scheduleRequest(final DelayedWriteResult request, final Procedure procedure, final Semaphore notify, Boolean combine) { - throw new Error("Not implemented"); - } - - protected ClusterImpl getNewResourceCluster() throws DatabaseException { - ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly); - if((serviceMode & SERVICE_MODE_CREATE) > 0) { - createdClusters.add(cluster.clusterId); - } - return cluster; - } - - class WriteOnlySupport implements WriteSupport { - - ClusterStream stream; - ClusterImpl currentCluster; - - public WriteOnlySupport() { - this.stream = clusterStream; - } - - @Override - public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException { - claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id ); - } - - @Override - public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException { - - ClusterImpl cluster = clusterTable.getClusterByResourceKey(s); - - if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0) - if(s != queryProvider2.getRootLibrary()) - throw new ImmutableException("Trying to modify immutable resource key=" + s); - - try { - maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator)); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - //throw e; - } - - clientChanges.invalidate(s); - - if (cluster.isWriteOnly()) - return; - queryProvider2.updateStatements(s, p); - - } - - @Override - public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException { - claimValue(provider, ((ResourceImpl)resource).id, value, value.length); - } - - @Override - public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) { - - ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid); - try { - maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator)); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - - clientChanges.invalidate(rid); - - if (cluster.isWriteOnly()) - return; - queryProvider2.updateValue(rid); - - } - - @Override - public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException { - - if(amount < 65536) { - claimValue(provider,resource, reader.readBytes(null, amount)); - return; - } - - byte[] bytes = new byte[65536]; - - int rid = ((ResourceImpl)resource).id; - ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid); - try { - int left = amount; - while(left > 0) { - int block = Math.min(left, 65536); - reader.readBytes(bytes, block); - maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator)); - left -= block; - } - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - - clientChanges.invalidate(rid); - - if (cluster.isWriteOnly()) - return; - queryProvider2.updateValue(rid); - - } - - private void maintainCluster(ClusterImpl before, ClusterI after_) { - if(after_ != null && after_ != before) { - ClusterImpl after = (ClusterImpl)after_; - if(currentCluster == before) currentCluster = after; - clusterTable.replaceCluster(after); - } - } - - public int createResourceKey(int foreignCounter) throws DatabaseException { - if(currentCluster == null) - currentCluster = getNewResourceCluster(); - if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) { - ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster(); - newCluster.foreignLookup = new byte[foreignCounter]; - currentCluster = newCluster; - if (DEBUG) - System.err.println("foreignLookup " + currentCluster + " " + foreignCounter); - } - return currentCluster.createResource(clusterTranslator); - } - - @Override - public Resource createResource(VirtualGraph provider) throws DatabaseException { - if(currentCluster == null) { - if (null != defaultClusterSet) { - ResourceImpl result = getNewResource(defaultClusterSet); - currentCluster = clusterTable.getClusterByResourceKey(result.id); - return result; - } else { - currentCluster = getNewResourceCluster(); - } - } - if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) { - if (null != defaultClusterSet) { - ResourceImpl result = getNewResource(defaultClusterSet); - currentCluster = clusterTable.getClusterByResourceKey(result.id); - return result; - } else { - currentCluster = getNewResourceCluster(); - } - } - return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator)); - } - - @Override - public Resource createResource(VirtualGraph provider, long clusterId) - throws DatabaseException { - return getNewResource(clusterId); - } - - @Override - public Resource createResource(VirtualGraph provider, Resource clusterSet) - throws DatabaseException { - return getNewResource(clusterSet); - } - - @Override - public void createClusterSet(VirtualGraph provider, Resource clusterSet) - throws DatabaseException { - getNewClusterSet(clusterSet); - } - - @Override - public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet) - throws ServiceException { - return containsClusterSet(clusterSet); - } - - public void selectCluster(long cluster) { - currentCluster = clusterTable.getClusterByClusterId(cluster); - long setResourceId = clusterSetsSupport.getSet(cluster); - clusterSetsSupport.put(setResourceId, cluster); - } - - @Override - public Resource setDefaultClusterSet(Resource clusterSet) - throws ServiceException { - Resource result = setDefaultClusterSet4NewResource(clusterSet); - if(clusterSet != null) { - long id = clusterSetsSupport.get(clusterSet.getResourceId()); - currentCluster = clusterTable.getClusterByClusterId(id); - return result; - } else { - currentCluster = null; - return null; - } - } - - @Override - public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException { - - provider = getProvider(provider); - if (null == provider) { - int key = ((ResourceImpl)resource).id; - - - - ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key); -// if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0) -// if(key != queryProvider2.getRootLibrary()) -// throw new ImmutableException("Trying to modify immutable resource key=" + key); - -// try { -// cluster.removeValue(key, clusterTranslator); -// } catch (DatabaseException e) { -// Logger.defaultLogError(e); -// return; -// } - - clusterTable.writeOnlyInvalidate(cluster); - - try { - int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key); - clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION); - clusterTranslator.removeValue(cluster); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - - queryProvider2.invalidateResource(key); - clientChanges.invalidate(key); - - } else { - ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id); - queryProvider2.updateValue(querySupport.getId(resource)); - clientChanges.claimValue(resource); - } - - - } - - @Override - public void flush(boolean intermediate) { - throw new UnsupportedOperationException(); - } - - @Override - public void flushCluster() { - clusterTable.flushCluster(graphSession); - if(defaultClusterSet != null) { - clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId); - } - currentCluster = null; - } - - @Override - public void flushCluster(Resource r) { - throw new UnsupportedOperationException("flushCluster resource " + r); - } - - @Override - public void gc() { - } - - @Override - public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, - Resource object) { - - int s = ((ResourceImpl)subject).id; - int p = ((ResourceImpl)predicate).id; - int o = ((ResourceImpl)object).id; - - provider = getProvider(provider); - if (null == provider) { - - ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s); - clusterTable.writeOnlyInvalidate(cluster); - - try { - - int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s); - int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p); - int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o); - - ClusterI pc = clusterTable.getClusterProxyByResourceKey(p); - ClusterI oc = clusterTable.getClusterProxyByResourceKey(o); - - clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION); - clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION); - clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION); - clusterTranslator.removeStatement(cluster); - - queryProvider2.invalidateResource(s); - clientChanges.invalidate(s); - - } catch (DatabaseException e) { - - Logger.defaultLogError(e); - - } - - return true; - - } else { - - ((VirtualGraphImpl)provider).deny(s, p, o); - queryProvider2.invalidateResource(s); - clientChanges.invalidate(s); - - return true; - - } - - } - - @Override - public void setValue(VirtualGraph provider, Resource resource, byte[] value) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean writeOnly() { - return true; - } - - @Override - public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException { - writeSupport.performWriteRequest(graph, request); - } - - @Override - public T performWriteRequest(WriteGraph graph, WriteResult request) throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public void addMetadata(Metadata data) throws ServiceException { - writeSupport.addMetadata(data); - } - - @Override - public T getMetadata(Class clazz) throws ServiceException { - return writeSupport.getMetadata(clazz); - } - - @Override - public TreeMap getMetadata() { - return writeSupport.getMetadata(); - } - - @Override - public void commitDone(WriteTraits writeTraits, long csid) { - writeSupport.commitDone(writeTraits, csid); - } - - @Override - public void clearUndoList(WriteTraits writeTraits) { - writeSupport.clearUndoList(writeTraits); - } - @Override - public int clearMetadata() { - return writeSupport.clearMetadata(); - } - - @Override - public void startUndo() { - writeSupport.startUndo(); - } - } - - class VirtualWriteOnlySupport implements WriteSupport { - -// @Override -// public void addStatement(VirtualGraph provider, Resource subject, Resource predicate, -// Resource object) { -// throw new UnsupportedOperationException(); -// } - - @Override - public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) { - - TransientGraph impl = (TransientGraph)provider; - impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object)); - getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate)); - clientChanges.claim(subject, predicate, object); - - } - - @Override - public void claim(VirtualGraph provider, int subject, int predicate, int object) { - - TransientGraph impl = (TransientGraph)provider; - impl.claim(subject, predicate, object); - getQueryProvider2().updateStatements(subject, predicate); - clientChanges.claim(subject, predicate, object); - - } - - @Override - public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException { - claimValue(provider, ((ResourceImpl)resource).id, value, value.length); - } - - @Override - public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) { - ((VirtualGraphImpl)provider).claimValue(resource, value, length); - getQueryProvider2().updateValue(resource); - clientChanges.claimValue(resource); - } - - @Override - public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException { - byte[] value = reader.readBytes(null, amount); - claimValue(provider, resource, value); - } - - @Override - public Resource createResource(VirtualGraph provider) { - TransientGraph impl = (TransientGraph)provider; - return impl.getResource(impl.newResource(false)); - } - - @Override - public Resource createResource(VirtualGraph provider, long clusterId) { - throw new UnsupportedOperationException(); - } - - @Override - public Resource createResource(VirtualGraph provider, Resource clusterSet) - throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public void createClusterSet(VirtualGraph provider, Resource clusterSet) - throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet) - throws ServiceException { - throw new UnsupportedOperationException(); - } - - @Override - public Resource setDefaultClusterSet(Resource clusterSet) - throws ServiceException { - return null; - } - - @Override - public void denyValue(VirtualGraph provider, Resource resource) { - ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id); - getQueryProvider2().updateValue(querySupport.getId(resource)); - // NOTE: this only keeps track of value changes by-resource. - clientChanges.claimValue(resource); - } - - @Override - public void flush(boolean intermediate) { - throw new UnsupportedOperationException(); - } - - @Override - public void flushCluster() { - throw new UnsupportedOperationException(); - } - - @Override - public void flushCluster(Resource r) { - throw new UnsupportedOperationException("Resource " + r); - } - - @Override - public void gc() { - } - - @Override - public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) { - TransientGraph impl = (TransientGraph) provider; - impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object)); - getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate)); - clientChanges.deny(subject, predicate, object); - return true; - } - - @Override - public void setValue(VirtualGraph provider, Resource resource, byte[] value) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean writeOnly() { - return true; - } - - @Override - public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public T performWriteRequest(WriteGraph graph, WriteResult request) throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public void addMetadata(Metadata data) throws ServiceException { - throw new UnsupportedOperationException(); - } - - @Override - public T getMetadata(Class clazz) throws ServiceException { - throw new UnsupportedOperationException(); - } - - @Override - public TreeMap getMetadata() { - throw new UnsupportedOperationException(); - } - - @Override - public void commitDone(WriteTraits writeTraits, long csid) { - } - - @Override - public void clearUndoList(WriteTraits writeTraits) { - } - - @Override - public int clearMetadata() { - return 0; - } - - @Override - public void startUndo() { - } - } - - private void performWriteOnly(WriteOnlyResult request, Semaphore notify, Procedure callback) { - - try { - - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - fireSessionVariableChange(SessionVariables.QUEUED_WRITES); - - flushCounter = 0; - clientChanges = new ClientChangesImpl(SessionImplSocket.this); - - acquireWriteOnly(); - - WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport(); - - WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider()); - - WriteState writeStateT = new WriteState(writer, request, notify, callback); - writeState = writeStateT; - - assert (null != writer); -// writer.state.barrier.inc(); - long start = System.nanoTime(); - T result = request.perform(writer); - long duration = System.nanoTime() - start; - if (DEBUG) { - System.err.println("################"); - System.err.println("WriteOnly duration " + 1e-9*duration); - } - writeStateT.setResult(result); - - // This makes clusters available from server - clusterStream.reallyFlush(); - - // This will trigger query updates - releaseWriteOnly(writer); - assert (null != writer); - - handleUpdatesAndMetadata(writer); - - } catch (CancelTransactionException e) { - - releaseWriteOnly(writeState.getGraph()); - - clusterTable.removeWriteOnlyClusters(); - state.stopWriteTransaction(clusterStream); - - } catch (Throwable e) { - - e.printStackTrace(); - - releaseWriteOnly(writeState.getGraph()); - - clusterTable.removeWriteOnlyClusters(); - - if (callback != null) - callback.exception(new DatabaseException(e)); - - state.stopWriteTransaction(clusterStream); - Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); - - } finally { - - fireSessionVariableChange(SessionVariables.QUEUED_WRITES); - - } - - } - - public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify) { - scheduleRequest(request, callback, notify, null); - } - - @Override - public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify, Boolean combine) { - - assertAlive(); - - assert (request != null); - - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread, thread) { - - @Override - public void run(int thread) { - - ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); - - try { - - fireSessionVariableChange(SessionVariables.QUEUED_WRITES); - - flushCounter = 0; - clientChanges = new ClientChangesImpl(SessionImplSocket.this); - - acquireWriteOnly(); - - VirtualGraph vg = getProvider(request.getProvider()); - WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport(); - - WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); - - writeState = new WriteState(writer, request, notify, new Procedure() { - - @Override - public void execute(Object result) { - if(callback != null) callback.run(null); - } - - @Override - public void exception(Throwable t) { - if(callback != null) callback.run((DatabaseException)t); - } - - }); - - assert (null != writer); -// writer.state.barrier.inc(); - - try { - - request.perform(writer); - - } catch (Throwable e) { - -// writer.state.barrier.dec(); -// writer.waitAsync(null); - - releaseWriteOnly(writer); - - clusterTable.removeWriteOnlyClusters(); - - if(!(e instanceof CancelTransactionException)) { - if (callback != null) - callback.run(new DatabaseException(e)); - } - - writeState.except(e); - - return; - - } - - // This makes clusters available from server - boolean empty = clusterStream.reallyFlush(); - // This was needed to make WO requests call metadata listeners. - // NOTE: the calling event does not contain clientChanges information. - if (!empty && clientChanges.isEmpty()) - clientChanges.setNotEmpty(true); - releaseWriteOnly(writer); - assert (null != writer); - } finally { - - fireSessionVariableChange(SessionVariables.QUEUED_WRITES); - - } - - - task.finish(); - - } - - }, combine); - - } - - public void scheduleRequest(final WriteOnlyResult request, final Procedure callback, final Semaphore notify) { - scheduleRequest(request, callback, notify, null); - } - - /* (non-Javadoc) - * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean) - */ - @Override - public void scheduleRequest(final WriteOnlyResult request, final Procedure callback, final Semaphore notify, Boolean combine) { - - assert (request != null); - - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread) { - - @Override - public void run(int thread) { - - ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); - - performWriteOnly(request, notify, callback); - - task.finish(); - - } - - }, combine); - - } - - public void scheduleRequest(final Read request, final AsyncProcedure procedure, final Semaphore notify, final DataContainer throwable, final DataContainer result) { - - assert (request != null); - assert (procedure != null); - - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) { - - @Override - public void run(int thread) { - - fireSessionVariableChange(SessionVariables.QUEUED_READS); - - ListenerBase listener = getListenerBase(procedure); - - final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); - - try { - - if (listener != null) { - - try { - newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure() { - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - procedure.exception(graph, t); - if(throwable != null) { - throwable.set(t); - } else { - // ErrorLogger.defaultLogError("Unhandled exception", t); - } - } - - @Override - public void execute(AsyncReadGraph graph, T t) { - if(result != null) result.set(t); - procedure.execute(graph, t); - } - - }, listener); - } catch (Throwable t) { - // This is handled by the AsyncProcedure - //Logger.defaultLogError("Internal error", t); - } - - } else { - - try { - -// newGraph.state.barrier.inc(); - - T t = request.perform(newGraph); - - try { - - if(result != null) result.set(t); - procedure.execute(newGraph, t); - - } catch (Throwable th) { - - if(throwable != null) { - throwable.set(th); - } else { - Logger.defaultLogError("Unhandled exception", th); - } - - } - - } catch (Throwable t) { - - if (DEBUG) - t.printStackTrace(); - - if(throwable != null) { - throwable.set(t); - } else { - Logger.defaultLogError("Unhandled exception", t); - } - - try { - - procedure.exception(newGraph, t); - - } catch (Throwable t2) { - - if(throwable != null) { - throwable.set(t2); - } else { - Logger.defaultLogError("Unhandled exception", t2); - } - - } - - } - -// newGraph.state.barrier.dec(); -// newGraph.waitAsync(request); - - } - - } finally { - - fireSessionVariableChange(SessionVariables.QUEUED_READS); - - } - - } - - }); - - } - - public void scheduleRequest(final AsyncRead request, final AsyncProcedure procedure, final ListenerBase listener, final Semaphore notify) { - - assert (request != null); - assert (procedure != null); - - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleRead(new SessionRead(request, null, notify, thread) { - - @Override - public void run(int thread) { - - fireSessionVariableChange(SessionVariables.QUEUED_READS); - - final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); - - try { - - if (listener != null) { - - newGraph.processor.query(newGraph, request, null, procedure, listener); - -// newGraph.waitAsync(request); - - } else { - - final ResultCallWrappedSingleQueryProcedure4 wrapper = new ResultCallWrappedSingleQueryProcedure4( - procedure, "request"); - - try { - -// newGraph.state.barrier.inc(); - - request.perform(newGraph, wrapper); - -// newGraph.waitAsync(request); - - } catch (Throwable t) { - - wrapper.exception(newGraph, t); -// newGraph.waitAsync(request); - - - } - - } - - } finally { - - fireSessionVariableChange(SessionVariables.QUEUED_READS); - - } - - } - - }); - - } - - public void scheduleRequest(final AsyncMultiRead request, final AsyncMultiProcedure procedure, final Semaphore notify) { - - assert (request != null); - assert (procedure != null); - - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - int sync = notify != null ? thread : -1; - - requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) { - - @Override - public void run(int thread) { - - fireSessionVariableChange(SessionVariables.QUEUED_READS); - - ListenerBase listener = getListenerBase(procedure); - - final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); - - try { - - if (listener != null) { - - newGraph.processor.query(newGraph, request, null, procedure, listener); - -// newGraph.waitAsync(request); - - } else { - - final ResultCallWrappedQueryProcedure4 wrapper = new ResultCallWrappedQueryProcedure4(procedure); - - try { - - request.perform(newGraph, wrapper); - - } catch (Throwable t) { - - t.printStackTrace(); - - } - - } - - } finally { - - fireSessionVariableChange(SessionVariables.QUEUED_READS); - - } - - } - - }); - - } - - public void scheduleRequest(final ExternalRead request, final Procedure procedure, final Semaphore notify, final DataContainer throwable, final DataContainer result) { - - assert (request != null); - assert (procedure != null); - - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) { - - @Override - public void run(int thread) { - - fireSessionVariableChange(SessionVariables.QUEUED_READS); - - ListenerBase listener = getListenerBase(procedure); - - final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); - - try { - - if (listener != null) { - - newGraph.processor.query(newGraph, request, null, new Procedure() { - - @Override - public void exception(Throwable t) { - procedure.exception(t); - if(throwable != null) { - throwable.set(t); - } - } - - @Override - public void execute(T t) { - if(result != null) result.set(t); - procedure.execute(t); - } - - }, listener); - -// newGraph.waitAsync(request); - - } else { - -// newGraph.state.barrier.inc(); - - request.register(newGraph, new Listener() { - - @Override - public void exception(Throwable t) { - if(throwable != null) throwable.set(t); - procedure.exception(t); -// newGraph.state.barrier.dec(); - } - - @Override - public void execute(T t) { - if(result != null) result.set(t); - procedure.execute(t); -// newGraph.state.barrier.dec(); - } - - @Override - public boolean isDisposed() { - return true; - } - - }); - -// newGraph.waitAsync(request); - - } - - } finally { - - fireSessionVariableChange(SessionVariables.QUEUED_READS); - - } - - } - - }); - - } - - - @Override - public void asyncRequest(final Read request, final AsyncProcedure procedure) { - - scheduleRequest(request, procedure, null, null, null); - - } - - @Override - public T syncRequest(Read request, AsyncProcedure procedure) throws DatabaseException { - assertNotSession(); - Semaphore notify = new Semaphore(0); - DataContainer container = new DataContainer(); - DataContainer result = new DataContainer(); - scheduleRequest(request, procedure, notify, container, result); - acquire(notify, request); - Throwable throwable = container.get(); - if(throwable != null) { - if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; - else throw new DatabaseException("Unexpected exception", throwable); - } - return result.get(); - } - - @Override - public T syncRequest(AsyncRead request, final AsyncProcedure procedure) throws DatabaseException { - assertNotSession(); - Semaphore notify = new Semaphore(0); - final DataContainer exceptionContainer = new DataContainer(); - final DataContainer resultContainer = new DataContainer(); - scheduleRequest(request, new AsyncProcedure() { - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - exceptionContainer.set(throwable); - procedure.exception(graph, throwable); - } - @Override - public void execute(AsyncReadGraph graph, T result) { - resultContainer.set(result); - procedure.execute(graph, result); - } - }, getListenerBase(procedure), notify); - acquire(notify, request, procedure); - Throwable throwable = exceptionContainer.get(); - if (throwable != null) { - if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; - else throw new DatabaseException("Unexpected exception", throwable); - } - return resultContainer.get(); - } - - - @Override - public Collection syncRequest(AsyncMultiRead request, final AsyncMultiProcedure procedure) throws DatabaseException { - assertNotSession(); - Semaphore notify = new Semaphore(0); - final DataContainer exceptionContainer = new DataContainer(); - scheduleRequest(request, new AsyncMultiProcedure() { - @Override - public void exception(AsyncReadGraph graph, Throwable throwable) { - exceptionContainer.set(throwable); - procedure.exception(graph, throwable); - } - @Override - public void execute(AsyncReadGraph graph, T result) { - procedure.execute(graph, result); - } - @Override - public void finished(AsyncReadGraph graph) { - procedure.finished(graph); - } - }, notify); - acquire(notify, request, procedure); - Throwable throwable = exceptionContainer.get(); - if (throwable != null) { - if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; - else throw new DatabaseException("Unexpected exception", throwable); - } - // TODO: implement return value - System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)"); - return null; - } - - @Override - public T syncRequest(final ExternalRead request) throws DatabaseException { - return syncRequest(request, new ProcedureAdapter()); - - -// assert(request != null); -// -// final DataContainer result = new DataContainer(); -// final DataContainer exception = new DataContainer(); -// -// syncRequest(request, new Procedure() { -// -// @Override -// public void execute(T t) { -// result.set(t); -// } -// -// @Override -// public void exception(Throwable t) { -// exception.set(t); -// } -// -// }); -// -// Throwable t = exception.get(); -// if(t != null) { -// if(t instanceof DatabaseException) throw (DatabaseException)t; -// else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t); -// } -// -// return result.get(); - - } - - @Override - public T syncRequest(ExternalRead request, final Listener procedure) throws DatabaseException { - return syncRequest(request, (Procedure)procedure); - } - - -// @Override -// public T syncRequest(Read request, AsyncProcedure procedure) throws DatabaseException { -// assertNotSession(); -// Semaphore notify = new Semaphore(0); -// DataContainer container = new DataContainer(); -// DataContainer result = new DataContainer(); -// scheduleRequest(request, procedure, notify, container, result); -// acquire(notify, request); -// Throwable throwable = container.get(); -// if(throwable != null) { -// if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; -// else throw new DatabaseException("Unexpected exception", throwable); -// } -// return result.get(); -// } - - - @Override - public T syncRequest(ExternalRead request, final Procedure procedure) throws DatabaseException { - assertNotSession(); - Semaphore notify = new Semaphore(0); - final DataContainer container = new DataContainer(); - final DataContainer result = new DataContainer(); - scheduleRequest(request, procedure, notify, container, result); - acquire(notify, request); - Throwable throwable = container.get(); - if (throwable != null) { - if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; - else throw new DatabaseException("Unexpected exception", throwable); - } - return result.get(); - } - - @Override - public void syncRequest(Write request) throws DatabaseException { - assertNotSession(); - assertAlive(); - Semaphore notify = new Semaphore(0); - final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - - @Override - public void run(DatabaseException e) { - exception.set(e); - } - - }, notify); - acquire(notify, request); - if(exception.get() != null) throw exception.get(); - } - - @Override - public T syncRequest(WriteResult request) throws DatabaseException { - assertNotSession(); - Semaphore notify = new Semaphore(0); - final DataContainer exception = new DataContainer(); - final DataContainer result = new DataContainer(); - scheduleRequest(request, new Procedure() { - - @Override - public void exception(Throwable t) { - exception.set(t); - } - - @Override - public void execute(T t) { - result.set(t); - } - - }, notify); - acquire(notify, request); - if(exception.get() != null) { - Throwable t = exception.get(); - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException(t); - } - return result.get(); - } - - @Override - public T syncRequest(WriteOnlyResult request) throws DatabaseException { - assertNotSession(); - Semaphore notify = new Semaphore(0); - final DataContainer exception = new DataContainer(); - final DataContainer result = new DataContainer(); - scheduleRequest(request, new Procedure() { - - @Override - public void exception(Throwable t) { - exception.set(t); - } - - @Override - public void execute(T t) { - result.set(t); - } - - }, notify); - acquire(notify, request); - if(exception.get() != null) { - Throwable t = exception.get(); - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException(t); - } - return result.get(); - } - - @Override - public T syncRequest(DelayedWriteResult request) throws DatabaseException { - assertNotSession(); - Semaphore notify = new Semaphore(0); - final DataContainer exception = new DataContainer(); - final DataContainer result = new DataContainer(); - scheduleRequest(request, new Procedure() { - - @Override - public void exception(Throwable t) { - exception.set(t); - } - - @Override - public void execute(T t) { - result.set(t); - } - - }, notify); - acquire(notify, request); - if(exception.get() != null) { - Throwable t = exception.get(); - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException(t); - } - return result.get(); - } - - @Override - public void syncRequest(DelayedWrite request) throws DatabaseException { - assertNotSession(); - Semaphore notify = new Semaphore(0); - final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - @Override - public void run(DatabaseException e) { - exception.set(e); - } - }, notify); - acquire(notify, request); - if(exception.get() != null) throw exception.get(); - } - - @Override - public void syncRequest(WriteOnly request) throws DatabaseException { - assertNotSession(); - assertAlive(); - Semaphore notify = new Semaphore(0); - final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - @Override - public void run(DatabaseException e) { - exception.set(e); - } - }, notify); - acquire(notify, request); - if(exception.get() != null) throw exception.get(); - } - - /* - * - * GraphRequestProcessor interface - */ - - @Override - public void asyncRequest(final AsyncRead request, final AsyncProcedure procedure) { - - scheduleRequest(request, procedure, null, null); - - } - - @Override - public void asyncRequest(final AsyncMultiRead request, final AsyncMultiProcedure procedure) { - - scheduleRequest(request, procedure, null); - - } - - @Override - public void asyncRequest(final ExternalRead request, final Procedure procedure) { - - scheduleRequest(request, procedure, null, null, null); - - } - - @Override - public void asyncRequest(final Write request, final Callback callback) { - - scheduleRequest(request, callback, null); - - } - - @Override - public void asyncRequest(final WriteResult request, final Procedure procedure) { - - scheduleRequest(request, procedure, null); - - } - - @Override - public void asyncRequest(final WriteOnlyResult request, final Procedure procedure) { - - scheduleRequest(request, procedure, null); - - } - - @Override - public void asyncRequest(final DelayedWriteResult request, final Procedure procedure) { - - scheduleRequest(request, procedure, null); - - } - - @Override - public void asyncRequest(final DelayedWrite request, final Callback callback) { - - scheduleRequest(request, callback, null); - - } - - @Override - public void asyncRequest(final Write r) { - asyncRequest(r, null); - } - - @Override - public void asyncRequest(final DelayedWrite r) { - asyncRequest(r, null); - } - - @Override - public void asyncRequest(final WriteOnly request, final Callback callback) { - - scheduleRequest(request, callback, null); - - } - - @Override - public void asyncRequest(final WriteOnly request) { - - asyncRequest(request, null); - - } - - @Override - public void async(ReadInterface r, AsyncProcedure procedure) { - r.request(this, procedure); - } - - @Override - public void async(ReadInterface r, Procedure procedure) { - r.request(this, procedure); - } - - @Override - public void async(ReadInterface r, SyncProcedure procedure) { - r.request(this, procedure); - } - - @Override - public void async(ReadInterface r, AsyncListener procedure) { - r.request(this, procedure); - } - - @Override - public void async(ReadInterface r, Listener procedure) { - r.request(this, procedure); - } - - @Override - public void async(ReadInterface r, SyncListener procedure) { - r.request(this, procedure); - } - - @Override - public T sync(ReadInterface r) throws DatabaseException { - return r.request(this); - } - - @Override - public T sync(WriteInterface r) throws DatabaseException { - return r.request(this); - } - - @Override - public void async(WriteInterface r, Procedure procedure) { - r.request(this, procedure); - } - - @Override - public void async(WriteInterface r) { - r.request(this, new ProcedureAdapter()); - } - - //@Override - public void incAsync() { - state.incAsync(); - } - - //@Override - public void decAsync() { - state.decAsync(); - } - - public long getCluster(ResourceImpl resource) { - ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id); - return cluster.getClusterId(); - } - - public long getCluster(int id) { - if (clusterTable == null) - System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come"); - return clusterTable.getClusterIdByResourceKeyNoThrow(id); - } - - public ResourceImpl getResource(int id) { - return new ResourceImpl(resourceSupport, id); - } - - public ResourceImpl getResource(int resourceIndex, long clusterId) { - assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex)); - ClusterI proxy = clusterTable.getClusterByClusterId(clusterId); - int key = proxy.getClusterKey(); - int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex); - return new ResourceImpl(resourceSupport, resourceKey); - } - - public ResourceImpl getResource2(int id) { - assert (id != 0); - return new ResourceImpl(resourceSupport, id); - } - - final public int getId(ResourceImpl impl) { - return impl.id; - } - - public static final Charset UTF8 = Charset.forName("utf-8"); - - - - - static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) { - for(TransientGraph g : support.providers) { - if(g.isPending(subject)) return false; - } - return true; - } - - static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) { - for(TransientGraph g : support.providers) { - if(g.isPending(subject, predicate)) return false; - } - return true; - } - - static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback runnable) { - - Callback composite = new Callback() { - - AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); - - @Override - public void run(ReadGraphImpl graph) { - if(ready.decrementAndGet() == 0) { - runnable.run(graph); - } - } - - }; - - for(TransientGraph g : support.providers) { - if(g.isPending(subject)) { - try { - g.load(graph, subject, composite); - } catch (DatabaseException e) { - e.printStackTrace(); - } - } else { - composite.run(graph); - } - } - - composite.run(graph); - - } - - static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback runnable) { - - Callback composite = new Callback() { - - AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); - - @Override - public void run(ReadGraphImpl graph) { - if(ready.decrementAndGet() == 0) { - runnable.run(graph); - } - } - - }; - - for(TransientGraph g : support.providers) { - if(g.isPending(subject, predicate)) { - try { - g.load(graph, subject, predicate, composite); - } catch (DatabaseException e) { - e.printStackTrace(); - } - } else { - composite.run(graph); - } - } - - composite.run(graph); - - } - -// void dumpHeap() { -// -// try { -// ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(), -// "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap( -// "d:/heap" + flushCounter + ".txt", true); -// } catch (IOException e) { -// e.printStackTrace(); -// } -// -// } - - void fireReactionsToSynchronize(ChangeSet cs) { - - // Do not fire empty events - if (cs.isEmpty()) - return; - - ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2()); -// g.state.barrier.inc(); - - try { - - if (!cs.isEmpty()) { - ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs); - for (ChangeListener l : changeListeners2) { - try { - l.graphChanged(e2); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - } - - } finally { - -// g.state.barrier.dec(); -// g.waitAsync(null); - - } - - } - - void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) { - try { - - // Do not fire empty events - if (cs2.isEmpty()) - return; - -// graph.restart(); -// graph.state.barrier.inc(); - - try { - - if (!cs2.isEmpty()) { - - ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2); - for (ChangeListener l : changeListeners2) { - try { - l.graphChanged(e2); -// System.out.println("changelistener " + l); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - - } - - } finally { - -// graph.state.barrier.dec(); -// graph.waitAsync(null); - - } - } catch (Throwable t) { - t.printStackTrace(); - - } - } - void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) { - - try { - - // Do not fire empty events - if (cs2.isEmpty()) - return; - - // Do not fire on virtual requests - if(graph.getProvider() != null) - return; - - WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null); - - try { - - ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2); - for (ChangeListener l : metadataListeners) { - try { - l.graphChanged(e2); - } catch (Throwable ex) { - ex.printStackTrace(); - } - } - - } finally { - - } - - } catch (Throwable t) { - t.printStackTrace(); - } - - } - - - /* - * (non-Javadoc) - * - * @see org.simantics.db.ServiceLocator#getService(java.lang.Class) - */ - @Override - public T getService(Class api) { - T t = peekService(api); - if (t == null) { - if (state.isClosed()) - throw new ServiceNotFoundException(this, api, "Session has been shut down"); - throw new ServiceNotFoundException(this, api); - } - return t; - } - - /** - * @return - */ - protected abstract ServerInformation getCachedServerInformation(); - - private Class serviceKey1 = null; - private Class serviceKey2 = null; - private Object service1 = null; - private Object service2 = null; - - /* - * (non-Javadoc) - * - * @see - * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class) - */ - @SuppressWarnings("unchecked") - @Override - public synchronized T peekService(Class api) { - - if(serviceKey1 == api) { - return (T)service1; - } else if (serviceKey2 == api) { - // Promote this key - Object result = service2; - service2 = service1; - serviceKey2 = serviceKey1; - service1 = result; - serviceKey1 = api; - return (T)result; - } - - if (Layer0.class == api) - return (T) L0; - if (ServerInformation.class == api) - return (T) getCachedServerInformation(); - else if (WriteGraphImpl.class == api) - return (T) writeState.getGraph(); - else if (ClusterBuilder.class == api) - return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport); - else if (ClusterBuilderFactory.class == api) - return (T)new ClusterBuilderFactoryImpl(this); - - service2 = service1; - serviceKey2 = serviceKey1; - - service1 = serviceLocator.peekService(api); - serviceKey1 = api; - - return (T)service1; - - } - - /* - * (non-Javadoc) - * - * @see - * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class) - */ - @Override - public boolean hasService(Class api) { - return serviceLocator.hasService(api); - } - - /** - * @param api the api that must be implemented by the specified service - * @param service the service implementation - */ - @Override - public void registerService(Class api, T service) { - if(Layer0.class == api) { - L0 = (Layer0)service; - return; - } - serviceLocator.registerService(api, service); - if (TransactionPolicySupport.class == api) { - transactionPolicy = (TransactionPolicySupport)service; - state.resetTransactionPolicy(); - } - if (api == serviceKey1) - service1 = service; - else if (api == serviceKey2) - service2 = service; - } - - // ---------------- - // SessionMonitor - // ---------------- - - void fireSessionVariableChange(String variable) { - for (MonitorHandler h : monitorHandlers) { - MonitorContext ctx = monitorContexts.getLeft(h); - assert ctx != null; - // SafeRunner functionality repeated here to avoid dependency. - try { - h.valuesChanged(ctx); - } catch (Exception e) { - Logger.defaultLogError("monitor handler notification produced the following exception", e); - } catch (LinkageError e) { - Logger.defaultLogError("monitor handler notification produced a linkage error", e); - } - } - } - - - class ResourceSerializerImpl implements ResourceSerializer { - - public long createRandomAccessId(int id) throws DatabaseException { - if(id < 0) - return id; - int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id); - long cluster = getCluster(id); - if (0 == cluster) - return 0; // Better to return 0 then invalid id. - long result = ClusterTraitsBase.createResourceId(cluster, index); - return result; - } - - @Override - public long getRandomAccessId(Resource resource) throws DatabaseException { - - ResourceImpl resourceImpl = (ResourceImpl) resource; - return createRandomAccessId(resourceImpl.id); - - } - - @Override - public int getTransientId(Resource resource) throws DatabaseException { - - ResourceImpl resourceImpl = (ResourceImpl) resource; - return resourceImpl.id; - - } - - @Override - public String createRandomAccessId(Resource resource) - throws InvalidResourceReferenceException { - - if(resource == null) throw new IllegalArgumentException(); - - ResourceImpl resourceImpl = (ResourceImpl) resource; - - if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0"; - - int r; - try { - r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id); - } catch (DatabaseException e1) { - throw new InvalidResourceReferenceException(e1); - } - try { - // Serialize as '_' - return "" + r + "_" + getCluster(resourceImpl); - } catch (Throwable e) { - e.printStackTrace(); - throw new InvalidResourceReferenceException(e); - } finally { - } - } - - public int getTransientId(long serialized) throws DatabaseException { - if (serialized <= 0) - return (int)serialized; - int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized); - long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized); - ClusterI cluster = clusterTranslator.getClusterByClusterId(c); - if (cluster == null) - throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized); - int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index); - return key; - } - - @Override - public Resource getResource(long randomAccessId) throws DatabaseException { - return getResourceByKey(getTransientId(randomAccessId)); - } - - @Override - public Resource getResource(int transientId) throws DatabaseException { - return getResourceByKey(transientId); - } - - @Override - public Resource getResource(String randomAccessId) - throws InvalidResourceReferenceException { - try { - int i = randomAccessId.indexOf('_'); - if (i == -1) - throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '" - + randomAccessId + "'"); - int r = Integer.parseInt(randomAccessId.substring(0, i)); - if(r < 0) return getResourceByKey(r); - long c = Long.parseLong(randomAccessId.substring(i + 1)); - ClusterI cluster = clusterTranslator.getClusterByClusterId(c); - int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r); - if (cluster.hasResource(key, clusterTranslator)) - return getResourceByKey(key); - } catch (InvalidResourceReferenceException e) { - throw e; - } catch (NumberFormatException e) { - throw new InvalidResourceReferenceException(e); - } catch (Throwable e) { - e.printStackTrace(); - throw new InvalidResourceReferenceException(e); - } finally { - } - throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId); - } - - @Override - public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException { - try { - return true; - } catch (Throwable e) { - e.printStackTrace(); - throw new InvalidResourceReferenceException(e); - } finally { - } - } - } - - // Copied from old SessionImpl - - void check() { - if (state.isClosed()) - throw new Error("Session closed."); - } - - - - public ResourceImpl getNewResource(long clusterId) { - ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId); - int newId; - try { - newId = cluster.createResource(clusterTranslator); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - return null; - } - return new ResourceImpl(resourceSupport, newId); - } - - public ResourceImpl getNewResource(Resource clusterSet) - throws DatabaseException { - long resourceId = clusterSet.getResourceId(); - Long clusterId = clusterSetsSupport.get(resourceId); - if (null == clusterId) - throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet); - if (Constants.NewClusterId == clusterId) { - clusterId = getService(ClusteringSupport.class).createCluster(); - if ((serviceMode & SERVICE_MODE_CREATE) > 0) - createdClusters.add(clusterId); - clusterSetsSupport.put(resourceId, clusterId); - return getNewResource(clusterId); - } else { - ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId); - ResourceImpl result; - if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) { - clusterId = getService(ClusteringSupport.class).createCluster(); - if ((serviceMode & SERVICE_MODE_CREATE) > 0) - createdClusters.add(clusterId); - clusterSetsSupport.put(resourceId, clusterId); - return getNewResource(clusterId); - } else { - result = getNewResource(clusterId); - int resultKey = querySupport.getId(result); - long resultCluster = querySupport.getClusterId(resultKey); - if (clusterId != resultCluster) - clusterSetsSupport.put(resourceId, resultCluster); - return result; - } - } - } - - public void getNewClusterSet(Resource clusterSet) - throws DatabaseException { - if (DEBUG) - System.out.println("new cluster set=" + clusterSet); - long resourceId = clusterSet.getResourceId(); - if (clusterSetsSupport.containsKey(resourceId)) - throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet); - clusterSetsSupport.put(resourceId, Constants.NewClusterId); - } - public boolean containsClusterSet(Resource clusterSet) - throws ServiceException { - long resourceId = clusterSet.getResourceId(); - return clusterSetsSupport.containsKey(resourceId); - } - public Resource setDefaultClusterSet4NewResource(Resource clusterSet) { - Resource r = defaultClusterSet; - defaultClusterSet = clusterSet; - return r; - } - void printDiagnostics() { - - if (DIAGNOSTICS) { - - final DataContainer totalMem = new DataContainer(0L); - final DataContainer residentClusters = new DataContainer(0L); - for(ClusterI cluster : clusterTable.getClusters()) { - try { - if (cluster.isLoaded() && !cluster.isEmpty()) { - residentClusters.set(residentClusters.get() + 1); - totalMem.set(totalMem.get() + cluster.getUsedSpace()); - } - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - } - - System.out.println("--------------------------------"); - System.out.println("Cluster information:"); - System.out.println("-amount of resident clusters=" + residentClusters.get()); - System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB"); - - for(ClusterI cluster : clusterTable.getClusters()) { - System.out.print("Cluster " + cluster.getClusterId() + " ["); - try { - if (!cluster.isLoaded()) - System.out.println("not loaded]"); - else if (cluster.isEmpty()) - System.out.println("is empty]"); - else { - System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator)); - System.out.print(",references=" + cluster.getReferenceCount()); - System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]"); - } - } catch (DatabaseException e) { - Logger.defaultLogError(e); - System.out.println("is corrupted]"); - } - } - - queryProvider2.printDiagnostics(); - System.out.println("--------------------------------"); - } - - } - - public void fireStartReadTransaction() { - - if (DIAGNOSTICS) - System.out.println("StartReadTransaction"); - - for (SessionEventListener listener : eventListeners) { - try { - listener.readTransactionStarted(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - - } - - public void fireFinishReadTransaction() { - - if (DIAGNOSTICS) - System.out.println("FinishReadTransaction"); - - for (SessionEventListener listener : eventListeners) { - try { - listener.readTransactionFinished(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - - } - - public void fireStartWriteTransaction() { - - if (DIAGNOSTICS) - System.out.println("StartWriteTransaction"); - - for (SessionEventListener listener : eventListeners) { - try { - listener.writeTransactionStarted(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - - } - - public void fireFinishWriteTransaction() { - - if (DIAGNOSTICS) - System.out.println("FinishWriteTransaction"); - - for (SessionEventListener listener : eventListeners) { - try { - listener.writeTransactionFinished(); - } catch (Throwable t) { - t.printStackTrace(); - } - } - - Indexing.resetDependenciesIndexingDisabled(); - - } - - Resource deserialize(final long resourceId, final long clusterId) throws IOException { - throw new Error("Not supported at the moment."); - } - - State getState() { - return state; - } - - ClusterTable getClusterTable() { - return clusterTable; - } - - public GraphSession getGraphSession() { - return graphSession; - } - - public QueryProcessor getQueryProvider2() { - return queryProvider2; - } - - ClientChangesImpl getClientChanges() { - return clientChanges; - } - - boolean getWriteOnly() { - return writeOnly; - } - - static int counter = 0; - - public void onClusterLoaded(long clusterId) { - - clusterTable.updateSize(); - - } - - - - - /* - * Implementation of the interface RequestProcessor - */ - - @Override - public T syncRequest(final Read request) throws DatabaseException { - - assertNotSession(); - assertAlive(); - - assert(request != null); - - final DataContainer result = new DataContainer(); - final DataContainer exception = new DataContainer(); - - syncRequest(request, new AsyncProcedure() { - - @Override - public void execute(AsyncReadGraph graph, T t) { - result.set(t); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - exception.set(t); - } - - }); - - Throwable t = exception.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); - } - - return result.get(); - - } - -// @Override -// public T syncRequest(Read request, AsyncListener procedure) throws DatabaseException { -// assertNotSession(); -// return syncRequest(request, (AsyncProcedure)procedure); -// } - - @Override - public T syncRequest(Read request, SyncListener procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new SyncToAsyncListener(procedure)); - } - - @Override - public T syncRequest(Read request, final Listener procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new NoneToAsyncListener(procedure)); - } - - @Override - public T syncRequest(final Read request, final SyncProcedure procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new SyncToAsyncProcedure(procedure)); - } - - @Override - public T syncRequest(Read request, Procedure procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new NoneToAsyncProcedure(procedure)); - } - - @Override - public T syncRequest(final AsyncRead request) throws DatabaseException { - - assertNotSession(); - - assert(request != null); - - final DataContainer result = new DataContainer(); - final DataContainer exception = new DataContainer(); - - syncRequest(request, new AsyncProcedure() { - - @Override - public void execute(AsyncReadGraph graph, T t) { - result.set(t); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - exception.set(t); - } - - }); - - Throwable t = exception.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); - } - - return result.get(); - - } - - @Override - public T syncRequest(AsyncRead request, AsyncListener procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, (AsyncProcedure)procedure); - } - - @Override - public T syncRequest(AsyncRead request, SyncListener procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new SyncToAsyncListener(procedure)); - } - - @Override - public T syncRequest(AsyncRead request, Listener procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new NoneToAsyncListener(procedure)); - } - - @Override - public T syncRequest(AsyncRead request, final SyncProcedure procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new SyncToAsyncProcedure(procedure)); - } - - @Override - final public T syncRequest(final AsyncRead request, final Procedure procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new NoneToAsyncProcedure(procedure)); - } - - @Override - public Collection syncRequest(MultiRead request) throws DatabaseException { - - assertNotSession(); - - assert(request != null); - - final ArrayList result = new ArrayList(); - final DataContainer exception = new DataContainer(); - - syncRequest(request, new AsyncMultiProcedure() { - - @Override - public void execute(AsyncReadGraph graph, T t) { - synchronized(result) { - result.add(t); - } - } - - @Override - public void finished(AsyncReadGraph graph) { - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - exception.set(t); - } - - - }); - - Throwable t = exception.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); - } - - return result; - - } - - @Override - public Collection syncRequest(MultiRead request, AsyncMultiListener procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, (AsyncMultiProcedure)procedure); - } - - @Override - public Collection syncRequest(MultiRead request, SyncMultiListener procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new SyncToAsyncMultiListener(procedure)); - } - - @Override - public Collection syncRequest(MultiRead request, MultiListener procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new NoneToAsyncMultiListener(procedure)); - } - - @Override - public Collection syncRequest(MultiRead request, SyncMultiProcedure procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new SyncToAsyncMultiProcedure(procedure)); - } - - @Override - public Collection syncRequest(MultiRead request, MultiProcedure procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new NoneToAsyncMultiProcedure(procedure)); - } - - @Override - final public Collection syncRequest(final AsyncMultiRead request) throws DatabaseException { - - assertNotSession(); - - assert(request != null); - - final ArrayList result = new ArrayList(); - final DataContainer exception = new DataContainer(); - - syncRequest(request, new AsyncMultiProcedure() { - - @Override - public void execute(AsyncReadGraph graph, T t) { - synchronized(result) { - result.add(t); - } - } - - @Override - public void finished(AsyncReadGraph graph) { - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - exception.set(t); - } - - - }); - - Throwable t = exception.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); - } - - return result; - - } - - @Override - public Collection syncRequest(AsyncMultiRead request, AsyncMultiListener procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, (AsyncMultiProcedure)procedure); - } - - @Override - public Collection syncRequest(AsyncMultiRead request, SyncMultiListener procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new SyncToAsyncMultiListener(procedure)); - } - - @Override - public Collection syncRequest(AsyncMultiRead request, MultiListener procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new NoneToAsyncMultiListener(procedure)); - } - - @Override - public Collection syncRequest(AsyncMultiRead request, final SyncMultiProcedure procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new SyncToAsyncMultiProcedure(procedure)); - } - - @Override - final public Collection syncRequest(final AsyncMultiRead request, final MultiProcedure procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new NoneToAsyncMultiProcedure(procedure)); - } - - - @Override - public void asyncRequest(Read request) { - - asyncRequest(request, new ProcedureAdapter() { - @Override - public void exception(Throwable t) { - t.printStackTrace(); - } - }); - - } - - @Override - public void asyncRequest(Read request, AsyncListener procedure) { - asyncRequest(request, (AsyncProcedure)procedure); - } - - @Override - public void asyncRequest(Read request, final SyncListener procedure) { - asyncRequest(request, new SyncToAsyncListener(procedure)); - } - - @Override - public void asyncRequest(Read request, final Listener procedure) { - asyncRequest(request, new NoneToAsyncListener(procedure)); - } - - @Override - public void asyncRequest(Read request, SyncProcedure procedure) { - asyncRequest(request, new SyncToAsyncProcedure(procedure)); - } - - @Override - public void asyncRequest(Read request, final Procedure procedure) { - asyncRequest(request, new NoneToAsyncProcedure(procedure)); - } - - @Override - final public void asyncRequest(final AsyncRead request) { - - assert(request != null); - - asyncRequest(request, new ProcedureAdapter() { - @Override - public void exception(Throwable t) { - t.printStackTrace(); - } - }); - - } - - @Override - public void asyncRequest(AsyncRead request, AsyncListener procedure) { - scheduleRequest(request, procedure, procedure, null); - } - - @Override - public void asyncRequest(AsyncRead request, final SyncListener procedure) { - asyncRequest(request, new SyncToAsyncListener(procedure)); - } - - @Override - public void asyncRequest(AsyncRead request, final Listener procedure) { - asyncRequest(request, new NoneToAsyncListener(procedure)); - } - - @Override - public void asyncRequest(AsyncRead request, SyncProcedure procedure) { - asyncRequest(request, new SyncToAsyncProcedure(procedure)); - } - - @Override - final public void asyncRequest(final AsyncRead request, final Procedure procedure) { - asyncRequest(request, new NoneToAsyncProcedure(procedure)); - } - - @Override - public void asyncRequest(MultiRead request) { - - assert(request != null); - - asyncRequest(request, new AsyncMultiProcedureAdapter() { - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - t.printStackTrace(); - } - }); - - } - - @Override - public void asyncRequest(MultiRead request, AsyncMultiListener procedure) { - asyncRequest(request, (AsyncMultiProcedure)procedure); - } - - @Override - public void asyncRequest(MultiRead request, SyncMultiListener procedure) { - asyncRequest(request, new SyncToAsyncMultiListener(procedure)); - } - - @Override - public void asyncRequest(MultiRead request, MultiListener procedure) { - asyncRequest(request, new NoneToAsyncMultiListener(procedure)); - } - - @Override - public void asyncRequest(MultiRead request, SyncMultiProcedure procedure) { - asyncRequest(request, new SyncToAsyncMultiProcedure(procedure)); - } - - @Override - public void asyncRequest(MultiRead request, MultiProcedure procedure) { - asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); - } - - @Override - final public void asyncRequest(AsyncMultiRead request) { - - assert(request != null); - - asyncRequest(request, new AsyncMultiProcedureAdapter() { - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - t.printStackTrace(); - } - }); - - } - - @Override - public void asyncRequest(AsyncMultiRead request, AsyncMultiListener procedure) { - asyncRequest(request, (AsyncMultiProcedure)procedure); - } - - @Override - public void asyncRequest(AsyncMultiRead request, SyncMultiListener procedure) { - asyncRequest(request, new SyncToAsyncMultiListener(procedure)); - } - - @Override - public void asyncRequest(AsyncMultiRead request, MultiListener procedure) { - asyncRequest(request, new NoneToAsyncMultiListener(procedure)); - } - - @Override - public void asyncRequest(AsyncMultiRead request, SyncMultiProcedure procedure) { - asyncRequest(request, new SyncToAsyncMultiProcedure(procedure)); - } - - @Override - final public void asyncRequest(AsyncMultiRead request, final MultiProcedure procedure) { - asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); - } - - @Override - public Collection syncRequest(MultiRead arg0, AsyncMultiProcedure arg1) throws DatabaseException { - assertNotSession(); - throw new Error("Not implemented!"); - } - - @Override - public void asyncRequest(MultiRead arg0, AsyncMultiProcedure arg1) { - throw new Error("Not implemented!"); - } - - @Override - final public void asyncRequest(final ExternalRead request) { - - assert(request != null); - - asyncRequest(request, new ProcedureAdapter() { - @Override - public void exception(Throwable t) { - t.printStackTrace(); - } - }); - - } - - @Override - public void asyncRequest(ExternalRead request, final Listener procedure) { - asyncRequest(request, (Procedure)procedure); - } - - - - void check(Throwable t) throws DatabaseException { - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception", t); - } - } - - void check(DataContainer container) throws DatabaseException { - Throwable t = container.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception", t); - } - } - - - - - - - boolean sameProvider(Write request) { - if(writeState.getGraph().provider != null) { - return writeState.getGraph().provider.equals(request.getProvider()); - } else { - return request.getProvider() == null; - } - } - - boolean plainWrite(WriteGraphImpl graph) { - if(graph == null) return false; - if(graph.writeSupport.writeOnly()) return false; - return true; - } - - - public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group"); - private void assertNotSession() throws DatabaseException { - Thread current = Thread.currentThread(); - if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction."); - } - - void assertAlive() { - if (!state.isAlive()) - throw new RuntimeDatabaseException("Session has been shut down."); - } - - public InputStream getValueStream(ReadGraphImpl graph, Resource resource) { - return querySupport.getValueStream(graph, querySupport.getId(resource)); - } - - public byte[] getValue(ReadGraphImpl graph, Resource resource) { - return querySupport.getValue(graph, querySupport.getId(resource)); - } - - void acquire(Semaphore semaphore, T request) throws DatabaseException { - acquire(semaphore, request, null); - } - - private void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException { - assertAlive(); - try { - long tryCount = 0; - // Loop until the semaphore is acquired reporting requests that take a long time. - while (true) { - - if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) { - // Acquired OK. - return; - } else { - if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) { - ++tryCount; - long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT - .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD) - * tryCount; - System.err.println("DB client request '" + request + "' is taking long to execute, so far " - + (waitTime) + " ms. (procedure=" + procedure + ")"); - } - } - - assertAlive(); - - } - } catch (InterruptedException e) { - e.printStackTrace(); - // FIXME: Should perhaps do something else in this case ?? - } - } - - - -// @Override -// public boolean holdOnToTransactionAfterCancel() { -// return transactionPolicy.holdOnToTransactionAfterCancel(); -// } -// -// @Override -// public boolean holdOnToTransactionAfterCommit() { -// return transactionPolicy.holdOnToTransactionAfterCommit(); -// } -// -// @Override -// public boolean holdOnToTransactionAfterRead() { -// return transactionPolicy.holdOnToTransactionAfterRead(); -// } -// -// @Override -// public void onRelinquish() { -// transactionPolicy.onRelinquish(); -// } -// -// @Override -// public void onRelinquishDone() { -// transactionPolicy.onRelinquishDone(); -// } -// -// @Override -// public void onRelinquishError() { -// transactionPolicy.onRelinquishError(); -// } - - - @Override - public Session getSession() { - return this; - } - - - protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph); - protected abstract ResourceImpl getNewResource() throws DatabaseException; - - public void ceased(int thread) { - - requestManager.ceased(thread); - - } - - public int getAmountOfQueryThreads() { - // This must be a power of two - return 1; -// return Integer.highestOneBit(Runtime.getRuntime().availableProcessors()); - } - - public Resource getResourceByKey(int key) throws ResourceNotFoundException { - - return new ResourceImpl(resourceSupport, key); - - } - - public void acquireWriteOnly() { - writeOnly = true; - } - - public void releaseWriteOnly(ReadGraphImpl graph) { - writeOnly = false; - queryProvider2.releaseWrite(graph); - } - - public void handleUpdatesAndMetadata(WriteGraphImpl writer) { - - long start = System.nanoTime(); - - while(dirtyPrimitives) { - dirtyPrimitives = false; - getQueryProvider2().performDirtyUpdates(writer); - getQueryProvider2().performScheduledUpdates(writer); - } - - fireMetadataListeners(writer, clientChanges); - - if(DebugPolicy.PERFORMANCE_DATA) { - long end = System.nanoTime(); - System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start)); - } - - } - - void removeTemporaryData() { - File platform = Platform.getLocation().toFile(); - File tempFiles = new File(platform, "tempFiles"); - File temp = new File(tempFiles, "db"); - if(!temp.exists()) - return; - try { - Files.walkFileTree(temp.toPath(), new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { - try { - Files.delete(file); - } catch (IOException e) { - Logger.defaultLogError(e); - } - return FileVisitResult.CONTINUE; - } - }); - } catch (IOException e) { - Logger.defaultLogError(e); - } - } - - public void handleCreatedClusters() { - if ((serviceMode & SERVICE_MODE_CREATE) > 0) { - createdClusters.forEach(new TLongProcedure() { - - @Override - public boolean execute(long value) { - ClusterImpl cluster = clusterTable.getClusterByClusterId(value); - cluster.setImmutable(true, clusterTranslator); - return true; - } - }); - } - createdClusters.clear(); - } - - @Override - public Object getModificationCounter() { - return queryProvider2.modificationCounter; - } - - @Override - public void markUndoPoint() { - state.setCombine(false); - } - -} +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package fi.vtt.simantics.procore.internal; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.TreeMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.core.runtime.Platform; +import org.simantics.databoard.Bindings; +import org.simantics.db.AsyncReadGraph; +import org.simantics.db.ChangeSet; +import org.simantics.db.DevelopmentKeys; +import org.simantics.db.ExternalValueSupport; +import org.simantics.db.Metadata; +import org.simantics.db.MonitorContext; +import org.simantics.db.MonitorHandler; +import org.simantics.db.Resource; +import org.simantics.db.ResourceSerializer; +import org.simantics.db.Session; +import org.simantics.db.SessionManager; +import org.simantics.db.SessionVariables; +import org.simantics.db.VirtualGraph; +import org.simantics.db.WriteGraph; +import org.simantics.db.authentication.UserAuthenticationAgent; +import org.simantics.db.authentication.UserAuthenticator; +import org.simantics.db.common.Indexing; +import org.simantics.db.common.TransactionPolicyRelease; +import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; +import org.simantics.db.common.procedure.adapter.ProcedureAdapter; +import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener; +import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener; +import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure; +import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure; +import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener; +import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener; +import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure; +import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure; +import org.simantics.db.common.service.ServiceActivityMonitorImpl; +import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions; +import org.simantics.db.common.utils.Logger; +import org.simantics.db.event.ChangeEvent; +import org.simantics.db.event.ChangeListener; +import org.simantics.db.event.SessionEventListener; +import org.simantics.db.exception.CancelTransactionException; +import org.simantics.db.exception.ClusterSetExistException; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.exception.ImmutableException; +import org.simantics.db.exception.InvalidResourceReferenceException; +import org.simantics.db.exception.ResourceNotFoundException; +import org.simantics.db.exception.RuntimeDatabaseException; +import org.simantics.db.exception.ServiceException; +import org.simantics.db.exception.ServiceNotFoundException; +import org.simantics.db.impl.ClusterBase; +import org.simantics.db.impl.ClusterI; +import org.simantics.db.impl.ClusterTraitsBase; +import org.simantics.db.impl.ClusterTranslator; +import org.simantics.db.impl.ResourceImpl; +import org.simantics.db.impl.TransientGraph; +import org.simantics.db.impl.VirtualGraphImpl; +import org.simantics.db.impl.graph.DelayedWriteGraph; +import org.simantics.db.impl.graph.ReadGraphImpl; +import org.simantics.db.impl.graph.WriteGraphImpl; +import org.simantics.db.impl.graph.WriteSupport; +import org.simantics.db.impl.internal.RandomAccessValueSupport; +import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4; +import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4; +import org.simantics.db.impl.query.QueryProcessor; +import org.simantics.db.impl.query.QueryProcessor.SessionRead; +import org.simantics.db.impl.query.QueryProcessor.SessionTask; +import org.simantics.db.impl.service.QueryDebug; +import org.simantics.db.impl.support.VirtualGraphServerSupport; +import org.simantics.db.impl.support.WriteRequestScheduleSupport; +import org.simantics.db.procedure.AsyncListener; +import org.simantics.db.procedure.AsyncMultiListener; +import org.simantics.db.procedure.AsyncMultiProcedure; +import org.simantics.db.procedure.AsyncProcedure; +import org.simantics.db.procedure.Listener; +import org.simantics.db.procedure.ListenerBase; +import org.simantics.db.procedure.MultiListener; +import org.simantics.db.procedure.MultiProcedure; +import org.simantics.db.procedure.Procedure; +import org.simantics.db.procedure.SyncListener; +import org.simantics.db.procedure.SyncMultiListener; +import org.simantics.db.procedure.SyncMultiProcedure; +import org.simantics.db.procedure.SyncProcedure; +import org.simantics.db.procore.cluster.ClusterImpl; +import org.simantics.db.procore.cluster.ClusterTraits; +import org.simantics.db.procore.protocol.Constants; +import org.simantics.db.procore.protocol.DebugPolicy; +import org.simantics.db.request.AsyncMultiRead; +import org.simantics.db.request.AsyncRead; +import org.simantics.db.request.DelayedWrite; +import org.simantics.db.request.DelayedWriteResult; +import org.simantics.db.request.ExternalRead; +import org.simantics.db.request.MultiRead; +import org.simantics.db.request.Read; +import org.simantics.db.request.ReadInterface; +import org.simantics.db.request.Write; +import org.simantics.db.request.WriteInterface; +import org.simantics.db.request.WriteOnly; +import org.simantics.db.request.WriteOnlyResult; +import org.simantics.db.request.WriteResult; +import org.simantics.db.request.WriteTraits; +import org.simantics.db.service.ByteReader; +import org.simantics.db.service.ClusterBuilder; +import org.simantics.db.service.ClusterBuilderFactory; +import org.simantics.db.service.ClusterControl; +import org.simantics.db.service.ClusterSetsSupport; +import org.simantics.db.service.ClusterUID; +import org.simantics.db.service.ClusteringSupport; +import org.simantics.db.service.CollectionSupport; +import org.simantics.db.service.DebugSupport; +import org.simantics.db.service.DirectQuerySupport; +import org.simantics.db.service.GraphChangeListenerSupport; +import org.simantics.db.service.InitSupport; +import org.simantics.db.service.LifecycleSupport; +import org.simantics.db.service.ManagementSupport; +import org.simantics.db.service.QueryControl; +import org.simantics.db.service.SerialisationSupport; +import org.simantics.db.service.ServerInformation; +import org.simantics.db.service.ServiceActivityMonitor; +import org.simantics.db.service.SessionEventSupport; +import org.simantics.db.service.SessionMonitorSupport; +import org.simantics.db.service.SessionUserSupport; +import org.simantics.db.service.StatementSupport; +import org.simantics.db.service.TransactionPolicySupport; +import org.simantics.db.service.TransactionSupport; +import org.simantics.db.service.TransferableGraphSupport; +import org.simantics.db.service.UndoRedoSupport; +import org.simantics.db.service.VirtualGraphSupport; +import org.simantics.db.service.XSupport; +import org.simantics.layer0.Layer0; +import org.simantics.utils.DataContainer; +import org.simantics.utils.Development; +import org.simantics.utils.datastructures.Callback; +import org.simantics.utils.threads.logger.ITask; +import org.simantics.utils.threads.logger.ThreadLogger; + +import gnu.trove.procedure.TLongProcedure; +import gnu.trove.set.hash.TLongHashSet; + + +public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport { + + protected static final boolean DEBUG = false; + + private static final boolean DIAGNOSTICS = false; + + private TransactionPolicySupport transactionPolicy; + + final private ClusterControl clusterControl; + + final protected BuiltinSupportImpl builtinSupport; + final protected VirtualGraphServerSupportImpl virtualGraphServerSupport; + final protected ClusterSetsSupport clusterSetsSupport; + final protected LifecycleSupportImpl lifecycleSupport; + + protected QuerySupportImpl querySupport; + protected ResourceSupportImpl resourceSupport; + protected WriteSupport writeSupport; + public ClusterTranslator clusterTranslator; + + boolean dirtyPrimitives = false; + + public static final int SERVICE_MODE_CREATE = 2; + public static final int SERVICE_MODE_ALLOW = 1; + public int serviceMode = 0; + public boolean createdImmutableClusters = false; + public TLongHashSet createdClusters = new TLongHashSet(); + + private Layer0 L0; + + /** + * The service locator maintained by the workbench. These services are + * initialized during workbench during the init method. + */ + final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl(); + + final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl(); + + final CopyOnWriteArrayList changeListeners2 = new CopyOnWriteArrayList(); + + final CopyOnWriteArrayList metadataListeners = new CopyOnWriteArrayList(); + + final CopyOnWriteArrayList eventListeners = new CopyOnWriteArrayList(); + + final HashSet sessionThreads = new HashSet(); + + final BijectionMap monitorContexts = new BijectionMap(); + + final protected State state = new State(); + + protected GraphSession graphSession = null; + + protected SessionManager sessionManagerImpl = null; + + protected UserAuthenticationAgent authAgent = null; + + protected UserAuthenticator authenticator = null; + + protected Resource user = null; + + protected ClusterStream clusterStream = null; + + protected SessionRequestManager requestManager = null; + + public ClusterTable clusterTable = null; + + public QueryProcessor queryProvider2 = null; + + ClientChangesImpl clientChanges = null; + + MonitorHandler[] monitorHandlers = new MonitorHandler[0]; + +// protected long newClusterId = Constants.NullClusterId; + + protected int flushCounter = 0; + + protected boolean writeOnly = false; + + WriteState writeState = null; + WriteStateBase delayedWriteState = null; + protected Resource defaultClusterSet = null; // If not null then used for newResource(). + + public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) { + +// if (authAgent == null) +// throw new IllegalArgumentException("null authentication agent"); + + File t = StaticSessionProperties.virtualGraphStoragePath; + if (null == t) + t = new File("."); + this.clusterTable = new ClusterTable(this, t); + this.builtinSupport = new BuiltinSupportImpl(this); + this.sessionManagerImpl = sessionManagerImpl; + this.user = null; +// this.authAgent = authAgent; + + serviceLocator.registerService(Session.class, this); + serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this)); + serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this)); + serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this)); + serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this)); + serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this)); + serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this)); + serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this)); + serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this)); + serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this)); + serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this)); + serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this)); + serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this)); + serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this)); + serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this)); + serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this)); + serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this)); + serviceLocator.registerService(XSupport.class, new XSupportImpl(this)); + serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl()); + serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this)); + serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl()); + serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl()); + ServiceActivityUpdaterForWriteTransactions.register(this); + + this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t); + serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport); + serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport); + this.lifecycleSupport = new LifecycleSupportImpl(this); + serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport); + this.transactionPolicy = new TransactionPolicyRelease(); + serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy); + this.clusterControl = new ClusterControlImpl(this); + serviceLocator.registerService(ClusterControl.class, clusterControl); + this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs. + this.clusterSetsSupport.setReadDirectory(t.toPath()); + this.clusterSetsSupport.updateWriteDirectory(t.toPath()); + serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport); + } + + /* + * + * Session interface + */ + + + @Override + public Resource getRootLibrary() { + return queryProvider2.getRootLibraryResource(); + } + + void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException { + if (!graphSession.dbSession.refreshEnabled()) + return; + try { + getClusterTable().refresh(csid, this, clusterUID); + } catch (Throwable t) { + Logger.defaultLogError("Refesh failed.", t); + } + } + + static final class TaskHelper { + private final String name; + private Object result; + final Semaphore sema = new Semaphore(0); + private Throwable throwable = null; + final Callback callback = new Callback() { + @Override + public void run(DatabaseException e) { + synchronized (TaskHelper.this) { + throwable = e; + } + } + }; + final Procedure proc = new Procedure() { + @Override + public void execute(Object result) { + callback.run(null); + } + @Override + public void exception(Throwable t) { + if (t instanceof DatabaseException) + callback.run((DatabaseException)t); + else + callback.run(new DatabaseException("" + name + "operation failed.", t)); + } + }; + final WriteTraits writeTraits = new WriteTraits() {}; + TaskHelper(String name) { + this.name = name; + } + T getResult() { + return (T)result; + } + void setResult(Object result) { + this.result = result; + } +// Throwable throwableGet() { +// return throwable; +// } + synchronized void throwableSet(Throwable t) { + throwable = t; + } +// void throwableSet(String t) { +// throwable = new InternalException("" + name + " operation failed. " + t); +// } + synchronized void throwableCheck() + throws DatabaseException { + if (null != throwable) + if (throwable instanceof DatabaseException) + throw (DatabaseException)throwable; + else + throw new DatabaseException("Undo operation failed.", throwable); + } + void throw_(String message) + throws DatabaseException { + throw new DatabaseException("" + name + " operation failed. " + message); + } + } + + + + private ListenerBase getListenerBase(Object procedure) { + if (procedure instanceof ListenerBase) + return (ListenerBase) procedure; + else + return null; + } + + public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify) { + scheduleRequest(request, callback, notify, null); + } + + /* (non-Javadoc) + * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) + */ + @Override + public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify, Boolean combine) { + + assert (request != null); + + if(Development.DEVELOPMENT) { + try { + if(Development.getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN)) + System.err.println("schedule write '" + request + "'"); + } catch (Throwable t) { + Logger.defaultLogError(t); + } + } + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + requestManager.scheduleWrite(new SessionTask(request, thread, thread) { + + @Override + public void run(int thread) { + + if(Development.DEVELOPMENT) { + try { + if(Development.getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN)) + System.err.println("perform write '" + request + "'"); + } catch (Throwable t) { + Logger.defaultLogError(t); + } + } + + ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); + + fireSessionVariableChange(SessionVariables.QUEUED_WRITES); + + try { + + flushCounter = 0; + clientChanges = new ClientChangesImpl(SessionImplSocket.this); + + VirtualGraph vg = getProvider(request.getProvider()); + + WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); + writeState = new WriteState(writer, request, notify, new Procedure() { + + @Override + public void execute(Object result) { + if(callback != null) callback.run(null); + } + + @Override + public void exception(Throwable t) { + if(callback != null) callback.run((DatabaseException)t); + } + + }); + + assert (null != writer); +// writer.state.barrier.inc(); + + try { + request.perform(writer); + assert (null != writer); + } catch (Throwable t) { + if (!(t instanceof CancelTransactionException)) + Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t); + writeState.except(t); + } finally { +// writer.state.barrier.dec(); +// writer.waitAsync(request); + } + + + assert(!queryProvider2.dirty); + + } catch (Throwable e) { + + // Log it first, just to be safe that the error is always logged. + if (!(e instanceof CancelTransactionException)) + Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); + +// writeState.getGraph().state.barrier.dec(); +// writeState.getGraph().waitAsync(request); + + writeState.except(e); + +// try { +// // Callback is client code, we have to be prepared for it to throw unexpected exceptions. +// // All we can do here is to log those, can't really pass them anywhere. +// if (callback != null) { +// if(e instanceof DatabaseException) callback.run((DatabaseException)e); +// else callback.run(new DatabaseException(e)); +// } +// } catch (Throwable e2) { +// Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2); +// } +// boolean empty = clusterStream.reallyFlush(); +// int callerThread = -1; +// ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this); +// SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1); +// try { +// // Send all local changes to server so it can calculate correct reverse change set. +// // This will call clusterStream.accept(). +// state.cancelCommit(context, clusterStream); +// if (!empty) { +// if (!context.isOk()) // this is a blocking operation +// throw new InternalException("Cancel failed. This should never happen. Contact application support."); +// getQueryProvider2().performDirtyUpdates(writeState.getGraph()); +// } +// state.cancelCommit2(context, clusterStream); +// } catch (DatabaseException e3) { +// Logger.defaultLogError("Write request cancel caused an unexpected error.", e3); +// } finally { +// clusterStream.setOff(false); +// clientChanges = new ClientChangesImpl(SessionImplSocket.this); +// } + + } finally { + + fireSessionVariableChange(SessionVariables.QUEUED_WRITES); + + } + + task.finish(); + + if(Development.DEVELOPMENT) { + try { + if(Development.getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN)) + System.err.println("finish write '" + request + "'"); + } catch (Throwable t) { + Logger.defaultLogError(t); + } + } + + } + + }, combine); + + } + + public void scheduleRequest(final WriteResult request, final Procedure procedure, final Semaphore notify) { + scheduleRequest(request, procedure, notify, null); + } + + /* (non-Javadoc) + * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean) + */ + @Override + public void scheduleRequest(final WriteResult request, final Procedure procedure, final Semaphore notify, Boolean combine) { + + assert (request != null); + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + requestManager.scheduleWrite(new SessionTask(request, thread) { + + @Override + public void run(int thread) { + + ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); + + fireSessionVariableChange(SessionVariables.QUEUED_WRITES); + + flushCounter = 0; + clientChanges = new ClientChangesImpl(SessionImplSocket.this); + + VirtualGraph vg = getProvider(request.getProvider()); + WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); + + try { + WriteState writeStateT = new WriteState(writer, request, notify, procedure); + writeState = writeStateT; + + assert (null != writer); +// writer.state.barrier.inc(); + writeStateT.setResult(request.perform(writer)); + assert (null != writer); + +// writer.state.barrier.dec(); +// writer.waitAsync(null); + + } catch (Throwable e) { + +// writer.state.barrier.dec(); +// writer.waitAsync(null); + + writeState.except(e); + +// state.stopWriteTransaction(clusterStream); +// +// } catch (Throwable e) { +// // Log it first, just to be safe that the error is always logged. +// Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); +// +// try { +// // Callback is client code, we have to be prepared for it to throw unexpected exceptions. +// // All we can do here is to log those, can't really pass them anywhere. +// if (procedure != null) { +// if(e instanceof DatabaseException) procedure.exception((DatabaseException)e); +// else procedure.exception(new DatabaseException(e)); +// } +// } catch (Throwable e2) { +// Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2); +// } +// +// clientChanges = new ClientChangesImpl(SessionImplSocket.this); +// +// state.stopWriteTransaction(clusterStream); + + } finally { + fireSessionVariableChange(SessionVariables.QUEUED_WRITES); + } + +// if(notify != null) notify.release(); + + task.finish(); + + } + + }, combine); + + } + + public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify) { + scheduleRequest(request, callback, notify, null); + } + + /* (non-Javadoc) + * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) + */ + @Override + public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify, Boolean combine) { + + final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite"); + + assert (request != null); + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + requestManager.scheduleWrite(new SessionTask(request, thread) { + + @Override + public void run(int thread) { + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + Procedure stateProcedure = new Procedure() { + @Override + public void execute(Object result) { + if (callback != null) + callback.run(null); + } + @Override + public void exception(Throwable t) { + if (callback != null) { + if (t instanceof DatabaseException) callback.run((DatabaseException) t); + else callback.run(new DatabaseException(t)); + } else + Logger.defaultLogError("Unhandled exception", t); + } + }; + + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + delayedWriteState = new WriteStateBase(request, notify, stateProcedure); + DelayedWriteGraph dwg = null; +// newGraph.state.barrier.inc(); + + try { + dwg = new DelayedWriteGraph(newGraph); + request.perform(dwg); + } catch (Throwable e) { + delayedWriteState.except(e); + total.finish(); + dwg.close(); + return; + } finally { +// newGraph.state.barrier.dec(); +// newGraph.waitAsync(request); + fireSessionVariableChange(SessionVariables.QUEUED_READS); + } + + delayedWriteState = null; + + ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit"); + fireSessionVariableChange(SessionVariables.QUEUED_WRITES); + + flushCounter = 0; + clientChanges = new ClientChangesImpl(SessionImplSocket.this); + + acquireWriteOnly(); + + VirtualGraph vg = getProvider(request.getProvider()); + WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport(); + + WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); + + writeState = new WriteState(writer, request, notify, stateProcedure); + + assert (null != writer); +// writer.state.barrier.inc(); + + try { + // Cannot cancel + dwg.commit(writer, request); + + if(defaultClusterSet != null) { + XSupport xs = getService(XSupport.class); + defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet); + ClusteringSupport cs = getService(ClusteringSupport.class); + clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet)); + } + + // This makes clusters available from server + clusterStream.reallyFlush(); + + releaseWriteOnly(writer); + handleUpdatesAndMetadata(writer); + + } catch (ServiceException e) { +// writer.state.barrier.dec(); +// writer.waitAsync(null); + + // These shall be requested from server + clusterTable.removeWriteOnlyClusters(); + // This makes clusters available from server + clusterStream.reallyFlush(); + + releaseWriteOnly(writer); + writeState.except(e); + } finally { + // Debugging & Profiling + fireSessionVariableChange(SessionVariables.QUEUED_WRITES); + task2.finish(); + total.finish(); + } + } + + }, combine); + + } + + public void scheduleRequest(final DelayedWriteResult request, final Procedure procedure, final Semaphore notify) { + scheduleRequest(request, procedure, notify, null); + } + + /* (non-Javadoc) + * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean) + */ + @Override + public void scheduleRequest(final DelayedWriteResult request, final Procedure procedure, final Semaphore notify, Boolean combine) { + throw new Error("Not implemented"); + } + + protected ClusterImpl getNewResourceCluster() throws DatabaseException { + ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly); + if((serviceMode & SERVICE_MODE_CREATE) > 0) { + createdClusters.add(cluster.clusterId); + } + return cluster; + } + + class WriteOnlySupport implements WriteSupport { + + ClusterStream stream; + ClusterImpl currentCluster; + + public WriteOnlySupport() { + this.stream = clusterStream; + } + + @Override + public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException { + claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id ); + } + + @Override + public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException { + + ClusterImpl cluster = clusterTable.getClusterByResourceKey(s); + + if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0) + if(s != queryProvider2.getRootLibrary()) + throw new ImmutableException("Trying to modify immutable resource key=" + s); + + try { + maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator)); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + //throw e; + } + + clientChanges.invalidate(s); + + if (cluster.isWriteOnly()) + return; + queryProvider2.updateStatements(s, p); + + } + + @Override + public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException { + claimValue(provider, ((ResourceImpl)resource).id, value, value.length); + } + + @Override + public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) { + + ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid); + try { + maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator)); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + clientChanges.invalidate(rid); + + if (cluster.isWriteOnly()) + return; + queryProvider2.updateValue(rid); + + } + + @Override + public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException { + + if(amount < 65536) { + claimValue(provider,resource, reader.readBytes(null, amount)); + return; + } + + byte[] bytes = new byte[65536]; + + int rid = ((ResourceImpl)resource).id; + ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid); + try { + int left = amount; + while(left > 0) { + int block = Math.min(left, 65536); + reader.readBytes(bytes, block); + maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator)); + left -= block; + } + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + clientChanges.invalidate(rid); + + if (cluster.isWriteOnly()) + return; + queryProvider2.updateValue(rid); + + } + + private void maintainCluster(ClusterImpl before, ClusterI after_) { + if(after_ != null && after_ != before) { + ClusterImpl after = (ClusterImpl)after_; + if(currentCluster == before) { + currentCluster = after; + } + clusterTable.replaceCluster(after); + } + } + + public int createResourceKey(int foreignCounter) throws DatabaseException { + if(currentCluster == null) { + currentCluster = getNewResourceCluster(); + } + if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) { + ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster(); + newCluster.foreignLookup = new byte[foreignCounter]; + currentCluster = newCluster; + if (DEBUG) + System.err.println("foreignLookup " + currentCluster + " " + foreignCounter); + } + return currentCluster.createResource(clusterTranslator); + } + + @Override + public Resource createResource(VirtualGraph provider) throws DatabaseException { + if(currentCluster == null) { + if (null != defaultClusterSet) { + ResourceImpl result = getNewResource(defaultClusterSet); + currentCluster = clusterTable.getClusterByResourceKey(result.id); + return result; + } else { + currentCluster = getNewResourceCluster(); + } + } + if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) { + if (null != defaultClusterSet) { + ResourceImpl result = getNewResource(defaultClusterSet); + currentCluster = clusterTable.getClusterByResourceKey(result.id); + return result; + } else { + currentCluster = getNewResourceCluster(); + } + } + return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator)); + } + + @Override + public Resource createResource(VirtualGraph provider, long clusterId) + throws DatabaseException { + return getNewResource(clusterId); + } + + @Override + public Resource createResource(VirtualGraph provider, Resource clusterSet) + throws DatabaseException { + return getNewResource(clusterSet); + } + + @Override + public void createClusterSet(VirtualGraph provider, Resource clusterSet) + throws DatabaseException { + getNewClusterSet(clusterSet); + } + + @Override + public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet) + throws ServiceException { + return containsClusterSet(clusterSet); + } + + public void selectCluster(long cluster) { + currentCluster = clusterTable.getClusterByClusterId(cluster); + long setResourceId = clusterSetsSupport.getSet(cluster); + clusterSetsSupport.put(setResourceId, cluster); + } + + @Override + public Resource setDefaultClusterSet(Resource clusterSet) + throws ServiceException { + Resource result = setDefaultClusterSet4NewResource(clusterSet); + if(clusterSet != null) { + long id = clusterSetsSupport.get(clusterSet.getResourceId()); + currentCluster = clusterTable.getClusterByClusterId(id); + return result; + } else { + currentCluster = null; + return null; + } + } + + @Override + public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException { + + provider = getProvider(provider); + if (null == provider) { + int key = ((ResourceImpl)resource).id; + + + + ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key); +// if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0) +// if(key != queryProvider2.getRootLibrary()) +// throw new ImmutableException("Trying to modify immutable resource key=" + key); + +// try { +// cluster.removeValue(key, clusterTranslator); +// } catch (DatabaseException e) { +// Logger.defaultLogError(e); +// return; +// } + + clusterTable.writeOnlyInvalidate(cluster); + + try { + int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key); + clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION); + clusterTranslator.removeValue(cluster); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + queryProvider2.invalidateResource(key); + clientChanges.invalidate(key); + + } else { + ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id); + queryProvider2.updateValue(querySupport.getId(resource)); + clientChanges.claimValue(resource); + } + + + } + + @Override + public void flush(boolean intermediate) { + throw new UnsupportedOperationException(); + } + + @Override + public void flushCluster() { + clusterTable.flushCluster(graphSession); + if(defaultClusterSet != null) { + clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId); + } + currentCluster = null; + } + + @Override + public void flushCluster(Resource r) { + throw new UnsupportedOperationException("flushCluster resource " + r); + } + + @Override + public void gc() { + } + + @Override + public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, + Resource object) { + + int s = ((ResourceImpl)subject).id; + int p = ((ResourceImpl)predicate).id; + int o = ((ResourceImpl)object).id; + + provider = getProvider(provider); + if (null == provider) { + + ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s); + clusterTable.writeOnlyInvalidate(cluster); + + try { + + int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s); + int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p); + int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o); + + ClusterI pc = clusterTable.getClusterProxyByResourceKey(p); + ClusterI oc = clusterTable.getClusterProxyByResourceKey(o); + + clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION); + clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION); + clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION); + clusterTranslator.removeStatement(cluster); + + queryProvider2.invalidateResource(s); + clientChanges.invalidate(s); + + } catch (DatabaseException e) { + + Logger.defaultLogError(e); + + } + + return true; + + } else { + + ((VirtualGraphImpl)provider).deny(s, p, o); + queryProvider2.invalidateResource(s); + clientChanges.invalidate(s); + + return true; + + } + + } + + @Override + public void setValue(VirtualGraph provider, Resource resource, byte[] value) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean writeOnly() { + return true; + } + + @Override + public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException { + writeSupport.performWriteRequest(graph, request); + } + + @Override + public T performWriteRequest(WriteGraph graph, WriteResult request) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public void addMetadata(Metadata data) throws ServiceException { + writeSupport.addMetadata(data); + } + + @Override + public T getMetadata(Class clazz) throws ServiceException { + return writeSupport.getMetadata(clazz); + } + + @Override + public TreeMap getMetadata() { + return writeSupport.getMetadata(); + } + + @Override + public void commitDone(WriteTraits writeTraits, long csid) { + writeSupport.commitDone(writeTraits, csid); + } + + @Override + public void clearUndoList(WriteTraits writeTraits) { + writeSupport.clearUndoList(writeTraits); + } + @Override + public int clearMetadata() { + return writeSupport.clearMetadata(); + } + + @Override + public void startUndo() { + writeSupport.startUndo(); + } + } + + class VirtualWriteOnlySupport implements WriteSupport { + +// @Override +// public void addStatement(VirtualGraph provider, Resource subject, Resource predicate, +// Resource object) { +// throw new UnsupportedOperationException(); +// } + + @Override + public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) { + + TransientGraph impl = (TransientGraph)provider; + impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object)); + getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate)); + clientChanges.claim(subject, predicate, object); + + } + + @Override + public void claim(VirtualGraph provider, int subject, int predicate, int object) { + + TransientGraph impl = (TransientGraph)provider; + impl.claim(subject, predicate, object); + getQueryProvider2().updateStatements(subject, predicate); + clientChanges.claim(subject, predicate, object); + + } + + @Override + public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException { + claimValue(provider, ((ResourceImpl)resource).id, value, value.length); + } + + @Override + public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) { + ((VirtualGraphImpl)provider).claimValue(resource, value, length); + getQueryProvider2().updateValue(resource); + clientChanges.claimValue(resource); + } + + @Override + public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException { + byte[] value = reader.readBytes(null, amount); + claimValue(provider, resource, value); + } + + @Override + public Resource createResource(VirtualGraph provider) { + TransientGraph impl = (TransientGraph)provider; + return impl.getResource(impl.newResource(false)); + } + + @Override + public Resource createResource(VirtualGraph provider, long clusterId) { + throw new UnsupportedOperationException(); + } + + @Override + public Resource createResource(VirtualGraph provider, Resource clusterSet) + throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public void createClusterSet(VirtualGraph provider, Resource clusterSet) + throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet) + throws ServiceException { + throw new UnsupportedOperationException(); + } + + @Override + public Resource setDefaultClusterSet(Resource clusterSet) + throws ServiceException { + return null; + } + + @Override + public void denyValue(VirtualGraph provider, Resource resource) { + ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id); + getQueryProvider2().updateValue(querySupport.getId(resource)); + // NOTE: this only keeps track of value changes by-resource. + clientChanges.claimValue(resource); + } + + @Override + public void flush(boolean intermediate) { + throw new UnsupportedOperationException(); + } + + @Override + public void flushCluster() { + throw new UnsupportedOperationException(); + } + + @Override + public void flushCluster(Resource r) { + throw new UnsupportedOperationException("Resource " + r); + } + + @Override + public void gc() { + } + + @Override + public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) { + TransientGraph impl = (TransientGraph) provider; + impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object)); + getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate)); + clientChanges.deny(subject, predicate, object); + return true; + } + + @Override + public void setValue(VirtualGraph provider, Resource resource, byte[] value) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean writeOnly() { + return true; + } + + @Override + public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public T performWriteRequest(WriteGraph graph, WriteResult request) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public void addMetadata(Metadata data) throws ServiceException { + throw new UnsupportedOperationException(); + } + + @Override + public T getMetadata(Class clazz) throws ServiceException { + throw new UnsupportedOperationException(); + } + + @Override + public TreeMap getMetadata() { + throw new UnsupportedOperationException(); + } + + @Override + public void commitDone(WriteTraits writeTraits, long csid) { + } + + @Override + public void clearUndoList(WriteTraits writeTraits) { + } + + @Override + public int clearMetadata() { + return 0; + } + + @Override + public void startUndo() { + } + } + + private void performWriteOnly(WriteOnlyResult request, Semaphore notify, Procedure callback) { + + try { + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + fireSessionVariableChange(SessionVariables.QUEUED_WRITES); + + flushCounter = 0; + clientChanges = new ClientChangesImpl(SessionImplSocket.this); + + acquireWriteOnly(); + + WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport(); + + WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider()); + + WriteState writeStateT = new WriteState(writer, request, notify, callback); + writeState = writeStateT; + + assert (null != writer); +// writer.state.barrier.inc(); + long start = System.nanoTime(); + T result = request.perform(writer); + long duration = System.nanoTime() - start; + if (DEBUG) { + System.err.println("################"); + System.err.println("WriteOnly duration " + 1e-9*duration); + } + writeStateT.setResult(result); + + // This makes clusters available from server + clusterStream.reallyFlush(); + + // This will trigger query updates + releaseWriteOnly(writer); + assert (null != writer); + + handleUpdatesAndMetadata(writer); + + } catch (CancelTransactionException e) { + + releaseWriteOnly(writeState.getGraph()); + + clusterTable.removeWriteOnlyClusters(); + state.stopWriteTransaction(clusterStream); + + } catch (Throwable e) { + + e.printStackTrace(); + + releaseWriteOnly(writeState.getGraph()); + + clusterTable.removeWriteOnlyClusters(); + + if (callback != null) + callback.exception(new DatabaseException(e)); + + state.stopWriteTransaction(clusterStream); + Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); + + } finally { + + fireSessionVariableChange(SessionVariables.QUEUED_WRITES); + + } + + } + + public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify) { + scheduleRequest(request, callback, notify, null); + } + + @Override + public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify, Boolean combine) { + + assertAlive(); + + assert (request != null); + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + requestManager.scheduleWrite(new SessionTask(request, thread, thread) { + + @Override + public void run(int thread) { + + ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); + + try { + + fireSessionVariableChange(SessionVariables.QUEUED_WRITES); + + flushCounter = 0; + clientChanges = new ClientChangesImpl(SessionImplSocket.this); + + acquireWriteOnly(); + + VirtualGraph vg = getProvider(request.getProvider()); + WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport(); + + WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); + + writeState = new WriteState(writer, request, notify, new Procedure() { + + @Override + public void execute(Object result) { + if(callback != null) callback.run(null); + } + + @Override + public void exception(Throwable t) { + if(callback != null) callback.run((DatabaseException)t); + } + + }); + + assert (null != writer); +// writer.state.barrier.inc(); + + try { + + request.perform(writer); + + } catch (Throwable e) { + +// writer.state.barrier.dec(); +// writer.waitAsync(null); + + releaseWriteOnly(writer); + + clusterTable.removeWriteOnlyClusters(); + + if(!(e instanceof CancelTransactionException)) { + if (callback != null) + callback.run(new DatabaseException(e)); + } + + writeState.except(e); + + return; + + } + + // This makes clusters available from server + boolean empty = clusterStream.reallyFlush(); + // This was needed to make WO requests call metadata listeners. + // NOTE: the calling event does not contain clientChanges information. + if (!empty && clientChanges.isEmpty()) + clientChanges.setNotEmpty(true); + releaseWriteOnly(writer); + assert (null != writer); + } finally { + + fireSessionVariableChange(SessionVariables.QUEUED_WRITES); + + } + + + task.finish(); + + } + + }, combine); + + } + + public void scheduleRequest(final WriteOnlyResult request, final Procedure callback, final Semaphore notify) { + scheduleRequest(request, callback, notify, null); + } + + /* (non-Javadoc) + * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean) + */ + @Override + public void scheduleRequest(final WriteOnlyResult request, final Procedure callback, final Semaphore notify, Boolean combine) { + + assert (request != null); + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + requestManager.scheduleWrite(new SessionTask(request, thread) { + + @Override + public void run(int thread) { + + ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); + + performWriteOnly(request, notify, callback); + + task.finish(); + + } + + }, combine); + + } + + public void scheduleRequest(final Read request, final AsyncProcedure procedure, final Semaphore notify, final DataContainer throwable, final DataContainer result) { + + assert (request != null); + assert (procedure != null); + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) { + + @Override + public void run(int thread) { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + ListenerBase listener = getListenerBase(procedure); + + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + + try { + + if (listener != null) { + + try { + newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure() { + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + procedure.exception(graph, t); + if(throwable != null) { + throwable.set(t); + } else { + // ErrorLogger.defaultLogError("Unhandled exception", t); + } + } + + @Override + public void execute(AsyncReadGraph graph, T t) { + if(result != null) result.set(t); + procedure.execute(graph, t); + } + + }, listener); + } catch (Throwable t) { + // This is handled by the AsyncProcedure + //Logger.defaultLogError("Internal error", t); + } + + } else { + + try { + +// newGraph.state.barrier.inc(); + + T t = request.perform(newGraph); + + try { + + if(result != null) result.set(t); + procedure.execute(newGraph, t); + + } catch (Throwable th) { + + if(throwable != null) { + throwable.set(th); + } else { + Logger.defaultLogError("Unhandled exception", th); + } + + } + + } catch (Throwable t) { + + if (DEBUG) + t.printStackTrace(); + + if(throwable != null) { + throwable.set(t); + } else { + Logger.defaultLogError("Unhandled exception", t); + } + + try { + + procedure.exception(newGraph, t); + + } catch (Throwable t2) { + + if(throwable != null) { + throwable.set(t2); + } else { + Logger.defaultLogError("Unhandled exception", t2); + } + + } + + } + +// newGraph.state.barrier.dec(); +// newGraph.waitAsync(request); + + } + + } finally { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + } + + } + + }); + + } + + public void scheduleRequest(final AsyncRead request, final AsyncProcedure procedure, final ListenerBase listener, final Semaphore notify) { + + assert (request != null); + assert (procedure != null); + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + requestManager.scheduleRead(new SessionRead(request, null, notify, thread) { + + @Override + public void run(int thread) { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + + try { + + if (listener != null) { + + newGraph.processor.query(newGraph, request, null, procedure, listener); + +// newGraph.waitAsync(request); + + } else { + + final ResultCallWrappedSingleQueryProcedure4 wrapper = new ResultCallWrappedSingleQueryProcedure4( + procedure, "request"); + + try { + +// newGraph.state.barrier.inc(); + + request.perform(newGraph, wrapper); + +// newGraph.waitAsync(request); + + } catch (Throwable t) { + + wrapper.exception(newGraph, t); +// newGraph.waitAsync(request); + + + } + + } + + } finally { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + } + + } + + }); + + } + + public void scheduleRequest(final AsyncMultiRead request, final AsyncMultiProcedure procedure, final Semaphore notify) { + + assert (request != null); + assert (procedure != null); + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + int sync = notify != null ? thread : -1; + + requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) { + + @Override + public void run(int thread) { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + ListenerBase listener = getListenerBase(procedure); + + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + + try { + + if (listener != null) { + + newGraph.processor.query(newGraph, request, null, procedure, listener); + +// newGraph.waitAsync(request); + + } else { + + final ResultCallWrappedQueryProcedure4 wrapper = new ResultCallWrappedQueryProcedure4(procedure); + + try { + + request.perform(newGraph, wrapper); + + } catch (Throwable t) { + + t.printStackTrace(); + + } + + } + + } finally { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + } + + } + + }); + + } + + public void scheduleRequest(final ExternalRead request, final Procedure procedure, final Semaphore notify, final DataContainer throwable, final DataContainer result) { + + assert (request != null); + assert (procedure != null); + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) { + + @Override + public void run(int thread) { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + ListenerBase listener = getListenerBase(procedure); + + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + + try { + + if (listener != null) { + + newGraph.processor.query(newGraph, request, null, new Procedure() { + + @Override + public void exception(Throwable t) { + procedure.exception(t); + if(throwable != null) { + throwable.set(t); + } + } + + @Override + public void execute(T t) { + if(result != null) result.set(t); + procedure.execute(t); + } + + }, listener); + +// newGraph.waitAsync(request); + + } else { + +// newGraph.state.barrier.inc(); + + request.register(newGraph, new Listener() { + + @Override + public void exception(Throwable t) { + if(throwable != null) throwable.set(t); + procedure.exception(t); +// newGraph.state.barrier.dec(); + } + + @Override + public void execute(T t) { + if(result != null) result.set(t); + procedure.execute(t); +// newGraph.state.barrier.dec(); + } + + @Override + public boolean isDisposed() { + return true; + } + + }); + +// newGraph.waitAsync(request); + + } + + } finally { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + } + + } + + }); + + } + + + @Override + public void asyncRequest(final Read request, final AsyncProcedure procedure) { + + scheduleRequest(request, procedure, null, null, null); + + } + + @Override + public T syncRequest(Read request, AsyncProcedure procedure) throws DatabaseException { + assertNotSession(); + Semaphore notify = new Semaphore(0); + DataContainer container = new DataContainer(); + DataContainer result = new DataContainer(); + scheduleRequest(request, procedure, notify, container, result); + acquire(notify, request); + Throwable throwable = container.get(); + if(throwable != null) { + if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; + else throw new DatabaseException("Unexpected exception", throwable); + } + return result.get(); + } + + @Override + public T syncRequest(AsyncRead request, final AsyncProcedure procedure) throws DatabaseException { + assertNotSession(); + Semaphore notify = new Semaphore(0); + final DataContainer exceptionContainer = new DataContainer(); + final DataContainer resultContainer = new DataContainer(); + scheduleRequest(request, new AsyncProcedure() { + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + exceptionContainer.set(throwable); + procedure.exception(graph, throwable); + } + @Override + public void execute(AsyncReadGraph graph, T result) { + resultContainer.set(result); + procedure.execute(graph, result); + } + }, getListenerBase(procedure), notify); + acquire(notify, request, procedure); + Throwable throwable = exceptionContainer.get(); + if (throwable != null) { + if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; + else throw new DatabaseException("Unexpected exception", throwable); + } + return resultContainer.get(); + } + + + @Override + public Collection syncRequest(AsyncMultiRead request, final AsyncMultiProcedure procedure) throws DatabaseException { + assertNotSession(); + Semaphore notify = new Semaphore(0); + final DataContainer exceptionContainer = new DataContainer(); + scheduleRequest(request, new AsyncMultiProcedure() { + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + exceptionContainer.set(throwable); + procedure.exception(graph, throwable); + } + @Override + public void execute(AsyncReadGraph graph, T result) { + procedure.execute(graph, result); + } + @Override + public void finished(AsyncReadGraph graph) { + procedure.finished(graph); + } + }, notify); + acquire(notify, request, procedure); + Throwable throwable = exceptionContainer.get(); + if (throwable != null) { + if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; + else throw new DatabaseException("Unexpected exception", throwable); + } + // TODO: implement return value + System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)"); + return null; + } + + @Override + public T syncRequest(final ExternalRead request) throws DatabaseException { + return syncRequest(request, new ProcedureAdapter()); + + +// assert(request != null); +// +// final DataContainer result = new DataContainer(); +// final DataContainer exception = new DataContainer(); +// +// syncRequest(request, new Procedure() { +// +// @Override +// public void execute(T t) { +// result.set(t); +// } +// +// @Override +// public void exception(Throwable t) { +// exception.set(t); +// } +// +// }); +// +// Throwable t = exception.get(); +// if(t != null) { +// if(t instanceof DatabaseException) throw (DatabaseException)t; +// else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t); +// } +// +// return result.get(); + + } + + @Override + public T syncRequest(ExternalRead request, final Listener procedure) throws DatabaseException { + return syncRequest(request, (Procedure)procedure); + } + + +// @Override +// public T syncRequest(Read request, AsyncProcedure procedure) throws DatabaseException { +// assertNotSession(); +// Semaphore notify = new Semaphore(0); +// DataContainer container = new DataContainer(); +// DataContainer result = new DataContainer(); +// scheduleRequest(request, procedure, notify, container, result); +// acquire(notify, request); +// Throwable throwable = container.get(); +// if(throwable != null) { +// if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; +// else throw new DatabaseException("Unexpected exception", throwable); +// } +// return result.get(); +// } + + + @Override + public T syncRequest(ExternalRead request, final Procedure procedure) throws DatabaseException { + assertNotSession(); + Semaphore notify = new Semaphore(0); + final DataContainer container = new DataContainer(); + final DataContainer result = new DataContainer(); + scheduleRequest(request, procedure, notify, container, result); + acquire(notify, request); + Throwable throwable = container.get(); + if (throwable != null) { + if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; + else throw new DatabaseException("Unexpected exception", throwable); + } + return result.get(); + } + + @Override + public void syncRequest(Write request) throws DatabaseException { + assertNotSession(); + assertAlive(); + Semaphore notify = new Semaphore(0); + final DataContainer exception = new DataContainer(); + scheduleRequest(request, new Callback() { + + @Override + public void run(DatabaseException e) { + exception.set(e); + } + + }, notify); + acquire(notify, request); + if(exception.get() != null) throw exception.get(); + } + + @Override + public T syncRequest(WriteResult request) throws DatabaseException { + assertNotSession(); + Semaphore notify = new Semaphore(0); + final DataContainer exception = new DataContainer(); + final DataContainer result = new DataContainer(); + scheduleRequest(request, new Procedure() { + + @Override + public void exception(Throwable t) { + exception.set(t); + } + + @Override + public void execute(T t) { + result.set(t); + } + + }, notify); + acquire(notify, request); + if(exception.get() != null) { + Throwable t = exception.get(); + if(t instanceof DatabaseException) throw (DatabaseException)t; + else throw new DatabaseException(t); + } + return result.get(); + } + + @Override + public T syncRequest(WriteOnlyResult request) throws DatabaseException { + assertNotSession(); + Semaphore notify = new Semaphore(0); + final DataContainer exception = new DataContainer(); + final DataContainer result = new DataContainer(); + scheduleRequest(request, new Procedure() { + + @Override + public void exception(Throwable t) { + exception.set(t); + } + + @Override + public void execute(T t) { + result.set(t); + } + + }, notify); + acquire(notify, request); + if(exception.get() != null) { + Throwable t = exception.get(); + if(t instanceof DatabaseException) throw (DatabaseException)t; + else throw new DatabaseException(t); + } + return result.get(); + } + + @Override + public T syncRequest(DelayedWriteResult request) throws DatabaseException { + assertNotSession(); + Semaphore notify = new Semaphore(0); + final DataContainer exception = new DataContainer(); + final DataContainer result = new DataContainer(); + scheduleRequest(request, new Procedure() { + + @Override + public void exception(Throwable t) { + exception.set(t); + } + + @Override + public void execute(T t) { + result.set(t); + } + + }, notify); + acquire(notify, request); + if(exception.get() != null) { + Throwable t = exception.get(); + if(t instanceof DatabaseException) throw (DatabaseException)t; + else throw new DatabaseException(t); + } + return result.get(); + } + + @Override + public void syncRequest(DelayedWrite request) throws DatabaseException { + assertNotSession(); + Semaphore notify = new Semaphore(0); + final DataContainer exception = new DataContainer(); + scheduleRequest(request, new Callback() { + @Override + public void run(DatabaseException e) { + exception.set(e); + } + }, notify); + acquire(notify, request); + if(exception.get() != null) throw exception.get(); + } + + @Override + public void syncRequest(WriteOnly request) throws DatabaseException { + assertNotSession(); + assertAlive(); + Semaphore notify = new Semaphore(0); + final DataContainer exception = new DataContainer(); + scheduleRequest(request, new Callback() { + @Override + public void run(DatabaseException e) { + exception.set(e); + } + }, notify); + acquire(notify, request); + if(exception.get() != null) throw exception.get(); + } + + /* + * + * GraphRequestProcessor interface + */ + + @Override + public void asyncRequest(final AsyncRead request, final AsyncProcedure procedure) { + + scheduleRequest(request, procedure, null, null); + + } + + @Override + public void asyncRequest(final AsyncMultiRead request, final AsyncMultiProcedure procedure) { + + scheduleRequest(request, procedure, null); + + } + + @Override + public void asyncRequest(final ExternalRead request, final Procedure procedure) { + + scheduleRequest(request, procedure, null, null, null); + + } + + @Override + public void asyncRequest(final Write request, final Callback callback) { + + scheduleRequest(request, callback, null); + + } + + @Override + public void asyncRequest(final WriteResult request, final Procedure procedure) { + + scheduleRequest(request, procedure, null); + + } + + @Override + public void asyncRequest(final WriteOnlyResult request, final Procedure procedure) { + + scheduleRequest(request, procedure, null); + + } + + @Override + public void asyncRequest(final DelayedWriteResult request, final Procedure procedure) { + + scheduleRequest(request, procedure, null); + + } + + @Override + public void asyncRequest(final DelayedWrite request, final Callback callback) { + + scheduleRequest(request, callback, null); + + } + + @Override + public void asyncRequest(final Write r) { + asyncRequest(r, null); + } + + @Override + public void asyncRequest(final DelayedWrite r) { + asyncRequest(r, null); + } + + @Override + public void asyncRequest(final WriteOnly request, final Callback callback) { + + scheduleRequest(request, callback, null); + + } + + @Override + public void asyncRequest(final WriteOnly request) { + + asyncRequest(request, null); + + } + + @Override + public void async(ReadInterface r, AsyncProcedure procedure) { + r.request(this, procedure); + } + + @Override + public void async(ReadInterface r, Procedure procedure) { + r.request(this, procedure); + } + + @Override + public void async(ReadInterface r, SyncProcedure procedure) { + r.request(this, procedure); + } + + @Override + public void async(ReadInterface r, AsyncListener procedure) { + r.request(this, procedure); + } + + @Override + public void async(ReadInterface r, Listener procedure) { + r.request(this, procedure); + } + + @Override + public void async(ReadInterface r, SyncListener procedure) { + r.request(this, procedure); + } + + @Override + public T sync(ReadInterface r) throws DatabaseException { + return r.request(this); + } + + @Override + public T sync(WriteInterface r) throws DatabaseException { + return r.request(this); + } + + @Override + public void async(WriteInterface r, Procedure procedure) { + r.request(this, procedure); + } + + @Override + public void async(WriteInterface r) { + r.request(this, new ProcedureAdapter()); + } + + //@Override + public void incAsync() { + state.incAsync(); + } + + //@Override + public void decAsync() { + state.decAsync(); + } + + public long getCluster(ResourceImpl resource) { + ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id); + return cluster.getClusterId(); + } + + public long getCluster(int id) { + if (clusterTable == null) + System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come"); + return clusterTable.getClusterIdByResourceKeyNoThrow(id); + } + + public ResourceImpl getResource(int id) { + return new ResourceImpl(resourceSupport, id); + } + + public ResourceImpl getResource(int resourceIndex, long clusterId) { + assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex)); + ClusterI proxy = clusterTable.getClusterByClusterId(clusterId); + int key = proxy.getClusterKey(); + int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex); + return new ResourceImpl(resourceSupport, resourceKey); + } + + public ResourceImpl getResource2(int id) { + assert (id != 0); + return new ResourceImpl(resourceSupport, id); + } + + final public int getId(ResourceImpl impl) { + return impl.id; + } + + public static final Charset UTF8 = Charset.forName("utf-8"); + + + + + static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) { + for(TransientGraph g : support.providers) { + if(g.isPending(subject)) return false; + } + return true; + } + + static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) { + for(TransientGraph g : support.providers) { + if(g.isPending(subject, predicate)) return false; + } + return true; + } + + static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback runnable) { + + Callback composite = new Callback() { + + AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); + + @Override + public void run(ReadGraphImpl graph) { + if(ready.decrementAndGet() == 0) { + runnable.run(graph); + } + } + + }; + + for(TransientGraph g : support.providers) { + if(g.isPending(subject)) { + try { + g.load(graph, subject, composite); + } catch (DatabaseException e) { + e.printStackTrace(); + } + } else { + composite.run(graph); + } + } + + composite.run(graph); + + } + + static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback runnable) { + + Callback composite = new Callback() { + + AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); + + @Override + public void run(ReadGraphImpl graph) { + if(ready.decrementAndGet() == 0) { + runnable.run(graph); + } + } + + }; + + for(TransientGraph g : support.providers) { + if(g.isPending(subject, predicate)) { + try { + g.load(graph, subject, predicate, composite); + } catch (DatabaseException e) { + e.printStackTrace(); + } + } else { + composite.run(graph); + } + } + + composite.run(graph); + + } + +// void dumpHeap() { +// +// try { +// ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(), +// "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap( +// "d:/heap" + flushCounter + ".txt", true); +// } catch (IOException e) { +// e.printStackTrace(); +// } +// +// } + + void fireReactionsToSynchronize(ChangeSet cs) { + + // Do not fire empty events + if (cs.isEmpty()) + return; + + ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2()); +// g.state.barrier.inc(); + + try { + + if (!cs.isEmpty()) { + ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs); + for (ChangeListener l : changeListeners2) { + try { + l.graphChanged(e2); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + } + + } finally { + +// g.state.barrier.dec(); +// g.waitAsync(null); + + } + + } + + void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) { + try { + + // Do not fire empty events + if (cs2.isEmpty()) + return; + +// graph.restart(); +// graph.state.barrier.inc(); + + try { + + if (!cs2.isEmpty()) { + + ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2); + for (ChangeListener l : changeListeners2) { + try { + l.graphChanged(e2); +// System.out.println("changelistener " + l); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + } + + } finally { + +// graph.state.barrier.dec(); +// graph.waitAsync(null); + + } + } catch (Throwable t) { + t.printStackTrace(); + + } + } + void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) { + + try { + + // Do not fire empty events + if (cs2.isEmpty()) + return; + + // Do not fire on virtual requests + if(graph.getProvider() != null) + return; + + WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null); + + try { + + ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2); + for (ChangeListener l : metadataListeners) { + try { + l.graphChanged(e2); + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + + } finally { + + } + + } catch (Throwable t) { + t.printStackTrace(); + } + + } + + + /* + * (non-Javadoc) + * + * @see org.simantics.db.ServiceLocator#getService(java.lang.Class) + */ + @Override + public T getService(Class api) { + T t = peekService(api); + if (t == null) { + if (state.isClosed()) + throw new ServiceNotFoundException(this, api, "Session has been shut down"); + throw new ServiceNotFoundException(this, api); + } + return t; + } + + /** + * @return + */ + protected abstract ServerInformation getCachedServerInformation(); + + private Class serviceKey1 = null; + private Class serviceKey2 = null; + private Object service1 = null; + private Object service2 = null; + + /* + * (non-Javadoc) + * + * @see + * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class) + */ + @SuppressWarnings("unchecked") + @Override + public synchronized T peekService(Class api) { + + if(serviceKey1 == api) { + return (T)service1; + } else if (serviceKey2 == api) { + // Promote this key + Object result = service2; + service2 = service1; + serviceKey2 = serviceKey1; + service1 = result; + serviceKey1 = api; + return (T)result; + } + + if (Layer0.class == api) + return (T) L0; + if (ServerInformation.class == api) + return (T) getCachedServerInformation(); + else if (WriteGraphImpl.class == api) + return (T) writeState.getGraph(); + else if (ClusterBuilder.class == api) + return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport); + else if (ClusterBuilderFactory.class == api) + return (T)new ClusterBuilderFactoryImpl(this); + + service2 = service1; + serviceKey2 = serviceKey1; + + service1 = serviceLocator.peekService(api); + serviceKey1 = api; + + return (T)service1; + + } + + /* + * (non-Javadoc) + * + * @see + * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class) + */ + @Override + public boolean hasService(Class api) { + return serviceLocator.hasService(api); + } + + /** + * @param api the api that must be implemented by the specified service + * @param service the service implementation + */ + @Override + public void registerService(Class api, T service) { + if(Layer0.class == api) { + L0 = (Layer0)service; + return; + } + serviceLocator.registerService(api, service); + if (TransactionPolicySupport.class == api) { + transactionPolicy = (TransactionPolicySupport)service; + state.resetTransactionPolicy(); + } + if (api == serviceKey1) + service1 = service; + else if (api == serviceKey2) + service2 = service; + } + + // ---------------- + // SessionMonitor + // ---------------- + + void fireSessionVariableChange(String variable) { + for (MonitorHandler h : monitorHandlers) { + MonitorContext ctx = monitorContexts.getLeft(h); + assert ctx != null; + // SafeRunner functionality repeated here to avoid dependency. + try { + h.valuesChanged(ctx); + } catch (Exception e) { + Logger.defaultLogError("monitor handler notification produced the following exception", e); + } catch (LinkageError e) { + Logger.defaultLogError("monitor handler notification produced a linkage error", e); + } + } + } + + + class ResourceSerializerImpl implements ResourceSerializer { + + public long createRandomAccessId(int id) throws DatabaseException { + if(id < 0) + return id; + int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id); + long cluster = getCluster(id); + if (0 == cluster) + return 0; // Better to return 0 then invalid id. + long result = ClusterTraitsBase.createResourceId(cluster, index); + return result; + } + + @Override + public long getRandomAccessId(Resource resource) throws DatabaseException { + + ResourceImpl resourceImpl = (ResourceImpl) resource; + return createRandomAccessId(resourceImpl.id); + + } + + @Override + public int getTransientId(Resource resource) throws DatabaseException { + + ResourceImpl resourceImpl = (ResourceImpl) resource; + return resourceImpl.id; + + } + + @Override + public String createRandomAccessId(Resource resource) + throws InvalidResourceReferenceException { + + if(resource == null) throw new IllegalArgumentException(); + + ResourceImpl resourceImpl = (ResourceImpl) resource; + + if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0"; + + int r; + try { + r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id); + } catch (DatabaseException e1) { + throw new InvalidResourceReferenceException(e1); + } + try { + // Serialize as '_' + return "" + r + "_" + getCluster(resourceImpl); + } catch (Throwable e) { + e.printStackTrace(); + throw new InvalidResourceReferenceException(e); + } finally { + } + } + + public int getTransientId(long serialized) throws DatabaseException { + if (serialized <= 0) + return (int)serialized; + int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized); + long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized); + ClusterI cluster = clusterTranslator.getClusterByClusterId(c); + if (cluster == null) + throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized); + int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index); + return key; + } + + @Override + public Resource getResource(long randomAccessId) throws DatabaseException { + return getResourceByKey(getTransientId(randomAccessId)); + } + + @Override + public Resource getResource(int transientId) throws DatabaseException { + return getResourceByKey(transientId); + } + + @Override + public Resource getResource(String randomAccessId) + throws InvalidResourceReferenceException { + try { + int i = randomAccessId.indexOf('_'); + if (i == -1) + throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '" + + randomAccessId + "'"); + int r = Integer.parseInt(randomAccessId.substring(0, i)); + if(r < 0) return getResourceByKey(r); + long c = Long.parseLong(randomAccessId.substring(i + 1)); + ClusterI cluster = clusterTranslator.getClusterByClusterId(c); + int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r); + if (cluster.hasResource(key, clusterTranslator)) + return getResourceByKey(key); + } catch (InvalidResourceReferenceException e) { + throw e; + } catch (NumberFormatException e) { + throw new InvalidResourceReferenceException(e); + } catch (Throwable e) { + e.printStackTrace(); + throw new InvalidResourceReferenceException(e); + } finally { + } + throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId); + } + + @Override + public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException { + try { + return true; + } catch (Throwable e) { + e.printStackTrace(); + throw new InvalidResourceReferenceException(e); + } finally { + } + } + } + + // Copied from old SessionImpl + + void check() { + if (state.isClosed()) + throw new Error("Session closed."); + } + + + + public ResourceImpl getNewResource(long clusterId) { + ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId); + int newId; + try { + newId = cluster.createResource(clusterTranslator); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + return null; + } + return new ResourceImpl(resourceSupport, newId); + } + + public ResourceImpl getNewResource(Resource clusterSet) + throws DatabaseException { + long resourceId = clusterSet.getResourceId(); + Long clusterId = clusterSetsSupport.get(resourceId); + if (null == clusterId) + throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet); + if (Constants.NewClusterId == clusterId) { + clusterId = getService(ClusteringSupport.class).createCluster(); + if ((serviceMode & SERVICE_MODE_CREATE) > 0) + createdClusters.add(clusterId); + clusterSetsSupport.put(resourceId, clusterId); + return getNewResource(clusterId); + } else { + ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId); + ResourceImpl result; + if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) { + clusterId = getService(ClusteringSupport.class).createCluster(); + if ((serviceMode & SERVICE_MODE_CREATE) > 0) + createdClusters.add(clusterId); + clusterSetsSupport.put(resourceId, clusterId); + return getNewResource(clusterId); + } else { + result = getNewResource(clusterId); + int resultKey = querySupport.getId(result); + long resultCluster = querySupport.getClusterId(resultKey); + if (clusterId != resultCluster) + clusterSetsSupport.put(resourceId, resultCluster); + return result; + } + } + } + + public void getNewClusterSet(Resource clusterSet) + throws DatabaseException { + if (DEBUG) + System.out.println("new cluster set=" + clusterSet); + long resourceId = clusterSet.getResourceId(); + if (clusterSetsSupport.containsKey(resourceId)) + throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet); + clusterSetsSupport.put(resourceId, Constants.NewClusterId); + } + public boolean containsClusterSet(Resource clusterSet) + throws ServiceException { + long resourceId = clusterSet.getResourceId(); + return clusterSetsSupport.containsKey(resourceId); + } + public Resource setDefaultClusterSet4NewResource(Resource clusterSet) { + Resource r = defaultClusterSet; + defaultClusterSet = clusterSet; + return r; + } + void printDiagnostics() { + + if (DIAGNOSTICS) { + + final DataContainer totalMem = new DataContainer(0L); + final DataContainer residentClusters = new DataContainer(0L); + for(ClusterI cluster : clusterTable.getClusters()) { + try { + if (cluster.isLoaded() && !cluster.isEmpty()) { + residentClusters.set(residentClusters.get() + 1); + totalMem.set(totalMem.get() + cluster.getUsedSpace()); + } + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + } + + System.out.println("--------------------------------"); + System.out.println("Cluster information:"); + System.out.println("-amount of resident clusters=" + residentClusters.get()); + System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB"); + + for(ClusterI cluster : clusterTable.getClusters()) { + System.out.print("Cluster " + cluster.getClusterId() + " ["); + try { + if (!cluster.isLoaded()) + System.out.println("not loaded]"); + else if (cluster.isEmpty()) + System.out.println("is empty]"); + else { + System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator)); + System.out.print(",references=" + cluster.getReferenceCount()); + System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]"); + } + } catch (DatabaseException e) { + Logger.defaultLogError(e); + System.out.println("is corrupted]"); + } + } + + queryProvider2.printDiagnostics(); + System.out.println("--------------------------------"); + } + + } + + public void fireStartReadTransaction() { + + if (DIAGNOSTICS) + System.out.println("StartReadTransaction"); + + for (SessionEventListener listener : eventListeners) { + try { + listener.readTransactionStarted(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + + } + + public void fireFinishReadTransaction() { + + if (DIAGNOSTICS) + System.out.println("FinishReadTransaction"); + + for (SessionEventListener listener : eventListeners) { + try { + listener.readTransactionFinished(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + + } + + public void fireStartWriteTransaction() { + + if (DIAGNOSTICS) + System.out.println("StartWriteTransaction"); + + for (SessionEventListener listener : eventListeners) { + try { + listener.writeTransactionStarted(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + + } + + public void fireFinishWriteTransaction() { + + if (DIAGNOSTICS) + System.out.println("FinishWriteTransaction"); + + for (SessionEventListener listener : eventListeners) { + try { + listener.writeTransactionFinished(); + } catch (Throwable t) { + t.printStackTrace(); + } + } + + Indexing.resetDependenciesIndexingDisabled(); + + } + + Resource deserialize(final long resourceId, final long clusterId) throws IOException { + throw new Error("Not supported at the moment."); + } + + State getState() { + return state; + } + + ClusterTable getClusterTable() { + return clusterTable; + } + + public GraphSession getGraphSession() { + return graphSession; + } + + public QueryProcessor getQueryProvider2() { + return queryProvider2; + } + + ClientChangesImpl getClientChanges() { + return clientChanges; + } + + boolean getWriteOnly() { + return writeOnly; + } + + static int counter = 0; + + public void onClusterLoaded(long clusterId) { + + clusterTable.updateSize(); + + } + + + + + /* + * Implementation of the interface RequestProcessor + */ + + @Override + public T syncRequest(final Read request) throws DatabaseException { + + assertNotSession(); + assertAlive(); + + assert(request != null); + + final DataContainer result = new DataContainer(); + final DataContainer exception = new DataContainer(); + + syncRequest(request, new AsyncProcedure() { + + @Override + public void execute(AsyncReadGraph graph, T t) { + result.set(t); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + exception.set(t); + } + + }); + + Throwable t = exception.get(); + if(t != null) { + if(t instanceof DatabaseException) throw (DatabaseException)t; + else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); + } + + return result.get(); + + } + +// @Override +// public T syncRequest(Read request, AsyncListener procedure) throws DatabaseException { +// assertNotSession(); +// return syncRequest(request, (AsyncProcedure)procedure); +// } + + @Override + public T syncRequest(Read request, SyncListener procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new SyncToAsyncListener(procedure)); + } + + @Override + public T syncRequest(Read request, final Listener procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new NoneToAsyncListener(procedure)); + } + + @Override + public T syncRequest(final Read request, final SyncProcedure procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new SyncToAsyncProcedure(procedure)); + } + + @Override + public T syncRequest(Read request, Procedure procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new NoneToAsyncProcedure(procedure)); + } + + @Override + public T syncRequest(final AsyncRead request) throws DatabaseException { + + assertNotSession(); + + assert(request != null); + + final DataContainer result = new DataContainer(); + final DataContainer exception = new DataContainer(); + + syncRequest(request, new AsyncProcedure() { + + @Override + public void execute(AsyncReadGraph graph, T t) { + result.set(t); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + exception.set(t); + } + + }); + + Throwable t = exception.get(); + if(t != null) { + if(t instanceof DatabaseException) throw (DatabaseException)t; + else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); + } + + return result.get(); + + } + + @Override + public T syncRequest(AsyncRead request, AsyncListener procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, (AsyncProcedure)procedure); + } + + @Override + public T syncRequest(AsyncRead request, SyncListener procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new SyncToAsyncListener(procedure)); + } + + @Override + public T syncRequest(AsyncRead request, Listener procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new NoneToAsyncListener(procedure)); + } + + @Override + public T syncRequest(AsyncRead request, final SyncProcedure procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new SyncToAsyncProcedure(procedure)); + } + + @Override + final public T syncRequest(final AsyncRead request, final Procedure procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new NoneToAsyncProcedure(procedure)); + } + + @Override + public Collection syncRequest(MultiRead request) throws DatabaseException { + + assertNotSession(); + + assert(request != null); + + final ArrayList result = new ArrayList(); + final DataContainer exception = new DataContainer(); + + syncRequest(request, new AsyncMultiProcedure() { + + @Override + public void execute(AsyncReadGraph graph, T t) { + synchronized(result) { + result.add(t); + } + } + + @Override + public void finished(AsyncReadGraph graph) { + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + exception.set(t); + } + + + }); + + Throwable t = exception.get(); + if(t != null) { + if(t instanceof DatabaseException) throw (DatabaseException)t; + else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); + } + + return result; + + } + + @Override + public Collection syncRequest(MultiRead request, AsyncMultiListener procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, (AsyncMultiProcedure)procedure); + } + + @Override + public Collection syncRequest(MultiRead request, SyncMultiListener procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new SyncToAsyncMultiListener(procedure)); + } + + @Override + public Collection syncRequest(MultiRead request, MultiListener procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new NoneToAsyncMultiListener(procedure)); + } + + @Override + public Collection syncRequest(MultiRead request, SyncMultiProcedure procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new SyncToAsyncMultiProcedure(procedure)); + } + + @Override + public Collection syncRequest(MultiRead request, MultiProcedure procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new NoneToAsyncMultiProcedure(procedure)); + } + + @Override + final public Collection syncRequest(final AsyncMultiRead request) throws DatabaseException { + + assertNotSession(); + + assert(request != null); + + final ArrayList result = new ArrayList(); + final DataContainer exception = new DataContainer(); + + syncRequest(request, new AsyncMultiProcedure() { + + @Override + public void execute(AsyncReadGraph graph, T t) { + synchronized(result) { + result.add(t); + } + } + + @Override + public void finished(AsyncReadGraph graph) { + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + exception.set(t); + } + + + }); + + Throwable t = exception.get(); + if(t != null) { + if(t instanceof DatabaseException) throw (DatabaseException)t; + else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); + } + + return result; + + } + + @Override + public Collection syncRequest(AsyncMultiRead request, AsyncMultiListener procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, (AsyncMultiProcedure)procedure); + } + + @Override + public Collection syncRequest(AsyncMultiRead request, SyncMultiListener procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new SyncToAsyncMultiListener(procedure)); + } + + @Override + public Collection syncRequest(AsyncMultiRead request, MultiListener procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new NoneToAsyncMultiListener(procedure)); + } + + @Override + public Collection syncRequest(AsyncMultiRead request, final SyncMultiProcedure procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new SyncToAsyncMultiProcedure(procedure)); + } + + @Override + final public Collection syncRequest(final AsyncMultiRead request, final MultiProcedure procedure) throws DatabaseException { + assertNotSession(); + return syncRequest(request, new NoneToAsyncMultiProcedure(procedure)); + } + + + @Override + public void asyncRequest(Read request) { + + asyncRequest(request, new ProcedureAdapter() { + @Override + public void exception(Throwable t) { + t.printStackTrace(); + } + }); + + } + + @Override + public void asyncRequest(Read request, AsyncListener procedure) { + asyncRequest(request, (AsyncProcedure)procedure); + } + + @Override + public void asyncRequest(Read request, final SyncListener procedure) { + asyncRequest(request, new SyncToAsyncListener(procedure)); + } + + @Override + public void asyncRequest(Read request, final Listener procedure) { + asyncRequest(request, new NoneToAsyncListener(procedure)); + } + + @Override + public void asyncRequest(Read request, SyncProcedure procedure) { + asyncRequest(request, new SyncToAsyncProcedure(procedure)); + } + + @Override + public void asyncRequest(Read request, final Procedure procedure) { + asyncRequest(request, new NoneToAsyncProcedure(procedure)); + } + + @Override + final public void asyncRequest(final AsyncRead request) { + + assert(request != null); + + asyncRequest(request, new ProcedureAdapter() { + @Override + public void exception(Throwable t) { + t.printStackTrace(); + } + }); + + } + + @Override + public void asyncRequest(AsyncRead request, AsyncListener procedure) { + scheduleRequest(request, procedure, procedure, null); + } + + @Override + public void asyncRequest(AsyncRead request, final SyncListener procedure) { + asyncRequest(request, new SyncToAsyncListener(procedure)); + } + + @Override + public void asyncRequest(AsyncRead request, final Listener procedure) { + asyncRequest(request, new NoneToAsyncListener(procedure)); + } + + @Override + public void asyncRequest(AsyncRead request, SyncProcedure procedure) { + asyncRequest(request, new SyncToAsyncProcedure(procedure)); + } + + @Override + final public void asyncRequest(final AsyncRead request, final Procedure procedure) { + asyncRequest(request, new NoneToAsyncProcedure(procedure)); + } + + @Override + public void asyncRequest(MultiRead request) { + + assert(request != null); + + asyncRequest(request, new AsyncMultiProcedureAdapter() { + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + t.printStackTrace(); + } + }); + + } + + @Override + public void asyncRequest(MultiRead request, AsyncMultiListener procedure) { + asyncRequest(request, (AsyncMultiProcedure)procedure); + } + + @Override + public void asyncRequest(MultiRead request, SyncMultiListener procedure) { + asyncRequest(request, new SyncToAsyncMultiListener(procedure)); + } + + @Override + public void asyncRequest(MultiRead request, MultiListener procedure) { + asyncRequest(request, new NoneToAsyncMultiListener(procedure)); + } + + @Override + public void asyncRequest(MultiRead request, SyncMultiProcedure procedure) { + asyncRequest(request, new SyncToAsyncMultiProcedure(procedure)); + } + + @Override + public void asyncRequest(MultiRead request, MultiProcedure procedure) { + asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); + } + + @Override + final public void asyncRequest(AsyncMultiRead request) { + + assert(request != null); + + asyncRequest(request, new AsyncMultiProcedureAdapter() { + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + t.printStackTrace(); + } + }); + + } + + @Override + public void asyncRequest(AsyncMultiRead request, AsyncMultiListener procedure) { + asyncRequest(request, (AsyncMultiProcedure)procedure); + } + + @Override + public void asyncRequest(AsyncMultiRead request, SyncMultiListener procedure) { + asyncRequest(request, new SyncToAsyncMultiListener(procedure)); + } + + @Override + public void asyncRequest(AsyncMultiRead request, MultiListener procedure) { + asyncRequest(request, new NoneToAsyncMultiListener(procedure)); + } + + @Override + public void asyncRequest(AsyncMultiRead request, SyncMultiProcedure procedure) { + asyncRequest(request, new SyncToAsyncMultiProcedure(procedure)); + } + + @Override + final public void asyncRequest(AsyncMultiRead request, final MultiProcedure procedure) { + asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); + } + + @Override + public Collection syncRequest(MultiRead arg0, AsyncMultiProcedure arg1) throws DatabaseException { + assertNotSession(); + throw new Error("Not implemented!"); + } + + @Override + public void asyncRequest(MultiRead arg0, AsyncMultiProcedure arg1) { + throw new Error("Not implemented!"); + } + + @Override + final public void asyncRequest(final ExternalRead request) { + + assert(request != null); + + asyncRequest(request, new ProcedureAdapter() { + @Override + public void exception(Throwable t) { + t.printStackTrace(); + } + }); + + } + + @Override + public void asyncRequest(ExternalRead request, final Listener procedure) { + asyncRequest(request, (Procedure)procedure); + } + + + + void check(Throwable t) throws DatabaseException { + if(t != null) { + if(t instanceof DatabaseException) throw (DatabaseException)t; + else throw new DatabaseException("Unexpected exception", t); + } + } + + void check(DataContainer container) throws DatabaseException { + Throwable t = container.get(); + if(t != null) { + if(t instanceof DatabaseException) throw (DatabaseException)t; + else throw new DatabaseException("Unexpected exception", t); + } + } + + + + + + + boolean sameProvider(Write request) { + if(writeState.getGraph().provider != null) { + return writeState.getGraph().provider.equals(request.getProvider()); + } else { + return request.getProvider() == null; + } + } + + boolean plainWrite(WriteGraphImpl graph) { + if(graph == null) return false; + if(graph.writeSupport.writeOnly()) return false; + return true; + } + + + public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group"); + private void assertNotSession() throws DatabaseException { + Thread current = Thread.currentThread(); + if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction."); + } + + void assertAlive() { + if (!state.isAlive()) + throw new RuntimeDatabaseException("Session has been shut down."); + } + + public InputStream getValueStream(ReadGraphImpl graph, Resource resource) { + return querySupport.getValueStream(graph, querySupport.getId(resource)); + } + + public byte[] getValue(ReadGraphImpl graph, Resource resource) { + return querySupport.getValue(graph, querySupport.getId(resource)); + } + + void acquire(Semaphore semaphore, T request) throws DatabaseException { + acquire(semaphore, request, null); + } + + private void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException { + assertAlive(); + try { + long tryCount = 0; + // Loop until the semaphore is acquired reporting requests that take a long time. + while (true) { + + if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) { + // Acquired OK. + return; + } else { + if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) { + ++tryCount; + long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT + .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD) + * tryCount; + System.err.println("DB client request '" + request + "' is taking long to execute, so far " + + (waitTime) + " ms. (procedure=" + procedure + ")"); + } + } + + assertAlive(); + + } + } catch (InterruptedException e) { + e.printStackTrace(); + // FIXME: Should perhaps do something else in this case ?? + } + } + + + +// @Override +// public boolean holdOnToTransactionAfterCancel() { +// return transactionPolicy.holdOnToTransactionAfterCancel(); +// } +// +// @Override +// public boolean holdOnToTransactionAfterCommit() { +// return transactionPolicy.holdOnToTransactionAfterCommit(); +// } +// +// @Override +// public boolean holdOnToTransactionAfterRead() { +// return transactionPolicy.holdOnToTransactionAfterRead(); +// } +// +// @Override +// public void onRelinquish() { +// transactionPolicy.onRelinquish(); +// } +// +// @Override +// public void onRelinquishDone() { +// transactionPolicy.onRelinquishDone(); +// } +// +// @Override +// public void onRelinquishError() { +// transactionPolicy.onRelinquishError(); +// } + + + @Override + public Session getSession() { + return this; + } + + + protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph); + protected abstract ResourceImpl getNewResource() throws DatabaseException; + + public void ceased(int thread) { + + requestManager.ceased(thread); + + } + + public int getAmountOfQueryThreads() { + // This must be a power of two + return 1; +// return Integer.highestOneBit(Runtime.getRuntime().availableProcessors()); + } + + public Resource getResourceByKey(int key) throws ResourceNotFoundException { + + return new ResourceImpl(resourceSupport, key); + + } + + public void acquireWriteOnly() { + writeOnly = true; + } + + public void releaseWriteOnly(ReadGraphImpl graph) { + writeOnly = false; + queryProvider2.releaseWrite(graph); + } + + public void handleUpdatesAndMetadata(WriteGraphImpl writer) { + + long start = System.nanoTime(); + + while(dirtyPrimitives) { + dirtyPrimitives = false; + getQueryProvider2().performDirtyUpdates(writer); + getQueryProvider2().performScheduledUpdates(writer); + } + + fireMetadataListeners(writer, clientChanges); + + if(DebugPolicy.PERFORMANCE_DATA) { + long end = System.nanoTime(); + System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start)); + } + + } + + void removeTemporaryData() { + File platform = Platform.getLocation().toFile(); + File tempFiles = new File(platform, "tempFiles"); + File temp = new File(tempFiles, "db"); + if(!temp.exists()) + return; + try { + Files.walkFileTree(temp.toPath(), new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + try { + Files.delete(file); + } catch (IOException e) { + Logger.defaultLogError(e); + } + return FileVisitResult.CONTINUE; + } + }); + } catch (IOException e) { + Logger.defaultLogError(e); + } + } + + public void handleCreatedClusters() { + if ((serviceMode & SERVICE_MODE_CREATE) > 0) { + createdClusters.forEach(new TLongProcedure() { + + @Override + public boolean execute(long value) { + ClusterImpl cluster = clusterTable.getClusterByClusterId(value); + cluster.setImmutable(true, clusterTranslator); + return true; + } + }); + } + createdClusters.clear(); + } + + @Override + public Object getModificationCounter() { + return queryProvider2.modificationCounter; + } + + @Override + public void markUndoPoint() { + state.setCombine(false); + } + +}