1 /*******************************************************************************
2 * Copyright (c) 2007, 2018 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.GraphChangeListenerSupport;
147 import org.simantics.db.service.InitSupport;
148 import org.simantics.db.service.LifecycleSupport;
149 import org.simantics.db.service.ManagementSupport;
150 import org.simantics.db.service.QueryControl;
151 import org.simantics.db.service.SerialisationSupport;
152 import org.simantics.db.service.ServerInformation;
153 import org.simantics.db.service.ServiceActivityMonitor;
154 import org.simantics.db.service.SessionEventSupport;
155 import org.simantics.db.service.SessionMonitorSupport;
156 import org.simantics.db.service.SessionUserSupport;
157 import org.simantics.db.service.StatementSupport;
158 import org.simantics.db.service.TransactionPolicySupport;
159 import org.simantics.db.service.TransactionSupport;
160 import org.simantics.db.service.TransferableGraphSupport;
161 import org.simantics.db.service.UndoRedoSupport;
162 import org.simantics.db.service.VirtualGraphSupport;
163 import org.simantics.db.service.XSupport;
164 import org.simantics.layer0.Layer0;
165 import org.simantics.utils.DataContainer;
166 import org.simantics.utils.Development;
167 import org.simantics.utils.threads.logger.ITask;
168 import org.simantics.utils.threads.logger.ThreadLogger;
169 import org.slf4j.LoggerFactory;
171 import gnu.trove.procedure.TLongProcedure;
172 import gnu.trove.set.hash.TLongHashSet;
175 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
177 private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class);
179 protected static final boolean DEBUG = false;
181 private static final boolean DIAGNOSTICS = false;
183 private TransactionPolicySupport transactionPolicy;
185 final private ClusterControl clusterControl;
187 final protected BuiltinSupportImpl builtinSupport;
188 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
189 final protected ClusterSetsSupport clusterSetsSupport;
190 final protected LifecycleSupportImpl lifecycleSupport;
192 protected QuerySupportImpl querySupport;
193 protected ResourceSupportImpl resourceSupport;
194 protected WriteSupport writeSupport;
195 public ClusterTranslator clusterTranslator;
197 boolean dirtyPrimitives = false;
199 public static final int SERVICE_MODE_CREATE = 2;
200 public static final int SERVICE_MODE_ALLOW = 1;
201 public int serviceMode = 0;
202 public boolean createdImmutableClusters = false;
203 public TLongHashSet createdClusters = new TLongHashSet();
208 * The service locator maintained by the workbench. These services are
209 * initialized during workbench during the <code>init</code> method.
211 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
213 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
215 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
217 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
219 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
221 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
223 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
225 final protected State state = new State();
227 protected GraphSession graphSession = null;
229 protected SessionManager sessionManagerImpl = null;
231 protected UserAuthenticationAgent authAgent = null;
233 protected UserAuthenticator authenticator = null;
235 protected Resource user = null;
237 protected ClusterStream clusterStream = null;
239 protected SessionRequestManager requestManager = null;
241 public ClusterTable clusterTable = null;
243 public QueryProcessor queryProvider2 = null;
245 ClientChangesImpl clientChanges = null;
247 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
249 // protected long newClusterId = Constants.NullClusterId;
251 protected int flushCounter = 0;
253 protected boolean writeOnly = false;
255 WriteState<?> writeState = null;
256 WriteStateBase<?> delayedWriteState = null;
257 protected Resource defaultClusterSet = null; // If not null then used for newResource().
259 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
261 // if (authAgent == null)
262 // throw new IllegalArgumentException("null authentication agent");
264 File t = StaticSessionProperties.virtualGraphStoragePath;
267 this.clusterTable = new ClusterTable(this, t);
268 this.builtinSupport = new BuiltinSupportImpl(this);
269 this.sessionManagerImpl = sessionManagerImpl;
271 // this.authAgent = authAgent;
273 serviceLocator.registerService(Session.class, this);
274 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
275 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
276 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
277 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
278 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
279 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
280 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
281 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
282 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
283 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
284 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
285 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
286 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
287 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
288 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
289 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
290 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
291 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
292 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
293 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
294 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
295 ServiceActivityUpdaterForWriteTransactions.register(this);
297 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
298 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
299 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
300 this.lifecycleSupport = new LifecycleSupportImpl(this);
301 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
302 this.transactionPolicy = new TransactionPolicyRelease();
303 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
304 this.clusterControl = new ClusterControlImpl(this);
305 serviceLocator.registerService(ClusterControl.class, clusterControl);
306 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
307 this.clusterSetsSupport.setReadDirectory(t.toPath());
308 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
309 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
319 public Resource getRootLibrary() {
320 return queryProvider2.getRootLibraryResource();
323 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
324 if (!graphSession.dbSession.refreshEnabled())
327 getClusterTable().refresh(csid, this, clusterUID);
328 } catch (Throwable t) {
329 Logger.defaultLogError("Refesh failed.", t);
333 static final class TaskHelper {
334 private final String name;
335 private Object result;
336 final Semaphore sema = new Semaphore(0);
337 private Throwable throwable = null;
338 final Consumer<DatabaseException> callback = e -> {
339 synchronized (TaskHelper.this) {
343 final Procedure<Object> proc = new Procedure<Object>() {
345 public void execute(Object result) {
346 callback.accept(null);
349 public void exception(Throwable t) {
350 if (t instanceof DatabaseException)
351 callback.accept((DatabaseException)t);
353 callback.accept(new DatabaseException("" + name + "operation failed.", t));
356 final WriteTraits writeTraits = new WriteTraits() {};
357 TaskHelper(String name) {
360 @SuppressWarnings("unchecked")
364 void setResult(Object result) {
365 this.result = result;
367 // Throwable throwableGet() {
370 synchronized void throwableSet(Throwable t) {
373 // void throwableSet(String t) {
374 // throwable = new InternalException("" + name + " operation failed. " + t);
376 synchronized void throwableCheck()
377 throws DatabaseException {
378 if (null != throwable)
379 if (throwable instanceof DatabaseException)
380 throw (DatabaseException)throwable;
382 throw new DatabaseException("Undo operation failed.", throwable);
384 void throw_(String message)
385 throws DatabaseException {
386 throw new DatabaseException("" + name + " operation failed. " + message);
392 private ListenerBase getListenerBase(Object procedure) {
393 if (procedure instanceof ListenerBase)
394 return (ListenerBase) procedure;
399 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
400 scheduleRequest(request, callback, notify, null);
404 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
407 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
409 assert (request != null);
411 if(Development.DEVELOPMENT) {
413 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
414 System.err.println("schedule write '" + request + "'");
415 } catch (Throwable t) {
416 Logger.defaultLogError(t);
420 requestManager.scheduleWrite(new SessionTask(null) {
423 public void run0(int thread) {
425 if(Development.DEVELOPMENT) {
427 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
428 System.err.println("perform write '" + request + "'");
429 } catch (Throwable t) {
430 Logger.defaultLogError(t);
434 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
436 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
441 Disposable.safeDispose(clientChanges);
442 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
444 VirtualGraph vg = getProvider(request.getProvider());
446 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
447 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
450 public void execute(Object result) {
451 if(callback != null) callback.accept(null);
455 public void exception(Throwable t) {
456 if(callback != null) callback.accept((DatabaseException)t);
461 assert (null != writer);
462 // writer.state.barrier.inc();
465 request.perform(writer);
466 assert (null != writer);
467 } catch (Throwable t) {
468 if (!(t instanceof CancelTransactionException))
469 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
470 writeState.except(t);
472 // writer.state.barrier.dec();
473 // writer.waitAsync(request);
477 assert(!queryProvider2.cache.dirty);
479 } catch (Throwable e) {
481 // Log it first, just to be safe that the error is always logged.
482 if (!(e instanceof CancelTransactionException))
483 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
485 // writeState.getGraph().state.barrier.dec();
486 // writeState.getGraph().waitAsync(request);
488 writeState.except(e);
491 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
492 // // All we can do here is to log those, can't really pass them anywhere.
493 // if (callback != null) {
494 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
495 // else callback.run(new DatabaseException(e));
497 // } catch (Throwable e2) {
498 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
500 // boolean empty = clusterStream.reallyFlush();
501 // int callerThread = -1;
502 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
503 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
505 // // Send all local changes to server so it can calculate correct reverse change set.
506 // // This will call clusterStream.accept().
507 // state.cancelCommit(context, clusterStream);
509 // if (!context.isOk()) // this is a blocking operation
510 // throw new InternalException("Cancel failed. This should never happen.");
511 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
513 // state.cancelCommit2(context, clusterStream);
514 // } catch (DatabaseException e3) {
515 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
517 // clusterStream.setOff(false);
518 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
523 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
529 if(Development.DEVELOPMENT) {
531 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
532 System.err.println("finish write '" + request + "'");
533 } catch (Throwable t) {
534 Logger.defaultLogError(t);
544 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
545 scheduleRequest(request, procedure, notify, null);
549 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
552 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
554 assert (request != null);
556 requestManager.scheduleWrite(new SessionTask(null) {
559 public void run0(int thread) {
561 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
563 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
566 Disposable.safeDispose(clientChanges);
567 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
569 VirtualGraph vg = getProvider(request.getProvider());
570 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
573 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
574 writeState = writeStateT;
576 assert (null != writer);
577 // writer.state.barrier.inc();
578 writeStateT.setResult(request.perform(writer));
579 assert (null != writer);
581 // writer.state.barrier.dec();
582 // writer.waitAsync(null);
584 } catch (Throwable e) {
586 // writer.state.barrier.dec();
587 // writer.waitAsync(null);
589 writeState.except(e);
591 // state.stopWriteTransaction(clusterStream);
593 // } catch (Throwable e) {
594 // // Log it first, just to be safe that the error is always logged.
595 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
598 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
599 // // All we can do here is to log those, can't really pass them anywhere.
600 // if (procedure != null) {
601 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
602 // else procedure.exception(new DatabaseException(e));
604 // } catch (Throwable e2) {
605 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
608 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
610 // state.stopWriteTransaction(clusterStream);
613 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
616 // if(notify != null) notify.release();
626 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
627 scheduleRequest(request, callback, notify, null);
631 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
634 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
636 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
638 assert (request != null);
640 requestManager.scheduleWrite(new SessionTask(null) {
643 public void run0(int thread) {
644 fireSessionVariableChange(SessionVariables.QUEUED_READS);
646 Procedure<Object> stateProcedure = new Procedure<Object>() {
648 public void execute(Object result) {
649 if (callback != null)
650 callback.accept(null);
653 public void exception(Throwable t) {
654 if (callback != null) {
655 if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
656 else callback.accept(new DatabaseException(t));
658 Logger.defaultLogError("Unhandled exception", t);
662 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
663 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
664 DelayedWriteGraph dwg = null;
665 // newGraph.state.barrier.inc();
668 dwg = new DelayedWriteGraph(newGraph);
669 request.perform(dwg);
670 } catch (Throwable e) {
671 delayedWriteState.except(e);
676 // newGraph.state.barrier.dec();
677 // newGraph.waitAsync(request);
678 fireSessionVariableChange(SessionVariables.QUEUED_READS);
681 delayedWriteState = null;
683 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
684 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
687 Disposable.safeDispose(clientChanges);
688 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
692 VirtualGraph vg = getProvider(request.getProvider());
693 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
695 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
697 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
699 assert (null != writer);
700 // writer.state.barrier.inc();
704 dwg.commit(writer, request);
706 if(defaultClusterSet != null) {
707 XSupport xs = getService(XSupport.class);
708 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
709 ClusteringSupport cs = getService(ClusteringSupport.class);
710 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
713 // This makes clusters available from server
714 clusterStream.reallyFlush();
716 releaseWriteOnly(writer);
717 handleUpdatesAndMetadata(writer);
719 } catch (ServiceException e) {
720 // writer.state.barrier.dec();
721 // writer.waitAsync(null);
723 // These shall be requested from server
724 clusterTable.removeWriteOnlyClusters();
725 // This makes clusters available from server
726 clusterStream.reallyFlush();
728 releaseWriteOnly(writer);
729 writeState.except(e);
731 // Debugging & Profiling
732 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
742 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
743 scheduleRequest(request, procedure, notify, null);
747 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
750 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
751 throw new Error("Not implemented");
754 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
755 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
756 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
757 createdClusters.add(cluster.clusterId);
762 class WriteOnlySupport implements WriteSupport {
764 ClusterStream stream;
765 ClusterImpl currentCluster;
767 public WriteOnlySupport() {
768 this.stream = clusterStream;
772 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
773 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
777 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
779 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
781 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
782 if(s != queryProvider2.getRootLibrary())
783 throw new ImmutableException("Trying to modify immutable resource key=" + s);
786 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
787 } catch (DatabaseException e) {
788 Logger.defaultLogError(e);
792 clientChanges.invalidate(s);
794 if (cluster.isWriteOnly())
796 queryProvider2.updateStatements(s, p);
801 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
802 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
806 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
808 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
810 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
811 } catch (DatabaseException e) {
812 Logger.defaultLogError(e);
815 clientChanges.invalidate(rid);
817 if (cluster.isWriteOnly())
819 queryProvider2.updateValue(rid);
824 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
827 claimValue(provider,resource, reader.readBytes(null, amount));
831 byte[] bytes = new byte[65536];
833 int rid = ((ResourceImpl)resource).id;
834 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
838 int block = Math.min(left, 65536);
839 reader.readBytes(bytes, block);
840 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
843 } catch (DatabaseException e) {
844 Logger.defaultLogError(e);
847 clientChanges.invalidate(rid);
849 if (cluster.isWriteOnly())
851 queryProvider2.updateValue(rid);
855 private void maintainCluster(ClusterImpl before, ClusterI after_) {
856 if(after_ != null && after_ != before) {
857 ClusterImpl after = (ClusterImpl)after_;
858 if(currentCluster == before) {
859 currentCluster = after;
861 clusterTable.replaceCluster(after);
865 public int createResourceKey(int foreignCounter) throws DatabaseException {
866 if(currentCluster == null) {
867 currentCluster = getNewResourceCluster();
869 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
870 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
871 newCluster.foreignLookup = new byte[foreignCounter];
872 currentCluster = newCluster;
874 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
876 return currentCluster.createResource(clusterTranslator);
880 public Resource createResource(VirtualGraph provider) throws DatabaseException {
881 if(currentCluster == null) {
882 if (null != defaultClusterSet) {
883 ResourceImpl result = getNewResource(defaultClusterSet);
884 currentCluster = clusterTable.getClusterByResourceKey(result.id);
887 currentCluster = getNewResourceCluster();
890 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
891 if (null != defaultClusterSet) {
892 ResourceImpl result = getNewResource(defaultClusterSet);
893 currentCluster = clusterTable.getClusterByResourceKey(result.id);
896 currentCluster = getNewResourceCluster();
899 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
903 public Resource createResource(VirtualGraph provider, long clusterId)
904 throws DatabaseException {
905 return getNewResource(clusterId);
909 public Resource createResource(VirtualGraph provider, Resource clusterSet)
910 throws DatabaseException {
911 return getNewResource(clusterSet);
915 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
916 throws DatabaseException {
917 getNewClusterSet(clusterSet);
921 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
922 throws ServiceException {
923 return containsClusterSet(clusterSet);
926 public void selectCluster(long cluster) {
927 currentCluster = clusterTable.getClusterByClusterId(cluster);
928 long setResourceId = clusterSetsSupport.getSet(cluster);
929 clusterSetsSupport.put(setResourceId, cluster);
933 public Resource setDefaultClusterSet(Resource clusterSet)
934 throws ServiceException {
935 Resource result = setDefaultClusterSet4NewResource(clusterSet);
936 if(clusterSet != null) {
937 long id = clusterSetsSupport.get(clusterSet.getResourceId());
938 currentCluster = clusterTable.getClusterByClusterId(id);
941 currentCluster = null;
947 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
949 provider = getProvider(provider);
950 if (null == provider) {
951 int key = ((ResourceImpl)resource).id;
955 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
956 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
957 // if(key != queryProvider2.getRootLibrary())
958 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
961 // cluster.removeValue(key, clusterTranslator);
962 // } catch (DatabaseException e) {
963 // Logger.defaultLogError(e);
967 clusterTable.writeOnlyInvalidate(cluster);
970 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
971 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
972 clusterTranslator.removeValue(cluster);
973 } catch (DatabaseException e) {
974 Logger.defaultLogError(e);
977 queryProvider2.invalidateResource(key);
978 clientChanges.invalidate(key);
981 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
982 queryProvider2.updateValue(querySupport.getId(resource));
983 clientChanges.claimValue(resource);
990 public void flush(boolean intermediate) {
991 throw new UnsupportedOperationException();
995 public void flushCluster() {
996 clusterTable.flushCluster(graphSession);
997 if(defaultClusterSet != null) {
998 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
1000 currentCluster = null;
1004 public void flushCluster(Resource r) {
1005 throw new UnsupportedOperationException("flushCluster resource " + r);
1013 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1016 int s = ((ResourceImpl)subject).id;
1017 int p = ((ResourceImpl)predicate).id;
1018 int o = ((ResourceImpl)object).id;
1020 provider = getProvider(provider);
1021 if (null == provider) {
1023 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1024 clusterTable.writeOnlyInvalidate(cluster);
1028 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1030 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1031 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1033 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1034 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1035 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1036 clusterTranslator.removeStatement(cluster);
1038 queryProvider2.invalidateResource(s);
1039 clientChanges.invalidate(s);
1041 } catch (DatabaseException e) {
1043 Logger.defaultLogError(e);
1051 ((VirtualGraphImpl)provider).deny(s, p, o);
1052 queryProvider2.invalidateResource(s);
1053 clientChanges.invalidate(s);
1062 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1063 throw new UnsupportedOperationException();
1067 public boolean writeOnly() {
1072 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1073 writeSupport.performWriteRequest(graph, request);
1077 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1078 throw new UnsupportedOperationException();
1082 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1083 throw new UnsupportedOperationException();
1087 public <T> void addMetadata(Metadata data) throws ServiceException {
1088 writeSupport.addMetadata(data);
1092 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1093 return writeSupport.getMetadata(clazz);
1097 public TreeMap<String, byte[]> getMetadata() {
1098 return writeSupport.getMetadata();
1102 public void commitDone(WriteTraits writeTraits, long csid) {
1103 writeSupport.commitDone(writeTraits, csid);
1107 public void clearUndoList(WriteTraits writeTraits) {
1108 writeSupport.clearUndoList(writeTraits);
1111 public int clearMetadata() {
1112 return writeSupport.clearMetadata();
1116 public void startUndo() {
1117 writeSupport.startUndo();
1121 class VirtualWriteOnlySupport implements WriteSupport {
1124 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1125 // Resource object) {
1126 // throw new UnsupportedOperationException();
1130 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1132 TransientGraph impl = (TransientGraph)provider;
1133 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1134 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1135 clientChanges.claim(subject, predicate, object);
1140 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1142 TransientGraph impl = (TransientGraph)provider;
1143 impl.claim(subject, predicate, object);
1144 getQueryProvider2().updateStatements(subject, predicate);
1145 clientChanges.claim(subject, predicate, object);
1150 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1151 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1155 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1156 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1157 getQueryProvider2().updateValue(resource);
1158 clientChanges.claimValue(resource);
1162 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1163 byte[] value = reader.readBytes(null, amount);
1164 claimValue(provider, resource, value);
1168 public Resource createResource(VirtualGraph provider) {
1169 TransientGraph impl = (TransientGraph)provider;
1170 return impl.getResource(impl.newResource(false));
1174 public Resource createResource(VirtualGraph provider, long clusterId) {
1175 throw new UnsupportedOperationException();
1179 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1180 throws DatabaseException {
1181 throw new UnsupportedOperationException();
1185 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1186 throws DatabaseException {
1187 throw new UnsupportedOperationException();
1191 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1192 throws ServiceException {
1193 throw new UnsupportedOperationException();
1197 public Resource setDefaultClusterSet(Resource clusterSet)
1198 throws ServiceException {
1203 public void denyValue(VirtualGraph provider, Resource resource) {
1204 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1205 getQueryProvider2().updateValue(querySupport.getId(resource));
1206 // NOTE: this only keeps track of value changes by-resource.
1207 clientChanges.claimValue(resource);
1211 public void flush(boolean intermediate) {
1212 throw new UnsupportedOperationException();
1216 public void flushCluster() {
1217 throw new UnsupportedOperationException();
1221 public void flushCluster(Resource r) {
1222 throw new UnsupportedOperationException("Resource " + r);
1230 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1231 TransientGraph impl = (TransientGraph) provider;
1232 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1233 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1234 clientChanges.deny(subject, predicate, object);
1239 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1240 throw new UnsupportedOperationException();
1244 public boolean writeOnly() {
1249 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1250 throw new UnsupportedOperationException();
1254 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1255 throw new UnsupportedOperationException();
1259 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1260 throw new UnsupportedOperationException();
1264 public <T> void addMetadata(Metadata data) throws ServiceException {
1265 throw new UnsupportedOperationException();
1269 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1270 throw new UnsupportedOperationException();
1274 public TreeMap<String, byte[]> getMetadata() {
1275 throw new UnsupportedOperationException();
1279 public void commitDone(WriteTraits writeTraits, long csid) {
1283 public void clearUndoList(WriteTraits writeTraits) {
1287 public int clearMetadata() {
1292 public void startUndo() {
1296 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1300 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1302 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1305 Disposable.safeDispose(clientChanges);
1306 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1310 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1312 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1314 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1315 writeState = writeStateT;
1317 assert (null != writer);
1318 // writer.state.barrier.inc();
1319 long start = System.nanoTime();
1320 T result = request.perform(writer);
1321 long duration = System.nanoTime() - start;
1323 System.err.println("################");
1324 System.err.println("WriteOnly duration " + 1e-9*duration);
1326 writeStateT.setResult(result);
1328 // This makes clusters available from server
1329 clusterStream.reallyFlush();
1331 // This will trigger query updates
1332 releaseWriteOnly(writer);
1333 assert (null != writer);
1335 handleUpdatesAndMetadata(writer);
1337 } catch (CancelTransactionException e) {
1339 releaseWriteOnly(writeState.getGraph());
1341 clusterTable.removeWriteOnlyClusters();
1342 state.stopWriteTransaction(clusterStream);
1344 } catch (Throwable e) {
1346 e.printStackTrace();
1348 releaseWriteOnly(writeState.getGraph());
1350 clusterTable.removeWriteOnlyClusters();
1352 if (callback != null)
1353 callback.exception(new DatabaseException(e));
1355 state.stopWriteTransaction(clusterStream);
1356 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1360 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1366 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1367 scheduleRequest(request, callback, notify, null);
1371 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1375 assert (request != null);
1377 requestManager.scheduleWrite(new SessionTask(null) {
1380 public void run0(int thread) {
1382 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1386 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1389 Disposable.safeDispose(clientChanges);
1390 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1394 VirtualGraph vg = getProvider(request.getProvider());
1395 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1397 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1399 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1402 public void execute(Object result) {
1403 if(callback != null) callback.accept(null);
1407 public void exception(Throwable t) {
1408 if(callback != null) callback.accept((DatabaseException)t);
1413 assert (null != writer);
1414 // writer.state.barrier.inc();
1418 request.perform(writer);
1420 } catch (Throwable e) {
1422 // writer.state.barrier.dec();
1423 // writer.waitAsync(null);
1425 releaseWriteOnly(writer);
1427 clusterTable.removeWriteOnlyClusters();
1429 if(!(e instanceof CancelTransactionException)) {
1430 if (callback != null)
1431 callback.accept(new DatabaseException(e));
1434 writeState.except(e);
1440 // This makes clusters available from server
1441 boolean empty = clusterStream.reallyFlush();
1442 // This was needed to make WO requests call metadata listeners.
1443 // NOTE: the calling event does not contain clientChanges information.
1444 if (!empty && clientChanges.isEmpty())
1445 clientChanges.setNotEmpty(true);
1446 releaseWriteOnly(writer);
1447 assert (null != writer);
1450 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1463 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1464 scheduleRequest(request, callback, notify, null);
1468 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1471 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1473 assert (request != null);
1475 requestManager.scheduleWrite(new SessionTask(null) {
1478 public void run0(int thread) {
1480 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1482 performWriteOnly(request, notify, callback);
1492 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1494 assert (request != null);
1495 assert (procedure != null);
1497 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1499 requestManager.scheduleRead(new SessionRead(throwable, notify) {
1502 public void run0(int thread) {
1504 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1506 ListenerBase listener = getListenerBase(procedure);
1508 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1510 // This is never synced but increase to prevent it from visiting 0
1511 newGraph.asyncBarrier.inc();
1515 if (listener != null) {
1519 AsyncProcedure ap = new AsyncProcedure<T>() {
1522 public void exception(AsyncReadGraph graph, Throwable t) {
1523 procedure.exception(graph, t);
1524 if(throwable != null) {
1527 // ErrorLogger.defaultLogError("Unhandled exception", t);
1532 public void execute(AsyncReadGraph graph, T t) {
1533 if(result != null) result.set(t);
1534 procedure.execute(graph, t);
1539 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1541 } catch (Throwable t) {
1542 // This is handled by the AsyncProcedure
1543 //Logger.defaultLogError("Internal error", t);
1550 // newGraph.state.barrier.inc();
1552 T t = request.perform(newGraph);
1556 if(result != null) result.set(t);
1557 procedure.execute(newGraph, t);
1559 } catch (Throwable th) {
1561 if(throwable != null) {
1564 Logger.defaultLogError("Unhandled exception", th);
1569 } catch (Throwable t) {
1572 t.printStackTrace();
1574 if(throwable != null) {
1577 Logger.defaultLogError("Unhandled exception", t);
1582 procedure.exception(newGraph, t);
1584 } catch (Throwable t2) {
1586 if(throwable != null) {
1589 Logger.defaultLogError("Unhandled exception", t2);
1596 // newGraph.state.barrier.dec();
1597 // newGraph.waitAsync(request);
1603 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1613 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1615 assert (request != null);
1616 assert (procedure != null);
1618 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1620 requestManager.scheduleRead(new SessionRead(null, notify) {
1623 public void run0(int thread) {
1625 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1627 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1631 if (listener != null) {
1634 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1635 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1636 //newGraph.processor.query(newGraph, request, null, procedure, listener);
1637 } catch (DatabaseException e) {
1638 Logger.defaultLogError(e);
1643 // final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1644 // procedure, "request");
1646 BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request);
1650 request.perform(newGraph, wrap);
1653 } catch (DatabaseException e) {
1655 Logger.defaultLogError(e);
1663 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1673 public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1675 assert (request != null);
1676 assert (procedure != null);
1678 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1680 int sync = notify != null ? thread : -1;
1682 requestManager.scheduleRead(new SessionRead(null, notify) {
1685 public void run0(int thread) {
1687 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1689 ListenerBase listener = getListenerBase(procedure);
1691 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1695 if (listener != null) {
1697 newGraph.processor.query(newGraph, request, null, procedure, listener);
1699 // newGraph.waitAsync(request);
1703 final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1707 request.perform(newGraph, wrapper);
1709 } catch (Throwable t) {
1711 t.printStackTrace();
1719 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1729 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1731 assert (request != null);
1732 assert (procedure != null);
1734 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1736 int sync = notify != null ? thread : -1;
1738 requestManager.scheduleRead(new SessionRead(null, notify) {
1741 public void run0(int thread) {
1743 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1745 ListenerBase listener = getListenerBase(procedure);
1747 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1751 if (listener != null) {
1753 newGraph.processor.query(newGraph, request, null, procedure, listener);
1755 // newGraph.waitAsync(request);
1759 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1763 request.perform(newGraph, wrapper);
1765 } catch (Throwable t) {
1767 t.printStackTrace();
1775 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1785 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1787 assert (request != null);
1788 assert (procedure != null);
1790 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1792 requestManager.scheduleRead(new SessionRead(throwable, notify) {
1795 public void run0(int thread) {
1797 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1799 ListenerBase listener = getListenerBase(procedure);
1801 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1805 if (listener != null) {
1808 QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1809 } catch (DatabaseException e) {
1810 Logger.defaultLogError(e);
1815 // newGraph.state.barrier.inc();
1817 request.register(newGraph, new Listener<T>() {
1820 public void exception(Throwable t) {
1821 if(throwable != null) throwable.set(t);
1822 procedure.exception(t);
1823 // newGraph.state.barrier.dec();
1827 public void execute(T t) {
1828 if(result != null) result.set(t);
1829 procedure.execute(t);
1830 // newGraph.state.barrier.dec();
1834 public boolean isDisposed() {
1840 // newGraph.waitAsync(request);
1846 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1858 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1860 scheduleRequest(request, procedure, null, null, null);
1865 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1867 Semaphore notify = new Semaphore(0);
1868 DataContainer<Throwable> container = new DataContainer<Throwable>();
1869 DataContainer<T> result = new DataContainer<T>();
1870 scheduleRequest(request, procedure, notify, container, result);
1871 acquire(notify, request);
1872 Throwable throwable = container.get();
1873 if(throwable != null) {
1874 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1875 else throw new DatabaseException("Unexpected exception", throwable);
1877 return result.get();
1881 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1883 Semaphore notify = new Semaphore(0);
1884 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1885 final DataContainer<T> resultContainer = new DataContainer<T>();
1886 scheduleRequest(request, new AsyncProcedure<T>() {
1888 public void exception(AsyncReadGraph graph, Throwable throwable) {
1889 exceptionContainer.set(throwable);
1890 procedure.exception(graph, throwable);
1893 public void execute(AsyncReadGraph graph, T result) {
1894 resultContainer.set(result);
1895 procedure.execute(graph, result);
1897 }, getListenerBase(procedure), notify);
1898 acquire(notify, request, procedure);
1899 Throwable throwable = exceptionContainer.get();
1900 if (throwable != null) {
1901 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1902 else throw new DatabaseException("Unexpected exception", throwable);
1904 return resultContainer.get();
1909 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1911 Semaphore notify = new Semaphore(0);
1912 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1913 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1915 public void exception(AsyncReadGraph graph, Throwable throwable) {
1916 exceptionContainer.set(throwable);
1917 procedure.exception(graph, throwable);
1920 public void execute(AsyncReadGraph graph, T result) {
1921 procedure.execute(graph, result);
1924 public void finished(AsyncReadGraph graph) {
1925 procedure.finished(graph);
1928 acquire(notify, request, procedure);
1929 Throwable throwable = exceptionContainer.get();
1930 if (throwable != null) {
1931 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1932 else throw new DatabaseException("Unexpected exception", throwable);
1934 // TODO: implement return value
1935 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1940 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1941 return syncRequest(request, new ProcedureAdapter<T>());
1944 // assert(request != null);
1946 // final DataContainer<T> result = new DataContainer<T>();
1947 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1949 // syncRequest(request, new Procedure<T>() {
1952 // public void execute(T t) {
1957 // public void exception(Throwable t) {
1958 // exception.set(t);
1963 // Throwable t = exception.get();
1965 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1966 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1969 // return result.get();
1974 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1975 return syncRequest(request, (Procedure<T>)procedure);
1980 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1981 // assertNotSession();
1982 // Semaphore notify = new Semaphore(0);
1983 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1984 // DataContainer<T> result = new DataContainer<T>();
1985 // scheduleRequest(request, procedure, notify, container, result);
1986 // acquire(notify, request);
1987 // Throwable throwable = container.get();
1988 // if(throwable != null) {
1989 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1990 // else throw new DatabaseException("Unexpected exception", throwable);
1992 // return result.get();
1997 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1999 Semaphore notify = new Semaphore(0);
2000 final DataContainer<Throwable> container = new DataContainer<Throwable>();
2001 final DataContainer<T> result = new DataContainer<T>();
2002 scheduleRequest(request, procedure, notify, container, result);
2003 acquire(notify, request);
2004 Throwable throwable = container.get();
2005 if (throwable != null) {
2006 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2007 else throw new DatabaseException("Unexpected exception", throwable);
2009 return result.get();
2013 public void syncRequest(Write request) throws DatabaseException {
2016 Semaphore notify = new Semaphore(0);
2017 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2018 scheduleRequest(request, e -> exception.set(e), notify);
2019 acquire(notify, request);
2020 if(exception.get() != null) throw exception.get();
2024 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
2026 Semaphore notify = new Semaphore(0);
2027 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2028 final DataContainer<T> result = new DataContainer<T>();
2029 scheduleRequest(request, new Procedure<T>() {
2032 public void exception(Throwable t) {
2037 public void execute(T t) {
2042 acquire(notify, request);
2043 if(exception.get() != null) {
2044 Throwable t = exception.get();
2045 if(t instanceof DatabaseException) throw (DatabaseException)t;
2046 else throw new DatabaseException(t);
2048 return result.get();
2052 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2054 Semaphore notify = new Semaphore(0);
2055 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2056 final DataContainer<T> result = new DataContainer<T>();
2057 scheduleRequest(request, new Procedure<T>() {
2060 public void exception(Throwable t) {
2065 public void execute(T t) {
2070 acquire(notify, request);
2071 if(exception.get() != null) {
2072 Throwable t = exception.get();
2073 if(t instanceof DatabaseException) throw (DatabaseException)t;
2074 else throw new DatabaseException(t);
2076 return result.get();
2080 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2082 Semaphore notify = new Semaphore(0);
2083 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2084 final DataContainer<T> result = new DataContainer<T>();
2085 scheduleRequest(request, new Procedure<T>() {
2088 public void exception(Throwable t) {
2093 public void execute(T t) {
2098 acquire(notify, request);
2099 if(exception.get() != null) {
2100 Throwable t = exception.get();
2101 if(t instanceof DatabaseException) throw (DatabaseException)t;
2102 else throw new DatabaseException(t);
2104 return result.get();
2108 public void syncRequest(DelayedWrite request) throws DatabaseException {
2110 Semaphore notify = new Semaphore(0);
2111 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2112 scheduleRequest(request, e -> exception.set(e), notify);
2113 acquire(notify, request);
2114 if(exception.get() != null) throw exception.get();
2118 public void syncRequest(WriteOnly request) throws DatabaseException {
2121 Semaphore notify = new Semaphore(0);
2122 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2123 scheduleRequest(request, e -> exception.set(e), notify);
2124 acquire(notify, request);
2125 if(exception.get() != null) throw exception.get();
2130 * GraphRequestProcessor interface
2134 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2136 scheduleRequest(request, procedure, null, null);
2141 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2143 scheduleRequest(request, procedure, null);
2148 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2150 scheduleRequest(request, procedure, null, null, null);
2155 public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2157 scheduleRequest(request, callback, null);
2162 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2164 scheduleRequest(request, procedure, null);
2169 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2171 scheduleRequest(request, procedure, null);
2176 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2178 scheduleRequest(request, procedure, null);
2183 public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2185 scheduleRequest(request, callback, null);
2190 public void asyncRequest(final Write r) {
2191 asyncRequest(r, null);
2195 public void asyncRequest(final DelayedWrite r) {
2196 asyncRequest(r, null);
2200 public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2202 scheduleRequest(request, callback, null);
2207 public void asyncRequest(final WriteOnly request) {
2209 asyncRequest(request, null);
2214 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2215 r.request(this, procedure);
2219 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2220 r.request(this, procedure);
2224 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2225 r.request(this, procedure);
2229 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2230 r.request(this, procedure);
2234 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2235 r.request(this, procedure);
2239 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2240 r.request(this, procedure);
2244 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2245 return r.request(this);
2249 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2250 return r.request(this);
2254 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2255 r.request(this, procedure);
2259 public <T> void async(WriteInterface<T> r) {
2260 r.request(this, new ProcedureAdapter<T>());
2264 public void incAsync() {
2269 public void decAsync() {
2273 public long getCluster(ResourceImpl resource) {
2274 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2275 return cluster.getClusterId();
2278 public long getCluster(int id) {
2279 if (clusterTable == null)
2280 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2281 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2284 public ResourceImpl getResource(int id) {
2285 return new ResourceImpl(resourceSupport, id);
2288 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2289 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2290 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2291 int key = proxy.getClusterKey();
2292 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2293 return new ResourceImpl(resourceSupport, resourceKey);
2296 public ResourceImpl getResource2(int id) {
2298 return new ResourceImpl(resourceSupport, id);
2301 final public int getId(ResourceImpl impl) {
2305 public static final Charset UTF8 = Charset.forName("utf-8");
2310 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2311 for(TransientGraph g : support.providers) {
2312 if(g.isPending(subject)) return false;
2317 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2318 for(TransientGraph g : support.providers) {
2319 if(g.isPending(subject, predicate)) return false;
2324 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2326 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2328 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2331 public void accept(ReadGraphImpl graph) {
2332 if(ready.decrementAndGet() == 0) {
2333 runnable.accept(graph);
2339 for(TransientGraph g : support.providers) {
2340 if(g.isPending(subject)) {
2342 g.load(graph, subject, composite);
2343 } catch (DatabaseException e) {
2344 e.printStackTrace();
2347 composite.accept(graph);
2351 composite.accept(graph);
2355 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2357 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2359 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2362 public void accept(ReadGraphImpl graph) {
2363 if(ready.decrementAndGet() == 0) {
2364 runnable.accept(graph);
2370 for(TransientGraph g : support.providers) {
2371 if(g.isPending(subject, predicate)) {
2373 g.load(graph, subject, predicate, composite);
2374 } catch (DatabaseException e) {
2375 e.printStackTrace();
2378 composite.accept(graph);
2382 composite.accept(graph);
2386 // void dumpHeap() {
2389 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2390 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2391 // "d:/heap" + flushCounter + ".txt", true);
2392 // } catch (IOException e) {
2393 // e.printStackTrace();
2398 void fireReactionsToSynchronize(ChangeSet cs) {
2400 // Do not fire empty events
2404 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2405 // g.state.barrier.inc();
2409 if (!cs.isEmpty()) {
2410 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2411 for (ChangeListener l : changeListeners2) {
2414 } catch (Exception ex) {
2415 ex.printStackTrace();
2422 // g.state.barrier.dec();
2423 // g.waitAsync(null);
2429 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2432 // Do not fire empty events
2437 // graph.state.barrier.inc();
2441 if (!cs2.isEmpty()) {
2443 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2444 for (ChangeListener l : changeListeners2) {
2447 // System.out.println("changelistener " + l);
2448 } catch (Exception ex) {
2449 ex.printStackTrace();
2457 // graph.state.barrier.dec();
2458 // graph.waitAsync(null);
2461 } catch (Throwable t) {
2462 t.printStackTrace();
2466 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2470 // Do not fire empty events
2474 // Do not fire on virtual requests
2475 if(graph.getProvider() != null)
2478 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2482 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2483 for (ChangeListener l : metadataListeners) {
2486 } catch (Throwable ex) {
2487 LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex);
2495 } catch (Throwable t) {
2496 LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t);
2505 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2508 public <T> T getService(Class<T> api) {
2509 T t = peekService(api);
2511 if (state.isClosed())
2512 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2513 throw new ServiceNotFoundException(this, api);
2521 protected abstract ServerInformation getCachedServerInformation();
2523 private Class<?> serviceKey1 = null;
2524 private Class<?> serviceKey2 = null;
2525 private Object service1 = null;
2526 private Object service2 = null;
2532 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2534 @SuppressWarnings("unchecked")
2536 public synchronized <T> T peekService(Class<T> api) {
2538 if(serviceKey1 == api) {
2540 } else if (serviceKey2 == api) {
2542 Object result = service2;
2543 service2 = service1;
2544 serviceKey2 = serviceKey1;
2550 if (Layer0.class == api)
2552 if (ServerInformation.class == api)
2553 return (T) getCachedServerInformation();
2554 else if (WriteGraphImpl.class == api)
2555 return (T) writeState.getGraph();
2556 else if (ClusterBuilder.class == api)
2557 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2558 else if (ClusterBuilderFactory.class == api)
2559 return (T)new ClusterBuilderFactoryImpl(this);
2561 service2 = service1;
2562 serviceKey2 = serviceKey1;
2564 service1 = serviceLocator.peekService(api);
2575 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2578 public boolean hasService(Class<?> api) {
2579 return serviceLocator.hasService(api);
2583 * @param api the api that must be implemented by the specified service
2584 * @param service the service implementation
2587 public <T> void registerService(Class<T> api, T service) {
2588 if(Layer0.class == api) {
2589 L0 = (Layer0)service;
2592 serviceLocator.registerService(api, service);
2593 if (TransactionPolicySupport.class == api) {
2594 transactionPolicy = (TransactionPolicySupport)service;
2595 state.resetTransactionPolicy();
2597 if (api == serviceKey1)
2599 else if (api == serviceKey2)
2607 void fireSessionVariableChange(String variable) {
2608 for (MonitorHandler h : monitorHandlers) {
2609 MonitorContext ctx = monitorContexts.getLeft(h);
2611 // SafeRunner functionality repeated here to avoid dependency.
2613 h.valuesChanged(ctx);
2614 } catch (Exception e) {
2615 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2616 } catch (LinkageError e) {
2617 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2623 class ResourceSerializerImpl implements ResourceSerializer {
2625 public long createRandomAccessId(int id) throws DatabaseException {
2628 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2629 long cluster = getCluster(id);
2631 return 0; // Better to return 0 then invalid id.
2632 long result = ClusterTraitsBase.createResourceId(cluster, index);
2637 public long getRandomAccessId(Resource resource) throws DatabaseException {
2639 ResourceImpl resourceImpl = (ResourceImpl) resource;
2640 return createRandomAccessId(resourceImpl.id);
2645 public int getTransientId(Resource resource) throws DatabaseException {
2647 ResourceImpl resourceImpl = (ResourceImpl) resource;
2648 return resourceImpl.id;
2653 public String createRandomAccessId(Resource resource)
2654 throws InvalidResourceReferenceException {
2656 if(resource == null) throw new IllegalArgumentException();
2658 ResourceImpl resourceImpl = (ResourceImpl) resource;
2660 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2664 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2665 } catch (DatabaseException e1) {
2666 throw new InvalidResourceReferenceException(e1);
2669 // Serialize as '<resource index>_<cluster id>'
2670 return "" + r + "_" + getCluster(resourceImpl);
2671 } catch (Throwable e) {
2672 e.printStackTrace();
2673 throw new InvalidResourceReferenceException(e);
2678 public int getTransientId(long serialized) throws DatabaseException {
2679 if (serialized <= 0)
2680 return (int)serialized;
2681 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2682 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2683 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2684 if (cluster == null)
2685 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2686 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2691 public Resource getResource(long randomAccessId) throws DatabaseException {
2692 return getResourceByKey(getTransientId(randomAccessId));
2696 public Resource getResource(int transientId) throws DatabaseException {
2697 return getResourceByKey(transientId);
2701 public Resource getResource(String randomAccessId)
2702 throws InvalidResourceReferenceException {
2704 int i = randomAccessId.indexOf('_');
2706 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2707 + randomAccessId + "'");
2708 int r = Integer.parseInt(randomAccessId.substring(0, i));
2709 if(r < 0) return getResourceByKey(r);
2710 long c = Long.parseLong(randomAccessId.substring(i + 1));
2711 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2712 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2713 if (cluster.hasResource(key, clusterTranslator))
2714 return getResourceByKey(key);
2715 } catch (InvalidResourceReferenceException e) {
2717 } catch (NumberFormatException e) {
2718 throw new InvalidResourceReferenceException(e);
2719 } catch (Throwable e) {
2720 e.printStackTrace();
2721 throw new InvalidResourceReferenceException(e);
2724 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2728 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2731 } catch (Throwable e) {
2732 e.printStackTrace();
2733 throw new InvalidResourceReferenceException(e);
2739 // Copied from old SessionImpl
2742 if (state.isClosed())
2743 throw new Error("Session closed.");
2748 public ResourceImpl getNewResource(long clusterId) {
2749 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2752 newId = cluster.createResource(clusterTranslator);
2753 } catch (DatabaseException e) {
2754 Logger.defaultLogError(e);
2757 return new ResourceImpl(resourceSupport, newId);
2760 public ResourceImpl getNewResource(Resource clusterSet)
2761 throws DatabaseException {
2762 long resourceId = clusterSet.getResourceId();
2763 Long clusterId = clusterSetsSupport.get(resourceId);
2764 if (null == clusterId)
2765 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2766 if (Constants.NewClusterId == clusterId) {
2767 clusterId = getService(ClusteringSupport.class).createCluster();
2768 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2769 createdClusters.add(clusterId);
2770 clusterSetsSupport.put(resourceId, clusterId);
2771 return getNewResource(clusterId);
2773 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2774 ResourceImpl result;
2775 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2776 clusterId = getService(ClusteringSupport.class).createCluster();
2777 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2778 createdClusters.add(clusterId);
2779 clusterSetsSupport.put(resourceId, clusterId);
2780 return getNewResource(clusterId);
2782 result = getNewResource(clusterId);
2783 int resultKey = querySupport.getId(result);
2784 long resultCluster = querySupport.getClusterId(resultKey);
2785 if (clusterId != resultCluster)
2786 clusterSetsSupport.put(resourceId, resultCluster);
2792 public void getNewClusterSet(Resource clusterSet)
2793 throws DatabaseException {
2795 System.out.println("new cluster set=" + clusterSet);
2796 long resourceId = clusterSet.getResourceId();
2797 if (clusterSetsSupport.containsKey(resourceId))
2798 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2799 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2801 public boolean containsClusterSet(Resource clusterSet)
2802 throws ServiceException {
2803 long resourceId = clusterSet.getResourceId();
2804 return clusterSetsSupport.containsKey(resourceId);
2806 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2807 Resource r = defaultClusterSet;
2808 defaultClusterSet = clusterSet;
2811 void printDiagnostics() {
2815 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2816 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2817 for(ClusterI cluster : clusterTable.getClusters()) {
2819 if (cluster.isLoaded() && !cluster.isEmpty()) {
2820 residentClusters.set(residentClusters.get() + 1);
2821 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2823 } catch (DatabaseException e) {
2824 Logger.defaultLogError(e);
2828 System.out.println("--------------------------------");
2829 System.out.println("Cluster information:");
2830 System.out.println("-amount of resident clusters=" + residentClusters.get());
2831 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2833 for(ClusterI cluster : clusterTable.getClusters()) {
2834 System.out.print("Cluster " + cluster.getClusterId() + " [");
2836 if (!cluster.isLoaded())
2837 System.out.println("not loaded]");
2838 else if (cluster.isEmpty())
2839 System.out.println("is empty]");
2841 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2842 System.out.print(",references=" + cluster.getReferenceCount());
2843 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2845 } catch (DatabaseException e) {
2846 Logger.defaultLogError(e);
2847 System.out.println("is corrupted]");
2851 queryProvider2.printDiagnostics();
2852 System.out.println("--------------------------------");
2857 public void fireStartReadTransaction() {
2860 System.out.println("StartReadTransaction");
2862 for (SessionEventListener listener : eventListeners) {
2864 listener.readTransactionStarted();
2865 } catch (Throwable t) {
2866 t.printStackTrace();
2872 public void fireFinishReadTransaction() {
2875 System.out.println("FinishReadTransaction");
2877 for (SessionEventListener listener : eventListeners) {
2879 listener.readTransactionFinished();
2880 } catch (Throwable t) {
2881 t.printStackTrace();
2887 public void fireStartWriteTransaction() {
2890 System.out.println("StartWriteTransaction");
2892 for (SessionEventListener listener : eventListeners) {
2894 listener.writeTransactionStarted();
2895 } catch (Throwable t) {
2896 t.printStackTrace();
2902 public void fireFinishWriteTransaction() {
2905 System.out.println("FinishWriteTransaction");
2907 for (SessionEventListener listener : eventListeners) {
2909 listener.writeTransactionFinished();
2910 } catch (Throwable t) {
2911 t.printStackTrace();
2915 Indexing.resetDependenciesIndexingDisabled();
2919 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2920 throw new Error("Not supported at the moment.");
2927 ClusterTable getClusterTable() {
2928 return clusterTable;
2931 public GraphSession getGraphSession() {
2932 return graphSession;
2935 public QueryProcessor getQueryProvider2() {
2936 return queryProvider2;
2939 ClientChangesImpl getClientChanges() {
2940 return clientChanges;
2943 boolean getWriteOnly() {
2947 static int counter = 0;
2949 public void onClusterLoaded(long clusterId) {
2951 clusterTable.updateSize();
2959 * Implementation of the interface RequestProcessor
2963 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2968 assert(request != null);
2970 final DataContainer<T> result = new DataContainer<T>();
2971 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2973 syncRequest(request, new AsyncProcedure<T>() {
2976 public void execute(AsyncReadGraph graph, T t) {
2981 public void exception(AsyncReadGraph graph, Throwable t) {
2987 Throwable t = exception.get();
2989 if(t instanceof DatabaseException) throw (DatabaseException)t;
2990 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2993 return result.get();
2998 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2999 // assertNotSession();
3000 // return syncRequest(request, (AsyncProcedure<T>)procedure);
3004 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
3006 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3010 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
3012 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3016 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3018 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3022 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
3024 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3028 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3032 assert(request != null);
3034 final DataContainer<T> result = new DataContainer<T>();
3035 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3037 syncRequest(request, new AsyncProcedure<T>() {
3040 public void execute(AsyncReadGraph graph, T t) {
3045 public void exception(AsyncReadGraph graph, Throwable t) {
3051 Throwable t = exception.get();
3053 if(t instanceof DatabaseException) throw (DatabaseException)t;
3054 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3057 return result.get();
3062 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3064 return syncRequest(request, (AsyncProcedure<T>)procedure);
3068 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3070 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3074 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3076 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3080 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3082 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3086 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3088 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3092 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3096 assert(request != null);
3098 final ArrayList<T> result = new ArrayList<T>();
3099 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3101 syncRequest(request, new SyncMultiProcedure<T>() {
3104 public void execute(ReadGraph graph, T t) {
3105 synchronized(result) {
3111 public void finished(ReadGraph graph) {
3115 public void exception(ReadGraph graph, Throwable t) {
3122 Throwable t = exception.get();
3124 if(t instanceof DatabaseException) throw (DatabaseException)t;
3125 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3133 public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
3135 throw new Error("Not implemented!");
3139 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3141 return syncRequest(request, (SyncMultiProcedure<T>)procedure);
3145 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3147 return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3151 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3153 return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3157 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3161 assert(request != null);
3163 final ArrayList<T> result = new ArrayList<T>();
3164 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3166 syncRequest(request, new AsyncMultiProcedure<T>() {
3169 public void execute(AsyncReadGraph graph, T t) {
3170 synchronized(result) {
3176 public void finished(AsyncReadGraph graph) {
3180 public void exception(AsyncReadGraph graph, Throwable t) {
3187 Throwable t = exception.get();
3189 if (t instanceof DatabaseException)
3190 throw (DatabaseException) t;
3192 throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3200 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3202 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3206 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3208 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3212 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3214 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3218 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3220 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3224 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3226 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3231 public <T> void asyncRequest(Read<T> request) {
3233 asyncRequest(request, new ProcedureAdapter<T>() {
3235 public void exception(Throwable t) {
3236 t.printStackTrace();
3243 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3244 asyncRequest(request, (AsyncProcedure<T>)procedure);
3248 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3249 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3253 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3254 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3258 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3259 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3263 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3264 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3268 final public <T> void asyncRequest(final AsyncRead<T> request) {
3270 assert(request != null);
3272 asyncRequest(request, new ProcedureAdapter<T>() {
3274 public void exception(Throwable t) {
3275 t.printStackTrace();
3282 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3283 scheduleRequest(request, procedure, procedure, null);
3287 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3288 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3292 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3293 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3297 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3298 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3302 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3303 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3307 public <T> void asyncRequest(MultiRead<T> request) {
3309 assert(request != null);
3311 asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
3313 public void exception(ReadGraph graph, Throwable t) {
3314 t.printStackTrace();
3321 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3322 asyncRequest(request, (SyncMultiProcedure<T>)procedure);
3326 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3327 asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3331 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3332 scheduleRequest(request, procedure, null);
3336 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3337 asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3341 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3343 assert(request != null);
3345 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3347 public void exception(AsyncReadGraph graph, Throwable t) {
3348 t.printStackTrace();
3355 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3356 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3360 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3361 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3365 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3366 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3370 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3371 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3375 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3376 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3380 final public <T> void asyncRequest(final ExternalRead<T> request) {
3382 assert(request != null);
3384 asyncRequest(request, new ProcedureAdapter<T>() {
3386 public void exception(Throwable t) {
3387 t.printStackTrace();
3394 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3395 asyncRequest(request, (Procedure<T>)procedure);
3398 boolean sameProvider(Write request) {
3399 if(writeState.getGraph().provider != null) {
3400 return writeState.getGraph().provider.equals(request.getProvider());
3402 return request.getProvider() == null;
3406 boolean plainWrite(WriteGraphImpl graph) {
3407 if(graph == null) return false;
3408 if(graph.writeSupport.writeOnly()) return false;
3413 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3415 private void assertNotSession() throws DatabaseException {
3416 Thread current = Thread.currentThread();
3417 if (sessionThreads.contains(current))
3418 throw new ServiceException("Caller is already inside a transaction.");
3421 void assertAlive() {
3422 if (!state.isAlive())
3423 throw new RuntimeDatabaseException("Session has been shut down.");
3426 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3427 return querySupport.getValueStream(graph, querySupport.getId(resource));
3430 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3431 return querySupport.getValue(graph, querySupport.getId(resource));
3434 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3435 acquire(semaphore, request, null);
3438 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3442 // Loop until the semaphore is acquired reporting requests that take a long time.
3445 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3449 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3451 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3452 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3454 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3455 + (waitTime) + " ms. (procedure=" + procedure + ")");
3462 } catch (InterruptedException e) {
3463 e.printStackTrace();
3464 // FIXME: Should perhaps do something else in this case ??
3471 // public boolean holdOnToTransactionAfterCancel() {
3472 // return transactionPolicy.holdOnToTransactionAfterCancel();
3476 // public boolean holdOnToTransactionAfterCommit() {
3477 // return transactionPolicy.holdOnToTransactionAfterCommit();
3481 // public boolean holdOnToTransactionAfterRead() {
3482 // return transactionPolicy.holdOnToTransactionAfterRead();
3486 // public void onRelinquish() {
3487 // transactionPolicy.onRelinquish();
3491 // public void onRelinquishDone() {
3492 // transactionPolicy.onRelinquishDone();
3496 // public void onRelinquishError() {
3497 // transactionPolicy.onRelinquishError();
3502 public Session getSession() {
3507 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3508 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3510 public void ceased(int thread) {
3512 requestManager.ceased(thread);
3516 public int getAmountOfQueryThreads() {
3517 // This must be a power of two
3519 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3522 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3524 return new ResourceImpl(resourceSupport, key);
3528 public void acquireWriteOnly() {
3532 public void releaseWriteOnly(ReadGraphImpl graph) {
3534 queryProvider2.releaseWrite(graph);
3537 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3539 long start = System.nanoTime();
3541 while(dirtyPrimitives) {
3542 dirtyPrimitives = false;
3543 getQueryProvider2().performDirtyUpdates(writer);
3544 getQueryProvider2().performScheduledUpdates(writer);
3547 fireMetadataListeners(writer, clientChanges);
3549 if(DebugPolicy.PERFORMANCE_DATA) {
3550 long end = System.nanoTime();
3551 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3556 void removeTemporaryData() {
3557 File platform = Platform.getLocation().toFile();
3558 File tempFiles = new File(platform, "tempFiles");
3559 File temp = new File(tempFiles, "db");
3563 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3565 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3568 } catch (IOException e) {
3569 Logger.defaultLogError(e);
3571 return FileVisitResult.CONTINUE;
3574 } catch (IOException e) {
3575 Logger.defaultLogError(e);
3579 public void handleCreatedClusters() {
3580 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3581 createdClusters.forEach(new TLongProcedure() {
3584 public boolean execute(long value) {
3585 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3586 cluster.setImmutable(true, clusterTranslator);
3591 createdClusters.clear();
3595 public Object getModificationCounter() {
3596 return queryProvider2.modificationCounter;
3600 public void markUndoPoint() {
3601 state.setCombine(false);