1 /*******************************************************************************
2 * Copyright (c) 2007, 2018 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.GraphChangeListenerSupport;
147 import org.simantics.db.service.InitSupport;
148 import org.simantics.db.service.LifecycleSupport;
149 import org.simantics.db.service.ManagementSupport;
150 import org.simantics.db.service.QueryControl;
151 import org.simantics.db.service.SerialisationSupport;
152 import org.simantics.db.service.ServerInformation;
153 import org.simantics.db.service.ServiceActivityMonitor;
154 import org.simantics.db.service.SessionEventSupport;
155 import org.simantics.db.service.SessionMonitorSupport;
156 import org.simantics.db.service.SessionUserSupport;
157 import org.simantics.db.service.StatementSupport;
158 import org.simantics.db.service.TransactionPolicySupport;
159 import org.simantics.db.service.TransactionSupport;
160 import org.simantics.db.service.TransferableGraphSupport;
161 import org.simantics.db.service.UndoRedoSupport;
162 import org.simantics.db.service.VirtualGraphSupport;
163 import org.simantics.db.service.XSupport;
164 import org.simantics.layer0.Layer0;
165 import org.simantics.utils.DataContainer;
166 import org.simantics.utils.Development;
167 import org.simantics.utils.threads.logger.ITask;
168 import org.simantics.utils.threads.logger.ThreadLogger;
170 import gnu.trove.procedure.TLongProcedure;
171 import gnu.trove.set.hash.TLongHashSet;
174 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
176 protected static final boolean DEBUG = false;
178 private static final boolean DIAGNOSTICS = false;
180 private TransactionPolicySupport transactionPolicy;
182 final private ClusterControl clusterControl;
184 final protected BuiltinSupportImpl builtinSupport;
185 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
186 final protected ClusterSetsSupport clusterSetsSupport;
187 final protected LifecycleSupportImpl lifecycleSupport;
189 protected QuerySupportImpl querySupport;
190 protected ResourceSupportImpl resourceSupport;
191 protected WriteSupport writeSupport;
192 public ClusterTranslator clusterTranslator;
194 boolean dirtyPrimitives = false;
196 public static final int SERVICE_MODE_CREATE = 2;
197 public static final int SERVICE_MODE_ALLOW = 1;
198 public int serviceMode = 0;
199 public boolean createdImmutableClusters = false;
200 public TLongHashSet createdClusters = new TLongHashSet();
205 * The service locator maintained by the workbench. These services are
206 * initialized during workbench during the <code>init</code> method.
208 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
210 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
212 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
214 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
216 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
218 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
220 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
222 final protected State state = new State();
224 protected GraphSession graphSession = null;
226 protected SessionManager sessionManagerImpl = null;
228 protected UserAuthenticationAgent authAgent = null;
230 protected UserAuthenticator authenticator = null;
232 protected Resource user = null;
234 protected ClusterStream clusterStream = null;
236 protected SessionRequestManager requestManager = null;
238 public ClusterTable clusterTable = null;
240 public QueryProcessor queryProvider2 = null;
242 ClientChangesImpl clientChanges = null;
244 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
246 // protected long newClusterId = Constants.NullClusterId;
248 protected int flushCounter = 0;
250 protected boolean writeOnly = false;
252 WriteState<?> writeState = null;
253 WriteStateBase<?> delayedWriteState = null;
254 protected Resource defaultClusterSet = null; // If not null then used for newResource().
256 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
258 // if (authAgent == null)
259 // throw new IllegalArgumentException("null authentication agent");
261 File t = StaticSessionProperties.virtualGraphStoragePath;
264 this.clusterTable = new ClusterTable(this, t);
265 this.builtinSupport = new BuiltinSupportImpl(this);
266 this.sessionManagerImpl = sessionManagerImpl;
268 // this.authAgent = authAgent;
270 serviceLocator.registerService(Session.class, this);
271 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
272 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
273 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
274 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
275 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
276 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
277 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
278 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
279 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
280 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
281 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
282 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
283 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
284 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
285 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
286 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
287 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
288 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
289 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
290 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
291 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
292 ServiceActivityUpdaterForWriteTransactions.register(this);
294 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
295 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
296 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
297 this.lifecycleSupport = new LifecycleSupportImpl(this);
298 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
299 this.transactionPolicy = new TransactionPolicyRelease();
300 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
301 this.clusterControl = new ClusterControlImpl(this);
302 serviceLocator.registerService(ClusterControl.class, clusterControl);
303 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
304 this.clusterSetsSupport.setReadDirectory(t.toPath());
305 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
306 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
316 public Resource getRootLibrary() {
317 return queryProvider2.getRootLibraryResource();
320 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
321 if (!graphSession.dbSession.refreshEnabled())
324 getClusterTable().refresh(csid, this, clusterUID);
325 } catch (Throwable t) {
326 Logger.defaultLogError("Refesh failed.", t);
330 static final class TaskHelper {
331 private final String name;
332 private Object result;
333 final Semaphore sema = new Semaphore(0);
334 private Throwable throwable = null;
335 final Consumer<DatabaseException> callback = e -> {
336 synchronized (TaskHelper.this) {
340 final Procedure<Object> proc = new Procedure<Object>() {
342 public void execute(Object result) {
343 callback.accept(null);
346 public void exception(Throwable t) {
347 if (t instanceof DatabaseException)
348 callback.accept((DatabaseException)t);
350 callback.accept(new DatabaseException("" + name + "operation failed.", t));
353 final WriteTraits writeTraits = new WriteTraits() {};
354 TaskHelper(String name) {
357 @SuppressWarnings("unchecked")
361 void setResult(Object result) {
362 this.result = result;
364 // Throwable throwableGet() {
367 synchronized void throwableSet(Throwable t) {
370 // void throwableSet(String t) {
371 // throwable = new InternalException("" + name + " operation failed. " + t);
373 synchronized void throwableCheck()
374 throws DatabaseException {
375 if (null != throwable)
376 if (throwable instanceof DatabaseException)
377 throw (DatabaseException)throwable;
379 throw new DatabaseException("Undo operation failed.", throwable);
381 void throw_(String message)
382 throws DatabaseException {
383 throw new DatabaseException("" + name + " operation failed. " + message);
389 private ListenerBase getListenerBase(Object procedure) {
390 if (procedure instanceof ListenerBase)
391 return (ListenerBase) procedure;
396 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
397 scheduleRequest(request, callback, notify, null);
401 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
404 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
406 assert (request != null);
408 if(Development.DEVELOPMENT) {
410 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
411 System.err.println("schedule write '" + request + "'");
412 } catch (Throwable t) {
413 Logger.defaultLogError(t);
417 requestManager.scheduleWrite(new SessionTask(null) {
420 public void run(int thread) {
422 if(Development.DEVELOPMENT) {
424 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
425 System.err.println("perform write '" + request + "'");
426 } catch (Throwable t) {
427 Logger.defaultLogError(t);
431 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
433 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
438 Disposable.safeDispose(clientChanges);
439 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
441 VirtualGraph vg = getProvider(request.getProvider());
443 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
444 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
447 public void execute(Object result) {
448 if(callback != null) callback.accept(null);
452 public void exception(Throwable t) {
453 if(callback != null) callback.accept((DatabaseException)t);
458 assert (null != writer);
459 // writer.state.barrier.inc();
462 request.perform(writer);
463 assert (null != writer);
464 } catch (Throwable t) {
465 if (!(t instanceof CancelTransactionException))
466 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
467 writeState.except(t);
469 // writer.state.barrier.dec();
470 // writer.waitAsync(request);
474 assert(!queryProvider2.cache.dirty);
476 } catch (Throwable e) {
478 // Log it first, just to be safe that the error is always logged.
479 if (!(e instanceof CancelTransactionException))
480 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
482 // writeState.getGraph().state.barrier.dec();
483 // writeState.getGraph().waitAsync(request);
485 writeState.except(e);
488 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
489 // // All we can do here is to log those, can't really pass them anywhere.
490 // if (callback != null) {
491 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
492 // else callback.run(new DatabaseException(e));
494 // } catch (Throwable e2) {
495 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
497 // boolean empty = clusterStream.reallyFlush();
498 // int callerThread = -1;
499 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
500 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
502 // // Send all local changes to server so it can calculate correct reverse change set.
503 // // This will call clusterStream.accept().
504 // state.cancelCommit(context, clusterStream);
506 // if (!context.isOk()) // this is a blocking operation
507 // throw new InternalException("Cancel failed. This should never happen.");
508 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
510 // state.cancelCommit2(context, clusterStream);
511 // } catch (DatabaseException e3) {
512 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
514 // clusterStream.setOff(false);
515 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
520 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
526 if(Development.DEVELOPMENT) {
528 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
529 System.err.println("finish write '" + request + "'");
530 } catch (Throwable t) {
531 Logger.defaultLogError(t);
541 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
542 scheduleRequest(request, procedure, notify, null);
546 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
549 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
551 assert (request != null);
553 requestManager.scheduleWrite(new SessionTask(null) {
556 public void run(int thread) {
558 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
560 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
563 Disposable.safeDispose(clientChanges);
564 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
566 VirtualGraph vg = getProvider(request.getProvider());
567 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
570 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
571 writeState = writeStateT;
573 assert (null != writer);
574 // writer.state.barrier.inc();
575 writeStateT.setResult(request.perform(writer));
576 assert (null != writer);
578 // writer.state.barrier.dec();
579 // writer.waitAsync(null);
581 } catch (Throwable e) {
583 // writer.state.barrier.dec();
584 // writer.waitAsync(null);
586 writeState.except(e);
588 // state.stopWriteTransaction(clusterStream);
590 // } catch (Throwable e) {
591 // // Log it first, just to be safe that the error is always logged.
592 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
595 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
596 // // All we can do here is to log those, can't really pass them anywhere.
597 // if (procedure != null) {
598 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
599 // else procedure.exception(new DatabaseException(e));
601 // } catch (Throwable e2) {
602 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
605 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
607 // state.stopWriteTransaction(clusterStream);
610 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
613 // if(notify != null) notify.release();
623 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
624 scheduleRequest(request, callback, notify, null);
628 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
631 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
633 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
635 assert (request != null);
637 requestManager.scheduleWrite(new SessionTask(null) {
640 public void run(int thread) {
641 fireSessionVariableChange(SessionVariables.QUEUED_READS);
643 Procedure<Object> stateProcedure = new Procedure<Object>() {
645 public void execute(Object result) {
646 if (callback != null)
647 callback.accept(null);
650 public void exception(Throwable t) {
651 if (callback != null) {
652 if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
653 else callback.accept(new DatabaseException(t));
655 Logger.defaultLogError("Unhandled exception", t);
659 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
660 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
661 DelayedWriteGraph dwg = null;
662 // newGraph.state.barrier.inc();
665 dwg = new DelayedWriteGraph(newGraph);
666 request.perform(dwg);
667 } catch (Throwable e) {
668 delayedWriteState.except(e);
673 // newGraph.state.barrier.dec();
674 // newGraph.waitAsync(request);
675 fireSessionVariableChange(SessionVariables.QUEUED_READS);
678 delayedWriteState = null;
680 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
681 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
684 Disposable.safeDispose(clientChanges);
685 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
689 VirtualGraph vg = getProvider(request.getProvider());
690 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
692 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
694 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
696 assert (null != writer);
697 // writer.state.barrier.inc();
701 dwg.commit(writer, request);
703 if(defaultClusterSet != null) {
704 XSupport xs = getService(XSupport.class);
705 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
706 ClusteringSupport cs = getService(ClusteringSupport.class);
707 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
710 // This makes clusters available from server
711 clusterStream.reallyFlush();
713 releaseWriteOnly(writer);
714 handleUpdatesAndMetadata(writer);
716 } catch (ServiceException e) {
717 // writer.state.barrier.dec();
718 // writer.waitAsync(null);
720 // These shall be requested from server
721 clusterTable.removeWriteOnlyClusters();
722 // This makes clusters available from server
723 clusterStream.reallyFlush();
725 releaseWriteOnly(writer);
726 writeState.except(e);
728 // Debugging & Profiling
729 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
739 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
740 scheduleRequest(request, procedure, notify, null);
744 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
747 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
748 throw new Error("Not implemented");
751 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
752 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
753 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
754 createdClusters.add(cluster.clusterId);
759 class WriteOnlySupport implements WriteSupport {
761 ClusterStream stream;
762 ClusterImpl currentCluster;
764 public WriteOnlySupport() {
765 this.stream = clusterStream;
769 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
770 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
774 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
776 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
778 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
779 if(s != queryProvider2.getRootLibrary())
780 throw new ImmutableException("Trying to modify immutable resource key=" + s);
783 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
784 } catch (DatabaseException e) {
785 Logger.defaultLogError(e);
789 clientChanges.invalidate(s);
791 if (cluster.isWriteOnly())
793 queryProvider2.updateStatements(s, p);
798 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
799 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
803 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
805 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
807 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
808 } catch (DatabaseException e) {
809 Logger.defaultLogError(e);
812 clientChanges.invalidate(rid);
814 if (cluster.isWriteOnly())
816 queryProvider2.updateValue(rid);
821 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
824 claimValue(provider,resource, reader.readBytes(null, amount));
828 byte[] bytes = new byte[65536];
830 int rid = ((ResourceImpl)resource).id;
831 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
835 int block = Math.min(left, 65536);
836 reader.readBytes(bytes, block);
837 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
840 } catch (DatabaseException e) {
841 Logger.defaultLogError(e);
844 clientChanges.invalidate(rid);
846 if (cluster.isWriteOnly())
848 queryProvider2.updateValue(rid);
852 private void maintainCluster(ClusterImpl before, ClusterI after_) {
853 if(after_ != null && after_ != before) {
854 ClusterImpl after = (ClusterImpl)after_;
855 if(currentCluster == before) {
856 currentCluster = after;
858 clusterTable.replaceCluster(after);
862 public int createResourceKey(int foreignCounter) throws DatabaseException {
863 if(currentCluster == null) {
864 currentCluster = getNewResourceCluster();
866 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
867 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
868 newCluster.foreignLookup = new byte[foreignCounter];
869 currentCluster = newCluster;
871 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
873 return currentCluster.createResource(clusterTranslator);
877 public Resource createResource(VirtualGraph provider) throws DatabaseException {
878 if(currentCluster == null) {
879 if (null != defaultClusterSet) {
880 ResourceImpl result = getNewResource(defaultClusterSet);
881 currentCluster = clusterTable.getClusterByResourceKey(result.id);
884 currentCluster = getNewResourceCluster();
887 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
888 if (null != defaultClusterSet) {
889 ResourceImpl result = getNewResource(defaultClusterSet);
890 currentCluster = clusterTable.getClusterByResourceKey(result.id);
893 currentCluster = getNewResourceCluster();
896 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
900 public Resource createResource(VirtualGraph provider, long clusterId)
901 throws DatabaseException {
902 return getNewResource(clusterId);
906 public Resource createResource(VirtualGraph provider, Resource clusterSet)
907 throws DatabaseException {
908 return getNewResource(clusterSet);
912 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
913 throws DatabaseException {
914 getNewClusterSet(clusterSet);
918 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
919 throws ServiceException {
920 return containsClusterSet(clusterSet);
923 public void selectCluster(long cluster) {
924 currentCluster = clusterTable.getClusterByClusterId(cluster);
925 long setResourceId = clusterSetsSupport.getSet(cluster);
926 clusterSetsSupport.put(setResourceId, cluster);
930 public Resource setDefaultClusterSet(Resource clusterSet)
931 throws ServiceException {
932 Resource result = setDefaultClusterSet4NewResource(clusterSet);
933 if(clusterSet != null) {
934 long id = clusterSetsSupport.get(clusterSet.getResourceId());
935 currentCluster = clusterTable.getClusterByClusterId(id);
938 currentCluster = null;
944 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
946 provider = getProvider(provider);
947 if (null == provider) {
948 int key = ((ResourceImpl)resource).id;
952 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
953 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
954 // if(key != queryProvider2.getRootLibrary())
955 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
958 // cluster.removeValue(key, clusterTranslator);
959 // } catch (DatabaseException e) {
960 // Logger.defaultLogError(e);
964 clusterTable.writeOnlyInvalidate(cluster);
967 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
968 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
969 clusterTranslator.removeValue(cluster);
970 } catch (DatabaseException e) {
971 Logger.defaultLogError(e);
974 queryProvider2.invalidateResource(key);
975 clientChanges.invalidate(key);
978 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
979 queryProvider2.updateValue(querySupport.getId(resource));
980 clientChanges.claimValue(resource);
987 public void flush(boolean intermediate) {
988 throw new UnsupportedOperationException();
992 public void flushCluster() {
993 clusterTable.flushCluster(graphSession);
994 if(defaultClusterSet != null) {
995 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
997 currentCluster = null;
1001 public void flushCluster(Resource r) {
1002 throw new UnsupportedOperationException("flushCluster resource " + r);
1010 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1013 int s = ((ResourceImpl)subject).id;
1014 int p = ((ResourceImpl)predicate).id;
1015 int o = ((ResourceImpl)object).id;
1017 provider = getProvider(provider);
1018 if (null == provider) {
1020 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1021 clusterTable.writeOnlyInvalidate(cluster);
1025 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1027 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1028 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1030 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1031 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1032 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1033 clusterTranslator.removeStatement(cluster);
1035 queryProvider2.invalidateResource(s);
1036 clientChanges.invalidate(s);
1038 } catch (DatabaseException e) {
1040 Logger.defaultLogError(e);
1048 ((VirtualGraphImpl)provider).deny(s, p, o);
1049 queryProvider2.invalidateResource(s);
1050 clientChanges.invalidate(s);
1059 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1060 throw new UnsupportedOperationException();
1064 public boolean writeOnly() {
1069 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1070 writeSupport.performWriteRequest(graph, request);
1074 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1075 throw new UnsupportedOperationException();
1079 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1080 throw new UnsupportedOperationException();
1084 public <T> void addMetadata(Metadata data) throws ServiceException {
1085 writeSupport.addMetadata(data);
1089 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1090 return writeSupport.getMetadata(clazz);
1094 public TreeMap<String, byte[]> getMetadata() {
1095 return writeSupport.getMetadata();
1099 public void commitDone(WriteTraits writeTraits, long csid) {
1100 writeSupport.commitDone(writeTraits, csid);
1104 public void clearUndoList(WriteTraits writeTraits) {
1105 writeSupport.clearUndoList(writeTraits);
1108 public int clearMetadata() {
1109 return writeSupport.clearMetadata();
1113 public void startUndo() {
1114 writeSupport.startUndo();
1118 class VirtualWriteOnlySupport implements WriteSupport {
1121 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1122 // Resource object) {
1123 // throw new UnsupportedOperationException();
1127 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1129 TransientGraph impl = (TransientGraph)provider;
1130 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1131 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1132 clientChanges.claim(subject, predicate, object);
1137 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1139 TransientGraph impl = (TransientGraph)provider;
1140 impl.claim(subject, predicate, object);
1141 getQueryProvider2().updateStatements(subject, predicate);
1142 clientChanges.claim(subject, predicate, object);
1147 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1148 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1152 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1153 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1154 getQueryProvider2().updateValue(resource);
1155 clientChanges.claimValue(resource);
1159 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1160 byte[] value = reader.readBytes(null, amount);
1161 claimValue(provider, resource, value);
1165 public Resource createResource(VirtualGraph provider) {
1166 TransientGraph impl = (TransientGraph)provider;
1167 return impl.getResource(impl.newResource(false));
1171 public Resource createResource(VirtualGraph provider, long clusterId) {
1172 throw new UnsupportedOperationException();
1176 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1177 throws DatabaseException {
1178 throw new UnsupportedOperationException();
1182 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1183 throws DatabaseException {
1184 throw new UnsupportedOperationException();
1188 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1189 throws ServiceException {
1190 throw new UnsupportedOperationException();
1194 public Resource setDefaultClusterSet(Resource clusterSet)
1195 throws ServiceException {
1200 public void denyValue(VirtualGraph provider, Resource resource) {
1201 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1202 getQueryProvider2().updateValue(querySupport.getId(resource));
1203 // NOTE: this only keeps track of value changes by-resource.
1204 clientChanges.claimValue(resource);
1208 public void flush(boolean intermediate) {
1209 throw new UnsupportedOperationException();
1213 public void flushCluster() {
1214 throw new UnsupportedOperationException();
1218 public void flushCluster(Resource r) {
1219 throw new UnsupportedOperationException("Resource " + r);
1227 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1228 TransientGraph impl = (TransientGraph) provider;
1229 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1230 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1231 clientChanges.deny(subject, predicate, object);
1236 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1237 throw new UnsupportedOperationException();
1241 public boolean writeOnly() {
1246 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1247 throw new UnsupportedOperationException();
1251 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1252 throw new UnsupportedOperationException();
1256 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1257 throw new UnsupportedOperationException();
1261 public <T> void addMetadata(Metadata data) throws ServiceException {
1262 throw new UnsupportedOperationException();
1266 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1267 throw new UnsupportedOperationException();
1271 public TreeMap<String, byte[]> getMetadata() {
1272 throw new UnsupportedOperationException();
1276 public void commitDone(WriteTraits writeTraits, long csid) {
1280 public void clearUndoList(WriteTraits writeTraits) {
1284 public int clearMetadata() {
1289 public void startUndo() {
1293 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1297 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1299 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1302 Disposable.safeDispose(clientChanges);
1303 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1307 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1309 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1311 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1312 writeState = writeStateT;
1314 assert (null != writer);
1315 // writer.state.barrier.inc();
1316 long start = System.nanoTime();
1317 T result = request.perform(writer);
1318 long duration = System.nanoTime() - start;
1320 System.err.println("################");
1321 System.err.println("WriteOnly duration " + 1e-9*duration);
1323 writeStateT.setResult(result);
1325 // This makes clusters available from server
1326 clusterStream.reallyFlush();
1328 // This will trigger query updates
1329 releaseWriteOnly(writer);
1330 assert (null != writer);
1332 handleUpdatesAndMetadata(writer);
1334 } catch (CancelTransactionException e) {
1336 releaseWriteOnly(writeState.getGraph());
1338 clusterTable.removeWriteOnlyClusters();
1339 state.stopWriteTransaction(clusterStream);
1341 } catch (Throwable e) {
1343 e.printStackTrace();
1345 releaseWriteOnly(writeState.getGraph());
1347 clusterTable.removeWriteOnlyClusters();
1349 if (callback != null)
1350 callback.exception(new DatabaseException(e));
1352 state.stopWriteTransaction(clusterStream);
1353 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1357 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1363 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1364 scheduleRequest(request, callback, notify, null);
1368 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1372 assert (request != null);
1374 requestManager.scheduleWrite(new SessionTask(null) {
1377 public void run(int thread) {
1379 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1383 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1386 Disposable.safeDispose(clientChanges);
1387 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1391 VirtualGraph vg = getProvider(request.getProvider());
1392 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1394 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1396 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1399 public void execute(Object result) {
1400 if(callback != null) callback.accept(null);
1404 public void exception(Throwable t) {
1405 if(callback != null) callback.accept((DatabaseException)t);
1410 assert (null != writer);
1411 // writer.state.barrier.inc();
1415 request.perform(writer);
1417 } catch (Throwable e) {
1419 // writer.state.barrier.dec();
1420 // writer.waitAsync(null);
1422 releaseWriteOnly(writer);
1424 clusterTable.removeWriteOnlyClusters();
1426 if(!(e instanceof CancelTransactionException)) {
1427 if (callback != null)
1428 callback.accept(new DatabaseException(e));
1431 writeState.except(e);
1437 // This makes clusters available from server
1438 boolean empty = clusterStream.reallyFlush();
1439 // This was needed to make WO requests call metadata listeners.
1440 // NOTE: the calling event does not contain clientChanges information.
1441 if (!empty && clientChanges.isEmpty())
1442 clientChanges.setNotEmpty(true);
1443 releaseWriteOnly(writer);
1444 assert (null != writer);
1447 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1460 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1461 scheduleRequest(request, callback, notify, null);
1465 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1468 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1470 assert (request != null);
1472 requestManager.scheduleWrite(new SessionTask(null) {
1475 public void run(int thread) {
1477 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1479 performWriteOnly(request, notify, callback);
1489 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1491 assert (request != null);
1492 assert (procedure != null);
1494 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1496 requestManager.scheduleRead(new SessionRead(throwable, notify) {
1499 public void run(int thread) {
1501 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1503 ListenerBase listener = getListenerBase(procedure);
1505 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1509 if (listener != null) {
1513 AsyncProcedure ap = new AsyncProcedure<T>() {
1516 public void exception(AsyncReadGraph graph, Throwable t) {
1517 procedure.exception(graph, t);
1518 if(throwable != null) {
1521 // ErrorLogger.defaultLogError("Unhandled exception", t);
1526 public void execute(AsyncReadGraph graph, T t) {
1527 if(result != null) result.set(t);
1528 procedure.execute(graph, t);
1533 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1535 } catch (Throwable t) {
1536 // This is handled by the AsyncProcedure
1537 //Logger.defaultLogError("Internal error", t);
1544 // newGraph.state.barrier.inc();
1546 T t = request.perform(newGraph);
1550 if(result != null) result.set(t);
1551 procedure.execute(newGraph, t);
1553 } catch (Throwable th) {
1555 if(throwable != null) {
1558 Logger.defaultLogError("Unhandled exception", th);
1563 } catch (Throwable t) {
1566 t.printStackTrace();
1568 if(throwable != null) {
1571 Logger.defaultLogError("Unhandled exception", t);
1576 procedure.exception(newGraph, t);
1578 } catch (Throwable t2) {
1580 if(throwable != null) {
1583 Logger.defaultLogError("Unhandled exception", t2);
1590 // newGraph.state.barrier.dec();
1591 // newGraph.waitAsync(request);
1597 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1607 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1609 assert (request != null);
1610 assert (procedure != null);
1612 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1614 requestManager.scheduleRead(new SessionRead(null, notify) {
1617 public void run(int thread) {
1619 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1621 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1625 if (listener != null) {
1628 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1629 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1630 //newGraph.processor.query(newGraph, request, null, procedure, listener);
1631 } catch (DatabaseException e) {
1632 Logger.defaultLogError(e);
1637 // final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1638 // procedure, "request");
1640 BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request);
1644 request.perform(newGraph, wrap);
1647 } catch (DatabaseException e) {
1649 Logger.defaultLogError(e);
1657 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1667 public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1669 assert (request != null);
1670 assert (procedure != null);
1672 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1674 int sync = notify != null ? thread : -1;
1676 requestManager.scheduleRead(new SessionRead(null, notify) {
1679 public void run(int thread) {
1681 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1683 ListenerBase listener = getListenerBase(procedure);
1685 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1689 if (listener != null) {
1691 newGraph.processor.query(newGraph, request, null, procedure, listener);
1693 // newGraph.waitAsync(request);
1697 final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1701 request.perform(newGraph, wrapper);
1703 } catch (Throwable t) {
1705 t.printStackTrace();
1713 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1723 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1725 assert (request != null);
1726 assert (procedure != null);
1728 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1730 int sync = notify != null ? thread : -1;
1732 requestManager.scheduleRead(new SessionRead(null, notify) {
1735 public void run(int thread) {
1737 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1739 ListenerBase listener = getListenerBase(procedure);
1741 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1745 if (listener != null) {
1747 newGraph.processor.query(newGraph, request, null, procedure, listener);
1749 // newGraph.waitAsync(request);
1753 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1757 request.perform(newGraph, wrapper);
1759 } catch (Throwable t) {
1761 t.printStackTrace();
1769 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1779 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1781 assert (request != null);
1782 assert (procedure != null);
1784 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1786 requestManager.scheduleRead(new SessionRead(throwable, notify) {
1789 public void run(int thread) {
1791 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1793 ListenerBase listener = getListenerBase(procedure);
1795 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1799 if (listener != null) {
1802 QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1803 } catch (DatabaseException e) {
1804 Logger.defaultLogError(e);
1809 // newGraph.state.barrier.inc();
1811 request.register(newGraph, new Listener<T>() {
1814 public void exception(Throwable t) {
1815 if(throwable != null) throwable.set(t);
1816 procedure.exception(t);
1817 // newGraph.state.barrier.dec();
1821 public void execute(T t) {
1822 if(result != null) result.set(t);
1823 procedure.execute(t);
1824 // newGraph.state.barrier.dec();
1828 public boolean isDisposed() {
1834 // newGraph.waitAsync(request);
1840 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1852 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1854 scheduleRequest(request, procedure, null, null, null);
1859 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1861 Semaphore notify = new Semaphore(0);
1862 DataContainer<Throwable> container = new DataContainer<Throwable>();
1863 DataContainer<T> result = new DataContainer<T>();
1864 scheduleRequest(request, procedure, notify, container, result);
1865 acquire(notify, request);
1866 Throwable throwable = container.get();
1867 if(throwable != null) {
1868 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1869 else throw new DatabaseException("Unexpected exception", throwable);
1871 return result.get();
1875 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1877 Semaphore notify = new Semaphore(0);
1878 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1879 final DataContainer<T> resultContainer = new DataContainer<T>();
1880 scheduleRequest(request, new AsyncProcedure<T>() {
1882 public void exception(AsyncReadGraph graph, Throwable throwable) {
1883 exceptionContainer.set(throwable);
1884 procedure.exception(graph, throwable);
1887 public void execute(AsyncReadGraph graph, T result) {
1888 resultContainer.set(result);
1889 procedure.execute(graph, result);
1891 }, getListenerBase(procedure), notify);
1892 acquire(notify, request, procedure);
1893 Throwable throwable = exceptionContainer.get();
1894 if (throwable != null) {
1895 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1896 else throw new DatabaseException("Unexpected exception", throwable);
1898 return resultContainer.get();
1903 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1905 Semaphore notify = new Semaphore(0);
1906 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1907 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1909 public void exception(AsyncReadGraph graph, Throwable throwable) {
1910 exceptionContainer.set(throwable);
1911 procedure.exception(graph, throwable);
1914 public void execute(AsyncReadGraph graph, T result) {
1915 procedure.execute(graph, result);
1918 public void finished(AsyncReadGraph graph) {
1919 procedure.finished(graph);
1922 acquire(notify, request, procedure);
1923 Throwable throwable = exceptionContainer.get();
1924 if (throwable != null) {
1925 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1926 else throw new DatabaseException("Unexpected exception", throwable);
1928 // TODO: implement return value
1929 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1934 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1935 return syncRequest(request, new ProcedureAdapter<T>());
1938 // assert(request != null);
1940 // final DataContainer<T> result = new DataContainer<T>();
1941 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1943 // syncRequest(request, new Procedure<T>() {
1946 // public void execute(T t) {
1951 // public void exception(Throwable t) {
1952 // exception.set(t);
1957 // Throwable t = exception.get();
1959 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1960 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1963 // return result.get();
1968 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1969 return syncRequest(request, (Procedure<T>)procedure);
1974 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1975 // assertNotSession();
1976 // Semaphore notify = new Semaphore(0);
1977 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1978 // DataContainer<T> result = new DataContainer<T>();
1979 // scheduleRequest(request, procedure, notify, container, result);
1980 // acquire(notify, request);
1981 // Throwable throwable = container.get();
1982 // if(throwable != null) {
1983 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1984 // else throw new DatabaseException("Unexpected exception", throwable);
1986 // return result.get();
1991 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1993 Semaphore notify = new Semaphore(0);
1994 final DataContainer<Throwable> container = new DataContainer<Throwable>();
1995 final DataContainer<T> result = new DataContainer<T>();
1996 scheduleRequest(request, procedure, notify, container, result);
1997 acquire(notify, request);
1998 Throwable throwable = container.get();
1999 if (throwable != null) {
2000 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2001 else throw new DatabaseException("Unexpected exception", throwable);
2003 return result.get();
2007 public void syncRequest(Write request) throws DatabaseException {
2010 Semaphore notify = new Semaphore(0);
2011 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2012 scheduleRequest(request, e -> exception.set(e), notify);
2013 acquire(notify, request);
2014 if(exception.get() != null) throw exception.get();
2018 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
2020 Semaphore notify = new Semaphore(0);
2021 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2022 final DataContainer<T> result = new DataContainer<T>();
2023 scheduleRequest(request, new Procedure<T>() {
2026 public void exception(Throwable t) {
2031 public void execute(T t) {
2036 acquire(notify, request);
2037 if(exception.get() != null) {
2038 Throwable t = exception.get();
2039 if(t instanceof DatabaseException) throw (DatabaseException)t;
2040 else throw new DatabaseException(t);
2042 return result.get();
2046 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2048 Semaphore notify = new Semaphore(0);
2049 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2050 final DataContainer<T> result = new DataContainer<T>();
2051 scheduleRequest(request, new Procedure<T>() {
2054 public void exception(Throwable t) {
2059 public void execute(T t) {
2064 acquire(notify, request);
2065 if(exception.get() != null) {
2066 Throwable t = exception.get();
2067 if(t instanceof DatabaseException) throw (DatabaseException)t;
2068 else throw new DatabaseException(t);
2070 return result.get();
2074 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2076 Semaphore notify = new Semaphore(0);
2077 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2078 final DataContainer<T> result = new DataContainer<T>();
2079 scheduleRequest(request, new Procedure<T>() {
2082 public void exception(Throwable t) {
2087 public void execute(T t) {
2092 acquire(notify, request);
2093 if(exception.get() != null) {
2094 Throwable t = exception.get();
2095 if(t instanceof DatabaseException) throw (DatabaseException)t;
2096 else throw new DatabaseException(t);
2098 return result.get();
2102 public void syncRequest(DelayedWrite request) throws DatabaseException {
2104 Semaphore notify = new Semaphore(0);
2105 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2106 scheduleRequest(request, e -> exception.set(e), notify);
2107 acquire(notify, request);
2108 if(exception.get() != null) throw exception.get();
2112 public void syncRequest(WriteOnly request) throws DatabaseException {
2115 Semaphore notify = new Semaphore(0);
2116 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2117 scheduleRequest(request, e -> exception.set(e), notify);
2118 acquire(notify, request);
2119 if(exception.get() != null) throw exception.get();
2124 * GraphRequestProcessor interface
2128 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2130 scheduleRequest(request, procedure, null, null);
2135 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2137 scheduleRequest(request, procedure, null);
2142 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2144 scheduleRequest(request, procedure, null, null, null);
2149 public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2151 scheduleRequest(request, callback, null);
2156 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2158 scheduleRequest(request, procedure, null);
2163 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2165 scheduleRequest(request, procedure, null);
2170 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2172 scheduleRequest(request, procedure, null);
2177 public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2179 scheduleRequest(request, callback, null);
2184 public void asyncRequest(final Write r) {
2185 asyncRequest(r, null);
2189 public void asyncRequest(final DelayedWrite r) {
2190 asyncRequest(r, null);
2194 public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2196 scheduleRequest(request, callback, null);
2201 public void asyncRequest(final WriteOnly request) {
2203 asyncRequest(request, null);
2208 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2209 r.request(this, procedure);
2213 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2214 r.request(this, procedure);
2218 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2219 r.request(this, procedure);
2223 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2224 r.request(this, procedure);
2228 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2229 r.request(this, procedure);
2233 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2234 r.request(this, procedure);
2238 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2239 return r.request(this);
2243 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2244 return r.request(this);
2248 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2249 r.request(this, procedure);
2253 public <T> void async(WriteInterface<T> r) {
2254 r.request(this, new ProcedureAdapter<T>());
2258 public void incAsync() {
2263 public void decAsync() {
2267 public long getCluster(ResourceImpl resource) {
2268 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2269 return cluster.getClusterId();
2272 public long getCluster(int id) {
2273 if (clusterTable == null)
2274 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2275 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2278 public ResourceImpl getResource(int id) {
2279 return new ResourceImpl(resourceSupport, id);
2282 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2283 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2284 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2285 int key = proxy.getClusterKey();
2286 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2287 return new ResourceImpl(resourceSupport, resourceKey);
2290 public ResourceImpl getResource2(int id) {
2292 return new ResourceImpl(resourceSupport, id);
2295 final public int getId(ResourceImpl impl) {
2299 public static final Charset UTF8 = Charset.forName("utf-8");
2304 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2305 for(TransientGraph g : support.providers) {
2306 if(g.isPending(subject)) return false;
2311 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2312 for(TransientGraph g : support.providers) {
2313 if(g.isPending(subject, predicate)) return false;
2318 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2320 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2322 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2325 public void accept(ReadGraphImpl graph) {
2326 if(ready.decrementAndGet() == 0) {
2327 runnable.accept(graph);
2333 for(TransientGraph g : support.providers) {
2334 if(g.isPending(subject)) {
2336 g.load(graph, subject, composite);
2337 } catch (DatabaseException e) {
2338 e.printStackTrace();
2341 composite.accept(graph);
2345 composite.accept(graph);
2349 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2351 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2353 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2356 public void accept(ReadGraphImpl graph) {
2357 if(ready.decrementAndGet() == 0) {
2358 runnable.accept(graph);
2364 for(TransientGraph g : support.providers) {
2365 if(g.isPending(subject, predicate)) {
2367 g.load(graph, subject, predicate, composite);
2368 } catch (DatabaseException e) {
2369 e.printStackTrace();
2372 composite.accept(graph);
2376 composite.accept(graph);
2380 // void dumpHeap() {
2383 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2384 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2385 // "d:/heap" + flushCounter + ".txt", true);
2386 // } catch (IOException e) {
2387 // e.printStackTrace();
2392 void fireReactionsToSynchronize(ChangeSet cs) {
2394 // Do not fire empty events
2398 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2399 // g.state.barrier.inc();
2403 if (!cs.isEmpty()) {
2404 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2405 for (ChangeListener l : changeListeners2) {
2408 } catch (Exception ex) {
2409 ex.printStackTrace();
2416 // g.state.barrier.dec();
2417 // g.waitAsync(null);
2423 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2426 // Do not fire empty events
2431 // graph.state.barrier.inc();
2435 if (!cs2.isEmpty()) {
2437 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2438 for (ChangeListener l : changeListeners2) {
2441 // System.out.println("changelistener " + l);
2442 } catch (Exception ex) {
2443 ex.printStackTrace();
2451 // graph.state.barrier.dec();
2452 // graph.waitAsync(null);
2455 } catch (Throwable t) {
2456 t.printStackTrace();
2460 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2464 // Do not fire empty events
2468 // Do not fire on virtual requests
2469 if(graph.getProvider() != null)
2472 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2476 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2477 for (ChangeListener l : metadataListeners) {
2480 } catch (Throwable ex) {
2481 ex.printStackTrace();
2489 } catch (Throwable t) {
2490 t.printStackTrace();
2499 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2502 public <T> T getService(Class<T> api) {
2503 T t = peekService(api);
2505 if (state.isClosed())
2506 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2507 throw new ServiceNotFoundException(this, api);
2515 protected abstract ServerInformation getCachedServerInformation();
2517 private Class<?> serviceKey1 = null;
2518 private Class<?> serviceKey2 = null;
2519 private Object service1 = null;
2520 private Object service2 = null;
2526 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2528 @SuppressWarnings("unchecked")
2530 public synchronized <T> T peekService(Class<T> api) {
2532 if(serviceKey1 == api) {
2534 } else if (serviceKey2 == api) {
2536 Object result = service2;
2537 service2 = service1;
2538 serviceKey2 = serviceKey1;
2544 if (Layer0.class == api)
2546 if (ServerInformation.class == api)
2547 return (T) getCachedServerInformation();
2548 else if (WriteGraphImpl.class == api)
2549 return (T) writeState.getGraph();
2550 else if (ClusterBuilder.class == api)
2551 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2552 else if (ClusterBuilderFactory.class == api)
2553 return (T)new ClusterBuilderFactoryImpl(this);
2555 service2 = service1;
2556 serviceKey2 = serviceKey1;
2558 service1 = serviceLocator.peekService(api);
2569 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2572 public boolean hasService(Class<?> api) {
2573 return serviceLocator.hasService(api);
2577 * @param api the api that must be implemented by the specified service
2578 * @param service the service implementation
2581 public <T> void registerService(Class<T> api, T service) {
2582 if(Layer0.class == api) {
2583 L0 = (Layer0)service;
2586 serviceLocator.registerService(api, service);
2587 if (TransactionPolicySupport.class == api) {
2588 transactionPolicy = (TransactionPolicySupport)service;
2589 state.resetTransactionPolicy();
2591 if (api == serviceKey1)
2593 else if (api == serviceKey2)
2601 void fireSessionVariableChange(String variable) {
2602 for (MonitorHandler h : monitorHandlers) {
2603 MonitorContext ctx = monitorContexts.getLeft(h);
2605 // SafeRunner functionality repeated here to avoid dependency.
2607 h.valuesChanged(ctx);
2608 } catch (Exception e) {
2609 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2610 } catch (LinkageError e) {
2611 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2617 class ResourceSerializerImpl implements ResourceSerializer {
2619 public long createRandomAccessId(int id) throws DatabaseException {
2622 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2623 long cluster = getCluster(id);
2625 return 0; // Better to return 0 then invalid id.
2626 long result = ClusterTraitsBase.createResourceId(cluster, index);
2631 public long getRandomAccessId(Resource resource) throws DatabaseException {
2633 ResourceImpl resourceImpl = (ResourceImpl) resource;
2634 return createRandomAccessId(resourceImpl.id);
2639 public int getTransientId(Resource resource) throws DatabaseException {
2641 ResourceImpl resourceImpl = (ResourceImpl) resource;
2642 return resourceImpl.id;
2647 public String createRandomAccessId(Resource resource)
2648 throws InvalidResourceReferenceException {
2650 if(resource == null) throw new IllegalArgumentException();
2652 ResourceImpl resourceImpl = (ResourceImpl) resource;
2654 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2658 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2659 } catch (DatabaseException e1) {
2660 throw new InvalidResourceReferenceException(e1);
2663 // Serialize as '<resource index>_<cluster id>'
2664 return "" + r + "_" + getCluster(resourceImpl);
2665 } catch (Throwable e) {
2666 e.printStackTrace();
2667 throw new InvalidResourceReferenceException(e);
2672 public int getTransientId(long serialized) throws DatabaseException {
2673 if (serialized <= 0)
2674 return (int)serialized;
2675 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2676 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2677 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2678 if (cluster == null)
2679 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2680 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2685 public Resource getResource(long randomAccessId) throws DatabaseException {
2686 return getResourceByKey(getTransientId(randomAccessId));
2690 public Resource getResource(int transientId) throws DatabaseException {
2691 return getResourceByKey(transientId);
2695 public Resource getResource(String randomAccessId)
2696 throws InvalidResourceReferenceException {
2698 int i = randomAccessId.indexOf('_');
2700 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2701 + randomAccessId + "'");
2702 int r = Integer.parseInt(randomAccessId.substring(0, i));
2703 if(r < 0) return getResourceByKey(r);
2704 long c = Long.parseLong(randomAccessId.substring(i + 1));
2705 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2706 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2707 if (cluster.hasResource(key, clusterTranslator))
2708 return getResourceByKey(key);
2709 } catch (InvalidResourceReferenceException e) {
2711 } catch (NumberFormatException e) {
2712 throw new InvalidResourceReferenceException(e);
2713 } catch (Throwable e) {
2714 e.printStackTrace();
2715 throw new InvalidResourceReferenceException(e);
2718 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2722 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2725 } catch (Throwable e) {
2726 e.printStackTrace();
2727 throw new InvalidResourceReferenceException(e);
2733 // Copied from old SessionImpl
2736 if (state.isClosed())
2737 throw new Error("Session closed.");
2742 public ResourceImpl getNewResource(long clusterId) {
2743 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2746 newId = cluster.createResource(clusterTranslator);
2747 } catch (DatabaseException e) {
2748 Logger.defaultLogError(e);
2751 return new ResourceImpl(resourceSupport, newId);
2754 public ResourceImpl getNewResource(Resource clusterSet)
2755 throws DatabaseException {
2756 long resourceId = clusterSet.getResourceId();
2757 Long clusterId = clusterSetsSupport.get(resourceId);
2758 if (null == clusterId)
2759 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2760 if (Constants.NewClusterId == clusterId) {
2761 clusterId = getService(ClusteringSupport.class).createCluster();
2762 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2763 createdClusters.add(clusterId);
2764 clusterSetsSupport.put(resourceId, clusterId);
2765 return getNewResource(clusterId);
2767 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2768 ResourceImpl result;
2769 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2770 clusterId = getService(ClusteringSupport.class).createCluster();
2771 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2772 createdClusters.add(clusterId);
2773 clusterSetsSupport.put(resourceId, clusterId);
2774 return getNewResource(clusterId);
2776 result = getNewResource(clusterId);
2777 int resultKey = querySupport.getId(result);
2778 long resultCluster = querySupport.getClusterId(resultKey);
2779 if (clusterId != resultCluster)
2780 clusterSetsSupport.put(resourceId, resultCluster);
2786 public void getNewClusterSet(Resource clusterSet)
2787 throws DatabaseException {
2789 System.out.println("new cluster set=" + clusterSet);
2790 long resourceId = clusterSet.getResourceId();
2791 if (clusterSetsSupport.containsKey(resourceId))
2792 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2793 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2795 public boolean containsClusterSet(Resource clusterSet)
2796 throws ServiceException {
2797 long resourceId = clusterSet.getResourceId();
2798 return clusterSetsSupport.containsKey(resourceId);
2800 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2801 Resource r = defaultClusterSet;
2802 defaultClusterSet = clusterSet;
2805 void printDiagnostics() {
2809 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2810 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2811 for(ClusterI cluster : clusterTable.getClusters()) {
2813 if (cluster.isLoaded() && !cluster.isEmpty()) {
2814 residentClusters.set(residentClusters.get() + 1);
2815 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2817 } catch (DatabaseException e) {
2818 Logger.defaultLogError(e);
2822 System.out.println("--------------------------------");
2823 System.out.println("Cluster information:");
2824 System.out.println("-amount of resident clusters=" + residentClusters.get());
2825 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2827 for(ClusterI cluster : clusterTable.getClusters()) {
2828 System.out.print("Cluster " + cluster.getClusterId() + " [");
2830 if (!cluster.isLoaded())
2831 System.out.println("not loaded]");
2832 else if (cluster.isEmpty())
2833 System.out.println("is empty]");
2835 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2836 System.out.print(",references=" + cluster.getReferenceCount());
2837 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2839 } catch (DatabaseException e) {
2840 Logger.defaultLogError(e);
2841 System.out.println("is corrupted]");
2845 queryProvider2.printDiagnostics();
2846 System.out.println("--------------------------------");
2851 public void fireStartReadTransaction() {
2854 System.out.println("StartReadTransaction");
2856 for (SessionEventListener listener : eventListeners) {
2858 listener.readTransactionStarted();
2859 } catch (Throwable t) {
2860 t.printStackTrace();
2866 public void fireFinishReadTransaction() {
2869 System.out.println("FinishReadTransaction");
2871 for (SessionEventListener listener : eventListeners) {
2873 listener.readTransactionFinished();
2874 } catch (Throwable t) {
2875 t.printStackTrace();
2881 public void fireStartWriteTransaction() {
2884 System.out.println("StartWriteTransaction");
2886 for (SessionEventListener listener : eventListeners) {
2888 listener.writeTransactionStarted();
2889 } catch (Throwable t) {
2890 t.printStackTrace();
2896 public void fireFinishWriteTransaction() {
2899 System.out.println("FinishWriteTransaction");
2901 for (SessionEventListener listener : eventListeners) {
2903 listener.writeTransactionFinished();
2904 } catch (Throwable t) {
2905 t.printStackTrace();
2909 Indexing.resetDependenciesIndexingDisabled();
2913 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2914 throw new Error("Not supported at the moment.");
2921 ClusterTable getClusterTable() {
2922 return clusterTable;
2925 public GraphSession getGraphSession() {
2926 return graphSession;
2929 public QueryProcessor getQueryProvider2() {
2930 return queryProvider2;
2933 ClientChangesImpl getClientChanges() {
2934 return clientChanges;
2937 boolean getWriteOnly() {
2941 static int counter = 0;
2943 public void onClusterLoaded(long clusterId) {
2945 clusterTable.updateSize();
2953 * Implementation of the interface RequestProcessor
2957 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2962 assert(request != null);
2964 final DataContainer<T> result = new DataContainer<T>();
2965 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2967 syncRequest(request, new AsyncProcedure<T>() {
2970 public void execute(AsyncReadGraph graph, T t) {
2975 public void exception(AsyncReadGraph graph, Throwable t) {
2981 Throwable t = exception.get();
2983 if(t instanceof DatabaseException) throw (DatabaseException)t;
2984 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2987 return result.get();
2992 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2993 // assertNotSession();
2994 // return syncRequest(request, (AsyncProcedure<T>)procedure);
2998 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
3000 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3004 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
3006 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3010 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3012 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3016 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
3018 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3022 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3026 assert(request != null);
3028 final DataContainer<T> result = new DataContainer<T>();
3029 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3031 syncRequest(request, new AsyncProcedure<T>() {
3034 public void execute(AsyncReadGraph graph, T t) {
3039 public void exception(AsyncReadGraph graph, Throwable t) {
3045 Throwable t = exception.get();
3047 if(t instanceof DatabaseException) throw (DatabaseException)t;
3048 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3051 return result.get();
3056 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3058 return syncRequest(request, (AsyncProcedure<T>)procedure);
3062 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3064 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3068 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3070 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3074 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3076 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3080 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3082 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3086 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3090 assert(request != null);
3092 final ArrayList<T> result = new ArrayList<T>();
3093 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3095 syncRequest(request, new SyncMultiProcedure<T>() {
3098 public void execute(ReadGraph graph, T t) {
3099 synchronized(result) {
3105 public void finished(ReadGraph graph) {
3109 public void exception(ReadGraph graph, Throwable t) {
3116 Throwable t = exception.get();
3118 if(t instanceof DatabaseException) throw (DatabaseException)t;
3119 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3127 public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
3129 throw new Error("Not implemented!");
3133 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3135 return syncRequest(request, (SyncMultiProcedure<T>)procedure);
3139 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3141 return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3145 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3147 return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3151 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3155 assert(request != null);
3157 final ArrayList<T> result = new ArrayList<T>();
3158 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3160 syncRequest(request, new AsyncMultiProcedure<T>() {
3163 public void execute(AsyncReadGraph graph, T t) {
3164 synchronized(result) {
3170 public void finished(AsyncReadGraph graph) {
3174 public void exception(AsyncReadGraph graph, Throwable t) {
3181 Throwable t = exception.get();
3183 if(t instanceof DatabaseException) throw (DatabaseException)t;
3184 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3192 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3194 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3198 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3200 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3204 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3206 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3210 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3212 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3216 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3218 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3223 public <T> void asyncRequest(Read<T> request) {
3225 asyncRequest(request, new ProcedureAdapter<T>() {
3227 public void exception(Throwable t) {
3228 t.printStackTrace();
3235 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3236 asyncRequest(request, (AsyncProcedure<T>)procedure);
3240 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3241 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3245 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3246 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3250 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3251 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3255 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3256 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3260 final public <T> void asyncRequest(final AsyncRead<T> request) {
3262 assert(request != null);
3264 asyncRequest(request, new ProcedureAdapter<T>() {
3266 public void exception(Throwable t) {
3267 t.printStackTrace();
3274 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3275 scheduleRequest(request, procedure, procedure, null);
3279 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3280 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3284 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3285 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3289 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3290 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3294 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3295 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3299 public <T> void asyncRequest(MultiRead<T> request) {
3301 assert(request != null);
3303 asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
3305 public void exception(ReadGraph graph, Throwable t) {
3306 t.printStackTrace();
3313 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3314 asyncRequest(request, (SyncMultiProcedure<T>)procedure);
3318 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3319 asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3323 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3324 scheduleRequest(request, procedure, null);
3328 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3329 asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3333 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3335 assert(request != null);
3337 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3339 public void exception(AsyncReadGraph graph, Throwable t) {
3340 t.printStackTrace();
3347 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3348 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3352 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3353 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3357 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3358 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3362 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3363 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3367 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3368 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3372 final public <T> void asyncRequest(final ExternalRead<T> request) {
3374 assert(request != null);
3376 asyncRequest(request, new ProcedureAdapter<T>() {
3378 public void exception(Throwable t) {
3379 t.printStackTrace();
3386 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3387 asyncRequest(request, (Procedure<T>)procedure);
3392 void check(Throwable t) throws DatabaseException {
3394 if(t instanceof DatabaseException) throw (DatabaseException)t;
3395 else throw new DatabaseException("Unexpected exception", t);
3399 void check(DataContainer<Throwable> container) throws DatabaseException {
3400 Throwable t = container.get();
3402 if(t instanceof DatabaseException) throw (DatabaseException)t;
3403 else throw new DatabaseException("Unexpected exception", t);
3412 boolean sameProvider(Write request) {
3413 if(writeState.getGraph().provider != null) {
3414 return writeState.getGraph().provider.equals(request.getProvider());
3416 return request.getProvider() == null;
3420 boolean plainWrite(WriteGraphImpl graph) {
3421 if(graph == null) return false;
3422 if(graph.writeSupport.writeOnly()) return false;
3427 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3428 private void assertNotSession() throws DatabaseException {
3429 Thread current = Thread.currentThread();
3430 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3433 void assertAlive() {
3434 if (!state.isAlive())
3435 throw new RuntimeDatabaseException("Session has been shut down.");
3438 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3439 return querySupport.getValueStream(graph, querySupport.getId(resource));
3442 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3443 return querySupport.getValue(graph, querySupport.getId(resource));
3446 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3447 acquire(semaphore, request, null);
3450 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3454 // Loop until the semaphore is acquired reporting requests that take a long time.
3457 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3461 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3463 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3464 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3466 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3467 + (waitTime) + " ms. (procedure=" + procedure + ")");
3474 } catch (InterruptedException e) {
3475 e.printStackTrace();
3476 // FIXME: Should perhaps do something else in this case ??
3483 // public boolean holdOnToTransactionAfterCancel() {
3484 // return transactionPolicy.holdOnToTransactionAfterCancel();
3488 // public boolean holdOnToTransactionAfterCommit() {
3489 // return transactionPolicy.holdOnToTransactionAfterCommit();
3493 // public boolean holdOnToTransactionAfterRead() {
3494 // return transactionPolicy.holdOnToTransactionAfterRead();
3498 // public void onRelinquish() {
3499 // transactionPolicy.onRelinquish();
3503 // public void onRelinquishDone() {
3504 // transactionPolicy.onRelinquishDone();
3508 // public void onRelinquishError() {
3509 // transactionPolicy.onRelinquishError();
3514 public Session getSession() {
3519 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3520 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3522 public void ceased(int thread) {
3524 requestManager.ceased(thread);
3528 public int getAmountOfQueryThreads() {
3529 // This must be a power of two
3531 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3534 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3536 return new ResourceImpl(resourceSupport, key);
3540 public void acquireWriteOnly() {
3544 public void releaseWriteOnly(ReadGraphImpl graph) {
3546 queryProvider2.releaseWrite(graph);
3549 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3551 long start = System.nanoTime();
3553 while(dirtyPrimitives) {
3554 dirtyPrimitives = false;
3555 getQueryProvider2().performDirtyUpdates(writer);
3556 getQueryProvider2().performScheduledUpdates(writer);
3559 fireMetadataListeners(writer, clientChanges);
3561 if(DebugPolicy.PERFORMANCE_DATA) {
3562 long end = System.nanoTime();
3563 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3568 void removeTemporaryData() {
3569 File platform = Platform.getLocation().toFile();
3570 File tempFiles = new File(platform, "tempFiles");
3571 File temp = new File(tempFiles, "db");
3575 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3577 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3580 } catch (IOException e) {
3581 Logger.defaultLogError(e);
3583 return FileVisitResult.CONTINUE;
3586 } catch (IOException e) {
3587 Logger.defaultLogError(e);
3591 public void handleCreatedClusters() {
3592 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3593 createdClusters.forEach(new TLongProcedure() {
3596 public boolean execute(long value) {
3597 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3598 cluster.setImmutable(true, clusterTranslator);
3603 createdClusters.clear();
3607 public Object getModificationCounter() {
3608 return queryProvider2.modificationCounter;
3612 public void markUndoPoint() {
3613 state.setCombine(false);