/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package fi.vtt.simantics.procore.internal; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.TreeMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.core.runtime.Platform; import org.simantics.databoard.Bindings; import org.simantics.db.AsyncReadGraph; import org.simantics.db.ChangeSet; import org.simantics.db.DevelopmentKeys; import org.simantics.db.ExternalValueSupport; import org.simantics.db.Metadata; import org.simantics.db.MonitorContext; import org.simantics.db.MonitorHandler; import org.simantics.db.Resource; import org.simantics.db.ResourceSerializer; import org.simantics.db.Session; import org.simantics.db.SessionManager; import org.simantics.db.SessionVariables; import org.simantics.db.VirtualGraph; import org.simantics.db.WriteGraph; import org.simantics.db.authentication.UserAuthenticationAgent; import org.simantics.db.authentication.UserAuthenticator; import org.simantics.db.common.Indexing; import org.simantics.db.common.TransactionPolicyRelease; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; import org.simantics.db.common.procedure.adapter.ProcedureAdapter; import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener; import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener; import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure; import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure; import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener; import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener; import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure; import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure; import org.simantics.db.common.service.ServiceActivityMonitorImpl; import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions; import org.simantics.db.common.utils.Logger; import org.simantics.db.event.ChangeEvent; import org.simantics.db.event.ChangeListener; import org.simantics.db.event.SessionEventListener; import org.simantics.db.exception.CancelTransactionException; import org.simantics.db.exception.ClusterSetExistException; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.ImmutableException; import org.simantics.db.exception.InvalidResourceReferenceException; import org.simantics.db.exception.ResourceNotFoundException; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.exception.ServiceException; import org.simantics.db.exception.ServiceNotFoundException; import org.simantics.db.impl.ClusterBase; import org.simantics.db.impl.ClusterI; import org.simantics.db.impl.ClusterTraitsBase; import org.simantics.db.impl.ClusterTranslator; import org.simantics.db.impl.ResourceImpl; import org.simantics.db.impl.TransientGraph; import org.simantics.db.impl.VirtualGraphImpl; import org.simantics.db.impl.graph.DelayedWriteGraph; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.impl.graph.WriteSupport; import org.simantics.db.impl.internal.RandomAccessValueSupport; import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4; import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4; import org.simantics.db.impl.query.QueryProcessor; import org.simantics.db.impl.query.QueryProcessor.SessionRead; import org.simantics.db.impl.query.QueryProcessor.SessionTask; import org.simantics.db.impl.service.QueryDebug; import org.simantics.db.impl.support.VirtualGraphServerSupport; import org.simantics.db.impl.support.WriteRequestScheduleSupport; import org.simantics.db.procedure.AsyncListener; import org.simantics.db.procedure.AsyncMultiListener; import org.simantics.db.procedure.AsyncMultiProcedure; import org.simantics.db.procedure.AsyncProcedure; import org.simantics.db.procedure.Listener; import org.simantics.db.procedure.ListenerBase; import org.simantics.db.procedure.MultiListener; import org.simantics.db.procedure.MultiProcedure; import org.simantics.db.procedure.Procedure; import org.simantics.db.procedure.SyncListener; import org.simantics.db.procedure.SyncMultiListener; import org.simantics.db.procedure.SyncMultiProcedure; import org.simantics.db.procedure.SyncProcedure; import org.simantics.db.procore.cluster.ClusterImpl; import org.simantics.db.procore.cluster.ClusterTraits; import org.simantics.db.procore.protocol.Constants; import org.simantics.db.procore.protocol.DebugPolicy; import org.simantics.db.request.AsyncMultiRead; import org.simantics.db.request.AsyncRead; import org.simantics.db.request.DelayedWrite; import org.simantics.db.request.DelayedWriteResult; import org.simantics.db.request.ExternalRead; import org.simantics.db.request.MultiRead; import org.simantics.db.request.Read; import org.simantics.db.request.ReadInterface; import org.simantics.db.request.UndoTraits; import org.simantics.db.request.Write; import org.simantics.db.request.WriteInterface; import org.simantics.db.request.WriteOnly; import org.simantics.db.request.WriteOnlyResult; import org.simantics.db.request.WriteResult; import org.simantics.db.request.WriteTraits; import org.simantics.db.service.ByteReader; import org.simantics.db.service.ClusterBuilder; import org.simantics.db.service.ClusterBuilderFactory; import org.simantics.db.service.ClusterControl; import org.simantics.db.service.ClusterSetsSupport; import org.simantics.db.service.ClusterUID; import org.simantics.db.service.ClusteringSupport; import org.simantics.db.service.CollectionSupport; import org.simantics.db.service.DebugSupport; import org.simantics.db.service.DirectQuerySupport; import org.simantics.db.service.GraphChangeListenerSupport; import org.simantics.db.service.InitSupport; import org.simantics.db.service.LifecycleSupport; import org.simantics.db.service.ManagementSupport; import org.simantics.db.service.QueryControl; import org.simantics.db.service.SerialisationSupport; import org.simantics.db.service.ServerInformation; import org.simantics.db.service.ServiceActivityMonitor; import org.simantics.db.service.SessionEventSupport; import org.simantics.db.service.SessionMonitorSupport; import org.simantics.db.service.SessionUserSupport; import org.simantics.db.service.StatementSupport; import org.simantics.db.service.TransactionPolicySupport; import org.simantics.db.service.TransactionSupport; import org.simantics.db.service.TransferableGraphSupport; import org.simantics.db.service.UndoRedoSupport; import org.simantics.db.service.VirtualGraphSupport; import org.simantics.db.service.XSupport; import org.simantics.layer0.Layer0; import org.simantics.utils.DataContainer; import org.simantics.utils.Development; import org.simantics.utils.datastructures.Callback; import org.simantics.utils.threads.logger.ITask; import org.simantics.utils.threads.logger.ThreadLogger; import gnu.trove.procedure.TLongProcedure; import gnu.trove.set.hash.TLongHashSet; public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport { protected static final boolean DEBUG = false; private static final boolean DIAGNOSTICS = false; private TransactionPolicySupport transactionPolicy; final private ClusterControl clusterControl; final protected BuiltinSupportImpl builtinSupport; final protected VirtualGraphServerSupportImpl virtualGraphServerSupport; final protected ClusterSetsSupport clusterSetsSupport; final protected LifecycleSupportImpl lifecycleSupport; protected QuerySupportImpl querySupport; protected ResourceSupportImpl resourceSupport; protected WriteSupport writeSupport; public ClusterTranslator clusterTranslator; boolean dirtyPrimitives = false; public static final int SERVICE_MODE_CREATE = 2; public static final int SERVICE_MODE_ALLOW = 1; public int serviceMode = 0; public boolean createdImmutableClusters = false; public TLongHashSet createdClusters = new TLongHashSet(); private Layer0 L0; /** * The service locator maintained by the workbench. These services are * initialized during workbench during the init method. */ final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl(); final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl(); final CopyOnWriteArrayList changeListeners2 = new CopyOnWriteArrayList(); final CopyOnWriteArrayList metadataListeners = new CopyOnWriteArrayList(); final CopyOnWriteArrayList eventListeners = new CopyOnWriteArrayList(); final HashSet sessionThreads = new HashSet(); final BijectionMap monitorContexts = new BijectionMap(); final protected State state = new State(); protected GraphSession graphSession = null; protected SessionManager sessionManagerImpl = null; protected UserAuthenticationAgent authAgent = null; protected UserAuthenticator authenticator = null; protected Resource user = null; protected ClusterStream clusterStream = null; protected SessionRequestManager requestManager = null; public ClusterTable clusterTable = null; public QueryProcessor queryProvider2 = null; ClientChangesImpl clientChanges = null; MonitorHandler[] monitorHandlers = new MonitorHandler[0]; // protected long newClusterId = Constants.NullClusterId; protected int flushCounter = 0; protected boolean writeOnly = false; WriteState writeState = null; WriteStateBase delayedWriteState = null; protected Resource defaultClusterSet = null; // If not null then used for newResource(). public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) { // if (authAgent == null) // throw new IllegalArgumentException("null authentication agent"); File t = StaticSessionProperties.virtualGraphStoragePath; if (null == t) t = new File("."); this.clusterTable = new ClusterTable(this, t); this.builtinSupport = new BuiltinSupportImpl(this); this.sessionManagerImpl = sessionManagerImpl; this.user = null; // this.authAgent = authAgent; serviceLocator.registerService(Session.class, this); serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this)); serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this)); serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this)); serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this)); serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this)); serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this)); serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this)); serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this)); serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this)); serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this)); serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this)); serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this)); serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this)); serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this)); serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this)); serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this)); serviceLocator.registerService(XSupport.class, new XSupportImpl(this)); serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl()); serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this)); serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl()); serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl()); ServiceActivityUpdaterForWriteTransactions.register(this); this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t); serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport); serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport); this.lifecycleSupport = new LifecycleSupportImpl(this); serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport); this.transactionPolicy = new TransactionPolicyRelease(); serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy); this.clusterControl = new ClusterControlImpl(this); serviceLocator.registerService(ClusterControl.class, clusterControl); this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs. this.clusterSetsSupport.updateReadAndWriteDirectories(t.toPath(), t.toPath()); serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport); } /* * * Session interface */ @Override public Resource getRootLibrary() { return queryProvider2.getRootLibraryResource(); } void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException { if (!graphSession.dbSession.refreshEnabled()) return; try { getClusterTable().refresh(csid, this, clusterUID); } catch (Throwable t) { Logger.defaultLogError("Refesh failed.", t); } } static final class TaskHelper { private final String name; private Object result; final Semaphore sema = new Semaphore(0); private Throwable throwable = null; final Callback callback = new Callback() { @Override public void run(DatabaseException e) { synchronized (TaskHelper.this) { throwable = e; } } }; final Procedure proc = new Procedure() { @Override public void execute(Object result) { callback.run(null); } @Override public void exception(Throwable t) { if (t instanceof DatabaseException) callback.run((DatabaseException)t); else callback.run(new DatabaseException("" + name + "operation failed.", t)); } }; final WriteTraits writeTraits = new WriteTraits() { @Override public UndoTraits getUndoTraits() { return null; } @Override public VirtualGraph getProvider() { return null; } }; TaskHelper(String name) { this.name = name; } T getResult() { return (T)result; } void setResult(Object result) { this.result = result; } // Throwable throwableGet() { // return throwable; // } synchronized void throwableSet(Throwable t) { throwable = t; } // void throwableSet(String t) { // throwable = new InternalException("" + name + " operation failed. " + t); // } synchronized void throwableCheck() throws DatabaseException { if (null != throwable) if (throwable instanceof DatabaseException) throw (DatabaseException)throwable; else throw new DatabaseException("Undo operation failed.", throwable); } void throw_(String message) throws DatabaseException { throw new DatabaseException("" + name + " operation failed. " + message); } } private ListenerBase getListenerBase(Object procedure) { if (procedure instanceof ListenerBase) return (ListenerBase) procedure; else return null; } public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } /* (non-Javadoc) * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) */ @Override public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify, Boolean combine) { assert (request != null); if(Development.DEVELOPMENT) { try { if(Development.getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN)) System.err.println("schedule write '" + request + "'"); } catch (Throwable t) { Logger.defaultLogError(t); } } int thread = request.hashCode() & queryProvider2.THREAD_MASK; requestManager.scheduleWrite(new SessionTask(request, thread, thread) { @Override public void run(int thread) { if(Development.DEVELOPMENT) { try { if(Development.getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN)) System.err.println("perform write '" + request + "'"); } catch (Throwable t) { Logger.defaultLogError(t); } } ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); fireSessionVariableChange(SessionVariables.QUEUED_WRITES); try { flushCounter = 0; clientChanges = new ClientChangesImpl(SessionImplSocket.this); VirtualGraph vg = getProvider(request.getProvider()); WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); writeState = new WriteState(writer, request, notify, new Procedure() { @Override public void execute(Object result) { if(callback != null) callback.run(null); } @Override public void exception(Throwable t) { if(callback != null) callback.run((DatabaseException)t); } }); assert (null != writer); // writer.state.barrier.inc(); try { request.perform(writer); assert (null != writer); } catch (Throwable t) { if (!(t instanceof CancelTransactionException)) Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t); writeState.except(t); } finally { // writer.state.barrier.dec(); // writer.waitAsync(request); } assert(!queryProvider2.dirty); } catch (Throwable e) { // Log it first, just to be safe that the error is always logged. if (!(e instanceof CancelTransactionException)) Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); // writeState.getGraph().state.barrier.dec(); // writeState.getGraph().waitAsync(request); writeState.except(e); // try { // // Callback is client code, we have to be prepared for it to throw unexpected exceptions. // // All we can do here is to log those, can't really pass them anywhere. // if (callback != null) { // if(e instanceof DatabaseException) callback.run((DatabaseException)e); // else callback.run(new DatabaseException(e)); // } // } catch (Throwable e2) { // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2); // } // boolean empty = clusterStream.reallyFlush(); // int callerThread = -1; // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this); // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1); // try { // // Send all local changes to server so it can calculate correct reverse change set. // // This will call clusterStream.accept(). // state.cancelCommit(context, clusterStream); // if (!empty) { // if (!context.isOk()) // this is a blocking operation // throw new InternalException("Cancel failed. This should never happen. Contact application support."); // getQueryProvider2().performDirtyUpdates(writeState.getGraph()); // } // state.cancelCommit2(context, clusterStream); // } catch (DatabaseException e3) { // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3); // } finally { // clusterStream.setOff(false); // clientChanges = new ClientChangesImpl(SessionImplSocket.this); // } } finally { fireSessionVariableChange(SessionVariables.QUEUED_WRITES); } task.finish(); if(Development.DEVELOPMENT) { try { if(Development.getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN)) System.err.println("finish write '" + request + "'"); } catch (Throwable t) { Logger.defaultLogError(t); } } } }, combine); } public void scheduleRequest(final WriteResult request, final Procedure procedure, final Semaphore notify) { scheduleRequest(request, procedure, notify, null); } /* (non-Javadoc) * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean) */ @Override public void scheduleRequest(final WriteResult request, final Procedure procedure, final Semaphore notify, Boolean combine) { assert (request != null); int thread = request.hashCode() & queryProvider2.THREAD_MASK; requestManager.scheduleWrite(new SessionTask(request, thread) { @Override public void run(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; clientChanges = new ClientChangesImpl(SessionImplSocket.this); VirtualGraph vg = getProvider(request.getProvider()); WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); try { WriteState writeStateT = new WriteState(writer, request, notify, procedure); writeState = writeStateT; assert (null != writer); // writer.state.barrier.inc(); writeStateT.setResult(request.perform(writer)); assert (null != writer); // writer.state.barrier.dec(); // writer.waitAsync(null); } catch (Throwable e) { // writer.state.barrier.dec(); // writer.waitAsync(null); writeState.except(e); // state.stopWriteTransaction(clusterStream); // // } catch (Throwable e) { // // Log it first, just to be safe that the error is always logged. // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); // // try { // // Callback is client code, we have to be prepared for it to throw unexpected exceptions. // // All we can do here is to log those, can't really pass them anywhere. // if (procedure != null) { // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e); // else procedure.exception(new DatabaseException(e)); // } // } catch (Throwable e2) { // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2); // } // // clientChanges = new ClientChangesImpl(SessionImplSocket.this); // // state.stopWriteTransaction(clusterStream); } finally { fireSessionVariableChange(SessionVariables.QUEUED_WRITES); } // if(notify != null) notify.release(); task.finish(); } }, combine); } public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } /* (non-Javadoc) * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) */ @Override public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify, Boolean combine) { final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite"); assert (request != null); int thread = request.hashCode() & queryProvider2.THREAD_MASK; requestManager.scheduleWrite(new SessionTask(request, thread) { @Override public void run(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); Procedure stateProcedure = new Procedure() { @Override public void execute(Object result) { if (callback != null) callback.run(null); } @Override public void exception(Throwable t) { if (callback != null) { if (t instanceof DatabaseException) callback.run((DatabaseException) t); else callback.run(new DatabaseException(t)); } else Logger.defaultLogError("Unhandled exception", t); } }; final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); delayedWriteState = new WriteStateBase(request, notify, stateProcedure); DelayedWriteGraph dwg = null; // newGraph.state.barrier.inc(); try { dwg = new DelayedWriteGraph(newGraph); request.perform(dwg); } catch (Throwable e) { delayedWriteState.except(e); total.finish(); return; } finally { // newGraph.state.barrier.dec(); // newGraph.waitAsync(request); fireSessionVariableChange(SessionVariables.QUEUED_READS); } delayedWriteState = null; ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit"); fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); VirtualGraph vg = getProvider(request.getProvider()); WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport(); WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); writeState = new WriteState(writer, request, notify, stateProcedure); assert (null != writer); // writer.state.barrier.inc(); try { // Cannot cancel dwg.commit(writer, request); if(defaultClusterSet != null) { XSupport xs = getService(XSupport.class); defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet); ClusteringSupport cs = getService(ClusteringSupport.class); clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet)); } // This makes clusters available from server clusterStream.reallyFlush(); releaseWriteOnly(writer); handleUpdatesAndMetadata(writer); } catch (ServiceException e) { // writer.state.barrier.dec(); // writer.waitAsync(null); // These shall be requested from server clusterTable.removeWriteOnlyClusters(); // This makes clusters available from server clusterStream.reallyFlush(); releaseWriteOnly(writer); writeState.except(e); } finally { // Debugging & Profiling fireSessionVariableChange(SessionVariables.QUEUED_WRITES); task2.finish(); total.finish(); } } }, combine); } public void scheduleRequest(final DelayedWriteResult request, final Procedure procedure, final Semaphore notify) { scheduleRequest(request, procedure, notify, null); } /* (non-Javadoc) * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean) */ @Override public void scheduleRequest(final DelayedWriteResult request, final Procedure procedure, final Semaphore notify, Boolean combine) { throw new Error("Not implemented"); } protected ClusterImpl getNewResourceCluster() throws DatabaseException { ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly); if((serviceMode & SERVICE_MODE_CREATE) > 0) { createdClusters.add(cluster.clusterId); } return cluster; } class WriteOnlySupport implements WriteSupport { ClusterStream stream; ClusterImpl currentCluster; public WriteOnlySupport() { this.stream = clusterStream; } @Override public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException { claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id ); } @Override public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException { ClusterImpl cluster = clusterTable.getClusterByResourceKey(s); if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0) if(s != queryProvider2.getRootLibrary()) throw new ImmutableException("Trying to modify immutable resource key=" + s); try { maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator)); } catch (DatabaseException e) { Logger.defaultLogError(e); //throw e; } clientChanges.invalidate(s); if (cluster.isWriteOnly()) return; queryProvider2.updateStatements(s, p); } @Override public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException { claimValue(provider, ((ResourceImpl)resource).id, value, value.length); } @Override public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) { ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid); try { maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator)); } catch (DatabaseException e) { Logger.defaultLogError(e); } clientChanges.invalidate(rid); if (cluster.isWriteOnly()) return; queryProvider2.updateValue(rid); } @Override public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException { if(amount < 65536) { claimValue(provider,resource, reader.readBytes(null, amount)); return; } byte[] bytes = new byte[65536]; int rid = ((ResourceImpl)resource).id; ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid); try { int left = amount; while(left > 0) { int block = Math.min(left, 65536); reader.readBytes(bytes, block); maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator)); left -= block; } } catch (DatabaseException e) { Logger.defaultLogError(e); } clientChanges.invalidate(rid); if (cluster.isWriteOnly()) return; queryProvider2.updateValue(rid); } private void maintainCluster(ClusterImpl before, ClusterI after_) { if(after_ != null && after_ != before) { ClusterImpl after = (ClusterImpl)after_; if(currentCluster == before) currentCluster = after; clusterTable.replaceCluster(after); } } public int createResourceKey(int foreignCounter) throws DatabaseException { if(currentCluster == null) currentCluster = getNewResourceCluster(); if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) { ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster(); newCluster.foreignLookup = new byte[foreignCounter]; currentCluster = newCluster; if (DEBUG) System.err.println("foreignLookup " + currentCluster + " " + foreignCounter); } return currentCluster.createResource(clusterTranslator); } @Override public Resource createResource(VirtualGraph provider) throws DatabaseException { if(currentCluster == null) { if (null != defaultClusterSet) { ResourceImpl result = getNewResource(defaultClusterSet); currentCluster = clusterTable.getClusterByResourceKey(result.id); return result; } else { currentCluster = getNewResourceCluster(); } } if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) { if (null != defaultClusterSet) { ResourceImpl result = getNewResource(defaultClusterSet); currentCluster = clusterTable.getClusterByResourceKey(result.id); return result; } else { currentCluster = getNewResourceCluster(); } } return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator)); } @Override public Resource createResource(VirtualGraph provider, long clusterId) throws DatabaseException { return getNewResource(clusterId); } @Override public Resource createResource(VirtualGraph provider, Resource clusterSet) throws DatabaseException { return getNewResource(clusterSet); } @Override public void createClusterSet(VirtualGraph provider, Resource clusterSet) throws DatabaseException { getNewClusterSet(clusterSet); } @Override public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet) throws ServiceException { return containsClusterSet(clusterSet); } public void selectCluster(long cluster) { currentCluster = clusterTable.getClusterByClusterId(cluster); long setResourceId = clusterSetsSupport.getSet(cluster); clusterSetsSupport.put(setResourceId, cluster); } @Override public Resource setDefaultClusterSet(Resource clusterSet) throws ServiceException { Resource result = setDefaultClusterSet4NewResource(clusterSet); if(clusterSet != null) { long id = clusterSetsSupport.get(clusterSet.getResourceId()); currentCluster = clusterTable.getClusterByClusterId(id); return result; } else { currentCluster = null; return null; } } @Override public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException { provider = getProvider(provider); if (null == provider) { int key = ((ResourceImpl)resource).id; ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key); // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0) // if(key != queryProvider2.getRootLibrary()) // throw new ImmutableException("Trying to modify immutable resource key=" + key); // try { // cluster.removeValue(key, clusterTranslator); // } catch (DatabaseException e) { // Logger.defaultLogError(e); // return; // } clusterTable.writeOnlyInvalidate(cluster); try { int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key); clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION); clusterTranslator.removeValue(cluster); } catch (DatabaseException e) { Logger.defaultLogError(e); } queryProvider2.invalidateResource(key); clientChanges.invalidate(key); } else { ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id); queryProvider2.updateValue(querySupport.getId(resource)); clientChanges.claimValue(resource); } } @Override public void flush(boolean intermediate) { throw new UnsupportedOperationException(); } @Override public void flushCluster() { clusterTable.flushCluster(graphSession); if(defaultClusterSet != null) { clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId); } currentCluster = null; } @Override public void flushCluster(Resource r) { throw new UnsupportedOperationException("flushCluster resource " + r); } @Override public void gc() { } @Override public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) { int s = ((ResourceImpl)subject).id; int p = ((ResourceImpl)predicate).id; int o = ((ResourceImpl)object).id; provider = getProvider(provider); if (null == provider) { ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s); clusterTable.writeOnlyInvalidate(cluster); try { int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s); int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p); int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o); ClusterI pc = clusterTable.getClusterProxyByResourceKey(p); ClusterI oc = clusterTable.getClusterProxyByResourceKey(o); clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION); clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION); clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION); clusterTranslator.removeStatement(cluster); queryProvider2.invalidateResource(s); clientChanges.invalidate(s); } catch (DatabaseException e) { Logger.defaultLogError(e); } return true; } else { ((VirtualGraphImpl)provider).deny(s, p, o); queryProvider2.invalidateResource(s); clientChanges.invalidate(s); return true; } } @Override public void setValue(VirtualGraph provider, Resource resource, byte[] value) { throw new UnsupportedOperationException(); } @Override public boolean writeOnly() { return true; } @Override public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException { writeSupport.performWriteRequest(graph, request); } @Override public T performWriteRequest(WriteGraph graph, WriteResult request) throws DatabaseException { throw new UnsupportedOperationException(); } @Override public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException { throw new UnsupportedOperationException(); } @Override public void addMetadata(Metadata data) throws ServiceException { writeSupport.addMetadata(data); } @Override public T getMetadata(Class clazz) throws ServiceException { return writeSupport.getMetadata(clazz); } @Override public TreeMap getMetadata() { return writeSupport.getMetadata(); } @Override public void commitDone(WriteTraits writeTraits, long csid) { writeSupport.commitDone(writeTraits, csid); } @Override public void clearUndoList(WriteTraits writeTraits) { writeSupport.clearUndoList(writeTraits); } @Override public int clearMetadata() { return writeSupport.clearMetadata(); } @Override public void startUndo() { writeSupport.startUndo(); } } class VirtualWriteOnlySupport implements WriteSupport { // @Override // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate, // Resource object) { // throw new UnsupportedOperationException(); // } @Override public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) { TransientGraph impl = (TransientGraph)provider; impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object)); getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate)); clientChanges.claim(subject, predicate, object); } @Override public void claim(VirtualGraph provider, int subject, int predicate, int object) { TransientGraph impl = (TransientGraph)provider; impl.claim(subject, predicate, object); getQueryProvider2().updateStatements(subject, predicate); clientChanges.claim(subject, predicate, object); } @Override public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException { claimValue(provider, ((ResourceImpl)resource).id, value, value.length); } @Override public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) { ((VirtualGraphImpl)provider).claimValue(resource, value, length); getQueryProvider2().updateValue(resource); clientChanges.claimValue(resource); } @Override public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException { byte[] value = reader.readBytes(null, amount); claimValue(provider, resource, value); } @Override public Resource createResource(VirtualGraph provider) { TransientGraph impl = (TransientGraph)provider; return impl.getResource(impl.newResource(false)); } @Override public Resource createResource(VirtualGraph provider, long clusterId) { throw new UnsupportedOperationException(); } @Override public Resource createResource(VirtualGraph provider, Resource clusterSet) throws DatabaseException { throw new UnsupportedOperationException(); } @Override public void createClusterSet(VirtualGraph provider, Resource clusterSet) throws DatabaseException { throw new UnsupportedOperationException(); } @Override public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet) throws ServiceException { throw new UnsupportedOperationException(); } @Override public Resource setDefaultClusterSet(Resource clusterSet) throws ServiceException { return null; } @Override public void denyValue(VirtualGraph provider, Resource resource) { ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id); getQueryProvider2().updateValue(querySupport.getId(resource)); // NOTE: this only keeps track of value changes by-resource. clientChanges.claimValue(resource); } @Override public void flush(boolean intermediate) { throw new UnsupportedOperationException(); } @Override public void flushCluster() { throw new UnsupportedOperationException(); } @Override public void flushCluster(Resource r) { throw new UnsupportedOperationException("Resource " + r); } @Override public void gc() { } @Override public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) { TransientGraph impl = (TransientGraph) provider; impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object)); getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate)); clientChanges.deny(subject, predicate, object); return true; } @Override public void setValue(VirtualGraph provider, Resource resource, byte[] value) { throw new UnsupportedOperationException(); } @Override public boolean writeOnly() { return true; } @Override public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException { throw new UnsupportedOperationException(); } @Override public T performWriteRequest(WriteGraph graph, WriteResult request) throws DatabaseException { throw new UnsupportedOperationException(); } @Override public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException { throw new UnsupportedOperationException(); } @Override public void addMetadata(Metadata data) throws ServiceException { throw new UnsupportedOperationException(); } @Override public T getMetadata(Class clazz) throws ServiceException { throw new UnsupportedOperationException(); } @Override public TreeMap getMetadata() { throw new UnsupportedOperationException(); } @Override public void commitDone(WriteTraits writeTraits, long csid) { } @Override public void clearUndoList(WriteTraits writeTraits) { } @Override public int clearMetadata() { return 0; } @Override public void startUndo() { } } private void performWriteOnly(WriteOnlyResult request, Semaphore notify, Procedure callback) { try { int thread = request.hashCode() & queryProvider2.THREAD_MASK; fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport(); WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider()); WriteState writeStateT = new WriteState(writer, request, notify, callback); writeState = writeStateT; assert (null != writer); // writer.state.barrier.inc(); long start = System.nanoTime(); T result = request.perform(writer); long duration = System.nanoTime() - start; if (DEBUG) { System.err.println("################"); System.err.println("WriteOnly duration " + 1e-9*duration); } writeStateT.setResult(result); // This makes clusters available from server clusterStream.reallyFlush(); // This will trigger query updates releaseWriteOnly(writer); assert (null != writer); handleUpdatesAndMetadata(writer); } catch (CancelTransactionException e) { releaseWriteOnly(writeState.getGraph()); clusterTable.removeWriteOnlyClusters(); state.stopWriteTransaction(clusterStream); } catch (Throwable e) { e.printStackTrace(); releaseWriteOnly(writeState.getGraph()); clusterTable.removeWriteOnlyClusters(); if (callback != null) callback.exception(new DatabaseException(e)); state.stopWriteTransaction(clusterStream); Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); } finally { fireSessionVariableChange(SessionVariables.QUEUED_WRITES); } } public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } @Override public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify, Boolean combine) { assertAlive(); assert (request != null); int thread = request.hashCode() & queryProvider2.THREAD_MASK; requestManager.scheduleWrite(new SessionTask(request, thread, thread) { @Override public void run(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); try { fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); VirtualGraph vg = getProvider(request.getProvider()); WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport(); WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); writeState = new WriteState(writer, request, notify, new Procedure() { @Override public void execute(Object result) { if(callback != null) callback.run(null); } @Override public void exception(Throwable t) { if(callback != null) callback.run((DatabaseException)t); } }); assert (null != writer); // writer.state.barrier.inc(); try { request.perform(writer); } catch (Throwable e) { // writer.state.barrier.dec(); // writer.waitAsync(null); releaseWriteOnly(writer); clusterTable.removeWriteOnlyClusters(); if(!(e instanceof CancelTransactionException)) { if (callback != null) callback.run(new DatabaseException(e)); } writeState.except(e); return; } // This makes clusters available from server boolean empty = clusterStream.reallyFlush(); // This was needed to make WO requests call metadata listeners. // NOTE: the calling event does not contain clientChanges information. if (!empty && clientChanges.isEmpty()) clientChanges.setNotEmpty(true); releaseWriteOnly(writer); assert (null != writer); } finally { fireSessionVariableChange(SessionVariables.QUEUED_WRITES); } task.finish(); } }, combine); } public void scheduleRequest(final WriteOnlyResult request, final Procedure callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } /* (non-Javadoc) * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean) */ @Override public void scheduleRequest(final WriteOnlyResult request, final Procedure callback, final Semaphore notify, Boolean combine) { assert (request != null); int thread = request.hashCode() & queryProvider2.THREAD_MASK; requestManager.scheduleWrite(new SessionTask(request, thread) { @Override public void run(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); performWriteOnly(request, notify, callback); task.finish(); } }, combine); } public void scheduleRequest(final Read request, final AsyncProcedure procedure, final Semaphore notify, final DataContainer throwable, final DataContainer result) { assert (request != null); assert (procedure != null); int thread = request.hashCode() & queryProvider2.THREAD_MASK; requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) { @Override public void run(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); ListenerBase listener = getListenerBase(procedure); final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); try { if (listener != null) { try { newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure() { @Override public void exception(AsyncReadGraph graph, Throwable t) { procedure.exception(graph, t); if(throwable != null) { throwable.set(t); } else { // ErrorLogger.defaultLogError("Unhandled exception", t); } } @Override public void execute(AsyncReadGraph graph, T t) { if(result != null) result.set(t); procedure.execute(graph, t); } }, listener); } catch (Throwable t) { // This is handled by the AsyncProcedure //Logger.defaultLogError("Internal error", t); } } else { try { // newGraph.state.barrier.inc(); T t = request.perform(newGraph); try { if(result != null) result.set(t); procedure.execute(newGraph, t); } catch (Throwable th) { if(throwable != null) { throwable.set(th); } else { Logger.defaultLogError("Unhandled exception", th); } } } catch (Throwable t) { if (DEBUG) t.printStackTrace(); if(throwable != null) { throwable.set(t); } else { Logger.defaultLogError("Unhandled exception", t); } try { procedure.exception(newGraph, t); } catch (Throwable t2) { if(throwable != null) { throwable.set(t2); } else { Logger.defaultLogError("Unhandled exception", t2); } } } // newGraph.state.barrier.dec(); // newGraph.waitAsync(request); } } finally { fireSessionVariableChange(SessionVariables.QUEUED_READS); } } }); } public void scheduleRequest(final AsyncRead request, final AsyncProcedure procedure, final ListenerBase listener, final Semaphore notify) { assert (request != null); assert (procedure != null); int thread = request.hashCode() & queryProvider2.THREAD_MASK; requestManager.scheduleRead(new SessionRead(request, null, notify, thread) { @Override public void run(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); try { if (listener != null) { newGraph.processor.query(newGraph, request, null, procedure, listener); // newGraph.waitAsync(request); } else { final ResultCallWrappedSingleQueryProcedure4 wrapper = new ResultCallWrappedSingleQueryProcedure4( procedure, "request"); try { // newGraph.state.barrier.inc(); request.perform(newGraph, wrapper); // newGraph.waitAsync(request); } catch (Throwable t) { wrapper.exception(newGraph, t); // newGraph.waitAsync(request); } } } finally { fireSessionVariableChange(SessionVariables.QUEUED_READS); } } }); } public void scheduleRequest(final AsyncMultiRead request, final AsyncMultiProcedure procedure, final Semaphore notify) { assert (request != null); assert (procedure != null); int thread = request.hashCode() & queryProvider2.THREAD_MASK; int sync = notify != null ? thread : -1; requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) { @Override public void run(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); ListenerBase listener = getListenerBase(procedure); final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); try { if (listener != null) { newGraph.processor.query(newGraph, request, null, procedure, listener); // newGraph.waitAsync(request); } else { final ResultCallWrappedQueryProcedure4 wrapper = new ResultCallWrappedQueryProcedure4(procedure); try { request.perform(newGraph, wrapper); } catch (Throwable t) { t.printStackTrace(); } } } finally { fireSessionVariableChange(SessionVariables.QUEUED_READS); } } }); } public void scheduleRequest(final ExternalRead request, final Procedure procedure, final Semaphore notify, final DataContainer throwable, final DataContainer result) { assert (request != null); assert (procedure != null); int thread = request.hashCode() & queryProvider2.THREAD_MASK; requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) { @Override public void run(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); ListenerBase listener = getListenerBase(procedure); final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); try { if (listener != null) { newGraph.processor.query(newGraph, request, null, new Procedure() { @Override public void exception(Throwable t) { procedure.exception(t); if(throwable != null) { throwable.set(t); } } @Override public void execute(T t) { if(result != null) result.set(t); procedure.execute(t); } }, listener); // newGraph.waitAsync(request); } else { // newGraph.state.barrier.inc(); request.register(newGraph, new Listener() { @Override public void exception(Throwable t) { if(throwable != null) throwable.set(t); procedure.exception(t); // newGraph.state.barrier.dec(); } @Override public void execute(T t) { if(result != null) result.set(t); procedure.execute(t); // newGraph.state.barrier.dec(); } @Override public boolean isDisposed() { return true; } }); // newGraph.waitAsync(request); } } finally { fireSessionVariableChange(SessionVariables.QUEUED_READS); } } }); } @Override public void asyncRequest(final Read request, final AsyncProcedure procedure) { scheduleRequest(request, procedure, null, null, null); } @Override public T syncRequest(Read request, AsyncProcedure procedure) throws DatabaseException { assertNotSession(); Semaphore notify = new Semaphore(0); DataContainer container = new DataContainer(); DataContainer result = new DataContainer(); scheduleRequest(request, procedure, notify, container, result); acquire(notify, request); Throwable throwable = container.get(); if(throwable != null) { if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; else throw new DatabaseException("Unexpected exception", throwable); } return result.get(); } @Override public T syncRequest(AsyncRead request, final AsyncProcedure procedure) throws DatabaseException { assertNotSession(); Semaphore notify = new Semaphore(0); final DataContainer exceptionContainer = new DataContainer(); final DataContainer resultContainer = new DataContainer(); scheduleRequest(request, new AsyncProcedure() { @Override public void exception(AsyncReadGraph graph, Throwable throwable) { exceptionContainer.set(throwable); procedure.exception(graph, throwable); } @Override public void execute(AsyncReadGraph graph, T result) { resultContainer.set(result); procedure.execute(graph, result); } }, getListenerBase(procedure), notify); acquire(notify, request, procedure); Throwable throwable = exceptionContainer.get(); if (throwable != null) { if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; else throw new DatabaseException("Unexpected exception", throwable); } return resultContainer.get(); } @Override public Collection syncRequest(AsyncMultiRead request, final AsyncMultiProcedure procedure) throws DatabaseException { assertNotSession(); Semaphore notify = new Semaphore(0); final DataContainer exceptionContainer = new DataContainer(); scheduleRequest(request, new AsyncMultiProcedure() { @Override public void exception(AsyncReadGraph graph, Throwable throwable) { exceptionContainer.set(throwable); procedure.exception(graph, throwable); } @Override public void execute(AsyncReadGraph graph, T result) { procedure.execute(graph, result); } @Override public void finished(AsyncReadGraph graph) { procedure.finished(graph); } }, notify); acquire(notify, request, procedure); Throwable throwable = exceptionContainer.get(); if (throwable != null) { if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; else throw new DatabaseException("Unexpected exception", throwable); } // TODO: implement return value System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)"); return null; } @Override public T syncRequest(final ExternalRead request) throws DatabaseException { return syncRequest(request, new ProcedureAdapter()); // assert(request != null); // // final DataContainer result = new DataContainer(); // final DataContainer exception = new DataContainer(); // // syncRequest(request, new Procedure() { // // @Override // public void execute(T t) { // result.set(t); // } // // @Override // public void exception(Throwable t) { // exception.set(t); // } // // }); // // Throwable t = exception.get(); // if(t != null) { // if(t instanceof DatabaseException) throw (DatabaseException)t; // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t); // } // // return result.get(); } @Override public T syncRequest(ExternalRead request, final Listener procedure) throws DatabaseException { return syncRequest(request, (Procedure)procedure); } // @Override // public T syncRequest(Read request, AsyncProcedure procedure) throws DatabaseException { // assertNotSession(); // Semaphore notify = new Semaphore(0); // DataContainer container = new DataContainer(); // DataContainer result = new DataContainer(); // scheduleRequest(request, procedure, notify, container, result); // acquire(notify, request); // Throwable throwable = container.get(); // if(throwable != null) { // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; // else throw new DatabaseException("Unexpected exception", throwable); // } // return result.get(); // } @Override public T syncRequest(ExternalRead request, final Procedure procedure) throws DatabaseException { assertNotSession(); Semaphore notify = new Semaphore(0); final DataContainer container = new DataContainer(); final DataContainer result = new DataContainer(); scheduleRequest(request, procedure, notify, container, result); acquire(notify, request); Throwable throwable = container.get(); if (throwable != null) { if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; else throw new DatabaseException("Unexpected exception", throwable); } return result.get(); } @Override public void syncRequest(Write request) throws DatabaseException { assertNotSession(); assertAlive(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); scheduleRequest(request, new Callback() { @Override public void run(DatabaseException e) { exception.set(e); } }, notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @Override public T syncRequest(WriteResult request) throws DatabaseException { assertNotSession(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); final DataContainer result = new DataContainer(); scheduleRequest(request, new Procedure() { @Override public void exception(Throwable t) { exception.set(t); } @Override public void execute(T t) { result.set(t); } }, notify); acquire(notify, request); if(exception.get() != null) { Throwable t = exception.get(); if(t instanceof DatabaseException) throw (DatabaseException)t; else throw new DatabaseException(t); } return result.get(); } @Override public T syncRequest(WriteOnlyResult request) throws DatabaseException { assertNotSession(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); final DataContainer result = new DataContainer(); scheduleRequest(request, new Procedure() { @Override public void exception(Throwable t) { exception.set(t); } @Override public void execute(T t) { result.set(t); } }, notify); acquire(notify, request); if(exception.get() != null) { Throwable t = exception.get(); if(t instanceof DatabaseException) throw (DatabaseException)t; else throw new DatabaseException(t); } return result.get(); } @Override public T syncRequest(DelayedWriteResult request) throws DatabaseException { assertNotSession(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); final DataContainer result = new DataContainer(); scheduleRequest(request, new Procedure() { @Override public void exception(Throwable t) { exception.set(t); } @Override public void execute(T t) { result.set(t); } }, notify); acquire(notify, request); if(exception.get() != null) { Throwable t = exception.get(); if(t instanceof DatabaseException) throw (DatabaseException)t; else throw new DatabaseException(t); } return result.get(); } @Override public void syncRequest(DelayedWrite request) throws DatabaseException { assertNotSession(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); scheduleRequest(request, new Callback() { @Override public void run(DatabaseException e) { exception.set(e); } }, notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @Override public void syncRequest(WriteOnly request) throws DatabaseException { assertNotSession(); assertAlive(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); scheduleRequest(request, new Callback() { @Override public void run(DatabaseException e) { exception.set(e); } }, notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } /* * * GraphRequestProcessor interface */ @Override public void asyncRequest(final AsyncRead request, final AsyncProcedure procedure) { scheduleRequest(request, procedure, null, null); } @Override public void asyncRequest(final AsyncMultiRead request, final AsyncMultiProcedure procedure) { scheduleRequest(request, procedure, null); } @Override public void asyncRequest(final ExternalRead request, final Procedure procedure) { scheduleRequest(request, procedure, null, null, null); } @Override public void asyncRequest(final Write request, final Callback callback) { scheduleRequest(request, callback, null); } @Override public void asyncRequest(final WriteResult request, final Procedure procedure) { scheduleRequest(request, procedure, null); } @Override public void asyncRequest(final WriteOnlyResult request, final Procedure procedure) { scheduleRequest(request, procedure, null); } @Override public void asyncRequest(final DelayedWriteResult request, final Procedure procedure) { scheduleRequest(request, procedure, null); } @Override public void asyncRequest(final DelayedWrite request, final Callback callback) { scheduleRequest(request, callback, null); } @Override public void asyncRequest(final Write r) { asyncRequest(r, null); } @Override public void asyncRequest(final DelayedWrite r) { asyncRequest(r, null); } @Override public void asyncRequest(final WriteOnly request, final Callback callback) { scheduleRequest(request, callback, null); } @Override public void asyncRequest(final WriteOnly request) { asyncRequest(request, null); } @Override public void async(ReadInterface r, AsyncProcedure procedure) { r.request(this, procedure); } @Override public void async(ReadInterface r, Procedure procedure) { r.request(this, procedure); } @Override public void async(ReadInterface r, SyncProcedure procedure) { r.request(this, procedure); } @Override public void async(ReadInterface r, AsyncListener procedure) { r.request(this, procedure); } @Override public void async(ReadInterface r, Listener procedure) { r.request(this, procedure); } @Override public void async(ReadInterface r, SyncListener procedure) { r.request(this, procedure); } @Override public T sync(ReadInterface r) throws DatabaseException { return r.request(this); } @Override public T sync(WriteInterface r) throws DatabaseException { return r.request(this); } @Override public void async(WriteInterface r, Procedure procedure) { r.request(this, procedure); } @Override public void async(WriteInterface r) { r.request(this, new ProcedureAdapter()); } //@Override public void incAsync() { state.incAsync(); } //@Override public void decAsync() { state.decAsync(); } public long getCluster(ResourceImpl resource) { ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id); return cluster.getClusterId(); } public long getCluster(int id) { if (clusterTable == null) System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come"); return clusterTable.getClusterIdByResourceKeyNoThrow(id); } public ResourceImpl getResource(int id) { return new ResourceImpl(resourceSupport, id); } public ResourceImpl getResource(int resourceIndex, long clusterId) { assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex)); ClusterI proxy = clusterTable.getClusterByClusterId(clusterId); int key = proxy.getClusterKey(); int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex); return new ResourceImpl(resourceSupport, resourceKey); } public ResourceImpl getResource2(int id) { assert (id != 0); return new ResourceImpl(resourceSupport, id); } final public int getId(ResourceImpl impl) { return impl.id; } public static final Charset UTF8 = Charset.forName("utf-8"); static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) { for(TransientGraph g : support.providers) { if(g.isPending(subject)) return false; } return true; } static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) { for(TransientGraph g : support.providers) { if(g.isPending(subject, predicate)) return false; } return true; } static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback runnable) { Callback composite = new Callback() { AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); @Override public void run(ReadGraphImpl graph) { if(ready.decrementAndGet() == 0) { runnable.run(graph); } } }; for(TransientGraph g : support.providers) { if(g.isPending(subject)) { try { g.load(graph, subject, composite); } catch (DatabaseException e) { e.printStackTrace(); } } else { composite.run(graph); } } composite.run(graph); } static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback runnable) { Callback composite = new Callback() { AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); @Override public void run(ReadGraphImpl graph) { if(ready.decrementAndGet() == 0) { runnable.run(graph); } } }; for(TransientGraph g : support.providers) { if(g.isPending(subject, predicate)) { try { g.load(graph, subject, predicate, composite); } catch (DatabaseException e) { e.printStackTrace(); } } else { composite.run(graph); } } composite.run(graph); } // void dumpHeap() { // // try { // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(), // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap( // "d:/heap" + flushCounter + ".txt", true); // } catch (IOException e) { // e.printStackTrace(); // } // // } void fireReactionsToSynchronize(ChangeSet cs) { // Do not fire empty events if (cs.isEmpty()) return; ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2()); // g.state.barrier.inc(); try { if (!cs.isEmpty()) { ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs); for (ChangeListener l : changeListeners2) { try { l.graphChanged(e2); } catch (Exception ex) { ex.printStackTrace(); } } } } finally { // g.state.barrier.dec(); // g.waitAsync(null); } } void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) { try { // Do not fire empty events if (cs2.isEmpty()) return; // graph.restart(); // graph.state.barrier.inc(); try { if (!cs2.isEmpty()) { ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2); for (ChangeListener l : changeListeners2) { try { l.graphChanged(e2); // System.out.println("changelistener " + l); } catch (Exception ex) { ex.printStackTrace(); } } } } finally { // graph.state.barrier.dec(); // graph.waitAsync(null); } } catch (Throwable t) { t.printStackTrace(); } } void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) { try { // Do not fire empty events if (cs2.isEmpty()) return; // Do not fire on virtual requests if(graph.getProvider() != null) return; WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null); try { ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2); for (ChangeListener l : metadataListeners) { try { l.graphChanged(e2); } catch (Throwable ex) { ex.printStackTrace(); } } } finally { } } catch (Throwable t) { t.printStackTrace(); } } /* * (non-Javadoc) * * @see org.simantics.db.ServiceLocator#getService(java.lang.Class) */ @Override public T getService(Class api) { T t = peekService(api); if (t == null) { if (state.isClosed()) throw new ServiceNotFoundException(this, api, "Session has been shut down"); throw new ServiceNotFoundException(this, api); } return t; } /** * @return */ protected abstract ServerInformation getCachedServerInformation(); private Class serviceKey1 = null; private Class serviceKey2 = null; private Object service1 = null; private Object service2 = null; /* * (non-Javadoc) * * @see * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class) */ @SuppressWarnings("unchecked") @Override public synchronized T peekService(Class api) { if(serviceKey1 == api) { return (T)service1; } else if (serviceKey2 == api) { // Promote this key Object result = service2; service2 = service1; serviceKey2 = serviceKey1; service1 = result; serviceKey1 = api; return (T)result; } if (Layer0.class == api) return (T) L0; if (ServerInformation.class == api) return (T) getCachedServerInformation(); else if (WriteGraphImpl.class == api) return (T) writeState.getGraph(); else if (ClusterBuilder.class == api) return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport); else if (ClusterBuilderFactory.class == api) return (T)new ClusterBuilderFactoryImpl(this); service2 = service1; serviceKey2 = serviceKey1; service1 = serviceLocator.peekService(api); serviceKey1 = api; return (T)service1; } /* * (non-Javadoc) * * @see * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class) */ @Override public boolean hasService(Class api) { return serviceLocator.hasService(api); } /** * @param api the api that must be implemented by the specified service * @param service the service implementation */ @Override public void registerService(Class api, T service) { if(Layer0.class == api) { L0 = (Layer0)service; return; } serviceLocator.registerService(api, service); if (TransactionPolicySupport.class == api) { transactionPolicy = (TransactionPolicySupport)service; state.resetTransactionPolicy(); } if (api == serviceKey1) service1 = service; else if (api == serviceKey2) service2 = service; } // ---------------- // SessionMonitor // ---------------- void fireSessionVariableChange(String variable) { for (MonitorHandler h : monitorHandlers) { MonitorContext ctx = monitorContexts.getLeft(h); assert ctx != null; // SafeRunner functionality repeated here to avoid dependency. try { h.valuesChanged(ctx); } catch (Exception e) { Logger.defaultLogError("monitor handler notification produced the following exception", e); } catch (LinkageError e) { Logger.defaultLogError("monitor handler notification produced a linkage error", e); } } } class ResourceSerializerImpl implements ResourceSerializer { public long createRandomAccessId(int id) throws DatabaseException { if(id < 0) return id; int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id); long cluster = getCluster(id); if (0 == cluster) return 0; // Better to return 0 then invalid id. long result = ClusterTraitsBase.createResourceId(cluster, index); return result; } @Override public long getRandomAccessId(Resource resource) throws DatabaseException { ResourceImpl resourceImpl = (ResourceImpl) resource; return createRandomAccessId(resourceImpl.id); } @Override public int getTransientId(Resource resource) throws DatabaseException { ResourceImpl resourceImpl = (ResourceImpl) resource; return resourceImpl.id; } @Override public String createRandomAccessId(Resource resource) throws InvalidResourceReferenceException { if(resource == null) throw new IllegalArgumentException(); ResourceImpl resourceImpl = (ResourceImpl) resource; if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0"; int r; try { r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id); } catch (DatabaseException e1) { throw new InvalidResourceReferenceException(e1); } try { // Serialize as '_' return "" + r + "_" + getCluster(resourceImpl); } catch (Throwable e) { e.printStackTrace(); throw new InvalidResourceReferenceException(e); } finally { } } public int getTransientId(long serialized) throws DatabaseException { if (serialized <= 0) return (int)serialized; int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized); long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized); ClusterI cluster = clusterTranslator.getClusterByClusterId(c); if (cluster == null) throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized); int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index); return key; } @Override public Resource getResource(long randomAccessId) throws DatabaseException { return getResourceByKey(getTransientId(randomAccessId)); } @Override public Resource getResource(int transientId) throws DatabaseException { return getResourceByKey(transientId); } @Override public Resource getResource(String randomAccessId) throws InvalidResourceReferenceException { try { int i = randomAccessId.indexOf('_'); if (i == -1) throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '" + randomAccessId + "'"); int r = Integer.parseInt(randomAccessId.substring(0, i)); if(r < 0) return getResourceByKey(r); long c = Long.parseLong(randomAccessId.substring(i + 1)); ClusterI cluster = clusterTranslator.getClusterByClusterId(c); int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r); if (cluster.hasResource(key, clusterTranslator)) return getResourceByKey(key); } catch (InvalidResourceReferenceException e) { throw e; } catch (NumberFormatException e) { throw new InvalidResourceReferenceException(e); } catch (Throwable e) { e.printStackTrace(); throw new InvalidResourceReferenceException(e); } finally { } throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId); } @Override public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException { try { return true; } catch (Throwable e) { e.printStackTrace(); throw new InvalidResourceReferenceException(e); } finally { } } } // Copied from old SessionImpl void check() { if (state.isClosed()) throw new Error("Session closed."); } public ResourceImpl getNewResource(long clusterId) { ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId); int newId; try { newId = cluster.createResource(clusterTranslator); } catch (DatabaseException e) { Logger.defaultLogError(e); return null; } return new ResourceImpl(resourceSupport, newId); } public ResourceImpl getNewResource(Resource clusterSet) throws DatabaseException { long resourceId = clusterSet.getResourceId(); Long clusterId = clusterSetsSupport.get(resourceId); if (null == clusterId) throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet); if (Constants.NewClusterId == clusterId) { clusterId = getService(ClusteringSupport.class).createCluster(); if ((serviceMode & SERVICE_MODE_CREATE) > 0) createdClusters.add(clusterId); clusterSetsSupport.put(resourceId, clusterId); return getNewResource(clusterId); } else { ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId); ResourceImpl result; if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) { clusterId = getService(ClusteringSupport.class).createCluster(); if ((serviceMode & SERVICE_MODE_CREATE) > 0) createdClusters.add(clusterId); clusterSetsSupport.put(resourceId, clusterId); return getNewResource(clusterId); } else { result = getNewResource(clusterId); int resultKey = querySupport.getId(result); long resultCluster = querySupport.getClusterId(resultKey); if (clusterId != resultCluster) clusterSetsSupport.put(resourceId, resultCluster); return result; } } } public void getNewClusterSet(Resource clusterSet) throws DatabaseException { if (DEBUG) System.out.println("new cluster set=" + clusterSet); long resourceId = clusterSet.getResourceId(); if (clusterSetsSupport.containsKey(resourceId)) throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet); clusterSetsSupport.put(resourceId, Constants.NewClusterId); } public boolean containsClusterSet(Resource clusterSet) throws ServiceException { long resourceId = clusterSet.getResourceId(); return clusterSetsSupport.containsKey(resourceId); } public Resource setDefaultClusterSet4NewResource(Resource clusterSet) { Resource r = defaultClusterSet; defaultClusterSet = clusterSet; return r; } void printDiagnostics() { if (DIAGNOSTICS) { final DataContainer totalMem = new DataContainer(0L); final DataContainer residentClusters = new DataContainer(0L); for(ClusterI cluster : clusterTable.getClusters()) { try { if (cluster.isLoaded() && !cluster.isEmpty()) { residentClusters.set(residentClusters.get() + 1); totalMem.set(totalMem.get() + cluster.getUsedSpace()); } } catch (DatabaseException e) { Logger.defaultLogError(e); } } System.out.println("--------------------------------"); System.out.println("Cluster information:"); System.out.println("-amount of resident clusters=" + residentClusters.get()); System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB"); for(ClusterI cluster : clusterTable.getClusters()) { System.out.print("Cluster " + cluster.getClusterId() + " ["); try { if (!cluster.isLoaded()) System.out.println("not loaded]"); else if (cluster.isEmpty()) System.out.println("is empty]"); else { System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator)); System.out.print(",references=" + cluster.getReferenceCount()); System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]"); } } catch (DatabaseException e) { Logger.defaultLogError(e); System.out.println("is corrupted]"); } } queryProvider2.printDiagnostics(); System.out.println("--------------------------------"); } } public void fireStartReadTransaction() { if (DIAGNOSTICS) System.out.println("StartReadTransaction"); for (SessionEventListener listener : eventListeners) { try { listener.readTransactionStarted(); } catch (Throwable t) { t.printStackTrace(); } } } public void fireFinishReadTransaction() { if (DIAGNOSTICS) System.out.println("FinishReadTransaction"); for (SessionEventListener listener : eventListeners) { try { listener.readTransactionFinished(); } catch (Throwable t) { t.printStackTrace(); } } } public void fireStartWriteTransaction() { if (DIAGNOSTICS) System.out.println("StartWriteTransaction"); for (SessionEventListener listener : eventListeners) { try { listener.writeTransactionStarted(); } catch (Throwable t) { t.printStackTrace(); } } } public void fireFinishWriteTransaction() { if (DIAGNOSTICS) System.out.println("FinishWriteTransaction"); for (SessionEventListener listener : eventListeners) { try { listener.writeTransactionFinished(); } catch (Throwable t) { t.printStackTrace(); } } Indexing.resetDependenciesIndexingDisabled(); } Resource deserialize(final long resourceId, final long clusterId) throws IOException { throw new Error("Not supported at the moment."); } State getState() { return state; } ClusterTable getClusterTable() { return clusterTable; } public GraphSession getGraphSession() { return graphSession; } public QueryProcessor getQueryProvider2() { return queryProvider2; } ClientChangesImpl getClientChanges() { return clientChanges; } boolean getWriteOnly() { return writeOnly; } static int counter = 0; public void onClusterLoaded(long clusterId) { clusterTable.updateSize(); } /* * Implementation of the interface RequestProcessor */ @Override public T syncRequest(final Read request) throws DatabaseException { assertNotSession(); assertAlive(); assert(request != null); final DataContainer result = new DataContainer(); final DataContainer exception = new DataContainer(); syncRequest(request, new AsyncProcedure() { @Override public void execute(AsyncReadGraph graph, T t) { result.set(t); } @Override public void exception(AsyncReadGraph graph, Throwable t) { exception.set(t); } }); Throwable t = exception.get(); if(t != null) { if(t instanceof DatabaseException) throw (DatabaseException)t; else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); } return result.get(); } // @Override // public T syncRequest(Read request, AsyncListener procedure) throws DatabaseException { // assertNotSession(); // return syncRequest(request, (AsyncProcedure)procedure); // } @Override public T syncRequest(Read request, SyncListener procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new SyncToAsyncListener(procedure)); } @Override public T syncRequest(Read request, final Listener procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new NoneToAsyncListener(procedure)); } @Override public T syncRequest(final Read request, final SyncProcedure procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new SyncToAsyncProcedure(procedure)); } @Override public T syncRequest(Read request, Procedure procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new NoneToAsyncProcedure(procedure)); } @Override public T syncRequest(final AsyncRead request) throws DatabaseException { assertNotSession(); assert(request != null); final DataContainer result = new DataContainer(); final DataContainer exception = new DataContainer(); syncRequest(request, new AsyncProcedure() { @Override public void execute(AsyncReadGraph graph, T t) { result.set(t); } @Override public void exception(AsyncReadGraph graph, Throwable t) { exception.set(t); } }); Throwable t = exception.get(); if(t != null) { if(t instanceof DatabaseException) throw (DatabaseException)t; else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); } return result.get(); } @Override public T syncRequest(AsyncRead request, AsyncListener procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, (AsyncProcedure)procedure); } @Override public T syncRequest(AsyncRead request, SyncListener procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new SyncToAsyncListener(procedure)); } @Override public T syncRequest(AsyncRead request, Listener procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new NoneToAsyncListener(procedure)); } @Override public T syncRequest(AsyncRead request, final SyncProcedure procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new SyncToAsyncProcedure(procedure)); } @Override final public T syncRequest(final AsyncRead request, final Procedure procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new NoneToAsyncProcedure(procedure)); } @Override public Collection syncRequest(MultiRead request) throws DatabaseException { assertNotSession(); assert(request != null); final ArrayList result = new ArrayList(); final DataContainer exception = new DataContainer(); syncRequest(request, new AsyncMultiProcedure() { @Override public void execute(AsyncReadGraph graph, T t) { synchronized(result) { result.add(t); } } @Override public void finished(AsyncReadGraph graph) { } @Override public void exception(AsyncReadGraph graph, Throwable t) { exception.set(t); } }); Throwable t = exception.get(); if(t != null) { if(t instanceof DatabaseException) throw (DatabaseException)t; else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); } return result; } @Override public Collection syncRequest(MultiRead request, AsyncMultiListener procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, (AsyncMultiProcedure)procedure); } @Override public Collection syncRequest(MultiRead request, SyncMultiListener procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new SyncToAsyncMultiListener(procedure)); } @Override public Collection syncRequest(MultiRead request, MultiListener procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new NoneToAsyncMultiListener(procedure)); } @Override public Collection syncRequest(MultiRead request, SyncMultiProcedure procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new SyncToAsyncMultiProcedure(procedure)); } @Override public Collection syncRequest(MultiRead request, MultiProcedure procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new NoneToAsyncMultiProcedure(procedure)); } @Override final public Collection syncRequest(final AsyncMultiRead request) throws DatabaseException { assertNotSession(); assert(request != null); final ArrayList result = new ArrayList(); final DataContainer exception = new DataContainer(); syncRequest(request, new AsyncMultiProcedure() { @Override public void execute(AsyncReadGraph graph, T t) { synchronized(result) { result.add(t); } } @Override public void finished(AsyncReadGraph graph) { } @Override public void exception(AsyncReadGraph graph, Throwable t) { exception.set(t); } }); Throwable t = exception.get(); if(t != null) { if(t instanceof DatabaseException) throw (DatabaseException)t; else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); } return result; } @Override public Collection syncRequest(AsyncMultiRead request, AsyncMultiListener procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, (AsyncMultiProcedure)procedure); } @Override public Collection syncRequest(AsyncMultiRead request, SyncMultiListener procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new SyncToAsyncMultiListener(procedure)); } @Override public Collection syncRequest(AsyncMultiRead request, MultiListener procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new NoneToAsyncMultiListener(procedure)); } @Override public Collection syncRequest(AsyncMultiRead request, final SyncMultiProcedure procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new SyncToAsyncMultiProcedure(procedure)); } @Override final public Collection syncRequest(final AsyncMultiRead request, final MultiProcedure procedure) throws DatabaseException { assertNotSession(); return syncRequest(request, new NoneToAsyncMultiProcedure(procedure)); } @Override public void asyncRequest(Read request) { asyncRequest(request, new ProcedureAdapter() { @Override public void exception(Throwable t) { t.printStackTrace(); } }); } @Override public void asyncRequest(Read request, AsyncListener procedure) { asyncRequest(request, (AsyncProcedure)procedure); } @Override public void asyncRequest(Read request, final SyncListener procedure) { asyncRequest(request, new SyncToAsyncListener(procedure)); } @Override public void asyncRequest(Read request, final Listener procedure) { asyncRequest(request, new NoneToAsyncListener(procedure)); } @Override public void asyncRequest(Read request, SyncProcedure procedure) { asyncRequest(request, new SyncToAsyncProcedure(procedure)); } @Override public void asyncRequest(Read request, final Procedure procedure) { asyncRequest(request, new NoneToAsyncProcedure(procedure)); } @Override final public void asyncRequest(final AsyncRead request) { assert(request != null); asyncRequest(request, new ProcedureAdapter() { @Override public void exception(Throwable t) { t.printStackTrace(); } }); } @Override public void asyncRequest(AsyncRead request, AsyncListener procedure) { scheduleRequest(request, procedure, procedure, null); } @Override public void asyncRequest(AsyncRead request, final SyncListener procedure) { asyncRequest(request, new SyncToAsyncListener(procedure)); } @Override public void asyncRequest(AsyncRead request, final Listener procedure) { asyncRequest(request, new NoneToAsyncListener(procedure)); } @Override public void asyncRequest(AsyncRead request, SyncProcedure procedure) { asyncRequest(request, new SyncToAsyncProcedure(procedure)); } @Override final public void asyncRequest(final AsyncRead request, final Procedure procedure) { asyncRequest(request, new NoneToAsyncProcedure(procedure)); } @Override public void asyncRequest(MultiRead request) { assert(request != null); asyncRequest(request, new AsyncMultiProcedureAdapter() { @Override public void exception(AsyncReadGraph graph, Throwable t) { t.printStackTrace(); } }); } @Override public void asyncRequest(MultiRead request, AsyncMultiListener procedure) { asyncRequest(request, (AsyncMultiProcedure)procedure); } @Override public void asyncRequest(MultiRead request, SyncMultiListener procedure) { asyncRequest(request, new SyncToAsyncMultiListener(procedure)); } @Override public void asyncRequest(MultiRead request, MultiListener procedure) { asyncRequest(request, new NoneToAsyncMultiListener(procedure)); } @Override public void asyncRequest(MultiRead request, SyncMultiProcedure procedure) { asyncRequest(request, new SyncToAsyncMultiProcedure(procedure)); } @Override public void asyncRequest(MultiRead request, MultiProcedure procedure) { asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); } @Override final public void asyncRequest(AsyncMultiRead request) { assert(request != null); asyncRequest(request, new AsyncMultiProcedureAdapter() { @Override public void exception(AsyncReadGraph graph, Throwable t) { t.printStackTrace(); } }); } @Override public void asyncRequest(AsyncMultiRead request, AsyncMultiListener procedure) { asyncRequest(request, (AsyncMultiProcedure)procedure); } @Override public void asyncRequest(AsyncMultiRead request, SyncMultiListener procedure) { asyncRequest(request, new SyncToAsyncMultiListener(procedure)); } @Override public void asyncRequest(AsyncMultiRead request, MultiListener procedure) { asyncRequest(request, new NoneToAsyncMultiListener(procedure)); } @Override public void asyncRequest(AsyncMultiRead request, SyncMultiProcedure procedure) { asyncRequest(request, new SyncToAsyncMultiProcedure(procedure)); } @Override final public void asyncRequest(AsyncMultiRead request, final MultiProcedure procedure) { asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); } @Override public Collection syncRequest(MultiRead arg0, AsyncMultiProcedure arg1) throws DatabaseException { assertNotSession(); throw new Error("Not implemented!"); } @Override public void asyncRequest(MultiRead arg0, AsyncMultiProcedure arg1) { throw new Error("Not implemented!"); } @Override final public void asyncRequest(final ExternalRead request) { assert(request != null); asyncRequest(request, new ProcedureAdapter() { @Override public void exception(Throwable t) { t.printStackTrace(); } }); } @Override public void asyncRequest(ExternalRead request, final Listener procedure) { asyncRequest(request, (Procedure)procedure); } void check(Throwable t) throws DatabaseException { if(t != null) { if(t instanceof DatabaseException) throw (DatabaseException)t; else throw new DatabaseException("Unexpected exception", t); } } void check(DataContainer container) throws DatabaseException { Throwable t = container.get(); if(t != null) { if(t instanceof DatabaseException) throw (DatabaseException)t; else throw new DatabaseException("Unexpected exception", t); } } boolean sameProvider(Write request) { if(writeState.getGraph().provider != null) { return writeState.getGraph().provider.equals(request.getProvider()); } else { return request.getProvider() == null; } } boolean plainWrite(WriteGraphImpl graph) { if(graph == null) return false; if(graph.writeSupport.writeOnly()) return false; return true; } public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group"); private void assertNotSession() throws DatabaseException { Thread current = Thread.currentThread(); if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction."); } void assertAlive() { if (!state.isAlive()) throw new RuntimeDatabaseException("Session has been shut down."); } public InputStream getValueStream(ReadGraphImpl graph, Resource resource) { return querySupport.getValueStream(graph, querySupport.getId(resource)); } public byte[] getValue(ReadGraphImpl graph, Resource resource) { return querySupport.getValue(graph, querySupport.getId(resource)); } void acquire(Semaphore semaphore, T request) throws DatabaseException { acquire(semaphore, request, null); } private void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException { assertAlive(); try { long tryCount = 0; // Loop until the semaphore is acquired reporting requests that take a long time. while (true) { if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) { // Acquired OK. return; } else { if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) { ++tryCount; long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD) * tryCount; System.err.println("DB client request '" + request + "' is taking long to execute, so far " + (waitTime) + " ms. (procedure=" + procedure + ")"); } } assertAlive(); } } catch (InterruptedException e) { e.printStackTrace(); // FIXME: Should perhaps do something else in this case ?? } } // @Override // public boolean holdOnToTransactionAfterCancel() { // return transactionPolicy.holdOnToTransactionAfterCancel(); // } // // @Override // public boolean holdOnToTransactionAfterCommit() { // return transactionPolicy.holdOnToTransactionAfterCommit(); // } // // @Override // public boolean holdOnToTransactionAfterRead() { // return transactionPolicy.holdOnToTransactionAfterRead(); // } // // @Override // public void onRelinquish() { // transactionPolicy.onRelinquish(); // } // // @Override // public void onRelinquishDone() { // transactionPolicy.onRelinquishDone(); // } // // @Override // public void onRelinquishError() { // transactionPolicy.onRelinquishError(); // } @Override public Session getSession() { return this; } protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph); protected abstract ResourceImpl getNewResource() throws DatabaseException; public void ceased(int thread) { requestManager.ceased(thread); } public int getAmountOfQueryThreads() { // This must be a power of two return 1; // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors()); } public Resource getResourceByKey(int key) throws ResourceNotFoundException { return new ResourceImpl(resourceSupport, key); } public void acquireWriteOnly() { writeOnly = true; } public void releaseWriteOnly(ReadGraphImpl graph) { writeOnly = false; queryProvider2.releaseWrite(graph); } public void handleUpdatesAndMetadata(WriteGraphImpl writer) { long start = System.nanoTime(); while(dirtyPrimitives) { dirtyPrimitives = false; getQueryProvider2().performDirtyUpdates(writer); getQueryProvider2().performScheduledUpdates(writer); } fireMetadataListeners(writer, clientChanges); if(DebugPolicy.PERFORMANCE_DATA) { long end = System.nanoTime(); System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start)); } } void removeTemporaryData() { File platform = Platform.getLocation().toFile(); File tempFiles = new File(platform, "tempFiles"); File temp = new File(tempFiles, "db"); if(!temp.exists()) return; try { Files.walkFileTree(temp.toPath(), new SimpleFileVisitor() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { try { Files.delete(file); } catch (IOException e) { Logger.defaultLogError(e); } return FileVisitResult.CONTINUE; } }); } catch (IOException e) { Logger.defaultLogError(e); } } public void handleCreatedClusters() { if ((serviceMode & SERVICE_MODE_CREATE) > 0) { createdClusters.forEach(new TLongProcedure() { @Override public boolean execute(long value) { ClusterImpl cluster = clusterTable.getClusterByClusterId(value); cluster.setImmutable(true, clusterTranslator); return true; } }); } createdClusters.clear(); } @Override public Object getModificationCounter() { return queryProvider2.modificationCounter; } @Override public void markUndoPoint() { state.setCombine(false); } }