1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.Resource;
43 import org.simantics.db.ResourceSerializer;
44 import org.simantics.db.Session;
45 import org.simantics.db.SessionManager;
46 import org.simantics.db.SessionVariables;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.authentication.UserAuthenticationAgent;
50 import org.simantics.db.authentication.UserAuthenticator;
51 import org.simantics.db.common.Indexing;
52 import org.simantics.db.common.TransactionPolicyRelease;
53 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
54 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
62 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
63 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
64 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
65 import org.simantics.db.common.utils.Logger;
66 import org.simantics.db.event.ChangeEvent;
67 import org.simantics.db.event.ChangeListener;
68 import org.simantics.db.event.SessionEventListener;
69 import org.simantics.db.exception.CancelTransactionException;
70 import org.simantics.db.exception.ClusterSetExistException;
71 import org.simantics.db.exception.DatabaseException;
72 import org.simantics.db.exception.ImmutableException;
73 import org.simantics.db.exception.InvalidResourceReferenceException;
74 import org.simantics.db.exception.ResourceNotFoundException;
75 import org.simantics.db.exception.RuntimeDatabaseException;
76 import org.simantics.db.exception.ServiceException;
77 import org.simantics.db.exception.ServiceNotFoundException;
78 import org.simantics.db.impl.ClusterBase;
79 import org.simantics.db.impl.ClusterI;
80 import org.simantics.db.impl.ClusterTraitsBase;
81 import org.simantics.db.impl.ClusterTranslator;
82 import org.simantics.db.impl.ResourceImpl;
83 import org.simantics.db.impl.TransientGraph;
84 import org.simantics.db.impl.VirtualGraphImpl;
85 import org.simantics.db.impl.graph.DelayedWriteGraph;
86 import org.simantics.db.impl.graph.ReadGraphImpl;
87 import org.simantics.db.impl.graph.WriteGraphImpl;
88 import org.simantics.db.impl.graph.WriteSupport;
89 import org.simantics.db.impl.internal.RandomAccessValueSupport;
90 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
91 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
92 import org.simantics.db.impl.query.QueryProcessor;
93 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
94 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
95 import org.simantics.db.impl.service.QueryDebug;
96 import org.simantics.db.impl.support.VirtualGraphServerSupport;
97 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
98 import org.simantics.db.procedure.AsyncListener;
99 import org.simantics.db.procedure.AsyncMultiListener;
100 import org.simantics.db.procedure.AsyncMultiProcedure;
101 import org.simantics.db.procedure.AsyncProcedure;
102 import org.simantics.db.procedure.Listener;
103 import org.simantics.db.procedure.ListenerBase;
104 import org.simantics.db.procedure.MultiListener;
105 import org.simantics.db.procedure.MultiProcedure;
106 import org.simantics.db.procedure.Procedure;
107 import org.simantics.db.procedure.SyncListener;
108 import org.simantics.db.procedure.SyncMultiListener;
109 import org.simantics.db.procedure.SyncMultiProcedure;
110 import org.simantics.db.procedure.SyncProcedure;
111 import org.simantics.db.procore.cluster.ClusterImpl;
112 import org.simantics.db.procore.cluster.ClusterTraits;
113 import org.simantics.db.procore.protocol.Constants;
114 import org.simantics.db.procore.protocol.DebugPolicy;
115 import org.simantics.db.request.AsyncMultiRead;
116 import org.simantics.db.request.AsyncRead;
117 import org.simantics.db.request.DelayedWrite;
118 import org.simantics.db.request.DelayedWriteResult;
119 import org.simantics.db.request.ExternalRead;
120 import org.simantics.db.request.MultiRead;
121 import org.simantics.db.request.Read;
122 import org.simantics.db.request.ReadInterface;
123 import org.simantics.db.request.Write;
124 import org.simantics.db.request.WriteInterface;
125 import org.simantics.db.request.WriteOnly;
126 import org.simantics.db.request.WriteOnlyResult;
127 import org.simantics.db.request.WriteResult;
128 import org.simantics.db.request.WriteTraits;
129 import org.simantics.db.service.ByteReader;
130 import org.simantics.db.service.ClusterBuilder;
131 import org.simantics.db.service.ClusterBuilderFactory;
132 import org.simantics.db.service.ClusterControl;
133 import org.simantics.db.service.ClusterSetsSupport;
134 import org.simantics.db.service.ClusterUID;
135 import org.simantics.db.service.ClusteringSupport;
136 import org.simantics.db.service.CollectionSupport;
137 import org.simantics.db.service.DebugSupport;
138 import org.simantics.db.service.DirectQuerySupport;
139 import org.simantics.db.service.GraphChangeListenerSupport;
140 import org.simantics.db.service.InitSupport;
141 import org.simantics.db.service.LifecycleSupport;
142 import org.simantics.db.service.ManagementSupport;
143 import org.simantics.db.service.QueryControl;
144 import org.simantics.db.service.SerialisationSupport;
145 import org.simantics.db.service.ServerInformation;
146 import org.simantics.db.service.ServiceActivityMonitor;
147 import org.simantics.db.service.SessionEventSupport;
148 import org.simantics.db.service.SessionMonitorSupport;
149 import org.simantics.db.service.SessionUserSupport;
150 import org.simantics.db.service.StatementSupport;
151 import org.simantics.db.service.TransactionPolicySupport;
152 import org.simantics.db.service.TransactionSupport;
153 import org.simantics.db.service.TransferableGraphSupport;
154 import org.simantics.db.service.UndoRedoSupport;
155 import org.simantics.db.service.VirtualGraphSupport;
156 import org.simantics.db.service.XSupport;
157 import org.simantics.layer0.Layer0;
158 import org.simantics.utils.DataContainer;
159 import org.simantics.utils.Development;
160 import org.simantics.utils.threads.logger.ITask;
161 import org.simantics.utils.threads.logger.ThreadLogger;
163 import gnu.trove.procedure.TLongProcedure;
164 import gnu.trove.set.hash.TLongHashSet;
167 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
169 protected static final boolean DEBUG = false;
171 private static final boolean DIAGNOSTICS = false;
173 private TransactionPolicySupport transactionPolicy;
175 final private ClusterControl clusterControl;
177 final protected BuiltinSupportImpl builtinSupport;
178 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
179 final protected ClusterSetsSupport clusterSetsSupport;
180 final protected LifecycleSupportImpl lifecycleSupport;
182 protected QuerySupportImpl querySupport;
183 protected ResourceSupportImpl resourceSupport;
184 protected WriteSupport writeSupport;
185 public ClusterTranslator clusterTranslator;
187 boolean dirtyPrimitives = false;
189 public static final int SERVICE_MODE_CREATE = 2;
190 public static final int SERVICE_MODE_ALLOW = 1;
191 public int serviceMode = 0;
192 public boolean createdImmutableClusters = false;
193 public TLongHashSet createdClusters = new TLongHashSet();
198 * The service locator maintained by the workbench. These services are
199 * initialized during workbench during the <code>init</code> method.
201 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
203 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
205 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
207 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
209 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
211 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
213 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
215 final protected State state = new State();
217 protected GraphSession graphSession = null;
219 protected SessionManager sessionManagerImpl = null;
221 protected UserAuthenticationAgent authAgent = null;
223 protected UserAuthenticator authenticator = null;
225 protected Resource user = null;
227 protected ClusterStream clusterStream = null;
229 protected SessionRequestManager requestManager = null;
231 public ClusterTable clusterTable = null;
233 public QueryProcessor queryProvider2 = null;
235 ClientChangesImpl clientChanges = null;
237 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
239 // protected long newClusterId = Constants.NullClusterId;
241 protected int flushCounter = 0;
243 protected boolean writeOnly = false;
245 WriteState<?> writeState = null;
246 WriteStateBase<?> delayedWriteState = null;
247 protected Resource defaultClusterSet = null; // If not null then used for newResource().
249 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
251 // if (authAgent == null)
252 // throw new IllegalArgumentException("null authentication agent");
254 File t = StaticSessionProperties.virtualGraphStoragePath;
257 this.clusterTable = new ClusterTable(this, t);
258 this.builtinSupport = new BuiltinSupportImpl(this);
259 this.sessionManagerImpl = sessionManagerImpl;
261 // this.authAgent = authAgent;
263 serviceLocator.registerService(Session.class, this);
264 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
265 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
266 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
267 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
268 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
269 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
270 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
271 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
272 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
273 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
274 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
275 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
276 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
277 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
278 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
279 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
280 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
281 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
282 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
283 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
284 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
285 ServiceActivityUpdaterForWriteTransactions.register(this);
287 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
288 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
289 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
290 this.lifecycleSupport = new LifecycleSupportImpl(this);
291 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
292 this.transactionPolicy = new TransactionPolicyRelease();
293 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
294 this.clusterControl = new ClusterControlImpl(this);
295 serviceLocator.registerService(ClusterControl.class, clusterControl);
296 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
297 this.clusterSetsSupport.setReadDirectory(t.toPath());
298 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
299 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
309 public Resource getRootLibrary() {
310 return queryProvider2.getRootLibraryResource();
313 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
314 if (!graphSession.dbSession.refreshEnabled())
317 getClusterTable().refresh(csid, this, clusterUID);
318 } catch (Throwable t) {
319 Logger.defaultLogError("Refesh failed.", t);
323 static final class TaskHelper {
324 private final String name;
325 private Object result;
326 final Semaphore sema = new Semaphore(0);
327 private Throwable throwable = null;
328 final Consumer<DatabaseException> callback = e -> {
329 synchronized (TaskHelper.this) {
333 final Procedure<Object> proc = new Procedure<Object>() {
335 public void execute(Object result) {
336 callback.accept(null);
339 public void exception(Throwable t) {
340 if (t instanceof DatabaseException)
341 callback.accept((DatabaseException)t);
343 callback.accept(new DatabaseException("" + name + "operation failed.", t));
346 final WriteTraits writeTraits = new WriteTraits() {};
347 TaskHelper(String name) {
350 @SuppressWarnings("unchecked")
354 void setResult(Object result) {
355 this.result = result;
357 // Throwable throwableGet() {
360 synchronized void throwableSet(Throwable t) {
363 // void throwableSet(String t) {
364 // throwable = new InternalException("" + name + " operation failed. " + t);
366 synchronized void throwableCheck()
367 throws DatabaseException {
368 if (null != throwable)
369 if (throwable instanceof DatabaseException)
370 throw (DatabaseException)throwable;
372 throw new DatabaseException("Undo operation failed.", throwable);
374 void throw_(String message)
375 throws DatabaseException {
376 throw new DatabaseException("" + name + " operation failed. " + message);
382 private ListenerBase getListenerBase(Object procedure) {
383 if (procedure instanceof ListenerBase)
384 return (ListenerBase) procedure;
389 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
390 scheduleRequest(request, callback, notify, null);
394 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
397 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
399 assert (request != null);
401 if(Development.DEVELOPMENT) {
403 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
404 System.err.println("schedule write '" + request + "'");
405 } catch (Throwable t) {
406 Logger.defaultLogError(t);
410 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
412 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
415 public void run(int thread) {
417 if(Development.DEVELOPMENT) {
419 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
420 System.err.println("perform write '" + request + "'");
421 } catch (Throwable t) {
422 Logger.defaultLogError(t);
426 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
428 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
433 Disposable.safeDispose(clientChanges);
434 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
436 VirtualGraph vg = getProvider(request.getProvider());
438 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
439 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
442 public void execute(Object result) {
443 if(callback != null) callback.accept(null);
447 public void exception(Throwable t) {
448 if(callback != null) callback.accept((DatabaseException)t);
453 assert (null != writer);
454 // writer.state.barrier.inc();
457 request.perform(writer);
458 assert (null != writer);
459 } catch (Throwable t) {
460 if (!(t instanceof CancelTransactionException))
461 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
462 writeState.except(t);
464 // writer.state.barrier.dec();
465 // writer.waitAsync(request);
469 assert(!queryProvider2.dirty);
471 } catch (Throwable e) {
473 // Log it first, just to be safe that the error is always logged.
474 if (!(e instanceof CancelTransactionException))
475 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
477 // writeState.getGraph().state.barrier.dec();
478 // writeState.getGraph().waitAsync(request);
480 writeState.except(e);
483 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
484 // // All we can do here is to log those, can't really pass them anywhere.
485 // if (callback != null) {
486 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
487 // else callback.run(new DatabaseException(e));
489 // } catch (Throwable e2) {
490 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
492 // boolean empty = clusterStream.reallyFlush();
493 // int callerThread = -1;
494 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
495 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
497 // // Send all local changes to server so it can calculate correct reverse change set.
498 // // This will call clusterStream.accept().
499 // state.cancelCommit(context, clusterStream);
501 // if (!context.isOk()) // this is a blocking operation
502 // throw new InternalException("Cancel failed. This should never happen. Contact application support.");
503 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
505 // state.cancelCommit2(context, clusterStream);
506 // } catch (DatabaseException e3) {
507 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
509 // clusterStream.setOff(false);
510 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
515 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
521 if(Development.DEVELOPMENT) {
523 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
524 System.err.println("finish write '" + request + "'");
525 } catch (Throwable t) {
526 Logger.defaultLogError(t);
536 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
537 scheduleRequest(request, procedure, notify, null);
541 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
544 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
546 assert (request != null);
548 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
550 requestManager.scheduleWrite(new SessionTask(request, thread) {
553 public void run(int thread) {
555 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
557 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
560 Disposable.safeDispose(clientChanges);
561 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
563 VirtualGraph vg = getProvider(request.getProvider());
564 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
567 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
568 writeState = writeStateT;
570 assert (null != writer);
571 // writer.state.barrier.inc();
572 writeStateT.setResult(request.perform(writer));
573 assert (null != writer);
575 // writer.state.barrier.dec();
576 // writer.waitAsync(null);
578 } catch (Throwable e) {
580 // writer.state.barrier.dec();
581 // writer.waitAsync(null);
583 writeState.except(e);
585 // state.stopWriteTransaction(clusterStream);
587 // } catch (Throwable e) {
588 // // Log it first, just to be safe that the error is always logged.
589 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
592 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
593 // // All we can do here is to log those, can't really pass them anywhere.
594 // if (procedure != null) {
595 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
596 // else procedure.exception(new DatabaseException(e));
598 // } catch (Throwable e2) {
599 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
602 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
604 // state.stopWriteTransaction(clusterStream);
607 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
610 // if(notify != null) notify.release();
620 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
621 scheduleRequest(request, callback, notify, null);
625 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
628 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
630 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
632 assert (request != null);
634 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
636 requestManager.scheduleWrite(new SessionTask(request, thread) {
639 public void run(int thread) {
640 fireSessionVariableChange(SessionVariables.QUEUED_READS);
642 Procedure<Object> stateProcedure = new Procedure<Object>() {
644 public void execute(Object result) {
645 if (callback != null)
646 callback.accept(null);
649 public void exception(Throwable t) {
650 if (callback != null) {
651 if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
652 else callback.accept(new DatabaseException(t));
654 Logger.defaultLogError("Unhandled exception", t);
658 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
659 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
660 DelayedWriteGraph dwg = null;
661 // newGraph.state.barrier.inc();
664 dwg = new DelayedWriteGraph(newGraph);
665 request.perform(dwg);
666 } catch (Throwable e) {
667 delayedWriteState.except(e);
672 // newGraph.state.barrier.dec();
673 // newGraph.waitAsync(request);
674 fireSessionVariableChange(SessionVariables.QUEUED_READS);
677 delayedWriteState = null;
679 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
680 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
683 Disposable.safeDispose(clientChanges);
684 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
688 VirtualGraph vg = getProvider(request.getProvider());
689 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
691 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
693 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
695 assert (null != writer);
696 // writer.state.barrier.inc();
700 dwg.commit(writer, request);
702 if(defaultClusterSet != null) {
703 XSupport xs = getService(XSupport.class);
704 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
705 ClusteringSupport cs = getService(ClusteringSupport.class);
706 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
709 // This makes clusters available from server
710 clusterStream.reallyFlush();
712 releaseWriteOnly(writer);
713 handleUpdatesAndMetadata(writer);
715 } catch (ServiceException e) {
716 // writer.state.barrier.dec();
717 // writer.waitAsync(null);
719 // These shall be requested from server
720 clusterTable.removeWriteOnlyClusters();
721 // This makes clusters available from server
722 clusterStream.reallyFlush();
724 releaseWriteOnly(writer);
725 writeState.except(e);
727 // Debugging & Profiling
728 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
738 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
739 scheduleRequest(request, procedure, notify, null);
743 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
746 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
747 throw new Error("Not implemented");
750 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
751 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
752 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
753 createdClusters.add(cluster.clusterId);
758 class WriteOnlySupport implements WriteSupport {
760 ClusterStream stream;
761 ClusterImpl currentCluster;
763 public WriteOnlySupport() {
764 this.stream = clusterStream;
768 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
769 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
773 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
775 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
777 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
778 if(s != queryProvider2.getRootLibrary())
779 throw new ImmutableException("Trying to modify immutable resource key=" + s);
782 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
783 } catch (DatabaseException e) {
784 Logger.defaultLogError(e);
788 clientChanges.invalidate(s);
790 if (cluster.isWriteOnly())
792 queryProvider2.updateStatements(s, p);
797 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
798 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
802 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
804 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
806 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
807 } catch (DatabaseException e) {
808 Logger.defaultLogError(e);
811 clientChanges.invalidate(rid);
813 if (cluster.isWriteOnly())
815 queryProvider2.updateValue(rid);
820 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
823 claimValue(provider,resource, reader.readBytes(null, amount));
827 byte[] bytes = new byte[65536];
829 int rid = ((ResourceImpl)resource).id;
830 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
834 int block = Math.min(left, 65536);
835 reader.readBytes(bytes, block);
836 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
839 } catch (DatabaseException e) {
840 Logger.defaultLogError(e);
843 clientChanges.invalidate(rid);
845 if (cluster.isWriteOnly())
847 queryProvider2.updateValue(rid);
851 private void maintainCluster(ClusterImpl before, ClusterI after_) {
852 if(after_ != null && after_ != before) {
853 ClusterImpl after = (ClusterImpl)after_;
854 if(currentCluster == before) {
855 currentCluster = after;
857 clusterTable.replaceCluster(after);
861 public int createResourceKey(int foreignCounter) throws DatabaseException {
862 if(currentCluster == null) {
863 currentCluster = getNewResourceCluster();
865 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
866 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
867 newCluster.foreignLookup = new byte[foreignCounter];
868 currentCluster = newCluster;
870 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
872 return currentCluster.createResource(clusterTranslator);
876 public Resource createResource(VirtualGraph provider) throws DatabaseException {
877 if(currentCluster == null) {
878 if (null != defaultClusterSet) {
879 ResourceImpl result = getNewResource(defaultClusterSet);
880 currentCluster = clusterTable.getClusterByResourceKey(result.id);
883 currentCluster = getNewResourceCluster();
886 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
887 if (null != defaultClusterSet) {
888 ResourceImpl result = getNewResource(defaultClusterSet);
889 currentCluster = clusterTable.getClusterByResourceKey(result.id);
892 currentCluster = getNewResourceCluster();
895 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
899 public Resource createResource(VirtualGraph provider, long clusterId)
900 throws DatabaseException {
901 return getNewResource(clusterId);
905 public Resource createResource(VirtualGraph provider, Resource clusterSet)
906 throws DatabaseException {
907 return getNewResource(clusterSet);
911 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
912 throws DatabaseException {
913 getNewClusterSet(clusterSet);
917 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
918 throws ServiceException {
919 return containsClusterSet(clusterSet);
922 public void selectCluster(long cluster) {
923 currentCluster = clusterTable.getClusterByClusterId(cluster);
924 long setResourceId = clusterSetsSupport.getSet(cluster);
925 clusterSetsSupport.put(setResourceId, cluster);
929 public Resource setDefaultClusterSet(Resource clusterSet)
930 throws ServiceException {
931 Resource result = setDefaultClusterSet4NewResource(clusterSet);
932 if(clusterSet != null) {
933 long id = clusterSetsSupport.get(clusterSet.getResourceId());
934 currentCluster = clusterTable.getClusterByClusterId(id);
937 currentCluster = null;
943 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
945 provider = getProvider(provider);
946 if (null == provider) {
947 int key = ((ResourceImpl)resource).id;
951 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
952 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
953 // if(key != queryProvider2.getRootLibrary())
954 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
957 // cluster.removeValue(key, clusterTranslator);
958 // } catch (DatabaseException e) {
959 // Logger.defaultLogError(e);
963 clusterTable.writeOnlyInvalidate(cluster);
966 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
967 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
968 clusterTranslator.removeValue(cluster);
969 } catch (DatabaseException e) {
970 Logger.defaultLogError(e);
973 queryProvider2.invalidateResource(key);
974 clientChanges.invalidate(key);
977 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
978 queryProvider2.updateValue(querySupport.getId(resource));
979 clientChanges.claimValue(resource);
986 public void flush(boolean intermediate) {
987 throw new UnsupportedOperationException();
991 public void flushCluster() {
992 clusterTable.flushCluster(graphSession);
993 if(defaultClusterSet != null) {
994 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
996 currentCluster = null;
1000 public void flushCluster(Resource r) {
1001 throw new UnsupportedOperationException("flushCluster resource " + r);
1009 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1012 int s = ((ResourceImpl)subject).id;
1013 int p = ((ResourceImpl)predicate).id;
1014 int o = ((ResourceImpl)object).id;
1016 provider = getProvider(provider);
1017 if (null == provider) {
1019 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1020 clusterTable.writeOnlyInvalidate(cluster);
1024 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1026 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1027 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1029 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1030 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1031 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1032 clusterTranslator.removeStatement(cluster);
1034 queryProvider2.invalidateResource(s);
1035 clientChanges.invalidate(s);
1037 } catch (DatabaseException e) {
1039 Logger.defaultLogError(e);
1047 ((VirtualGraphImpl)provider).deny(s, p, o);
1048 queryProvider2.invalidateResource(s);
1049 clientChanges.invalidate(s);
1058 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1059 throw new UnsupportedOperationException();
1063 public boolean writeOnly() {
1068 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1069 writeSupport.performWriteRequest(graph, request);
1073 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1074 throw new UnsupportedOperationException();
1078 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1079 throw new UnsupportedOperationException();
1083 public <T> void addMetadata(Metadata data) throws ServiceException {
1084 writeSupport.addMetadata(data);
1088 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1089 return writeSupport.getMetadata(clazz);
1093 public TreeMap<String, byte[]> getMetadata() {
1094 return writeSupport.getMetadata();
1098 public void commitDone(WriteTraits writeTraits, long csid) {
1099 writeSupport.commitDone(writeTraits, csid);
1103 public void clearUndoList(WriteTraits writeTraits) {
1104 writeSupport.clearUndoList(writeTraits);
1107 public int clearMetadata() {
1108 return writeSupport.clearMetadata();
1112 public void startUndo() {
1113 writeSupport.startUndo();
1117 class VirtualWriteOnlySupport implements WriteSupport {
1120 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1121 // Resource object) {
1122 // throw new UnsupportedOperationException();
1126 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1128 TransientGraph impl = (TransientGraph)provider;
1129 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1130 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1131 clientChanges.claim(subject, predicate, object);
1136 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1138 TransientGraph impl = (TransientGraph)provider;
1139 impl.claim(subject, predicate, object);
1140 getQueryProvider2().updateStatements(subject, predicate);
1141 clientChanges.claim(subject, predicate, object);
1146 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1147 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1151 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1152 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1153 getQueryProvider2().updateValue(resource);
1154 clientChanges.claimValue(resource);
1158 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1159 byte[] value = reader.readBytes(null, amount);
1160 claimValue(provider, resource, value);
1164 public Resource createResource(VirtualGraph provider) {
1165 TransientGraph impl = (TransientGraph)provider;
1166 return impl.getResource(impl.newResource(false));
1170 public Resource createResource(VirtualGraph provider, long clusterId) {
1171 throw new UnsupportedOperationException();
1175 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1176 throws DatabaseException {
1177 throw new UnsupportedOperationException();
1181 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1182 throws DatabaseException {
1183 throw new UnsupportedOperationException();
1187 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1188 throws ServiceException {
1189 throw new UnsupportedOperationException();
1193 public Resource setDefaultClusterSet(Resource clusterSet)
1194 throws ServiceException {
1199 public void denyValue(VirtualGraph provider, Resource resource) {
1200 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1201 getQueryProvider2().updateValue(querySupport.getId(resource));
1202 // NOTE: this only keeps track of value changes by-resource.
1203 clientChanges.claimValue(resource);
1207 public void flush(boolean intermediate) {
1208 throw new UnsupportedOperationException();
1212 public void flushCluster() {
1213 throw new UnsupportedOperationException();
1217 public void flushCluster(Resource r) {
1218 throw new UnsupportedOperationException("Resource " + r);
1226 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1227 TransientGraph impl = (TransientGraph) provider;
1228 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1229 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1230 clientChanges.deny(subject, predicate, object);
1235 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1236 throw new UnsupportedOperationException();
1240 public boolean writeOnly() {
1245 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1246 throw new UnsupportedOperationException();
1250 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1251 throw new UnsupportedOperationException();
1255 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1256 throw new UnsupportedOperationException();
1260 public <T> void addMetadata(Metadata data) throws ServiceException {
1261 throw new UnsupportedOperationException();
1265 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1266 throw new UnsupportedOperationException();
1270 public TreeMap<String, byte[]> getMetadata() {
1271 throw new UnsupportedOperationException();
1275 public void commitDone(WriteTraits writeTraits, long csid) {
1279 public void clearUndoList(WriteTraits writeTraits) {
1283 public int clearMetadata() {
1288 public void startUndo() {
1292 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1296 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1298 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1301 Disposable.safeDispose(clientChanges);
1302 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1306 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1308 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1310 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1311 writeState = writeStateT;
1313 assert (null != writer);
1314 // writer.state.barrier.inc();
1315 long start = System.nanoTime();
1316 T result = request.perform(writer);
1317 long duration = System.nanoTime() - start;
1319 System.err.println("################");
1320 System.err.println("WriteOnly duration " + 1e-9*duration);
1322 writeStateT.setResult(result);
1324 // This makes clusters available from server
1325 clusterStream.reallyFlush();
1327 // This will trigger query updates
1328 releaseWriteOnly(writer);
1329 assert (null != writer);
1331 handleUpdatesAndMetadata(writer);
1333 } catch (CancelTransactionException e) {
1335 releaseWriteOnly(writeState.getGraph());
1337 clusterTable.removeWriteOnlyClusters();
1338 state.stopWriteTransaction(clusterStream);
1340 } catch (Throwable e) {
1342 e.printStackTrace();
1344 releaseWriteOnly(writeState.getGraph());
1346 clusterTable.removeWriteOnlyClusters();
1348 if (callback != null)
1349 callback.exception(new DatabaseException(e));
1351 state.stopWriteTransaction(clusterStream);
1352 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1356 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1362 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1363 scheduleRequest(request, callback, notify, null);
1367 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1371 assert (request != null);
1373 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1375 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1378 public void run(int thread) {
1380 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1384 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1387 Disposable.safeDispose(clientChanges);
1388 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1392 VirtualGraph vg = getProvider(request.getProvider());
1393 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1395 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1397 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1400 public void execute(Object result) {
1401 if(callback != null) callback.accept(null);
1405 public void exception(Throwable t) {
1406 if(callback != null) callback.accept((DatabaseException)t);
1411 assert (null != writer);
1412 // writer.state.barrier.inc();
1416 request.perform(writer);
1418 } catch (Throwable e) {
1420 // writer.state.barrier.dec();
1421 // writer.waitAsync(null);
1423 releaseWriteOnly(writer);
1425 clusterTable.removeWriteOnlyClusters();
1427 if(!(e instanceof CancelTransactionException)) {
1428 if (callback != null)
1429 callback.accept(new DatabaseException(e));
1432 writeState.except(e);
1438 // This makes clusters available from server
1439 boolean empty = clusterStream.reallyFlush();
1440 // This was needed to make WO requests call metadata listeners.
1441 // NOTE: the calling event does not contain clientChanges information.
1442 if (!empty && clientChanges.isEmpty())
1443 clientChanges.setNotEmpty(true);
1444 releaseWriteOnly(writer);
1445 assert (null != writer);
1448 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1461 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1462 scheduleRequest(request, callback, notify, null);
1466 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1469 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1471 assert (request != null);
1473 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1475 requestManager.scheduleWrite(new SessionTask(request, thread) {
1478 public void run(int thread) {
1480 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1482 performWriteOnly(request, notify, callback);
1492 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1494 assert (request != null);
1495 assert (procedure != null);
1497 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1499 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1502 public void run(int thread) {
1504 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1506 ListenerBase listener = getListenerBase(procedure);
1508 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1512 if (listener != null) {
1515 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1518 public void exception(AsyncReadGraph graph, Throwable t) {
1519 procedure.exception(graph, t);
1520 if(throwable != null) {
1523 // ErrorLogger.defaultLogError("Unhandled exception", t);
1528 public void execute(AsyncReadGraph graph, T t) {
1529 if(result != null) result.set(t);
1530 procedure.execute(graph, t);
1534 } catch (Throwable t) {
1535 // This is handled by the AsyncProcedure
1536 //Logger.defaultLogError("Internal error", t);
1543 // newGraph.state.barrier.inc();
1545 T t = request.perform(newGraph);
1549 if(result != null) result.set(t);
1550 procedure.execute(newGraph, t);
1552 } catch (Throwable th) {
1554 if(throwable != null) {
1557 Logger.defaultLogError("Unhandled exception", th);
1562 } catch (Throwable t) {
1565 t.printStackTrace();
1567 if(throwable != null) {
1570 Logger.defaultLogError("Unhandled exception", t);
1575 procedure.exception(newGraph, t);
1577 } catch (Throwable t2) {
1579 if(throwable != null) {
1582 Logger.defaultLogError("Unhandled exception", t2);
1589 // newGraph.state.barrier.dec();
1590 // newGraph.waitAsync(request);
1596 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1606 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1608 assert (request != null);
1609 assert (procedure != null);
1611 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1613 requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1616 public void run(int thread) {
1618 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1620 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1624 if (listener != null) {
1626 newGraph.processor.query(newGraph, request, null, procedure, listener);
1628 // newGraph.waitAsync(request);
1632 final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1633 procedure, "request");
1637 // newGraph.state.barrier.inc();
1639 request.perform(newGraph, wrapper);
1641 // newGraph.waitAsync(request);
1643 } catch (Throwable t) {
1645 wrapper.exception(newGraph, t);
1646 // newGraph.waitAsync(request);
1655 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1665 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1667 assert (request != null);
1668 assert (procedure != null);
1670 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1672 int sync = notify != null ? thread : -1;
1674 requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1677 public void run(int thread) {
1679 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1681 ListenerBase listener = getListenerBase(procedure);
1683 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1687 if (listener != null) {
1689 newGraph.processor.query(newGraph, request, null, procedure, listener);
1691 // newGraph.waitAsync(request);
1695 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1699 request.perform(newGraph, wrapper);
1701 } catch (Throwable t) {
1703 t.printStackTrace();
1711 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1721 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1723 assert (request != null);
1724 assert (procedure != null);
1726 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1728 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1731 public void run(int thread) {
1733 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1735 ListenerBase listener = getListenerBase(procedure);
1737 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1741 if (listener != null) {
1743 newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1746 public void exception(Throwable t) {
1747 procedure.exception(t);
1748 if(throwable != null) {
1754 public void execute(T t) {
1755 if(result != null) result.set(t);
1756 procedure.execute(t);
1761 // newGraph.waitAsync(request);
1765 // newGraph.state.barrier.inc();
1767 request.register(newGraph, new Listener<T>() {
1770 public void exception(Throwable t) {
1771 if(throwable != null) throwable.set(t);
1772 procedure.exception(t);
1773 // newGraph.state.barrier.dec();
1777 public void execute(T t) {
1778 if(result != null) result.set(t);
1779 procedure.execute(t);
1780 // newGraph.state.barrier.dec();
1784 public boolean isDisposed() {
1790 // newGraph.waitAsync(request);
1796 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1808 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1810 scheduleRequest(request, procedure, null, null, null);
1815 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1817 Semaphore notify = new Semaphore(0);
1818 DataContainer<Throwable> container = new DataContainer<Throwable>();
1819 DataContainer<T> result = new DataContainer<T>();
1820 scheduleRequest(request, procedure, notify, container, result);
1821 acquire(notify, request);
1822 Throwable throwable = container.get();
1823 if(throwable != null) {
1824 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1825 else throw new DatabaseException("Unexpected exception", throwable);
1827 return result.get();
1831 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1833 Semaphore notify = new Semaphore(0);
1834 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1835 final DataContainer<T> resultContainer = new DataContainer<T>();
1836 scheduleRequest(request, new AsyncProcedure<T>() {
1838 public void exception(AsyncReadGraph graph, Throwable throwable) {
1839 exceptionContainer.set(throwable);
1840 procedure.exception(graph, throwable);
1843 public void execute(AsyncReadGraph graph, T result) {
1844 resultContainer.set(result);
1845 procedure.execute(graph, result);
1847 }, getListenerBase(procedure), notify);
1848 acquire(notify, request, procedure);
1849 Throwable throwable = exceptionContainer.get();
1850 if (throwable != null) {
1851 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1852 else throw new DatabaseException("Unexpected exception", throwable);
1854 return resultContainer.get();
1859 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1861 Semaphore notify = new Semaphore(0);
1862 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1863 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1865 public void exception(AsyncReadGraph graph, Throwable throwable) {
1866 exceptionContainer.set(throwable);
1867 procedure.exception(graph, throwable);
1870 public void execute(AsyncReadGraph graph, T result) {
1871 procedure.execute(graph, result);
1874 public void finished(AsyncReadGraph graph) {
1875 procedure.finished(graph);
1878 acquire(notify, request, procedure);
1879 Throwable throwable = exceptionContainer.get();
1880 if (throwable != null) {
1881 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1882 else throw new DatabaseException("Unexpected exception", throwable);
1884 // TODO: implement return value
1885 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1890 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1891 return syncRequest(request, new ProcedureAdapter<T>());
1894 // assert(request != null);
1896 // final DataContainer<T> result = new DataContainer<T>();
1897 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1899 // syncRequest(request, new Procedure<T>() {
1902 // public void execute(T t) {
1907 // public void exception(Throwable t) {
1908 // exception.set(t);
1913 // Throwable t = exception.get();
1915 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1916 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1919 // return result.get();
1924 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1925 return syncRequest(request, (Procedure<T>)procedure);
1930 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1931 // assertNotSession();
1932 // Semaphore notify = new Semaphore(0);
1933 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1934 // DataContainer<T> result = new DataContainer<T>();
1935 // scheduleRequest(request, procedure, notify, container, result);
1936 // acquire(notify, request);
1937 // Throwable throwable = container.get();
1938 // if(throwable != null) {
1939 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1940 // else throw new DatabaseException("Unexpected exception", throwable);
1942 // return result.get();
1947 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1949 Semaphore notify = new Semaphore(0);
1950 final DataContainer<Throwable> container = new DataContainer<Throwable>();
1951 final DataContainer<T> result = new DataContainer<T>();
1952 scheduleRequest(request, procedure, notify, container, result);
1953 acquire(notify, request);
1954 Throwable throwable = container.get();
1955 if (throwable != null) {
1956 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1957 else throw new DatabaseException("Unexpected exception", throwable);
1959 return result.get();
1963 public void syncRequest(Write request) throws DatabaseException {
1966 Semaphore notify = new Semaphore(0);
1967 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1968 scheduleRequest(request, e -> exception.set(e), notify);
1969 acquire(notify, request);
1970 if(exception.get() != null) throw exception.get();
1974 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1976 Semaphore notify = new Semaphore(0);
1977 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1978 final DataContainer<T> result = new DataContainer<T>();
1979 scheduleRequest(request, new Procedure<T>() {
1982 public void exception(Throwable t) {
1987 public void execute(T t) {
1992 acquire(notify, request);
1993 if(exception.get() != null) {
1994 Throwable t = exception.get();
1995 if(t instanceof DatabaseException) throw (DatabaseException)t;
1996 else throw new DatabaseException(t);
1998 return result.get();
2002 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2004 Semaphore notify = new Semaphore(0);
2005 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2006 final DataContainer<T> result = new DataContainer<T>();
2007 scheduleRequest(request, new Procedure<T>() {
2010 public void exception(Throwable t) {
2015 public void execute(T t) {
2020 acquire(notify, request);
2021 if(exception.get() != null) {
2022 Throwable t = exception.get();
2023 if(t instanceof DatabaseException) throw (DatabaseException)t;
2024 else throw new DatabaseException(t);
2026 return result.get();
2030 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2032 Semaphore notify = new Semaphore(0);
2033 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2034 final DataContainer<T> result = new DataContainer<T>();
2035 scheduleRequest(request, new Procedure<T>() {
2038 public void exception(Throwable t) {
2043 public void execute(T t) {
2048 acquire(notify, request);
2049 if(exception.get() != null) {
2050 Throwable t = exception.get();
2051 if(t instanceof DatabaseException) throw (DatabaseException)t;
2052 else throw new DatabaseException(t);
2054 return result.get();
2058 public void syncRequest(DelayedWrite request) throws DatabaseException {
2060 Semaphore notify = new Semaphore(0);
2061 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2062 scheduleRequest(request, e -> exception.set(e), notify);
2063 acquire(notify, request);
2064 if(exception.get() != null) throw exception.get();
2068 public void syncRequest(WriteOnly request) throws DatabaseException {
2071 Semaphore notify = new Semaphore(0);
2072 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2073 scheduleRequest(request, e -> exception.set(e), notify);
2074 acquire(notify, request);
2075 if(exception.get() != null) throw exception.get();
2080 * GraphRequestProcessor interface
2084 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2086 scheduleRequest(request, procedure, null, null);
2091 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2093 scheduleRequest(request, procedure, null);
2098 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2100 scheduleRequest(request, procedure, null, null, null);
2105 public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2107 scheduleRequest(request, callback, null);
2112 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2114 scheduleRequest(request, procedure, null);
2119 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2121 scheduleRequest(request, procedure, null);
2126 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2128 scheduleRequest(request, procedure, null);
2133 public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2135 scheduleRequest(request, callback, null);
2140 public void asyncRequest(final Write r) {
2141 asyncRequest(r, null);
2145 public void asyncRequest(final DelayedWrite r) {
2146 asyncRequest(r, null);
2150 public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2152 scheduleRequest(request, callback, null);
2157 public void asyncRequest(final WriteOnly request) {
2159 asyncRequest(request, null);
2164 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2165 r.request(this, procedure);
2169 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2170 r.request(this, procedure);
2174 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2175 r.request(this, procedure);
2179 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2180 r.request(this, procedure);
2184 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2185 r.request(this, procedure);
2189 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2190 r.request(this, procedure);
2194 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2195 return r.request(this);
2199 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2200 return r.request(this);
2204 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2205 r.request(this, procedure);
2209 public <T> void async(WriteInterface<T> r) {
2210 r.request(this, new ProcedureAdapter<T>());
2214 public void incAsync() {
2219 public void decAsync() {
2223 public long getCluster(ResourceImpl resource) {
2224 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2225 return cluster.getClusterId();
2228 public long getCluster(int id) {
2229 if (clusterTable == null)
2230 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2231 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2234 public ResourceImpl getResource(int id) {
2235 return new ResourceImpl(resourceSupport, id);
2238 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2239 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2240 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2241 int key = proxy.getClusterKey();
2242 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2243 return new ResourceImpl(resourceSupport, resourceKey);
2246 public ResourceImpl getResource2(int id) {
2248 return new ResourceImpl(resourceSupport, id);
2251 final public int getId(ResourceImpl impl) {
2255 public static final Charset UTF8 = Charset.forName("utf-8");
2260 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2261 for(TransientGraph g : support.providers) {
2262 if(g.isPending(subject)) return false;
2267 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2268 for(TransientGraph g : support.providers) {
2269 if(g.isPending(subject, predicate)) return false;
2274 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2276 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2278 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2281 public void accept(ReadGraphImpl graph) {
2282 if(ready.decrementAndGet() == 0) {
2283 runnable.accept(graph);
2289 for(TransientGraph g : support.providers) {
2290 if(g.isPending(subject)) {
2292 g.load(graph, subject, composite);
2293 } catch (DatabaseException e) {
2294 e.printStackTrace();
2297 composite.accept(graph);
2301 composite.accept(graph);
2305 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2307 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2309 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2312 public void accept(ReadGraphImpl graph) {
2313 if(ready.decrementAndGet() == 0) {
2314 runnable.accept(graph);
2320 for(TransientGraph g : support.providers) {
2321 if(g.isPending(subject, predicate)) {
2323 g.load(graph, subject, predicate, composite);
2324 } catch (DatabaseException e) {
2325 e.printStackTrace();
2328 composite.accept(graph);
2332 composite.accept(graph);
2336 // void dumpHeap() {
2339 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2340 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2341 // "d:/heap" + flushCounter + ".txt", true);
2342 // } catch (IOException e) {
2343 // e.printStackTrace();
2348 void fireReactionsToSynchronize(ChangeSet cs) {
2350 // Do not fire empty events
2354 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2355 // g.state.barrier.inc();
2359 if (!cs.isEmpty()) {
2360 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2361 for (ChangeListener l : changeListeners2) {
2364 } catch (Exception ex) {
2365 ex.printStackTrace();
2372 // g.state.barrier.dec();
2373 // g.waitAsync(null);
2379 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2382 // Do not fire empty events
2387 // graph.state.barrier.inc();
2391 if (!cs2.isEmpty()) {
2393 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2394 for (ChangeListener l : changeListeners2) {
2397 // System.out.println("changelistener " + l);
2398 } catch (Exception ex) {
2399 ex.printStackTrace();
2407 // graph.state.barrier.dec();
2408 // graph.waitAsync(null);
2411 } catch (Throwable t) {
2412 t.printStackTrace();
2416 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2420 // Do not fire empty events
2424 // Do not fire on virtual requests
2425 if(graph.getProvider() != null)
2428 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2432 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2433 for (ChangeListener l : metadataListeners) {
2436 } catch (Throwable ex) {
2437 ex.printStackTrace();
2445 } catch (Throwable t) {
2446 t.printStackTrace();
2455 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2458 public <T> T getService(Class<T> api) {
2459 T t = peekService(api);
2461 if (state.isClosed())
2462 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2463 throw new ServiceNotFoundException(this, api);
2471 protected abstract ServerInformation getCachedServerInformation();
2473 private Class<?> serviceKey1 = null;
2474 private Class<?> serviceKey2 = null;
2475 private Object service1 = null;
2476 private Object service2 = null;
2482 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2484 @SuppressWarnings("unchecked")
2486 public synchronized <T> T peekService(Class<T> api) {
2488 if(serviceKey1 == api) {
2490 } else if (serviceKey2 == api) {
2492 Object result = service2;
2493 service2 = service1;
2494 serviceKey2 = serviceKey1;
2500 if (Layer0.class == api)
2502 if (ServerInformation.class == api)
2503 return (T) getCachedServerInformation();
2504 else if (WriteGraphImpl.class == api)
2505 return (T) writeState.getGraph();
2506 else if (ClusterBuilder.class == api)
2507 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2508 else if (ClusterBuilderFactory.class == api)
2509 return (T)new ClusterBuilderFactoryImpl(this);
2511 service2 = service1;
2512 serviceKey2 = serviceKey1;
2514 service1 = serviceLocator.peekService(api);
2525 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2528 public boolean hasService(Class<?> api) {
2529 return serviceLocator.hasService(api);
2533 * @param api the api that must be implemented by the specified service
2534 * @param service the service implementation
2537 public <T> void registerService(Class<T> api, T service) {
2538 if(Layer0.class == api) {
2539 L0 = (Layer0)service;
2542 serviceLocator.registerService(api, service);
2543 if (TransactionPolicySupport.class == api) {
2544 transactionPolicy = (TransactionPolicySupport)service;
2545 state.resetTransactionPolicy();
2547 if (api == serviceKey1)
2549 else if (api == serviceKey2)
2557 void fireSessionVariableChange(String variable) {
2558 for (MonitorHandler h : monitorHandlers) {
2559 MonitorContext ctx = monitorContexts.getLeft(h);
2561 // SafeRunner functionality repeated here to avoid dependency.
2563 h.valuesChanged(ctx);
2564 } catch (Exception e) {
2565 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2566 } catch (LinkageError e) {
2567 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2573 class ResourceSerializerImpl implements ResourceSerializer {
2575 public long createRandomAccessId(int id) throws DatabaseException {
2578 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2579 long cluster = getCluster(id);
2581 return 0; // Better to return 0 then invalid id.
2582 long result = ClusterTraitsBase.createResourceId(cluster, index);
2587 public long getRandomAccessId(Resource resource) throws DatabaseException {
2589 ResourceImpl resourceImpl = (ResourceImpl) resource;
2590 return createRandomAccessId(resourceImpl.id);
2595 public int getTransientId(Resource resource) throws DatabaseException {
2597 ResourceImpl resourceImpl = (ResourceImpl) resource;
2598 return resourceImpl.id;
2603 public String createRandomAccessId(Resource resource)
2604 throws InvalidResourceReferenceException {
2606 if(resource == null) throw new IllegalArgumentException();
2608 ResourceImpl resourceImpl = (ResourceImpl) resource;
2610 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2614 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2615 } catch (DatabaseException e1) {
2616 throw new InvalidResourceReferenceException(e1);
2619 // Serialize as '<resource index>_<cluster id>'
2620 return "" + r + "_" + getCluster(resourceImpl);
2621 } catch (Throwable e) {
2622 e.printStackTrace();
2623 throw new InvalidResourceReferenceException(e);
2628 public int getTransientId(long serialized) throws DatabaseException {
2629 if (serialized <= 0)
2630 return (int)serialized;
2631 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2632 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2633 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2634 if (cluster == null)
2635 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2636 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2641 public Resource getResource(long randomAccessId) throws DatabaseException {
2642 return getResourceByKey(getTransientId(randomAccessId));
2646 public Resource getResource(int transientId) throws DatabaseException {
2647 return getResourceByKey(transientId);
2651 public Resource getResource(String randomAccessId)
2652 throws InvalidResourceReferenceException {
2654 int i = randomAccessId.indexOf('_');
2656 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2657 + randomAccessId + "'");
2658 int r = Integer.parseInt(randomAccessId.substring(0, i));
2659 if(r < 0) return getResourceByKey(r);
2660 long c = Long.parseLong(randomAccessId.substring(i + 1));
2661 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2662 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2663 if (cluster.hasResource(key, clusterTranslator))
2664 return getResourceByKey(key);
2665 } catch (InvalidResourceReferenceException e) {
2667 } catch (NumberFormatException e) {
2668 throw new InvalidResourceReferenceException(e);
2669 } catch (Throwable e) {
2670 e.printStackTrace();
2671 throw new InvalidResourceReferenceException(e);
2674 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2678 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2681 } catch (Throwable e) {
2682 e.printStackTrace();
2683 throw new InvalidResourceReferenceException(e);
2689 // Copied from old SessionImpl
2692 if (state.isClosed())
2693 throw new Error("Session closed.");
2698 public ResourceImpl getNewResource(long clusterId) {
2699 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2702 newId = cluster.createResource(clusterTranslator);
2703 } catch (DatabaseException e) {
2704 Logger.defaultLogError(e);
2707 return new ResourceImpl(resourceSupport, newId);
2710 public ResourceImpl getNewResource(Resource clusterSet)
2711 throws DatabaseException {
2712 long resourceId = clusterSet.getResourceId();
2713 Long clusterId = clusterSetsSupport.get(resourceId);
2714 if (null == clusterId)
2715 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2716 if (Constants.NewClusterId == clusterId) {
2717 clusterId = getService(ClusteringSupport.class).createCluster();
2718 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2719 createdClusters.add(clusterId);
2720 clusterSetsSupport.put(resourceId, clusterId);
2721 return getNewResource(clusterId);
2723 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2724 ResourceImpl result;
2725 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2726 clusterId = getService(ClusteringSupport.class).createCluster();
2727 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2728 createdClusters.add(clusterId);
2729 clusterSetsSupport.put(resourceId, clusterId);
2730 return getNewResource(clusterId);
2732 result = getNewResource(clusterId);
2733 int resultKey = querySupport.getId(result);
2734 long resultCluster = querySupport.getClusterId(resultKey);
2735 if (clusterId != resultCluster)
2736 clusterSetsSupport.put(resourceId, resultCluster);
2742 public void getNewClusterSet(Resource clusterSet)
2743 throws DatabaseException {
2745 System.out.println("new cluster set=" + clusterSet);
2746 long resourceId = clusterSet.getResourceId();
2747 if (clusterSetsSupport.containsKey(resourceId))
2748 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2749 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2751 public boolean containsClusterSet(Resource clusterSet)
2752 throws ServiceException {
2753 long resourceId = clusterSet.getResourceId();
2754 return clusterSetsSupport.containsKey(resourceId);
2756 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2757 Resource r = defaultClusterSet;
2758 defaultClusterSet = clusterSet;
2761 void printDiagnostics() {
2765 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2766 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2767 for(ClusterI cluster : clusterTable.getClusters()) {
2769 if (cluster.isLoaded() && !cluster.isEmpty()) {
2770 residentClusters.set(residentClusters.get() + 1);
2771 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2773 } catch (DatabaseException e) {
2774 Logger.defaultLogError(e);
2778 System.out.println("--------------------------------");
2779 System.out.println("Cluster information:");
2780 System.out.println("-amount of resident clusters=" + residentClusters.get());
2781 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2783 for(ClusterI cluster : clusterTable.getClusters()) {
2784 System.out.print("Cluster " + cluster.getClusterId() + " [");
2786 if (!cluster.isLoaded())
2787 System.out.println("not loaded]");
2788 else if (cluster.isEmpty())
2789 System.out.println("is empty]");
2791 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2792 System.out.print(",references=" + cluster.getReferenceCount());
2793 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2795 } catch (DatabaseException e) {
2796 Logger.defaultLogError(e);
2797 System.out.println("is corrupted]");
2801 queryProvider2.printDiagnostics();
2802 System.out.println("--------------------------------");
2807 public void fireStartReadTransaction() {
2810 System.out.println("StartReadTransaction");
2812 for (SessionEventListener listener : eventListeners) {
2814 listener.readTransactionStarted();
2815 } catch (Throwable t) {
2816 t.printStackTrace();
2822 public void fireFinishReadTransaction() {
2825 System.out.println("FinishReadTransaction");
2827 for (SessionEventListener listener : eventListeners) {
2829 listener.readTransactionFinished();
2830 } catch (Throwable t) {
2831 t.printStackTrace();
2837 public void fireStartWriteTransaction() {
2840 System.out.println("StartWriteTransaction");
2842 for (SessionEventListener listener : eventListeners) {
2844 listener.writeTransactionStarted();
2845 } catch (Throwable t) {
2846 t.printStackTrace();
2852 public void fireFinishWriteTransaction() {
2855 System.out.println("FinishWriteTransaction");
2857 for (SessionEventListener listener : eventListeners) {
2859 listener.writeTransactionFinished();
2860 } catch (Throwable t) {
2861 t.printStackTrace();
2865 Indexing.resetDependenciesIndexingDisabled();
2869 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2870 throw new Error("Not supported at the moment.");
2877 ClusterTable getClusterTable() {
2878 return clusterTable;
2881 public GraphSession getGraphSession() {
2882 return graphSession;
2885 public QueryProcessor getQueryProvider2() {
2886 return queryProvider2;
2889 ClientChangesImpl getClientChanges() {
2890 return clientChanges;
2893 boolean getWriteOnly() {
2897 static int counter = 0;
2899 public void onClusterLoaded(long clusterId) {
2901 clusterTable.updateSize();
2909 * Implementation of the interface RequestProcessor
2913 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2918 assert(request != null);
2920 final DataContainer<T> result = new DataContainer<T>();
2921 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2923 syncRequest(request, new AsyncProcedure<T>() {
2926 public void execute(AsyncReadGraph graph, T t) {
2931 public void exception(AsyncReadGraph graph, Throwable t) {
2937 Throwable t = exception.get();
2939 if(t instanceof DatabaseException) throw (DatabaseException)t;
2940 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2943 return result.get();
2948 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2949 // assertNotSession();
2950 // return syncRequest(request, (AsyncProcedure<T>)procedure);
2954 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2956 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2960 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2962 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2966 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2968 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2972 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2974 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2978 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2982 assert(request != null);
2984 final DataContainer<T> result = new DataContainer<T>();
2985 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2987 syncRequest(request, new AsyncProcedure<T>() {
2990 public void execute(AsyncReadGraph graph, T t) {
2995 public void exception(AsyncReadGraph graph, Throwable t) {
3001 Throwable t = exception.get();
3003 if(t instanceof DatabaseException) throw (DatabaseException)t;
3004 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3007 return result.get();
3012 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3014 return syncRequest(request, (AsyncProcedure<T>)procedure);
3018 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3020 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3024 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3026 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3030 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3032 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3036 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3038 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3042 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3046 assert(request != null);
3048 final ArrayList<T> result = new ArrayList<T>();
3049 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3051 syncRequest(request, new AsyncMultiProcedure<T>() {
3054 public void execute(AsyncReadGraph graph, T t) {
3055 synchronized(result) {
3061 public void finished(AsyncReadGraph graph) {
3065 public void exception(AsyncReadGraph graph, Throwable t) {
3072 Throwable t = exception.get();
3074 if(t instanceof DatabaseException) throw (DatabaseException)t;
3075 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3083 public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3085 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3089 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3091 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3095 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3097 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3101 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3103 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3107 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3109 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3113 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3117 assert(request != null);
3119 final ArrayList<T> result = new ArrayList<T>();
3120 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3122 syncRequest(request, new AsyncMultiProcedure<T>() {
3125 public void execute(AsyncReadGraph graph, T t) {
3126 synchronized(result) {
3132 public void finished(AsyncReadGraph graph) {
3136 public void exception(AsyncReadGraph graph, Throwable t) {
3143 Throwable t = exception.get();
3145 if(t instanceof DatabaseException) throw (DatabaseException)t;
3146 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3154 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3156 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3160 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3162 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3166 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3168 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3172 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3174 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3178 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3180 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3185 public <T> void asyncRequest(Read<T> request) {
3187 asyncRequest(request, new ProcedureAdapter<T>() {
3189 public void exception(Throwable t) {
3190 t.printStackTrace();
3197 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3198 asyncRequest(request, (AsyncProcedure<T>)procedure);
3202 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3203 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3207 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3208 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3212 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3213 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3217 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3218 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3222 final public <T> void asyncRequest(final AsyncRead<T> request) {
3224 assert(request != null);
3226 asyncRequest(request, new ProcedureAdapter<T>() {
3228 public void exception(Throwable t) {
3229 t.printStackTrace();
3236 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3237 scheduleRequest(request, procedure, procedure, null);
3241 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3242 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3246 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3247 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3251 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3252 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3256 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3257 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3261 public <T> void asyncRequest(MultiRead<T> request) {
3263 assert(request != null);
3265 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3267 public void exception(AsyncReadGraph graph, Throwable t) {
3268 t.printStackTrace();
3275 public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3276 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3280 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3281 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3285 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3286 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3290 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3291 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3295 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3296 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3300 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3302 assert(request != null);
3304 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3306 public void exception(AsyncReadGraph graph, Throwable t) {
3307 t.printStackTrace();
3314 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3315 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3319 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3320 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3324 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3325 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3329 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3330 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3334 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3335 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3339 public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3341 throw new Error("Not implemented!");
3345 public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3346 throw new Error("Not implemented!");
3350 final public <T> void asyncRequest(final ExternalRead<T> request) {
3352 assert(request != null);
3354 asyncRequest(request, new ProcedureAdapter<T>() {
3356 public void exception(Throwable t) {
3357 t.printStackTrace();
3364 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3365 asyncRequest(request, (Procedure<T>)procedure);
3370 void check(Throwable t) throws DatabaseException {
3372 if(t instanceof DatabaseException) throw (DatabaseException)t;
3373 else throw new DatabaseException("Unexpected exception", t);
3377 void check(DataContainer<Throwable> container) throws DatabaseException {
3378 Throwable t = container.get();
3380 if(t instanceof DatabaseException) throw (DatabaseException)t;
3381 else throw new DatabaseException("Unexpected exception", t);
3390 boolean sameProvider(Write request) {
3391 if(writeState.getGraph().provider != null) {
3392 return writeState.getGraph().provider.equals(request.getProvider());
3394 return request.getProvider() == null;
3398 boolean plainWrite(WriteGraphImpl graph) {
3399 if(graph == null) return false;
3400 if(graph.writeSupport.writeOnly()) return false;
3405 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3406 private void assertNotSession() throws DatabaseException {
3407 Thread current = Thread.currentThread();
3408 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3411 void assertAlive() {
3412 if (!state.isAlive())
3413 throw new RuntimeDatabaseException("Session has been shut down.");
3416 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3417 return querySupport.getValueStream(graph, querySupport.getId(resource));
3420 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3421 return querySupport.getValue(graph, querySupport.getId(resource));
3424 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3425 acquire(semaphore, request, null);
3428 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3432 // Loop until the semaphore is acquired reporting requests that take a long time.
3435 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3439 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3441 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3442 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3444 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3445 + (waitTime) + " ms. (procedure=" + procedure + ")");
3452 } catch (InterruptedException e) {
3453 e.printStackTrace();
3454 // FIXME: Should perhaps do something else in this case ??
3461 // public boolean holdOnToTransactionAfterCancel() {
3462 // return transactionPolicy.holdOnToTransactionAfterCancel();
3466 // public boolean holdOnToTransactionAfterCommit() {
3467 // return transactionPolicy.holdOnToTransactionAfterCommit();
3471 // public boolean holdOnToTransactionAfterRead() {
3472 // return transactionPolicy.holdOnToTransactionAfterRead();
3476 // public void onRelinquish() {
3477 // transactionPolicy.onRelinquish();
3481 // public void onRelinquishDone() {
3482 // transactionPolicy.onRelinquishDone();
3486 // public void onRelinquishError() {
3487 // transactionPolicy.onRelinquishError();
3492 public Session getSession() {
3497 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3498 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3500 public void ceased(int thread) {
3502 requestManager.ceased(thread);
3506 public int getAmountOfQueryThreads() {
3507 // This must be a power of two
3509 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3512 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3514 return new ResourceImpl(resourceSupport, key);
3518 public void acquireWriteOnly() {
3522 public void releaseWriteOnly(ReadGraphImpl graph) {
3524 queryProvider2.releaseWrite(graph);
3527 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3529 long start = System.nanoTime();
3531 while(dirtyPrimitives) {
3532 dirtyPrimitives = false;
3533 getQueryProvider2().performDirtyUpdates(writer);
3534 getQueryProvider2().performScheduledUpdates(writer);
3537 fireMetadataListeners(writer, clientChanges);
3539 if(DebugPolicy.PERFORMANCE_DATA) {
3540 long end = System.nanoTime();
3541 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3546 void removeTemporaryData() {
3547 File platform = Platform.getLocation().toFile();
3548 File tempFiles = new File(platform, "tempFiles");
3549 File temp = new File(tempFiles, "db");
3553 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3555 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3558 } catch (IOException e) {
3559 Logger.defaultLogError(e);
3561 return FileVisitResult.CONTINUE;
3564 } catch (IOException e) {
3565 Logger.defaultLogError(e);
3569 public void handleCreatedClusters() {
3570 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3571 createdClusters.forEach(new TLongProcedure() {
3574 public boolean execute(long value) {
3575 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3576 cluster.setImmutable(true, clusterTranslator);
3581 createdClusters.clear();
3585 public Object getModificationCounter() {
3586 return queryProvider2.modificationCounter;
3590 public void markUndoPoint() {
3591 state.setCombine(false);