1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.ExternalValueSupport;
38 import org.simantics.db.Metadata;
39 import org.simantics.db.MonitorContext;
40 import org.simantics.db.MonitorHandler;
41 import org.simantics.db.Resource;
42 import org.simantics.db.ResourceSerializer;
43 import org.simantics.db.Session;
44 import org.simantics.db.SessionManager;
45 import org.simantics.db.SessionVariables;
46 import org.simantics.db.VirtualGraph;
47 import org.simantics.db.WriteGraph;
48 import org.simantics.db.authentication.UserAuthenticationAgent;
49 import org.simantics.db.authentication.UserAuthenticator;
50 import org.simantics.db.common.Indexing;
51 import org.simantics.db.common.TransactionPolicyRelease;
52 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
53 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
62 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
63 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
64 import org.simantics.db.common.utils.Logger;
65 import org.simantics.db.event.ChangeEvent;
66 import org.simantics.db.event.ChangeListener;
67 import org.simantics.db.event.SessionEventListener;
68 import org.simantics.db.exception.CancelTransactionException;
69 import org.simantics.db.exception.ClusterSetExistException;
70 import org.simantics.db.exception.DatabaseException;
71 import org.simantics.db.exception.ImmutableException;
72 import org.simantics.db.exception.InvalidResourceReferenceException;
73 import org.simantics.db.exception.ResourceNotFoundException;
74 import org.simantics.db.exception.RuntimeDatabaseException;
75 import org.simantics.db.exception.ServiceException;
76 import org.simantics.db.exception.ServiceNotFoundException;
77 import org.simantics.db.impl.ClusterBase;
78 import org.simantics.db.impl.ClusterI;
79 import org.simantics.db.impl.ClusterTraitsBase;
80 import org.simantics.db.impl.ClusterTranslator;
81 import org.simantics.db.impl.ResourceImpl;
82 import org.simantics.db.impl.TransientGraph;
83 import org.simantics.db.impl.VirtualGraphImpl;
84 import org.simantics.db.impl.graph.DelayedWriteGraph;
85 import org.simantics.db.impl.graph.ReadGraphImpl;
86 import org.simantics.db.impl.graph.WriteGraphImpl;
87 import org.simantics.db.impl.graph.WriteSupport;
88 import org.simantics.db.impl.internal.RandomAccessValueSupport;
89 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
90 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
91 import org.simantics.db.impl.query.QueryProcessor;
92 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
93 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
94 import org.simantics.db.impl.service.QueryDebug;
95 import org.simantics.db.impl.support.VirtualGraphServerSupport;
96 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
97 import org.simantics.db.procedure.AsyncListener;
98 import org.simantics.db.procedure.AsyncMultiListener;
99 import org.simantics.db.procedure.AsyncMultiProcedure;
100 import org.simantics.db.procedure.AsyncProcedure;
101 import org.simantics.db.procedure.Listener;
102 import org.simantics.db.procedure.ListenerBase;
103 import org.simantics.db.procedure.MultiListener;
104 import org.simantics.db.procedure.MultiProcedure;
105 import org.simantics.db.procedure.Procedure;
106 import org.simantics.db.procedure.SyncListener;
107 import org.simantics.db.procedure.SyncMultiListener;
108 import org.simantics.db.procedure.SyncMultiProcedure;
109 import org.simantics.db.procedure.SyncProcedure;
110 import org.simantics.db.procore.cluster.ClusterImpl;
111 import org.simantics.db.procore.cluster.ClusterTraits;
112 import org.simantics.db.procore.protocol.Constants;
113 import org.simantics.db.procore.protocol.DebugPolicy;
114 import org.simantics.db.request.AsyncMultiRead;
115 import org.simantics.db.request.AsyncRead;
116 import org.simantics.db.request.DelayedWrite;
117 import org.simantics.db.request.DelayedWriteResult;
118 import org.simantics.db.request.ExternalRead;
119 import org.simantics.db.request.MultiRead;
120 import org.simantics.db.request.Read;
121 import org.simantics.db.request.ReadInterface;
122 import org.simantics.db.request.Write;
123 import org.simantics.db.request.WriteInterface;
124 import org.simantics.db.request.WriteOnly;
125 import org.simantics.db.request.WriteOnlyResult;
126 import org.simantics.db.request.WriteResult;
127 import org.simantics.db.request.WriteTraits;
128 import org.simantics.db.service.ByteReader;
129 import org.simantics.db.service.ClusterBuilder;
130 import org.simantics.db.service.ClusterBuilderFactory;
131 import org.simantics.db.service.ClusterControl;
132 import org.simantics.db.service.ClusterSetsSupport;
133 import org.simantics.db.service.ClusterUID;
134 import org.simantics.db.service.ClusteringSupport;
135 import org.simantics.db.service.CollectionSupport;
136 import org.simantics.db.service.DebugSupport;
137 import org.simantics.db.service.DirectQuerySupport;
138 import org.simantics.db.service.GraphChangeListenerSupport;
139 import org.simantics.db.service.InitSupport;
140 import org.simantics.db.service.LifecycleSupport;
141 import org.simantics.db.service.ManagementSupport;
142 import org.simantics.db.service.QueryControl;
143 import org.simantics.db.service.SerialisationSupport;
144 import org.simantics.db.service.ServerInformation;
145 import org.simantics.db.service.ServiceActivityMonitor;
146 import org.simantics.db.service.SessionEventSupport;
147 import org.simantics.db.service.SessionMonitorSupport;
148 import org.simantics.db.service.SessionUserSupport;
149 import org.simantics.db.service.StatementSupport;
150 import org.simantics.db.service.TransactionPolicySupport;
151 import org.simantics.db.service.TransactionSupport;
152 import org.simantics.db.service.TransferableGraphSupport;
153 import org.simantics.db.service.UndoRedoSupport;
154 import org.simantics.db.service.VirtualGraphSupport;
155 import org.simantics.db.service.XSupport;
156 import org.simantics.layer0.Layer0;
157 import org.simantics.utils.DataContainer;
158 import org.simantics.utils.Development;
159 import org.simantics.utils.threads.logger.ITask;
160 import org.simantics.utils.threads.logger.ThreadLogger;
162 import gnu.trove.procedure.TLongProcedure;
163 import gnu.trove.set.hash.TLongHashSet;
166 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
168 protected static final boolean DEBUG = false;
170 private static final boolean DIAGNOSTICS = false;
172 private TransactionPolicySupport transactionPolicy;
174 final private ClusterControl clusterControl;
176 final protected BuiltinSupportImpl builtinSupport;
177 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
178 final protected ClusterSetsSupport clusterSetsSupport;
179 final protected LifecycleSupportImpl lifecycleSupport;
181 protected QuerySupportImpl querySupport;
182 protected ResourceSupportImpl resourceSupport;
183 protected WriteSupport writeSupport;
184 public ClusterTranslator clusterTranslator;
186 boolean dirtyPrimitives = false;
188 public static final int SERVICE_MODE_CREATE = 2;
189 public static final int SERVICE_MODE_ALLOW = 1;
190 public int serviceMode = 0;
191 public boolean createdImmutableClusters = false;
192 public TLongHashSet createdClusters = new TLongHashSet();
197 * The service locator maintained by the workbench. These services are
198 * initialized during workbench during the <code>init</code> method.
200 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
202 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
204 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
206 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
208 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
210 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
212 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
214 final protected State state = new State();
216 protected GraphSession graphSession = null;
218 protected SessionManager sessionManagerImpl = null;
220 protected UserAuthenticationAgent authAgent = null;
222 protected UserAuthenticator authenticator = null;
224 protected Resource user = null;
226 protected ClusterStream clusterStream = null;
228 protected SessionRequestManager requestManager = null;
230 public ClusterTable clusterTable = null;
232 public QueryProcessor queryProvider2 = null;
234 ClientChangesImpl clientChanges = null;
236 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
238 // protected long newClusterId = Constants.NullClusterId;
240 protected int flushCounter = 0;
242 protected boolean writeOnly = false;
244 WriteState<?> writeState = null;
245 WriteStateBase<?> delayedWriteState = null;
246 protected Resource defaultClusterSet = null; // If not null then used for newResource().
248 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
250 // if (authAgent == null)
251 // throw new IllegalArgumentException("null authentication agent");
253 File t = StaticSessionProperties.virtualGraphStoragePath;
256 this.clusterTable = new ClusterTable(this, t);
257 this.builtinSupport = new BuiltinSupportImpl(this);
258 this.sessionManagerImpl = sessionManagerImpl;
260 // this.authAgent = authAgent;
262 serviceLocator.registerService(Session.class, this);
263 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
264 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
265 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
266 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
267 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
268 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
269 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
270 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
271 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
272 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
273 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
274 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
275 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
276 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
277 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
278 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
279 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
280 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
281 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
282 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
283 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
284 ServiceActivityUpdaterForWriteTransactions.register(this);
286 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
287 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
288 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
289 this.lifecycleSupport = new LifecycleSupportImpl(this);
290 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
291 this.transactionPolicy = new TransactionPolicyRelease();
292 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
293 this.clusterControl = new ClusterControlImpl(this);
294 serviceLocator.registerService(ClusterControl.class, clusterControl);
295 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
296 this.clusterSetsSupport.setReadDirectory(t.toPath());
297 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
298 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
308 public Resource getRootLibrary() {
309 return queryProvider2.getRootLibraryResource();
312 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
313 if (!graphSession.dbSession.refreshEnabled())
316 getClusterTable().refresh(csid, this, clusterUID);
317 } catch (Throwable t) {
318 Logger.defaultLogError("Refesh failed.", t);
322 static final class TaskHelper {
323 private final String name;
324 private Object result;
325 final Semaphore sema = new Semaphore(0);
326 private Throwable throwable = null;
327 final Consumer<DatabaseException> callback = e -> {
328 synchronized (TaskHelper.this) {
332 final Procedure<Object> proc = new Procedure<Object>() {
334 public void execute(Object result) {
335 callback.accept(null);
338 public void exception(Throwable t) {
339 if (t instanceof DatabaseException)
340 callback.accept((DatabaseException)t);
342 callback.accept(new DatabaseException("" + name + "operation failed.", t));
345 final WriteTraits writeTraits = new WriteTraits() {};
346 TaskHelper(String name) {
349 @SuppressWarnings("unchecked")
353 void setResult(Object result) {
354 this.result = result;
356 // Throwable throwableGet() {
359 synchronized void throwableSet(Throwable t) {
362 // void throwableSet(String t) {
363 // throwable = new InternalException("" + name + " operation failed. " + t);
365 synchronized void throwableCheck()
366 throws DatabaseException {
367 if (null != throwable)
368 if (throwable instanceof DatabaseException)
369 throw (DatabaseException)throwable;
371 throw new DatabaseException("Undo operation failed.", throwable);
373 void throw_(String message)
374 throws DatabaseException {
375 throw new DatabaseException("" + name + " operation failed. " + message);
381 private ListenerBase getListenerBase(Object procedure) {
382 if (procedure instanceof ListenerBase)
383 return (ListenerBase) procedure;
388 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
389 scheduleRequest(request, callback, notify, null);
393 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
396 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
398 assert (request != null);
400 if(Development.DEVELOPMENT) {
402 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
403 System.err.println("schedule write '" + request + "'");
404 } catch (Throwable t) {
405 Logger.defaultLogError(t);
409 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
411 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
414 public void run(int thread) {
416 if(Development.DEVELOPMENT) {
418 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
419 System.err.println("perform write '" + request + "'");
420 } catch (Throwable t) {
421 Logger.defaultLogError(t);
425 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
427 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
432 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
434 VirtualGraph vg = getProvider(request.getProvider());
436 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
437 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
440 public void execute(Object result) {
441 if(callback != null) callback.accept(null);
445 public void exception(Throwable t) {
446 if(callback != null) callback.accept((DatabaseException)t);
451 assert (null != writer);
452 // writer.state.barrier.inc();
455 request.perform(writer);
456 assert (null != writer);
457 } catch (Throwable t) {
458 if (!(t instanceof CancelTransactionException))
459 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
460 writeState.except(t);
462 // writer.state.barrier.dec();
463 // writer.waitAsync(request);
467 assert(!queryProvider2.dirty);
469 } catch (Throwable e) {
471 // Log it first, just to be safe that the error is always logged.
472 if (!(e instanceof CancelTransactionException))
473 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
475 // writeState.getGraph().state.barrier.dec();
476 // writeState.getGraph().waitAsync(request);
478 writeState.except(e);
481 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
482 // // All we can do here is to log those, can't really pass them anywhere.
483 // if (callback != null) {
484 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
485 // else callback.run(new DatabaseException(e));
487 // } catch (Throwable e2) {
488 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
490 // boolean empty = clusterStream.reallyFlush();
491 // int callerThread = -1;
492 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
493 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
495 // // Send all local changes to server so it can calculate correct reverse change set.
496 // // This will call clusterStream.accept().
497 // state.cancelCommit(context, clusterStream);
499 // if (!context.isOk()) // this is a blocking operation
500 // throw new InternalException("Cancel failed. This should never happen. Contact application support.");
501 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
503 // state.cancelCommit2(context, clusterStream);
504 // } catch (DatabaseException e3) {
505 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
507 // clusterStream.setOff(false);
508 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
513 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
519 if(Development.DEVELOPMENT) {
521 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
522 System.err.println("finish write '" + request + "'");
523 } catch (Throwable t) {
524 Logger.defaultLogError(t);
534 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
535 scheduleRequest(request, procedure, notify, null);
539 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
542 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
544 assert (request != null);
546 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
548 requestManager.scheduleWrite(new SessionTask(request, thread) {
551 public void run(int thread) {
553 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
555 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
558 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
560 VirtualGraph vg = getProvider(request.getProvider());
561 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
564 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
565 writeState = writeStateT;
567 assert (null != writer);
568 // writer.state.barrier.inc();
569 writeStateT.setResult(request.perform(writer));
570 assert (null != writer);
572 // writer.state.barrier.dec();
573 // writer.waitAsync(null);
575 } catch (Throwable e) {
577 // writer.state.barrier.dec();
578 // writer.waitAsync(null);
580 writeState.except(e);
582 // state.stopWriteTransaction(clusterStream);
584 // } catch (Throwable e) {
585 // // Log it first, just to be safe that the error is always logged.
586 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
589 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
590 // // All we can do here is to log those, can't really pass them anywhere.
591 // if (procedure != null) {
592 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
593 // else procedure.exception(new DatabaseException(e));
595 // } catch (Throwable e2) {
596 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
599 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
601 // state.stopWriteTransaction(clusterStream);
604 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
607 // if(notify != null) notify.release();
617 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
618 scheduleRequest(request, callback, notify, null);
622 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
625 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
627 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
629 assert (request != null);
631 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
633 requestManager.scheduleWrite(new SessionTask(request, thread) {
636 public void run(int thread) {
637 fireSessionVariableChange(SessionVariables.QUEUED_READS);
639 Procedure<Object> stateProcedure = new Procedure<Object>() {
641 public void execute(Object result) {
642 if (callback != null)
643 callback.accept(null);
646 public void exception(Throwable t) {
647 if (callback != null) {
648 if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
649 else callback.accept(new DatabaseException(t));
651 Logger.defaultLogError("Unhandled exception", t);
655 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
656 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
657 DelayedWriteGraph dwg = null;
658 // newGraph.state.barrier.inc();
661 dwg = new DelayedWriteGraph(newGraph);
662 request.perform(dwg);
663 } catch (Throwable e) {
664 delayedWriteState.except(e);
669 // newGraph.state.barrier.dec();
670 // newGraph.waitAsync(request);
671 fireSessionVariableChange(SessionVariables.QUEUED_READS);
674 delayedWriteState = null;
676 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
677 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
680 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
684 VirtualGraph vg = getProvider(request.getProvider());
685 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
687 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
689 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
691 assert (null != writer);
692 // writer.state.barrier.inc();
696 dwg.commit(writer, request);
698 if(defaultClusterSet != null) {
699 XSupport xs = getService(XSupport.class);
700 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
701 ClusteringSupport cs = getService(ClusteringSupport.class);
702 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
705 // This makes clusters available from server
706 clusterStream.reallyFlush();
708 releaseWriteOnly(writer);
709 handleUpdatesAndMetadata(writer);
711 } catch (ServiceException e) {
712 // writer.state.barrier.dec();
713 // writer.waitAsync(null);
715 // These shall be requested from server
716 clusterTable.removeWriteOnlyClusters();
717 // This makes clusters available from server
718 clusterStream.reallyFlush();
720 releaseWriteOnly(writer);
721 writeState.except(e);
723 // Debugging & Profiling
724 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
734 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
735 scheduleRequest(request, procedure, notify, null);
739 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
742 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
743 throw new Error("Not implemented");
746 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
747 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
748 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
749 createdClusters.add(cluster.clusterId);
754 class WriteOnlySupport implements WriteSupport {
756 ClusterStream stream;
757 ClusterImpl currentCluster;
759 public WriteOnlySupport() {
760 this.stream = clusterStream;
764 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
765 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
769 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
771 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
773 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
774 if(s != queryProvider2.getRootLibrary())
775 throw new ImmutableException("Trying to modify immutable resource key=" + s);
778 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
779 } catch (DatabaseException e) {
780 Logger.defaultLogError(e);
784 clientChanges.invalidate(s);
786 if (cluster.isWriteOnly())
788 queryProvider2.updateStatements(s, p);
793 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
794 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
798 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
800 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
802 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
803 } catch (DatabaseException e) {
804 Logger.defaultLogError(e);
807 clientChanges.invalidate(rid);
809 if (cluster.isWriteOnly())
811 queryProvider2.updateValue(rid);
816 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
819 claimValue(provider,resource, reader.readBytes(null, amount));
823 byte[] bytes = new byte[65536];
825 int rid = ((ResourceImpl)resource).id;
826 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
830 int block = Math.min(left, 65536);
831 reader.readBytes(bytes, block);
832 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
835 } catch (DatabaseException e) {
836 Logger.defaultLogError(e);
839 clientChanges.invalidate(rid);
841 if (cluster.isWriteOnly())
843 queryProvider2.updateValue(rid);
847 private void maintainCluster(ClusterImpl before, ClusterI after_) {
848 if(after_ != null && after_ != before) {
849 ClusterImpl after = (ClusterImpl)after_;
850 if(currentCluster == before) {
851 currentCluster = after;
853 clusterTable.replaceCluster(after);
857 public int createResourceKey(int foreignCounter) throws DatabaseException {
858 if(currentCluster == null) {
859 currentCluster = getNewResourceCluster();
861 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
862 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
863 newCluster.foreignLookup = new byte[foreignCounter];
864 currentCluster = newCluster;
866 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
868 return currentCluster.createResource(clusterTranslator);
872 public Resource createResource(VirtualGraph provider) throws DatabaseException {
873 if(currentCluster == null) {
874 if (null != defaultClusterSet) {
875 ResourceImpl result = getNewResource(defaultClusterSet);
876 currentCluster = clusterTable.getClusterByResourceKey(result.id);
879 currentCluster = getNewResourceCluster();
882 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
883 if (null != defaultClusterSet) {
884 ResourceImpl result = getNewResource(defaultClusterSet);
885 currentCluster = clusterTable.getClusterByResourceKey(result.id);
888 currentCluster = getNewResourceCluster();
891 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
895 public Resource createResource(VirtualGraph provider, long clusterId)
896 throws DatabaseException {
897 return getNewResource(clusterId);
901 public Resource createResource(VirtualGraph provider, Resource clusterSet)
902 throws DatabaseException {
903 return getNewResource(clusterSet);
907 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
908 throws DatabaseException {
909 getNewClusterSet(clusterSet);
913 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
914 throws ServiceException {
915 return containsClusterSet(clusterSet);
918 public void selectCluster(long cluster) {
919 currentCluster = clusterTable.getClusterByClusterId(cluster);
920 long setResourceId = clusterSetsSupport.getSet(cluster);
921 clusterSetsSupport.put(setResourceId, cluster);
925 public Resource setDefaultClusterSet(Resource clusterSet)
926 throws ServiceException {
927 Resource result = setDefaultClusterSet4NewResource(clusterSet);
928 if(clusterSet != null) {
929 long id = clusterSetsSupport.get(clusterSet.getResourceId());
930 currentCluster = clusterTable.getClusterByClusterId(id);
933 currentCluster = null;
939 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
941 provider = getProvider(provider);
942 if (null == provider) {
943 int key = ((ResourceImpl)resource).id;
947 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
948 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
949 // if(key != queryProvider2.getRootLibrary())
950 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
953 // cluster.removeValue(key, clusterTranslator);
954 // } catch (DatabaseException e) {
955 // Logger.defaultLogError(e);
959 clusterTable.writeOnlyInvalidate(cluster);
962 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
963 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
964 clusterTranslator.removeValue(cluster);
965 } catch (DatabaseException e) {
966 Logger.defaultLogError(e);
969 queryProvider2.invalidateResource(key);
970 clientChanges.invalidate(key);
973 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
974 queryProvider2.updateValue(querySupport.getId(resource));
975 clientChanges.claimValue(resource);
982 public void flush(boolean intermediate) {
983 throw new UnsupportedOperationException();
987 public void flushCluster() {
988 clusterTable.flushCluster(graphSession);
989 if(defaultClusterSet != null) {
990 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
992 currentCluster = null;
996 public void flushCluster(Resource r) {
997 throw new UnsupportedOperationException("flushCluster resource " + r);
1005 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1008 int s = ((ResourceImpl)subject).id;
1009 int p = ((ResourceImpl)predicate).id;
1010 int o = ((ResourceImpl)object).id;
1012 provider = getProvider(provider);
1013 if (null == provider) {
1015 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1016 clusterTable.writeOnlyInvalidate(cluster);
1020 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1021 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1022 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1024 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1025 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1027 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1028 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1029 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1030 clusterTranslator.removeStatement(cluster);
1032 queryProvider2.invalidateResource(s);
1033 clientChanges.invalidate(s);
1035 } catch (DatabaseException e) {
1037 Logger.defaultLogError(e);
1045 ((VirtualGraphImpl)provider).deny(s, p, o);
1046 queryProvider2.invalidateResource(s);
1047 clientChanges.invalidate(s);
1056 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1057 throw new UnsupportedOperationException();
1061 public boolean writeOnly() {
1066 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1067 writeSupport.performWriteRequest(graph, request);
1071 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1072 throw new UnsupportedOperationException();
1076 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1077 throw new UnsupportedOperationException();
1081 public <T> void addMetadata(Metadata data) throws ServiceException {
1082 writeSupport.addMetadata(data);
1086 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1087 return writeSupport.getMetadata(clazz);
1091 public TreeMap<String, byte[]> getMetadata() {
1092 return writeSupport.getMetadata();
1096 public void commitDone(WriteTraits writeTraits, long csid) {
1097 writeSupport.commitDone(writeTraits, csid);
1101 public void clearUndoList(WriteTraits writeTraits) {
1102 writeSupport.clearUndoList(writeTraits);
1105 public int clearMetadata() {
1106 return writeSupport.clearMetadata();
1110 public void startUndo() {
1111 writeSupport.startUndo();
1115 class VirtualWriteOnlySupport implements WriteSupport {
1118 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1119 // Resource object) {
1120 // throw new UnsupportedOperationException();
1124 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1126 TransientGraph impl = (TransientGraph)provider;
1127 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1128 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1129 clientChanges.claim(subject, predicate, object);
1134 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1136 TransientGraph impl = (TransientGraph)provider;
1137 impl.claim(subject, predicate, object);
1138 getQueryProvider2().updateStatements(subject, predicate);
1139 clientChanges.claim(subject, predicate, object);
1144 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1145 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1149 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1150 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1151 getQueryProvider2().updateValue(resource);
1152 clientChanges.claimValue(resource);
1156 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1157 byte[] value = reader.readBytes(null, amount);
1158 claimValue(provider, resource, value);
1162 public Resource createResource(VirtualGraph provider) {
1163 TransientGraph impl = (TransientGraph)provider;
1164 return impl.getResource(impl.newResource(false));
1168 public Resource createResource(VirtualGraph provider, long clusterId) {
1169 throw new UnsupportedOperationException();
1173 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1174 throws DatabaseException {
1175 throw new UnsupportedOperationException();
1179 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1180 throws DatabaseException {
1181 throw new UnsupportedOperationException();
1185 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1186 throws ServiceException {
1187 throw new UnsupportedOperationException();
1191 public Resource setDefaultClusterSet(Resource clusterSet)
1192 throws ServiceException {
1197 public void denyValue(VirtualGraph provider, Resource resource) {
1198 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1199 getQueryProvider2().updateValue(querySupport.getId(resource));
1200 // NOTE: this only keeps track of value changes by-resource.
1201 clientChanges.claimValue(resource);
1205 public void flush(boolean intermediate) {
1206 throw new UnsupportedOperationException();
1210 public void flushCluster() {
1211 throw new UnsupportedOperationException();
1215 public void flushCluster(Resource r) {
1216 throw new UnsupportedOperationException("Resource " + r);
1224 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1225 TransientGraph impl = (TransientGraph) provider;
1226 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1227 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1228 clientChanges.deny(subject, predicate, object);
1233 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1234 throw new UnsupportedOperationException();
1238 public boolean writeOnly() {
1243 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1244 throw new UnsupportedOperationException();
1248 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1249 throw new UnsupportedOperationException();
1253 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1254 throw new UnsupportedOperationException();
1258 public <T> void addMetadata(Metadata data) throws ServiceException {
1259 throw new UnsupportedOperationException();
1263 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1264 throw new UnsupportedOperationException();
1268 public TreeMap<String, byte[]> getMetadata() {
1269 throw new UnsupportedOperationException();
1273 public void commitDone(WriteTraits writeTraits, long csid) {
1277 public void clearUndoList(WriteTraits writeTraits) {
1281 public int clearMetadata() {
1286 public void startUndo() {
1290 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1294 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1296 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1299 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1303 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1305 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1307 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1308 writeState = writeStateT;
1310 assert (null != writer);
1311 // writer.state.barrier.inc();
1312 long start = System.nanoTime();
1313 T result = request.perform(writer);
1314 long duration = System.nanoTime() - start;
1316 System.err.println("################");
1317 System.err.println("WriteOnly duration " + 1e-9*duration);
1319 writeStateT.setResult(result);
1321 // This makes clusters available from server
1322 clusterStream.reallyFlush();
1324 // This will trigger query updates
1325 releaseWriteOnly(writer);
1326 assert (null != writer);
1328 handleUpdatesAndMetadata(writer);
1330 } catch (CancelTransactionException e) {
1332 releaseWriteOnly(writeState.getGraph());
1334 clusterTable.removeWriteOnlyClusters();
1335 state.stopWriteTransaction(clusterStream);
1337 } catch (Throwable e) {
1339 e.printStackTrace();
1341 releaseWriteOnly(writeState.getGraph());
1343 clusterTable.removeWriteOnlyClusters();
1345 if (callback != null)
1346 callback.exception(new DatabaseException(e));
1348 state.stopWriteTransaction(clusterStream);
1349 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1353 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1359 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1360 scheduleRequest(request, callback, notify, null);
1364 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1368 assert (request != null);
1370 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1372 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1375 public void run(int thread) {
1377 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1381 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1384 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1388 VirtualGraph vg = getProvider(request.getProvider());
1389 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1391 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1393 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1396 public void execute(Object result) {
1397 if(callback != null) callback.accept(null);
1401 public void exception(Throwable t) {
1402 if(callback != null) callback.accept((DatabaseException)t);
1407 assert (null != writer);
1408 // writer.state.barrier.inc();
1412 request.perform(writer);
1414 } catch (Throwable e) {
1416 // writer.state.barrier.dec();
1417 // writer.waitAsync(null);
1419 releaseWriteOnly(writer);
1421 clusterTable.removeWriteOnlyClusters();
1423 if(!(e instanceof CancelTransactionException)) {
1424 if (callback != null)
1425 callback.accept(new DatabaseException(e));
1428 writeState.except(e);
1434 // This makes clusters available from server
1435 boolean empty = clusterStream.reallyFlush();
1436 // This was needed to make WO requests call metadata listeners.
1437 // NOTE: the calling event does not contain clientChanges information.
1438 if (!empty && clientChanges.isEmpty())
1439 clientChanges.setNotEmpty(true);
1440 releaseWriteOnly(writer);
1441 assert (null != writer);
1444 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1457 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1458 scheduleRequest(request, callback, notify, null);
1462 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1465 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1467 assert (request != null);
1469 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1471 requestManager.scheduleWrite(new SessionTask(request, thread) {
1474 public void run(int thread) {
1476 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1478 performWriteOnly(request, notify, callback);
1488 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1490 assert (request != null);
1491 assert (procedure != null);
1493 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1495 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1498 public void run(int thread) {
1500 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1502 ListenerBase listener = getListenerBase(procedure);
1504 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1508 if (listener != null) {
1511 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1514 public void exception(AsyncReadGraph graph, Throwable t) {
1515 procedure.exception(graph, t);
1516 if(throwable != null) {
1519 // ErrorLogger.defaultLogError("Unhandled exception", t);
1524 public void execute(AsyncReadGraph graph, T t) {
1525 if(result != null) result.set(t);
1526 procedure.execute(graph, t);
1530 } catch (Throwable t) {
1531 // This is handled by the AsyncProcedure
1532 //Logger.defaultLogError("Internal error", t);
1539 // newGraph.state.barrier.inc();
1541 T t = request.perform(newGraph);
1545 if(result != null) result.set(t);
1546 procedure.execute(newGraph, t);
1548 } catch (Throwable th) {
1550 if(throwable != null) {
1553 Logger.defaultLogError("Unhandled exception", th);
1558 } catch (Throwable t) {
1561 t.printStackTrace();
1563 if(throwable != null) {
1566 Logger.defaultLogError("Unhandled exception", t);
1571 procedure.exception(newGraph, t);
1573 } catch (Throwable t2) {
1575 if(throwable != null) {
1578 Logger.defaultLogError("Unhandled exception", t2);
1585 // newGraph.state.barrier.dec();
1586 // newGraph.waitAsync(request);
1592 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1602 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1604 assert (request != null);
1605 assert (procedure != null);
1607 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1609 requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1612 public void run(int thread) {
1614 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1616 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1620 if (listener != null) {
1622 newGraph.processor.query(newGraph, request, null, procedure, listener);
1624 // newGraph.waitAsync(request);
1628 final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1629 procedure, "request");
1633 // newGraph.state.barrier.inc();
1635 request.perform(newGraph, wrapper);
1637 // newGraph.waitAsync(request);
1639 } catch (Throwable t) {
1641 wrapper.exception(newGraph, t);
1642 // newGraph.waitAsync(request);
1651 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1661 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1663 assert (request != null);
1664 assert (procedure != null);
1666 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1668 int sync = notify != null ? thread : -1;
1670 requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1673 public void run(int thread) {
1675 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1677 ListenerBase listener = getListenerBase(procedure);
1679 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1683 if (listener != null) {
1685 newGraph.processor.query(newGraph, request, null, procedure, listener);
1687 // newGraph.waitAsync(request);
1691 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1695 request.perform(newGraph, wrapper);
1697 } catch (Throwable t) {
1699 t.printStackTrace();
1707 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1717 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1719 assert (request != null);
1720 assert (procedure != null);
1722 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1724 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1727 public void run(int thread) {
1729 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1731 ListenerBase listener = getListenerBase(procedure);
1733 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1737 if (listener != null) {
1739 newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1742 public void exception(Throwable t) {
1743 procedure.exception(t);
1744 if(throwable != null) {
1750 public void execute(T t) {
1751 if(result != null) result.set(t);
1752 procedure.execute(t);
1757 // newGraph.waitAsync(request);
1761 // newGraph.state.barrier.inc();
1763 request.register(newGraph, new Listener<T>() {
1766 public void exception(Throwable t) {
1767 if(throwable != null) throwable.set(t);
1768 procedure.exception(t);
1769 // newGraph.state.barrier.dec();
1773 public void execute(T t) {
1774 if(result != null) result.set(t);
1775 procedure.execute(t);
1776 // newGraph.state.barrier.dec();
1780 public boolean isDisposed() {
1786 // newGraph.waitAsync(request);
1792 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1804 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1806 scheduleRequest(request, procedure, null, null, null);
1811 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1813 Semaphore notify = new Semaphore(0);
1814 DataContainer<Throwable> container = new DataContainer<Throwable>();
1815 DataContainer<T> result = new DataContainer<T>();
1816 scheduleRequest(request, procedure, notify, container, result);
1817 acquire(notify, request);
1818 Throwable throwable = container.get();
1819 if(throwable != null) {
1820 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1821 else throw new DatabaseException("Unexpected exception", throwable);
1823 return result.get();
1827 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1829 Semaphore notify = new Semaphore(0);
1830 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1831 final DataContainer<T> resultContainer = new DataContainer<T>();
1832 scheduleRequest(request, new AsyncProcedure<T>() {
1834 public void exception(AsyncReadGraph graph, Throwable throwable) {
1835 exceptionContainer.set(throwable);
1836 procedure.exception(graph, throwable);
1839 public void execute(AsyncReadGraph graph, T result) {
1840 resultContainer.set(result);
1841 procedure.execute(graph, result);
1843 }, getListenerBase(procedure), notify);
1844 acquire(notify, request, procedure);
1845 Throwable throwable = exceptionContainer.get();
1846 if (throwable != null) {
1847 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1848 else throw new DatabaseException("Unexpected exception", throwable);
1850 return resultContainer.get();
1855 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1857 Semaphore notify = new Semaphore(0);
1858 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1859 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1861 public void exception(AsyncReadGraph graph, Throwable throwable) {
1862 exceptionContainer.set(throwable);
1863 procedure.exception(graph, throwable);
1866 public void execute(AsyncReadGraph graph, T result) {
1867 procedure.execute(graph, result);
1870 public void finished(AsyncReadGraph graph) {
1871 procedure.finished(graph);
1874 acquire(notify, request, procedure);
1875 Throwable throwable = exceptionContainer.get();
1876 if (throwable != null) {
1877 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1878 else throw new DatabaseException("Unexpected exception", throwable);
1880 // TODO: implement return value
1881 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1886 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1887 return syncRequest(request, new ProcedureAdapter<T>());
1890 // assert(request != null);
1892 // final DataContainer<T> result = new DataContainer<T>();
1893 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1895 // syncRequest(request, new Procedure<T>() {
1898 // public void execute(T t) {
1903 // public void exception(Throwable t) {
1904 // exception.set(t);
1909 // Throwable t = exception.get();
1911 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1912 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1915 // return result.get();
1920 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1921 return syncRequest(request, (Procedure<T>)procedure);
1926 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1927 // assertNotSession();
1928 // Semaphore notify = new Semaphore(0);
1929 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1930 // DataContainer<T> result = new DataContainer<T>();
1931 // scheduleRequest(request, procedure, notify, container, result);
1932 // acquire(notify, request);
1933 // Throwable throwable = container.get();
1934 // if(throwable != null) {
1935 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1936 // else throw new DatabaseException("Unexpected exception", throwable);
1938 // return result.get();
1943 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1945 Semaphore notify = new Semaphore(0);
1946 final DataContainer<Throwable> container = new DataContainer<Throwable>();
1947 final DataContainer<T> result = new DataContainer<T>();
1948 scheduleRequest(request, procedure, notify, container, result);
1949 acquire(notify, request);
1950 Throwable throwable = container.get();
1951 if (throwable != null) {
1952 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1953 else throw new DatabaseException("Unexpected exception", throwable);
1955 return result.get();
1959 public void syncRequest(Write request) throws DatabaseException {
1962 Semaphore notify = new Semaphore(0);
1963 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1964 scheduleRequest(request, e -> exception.set(e), notify);
1965 acquire(notify, request);
1966 if(exception.get() != null) throw exception.get();
1970 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1972 Semaphore notify = new Semaphore(0);
1973 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1974 final DataContainer<T> result = new DataContainer<T>();
1975 scheduleRequest(request, new Procedure<T>() {
1978 public void exception(Throwable t) {
1983 public void execute(T t) {
1988 acquire(notify, request);
1989 if(exception.get() != null) {
1990 Throwable t = exception.get();
1991 if(t instanceof DatabaseException) throw (DatabaseException)t;
1992 else throw new DatabaseException(t);
1994 return result.get();
1998 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2000 Semaphore notify = new Semaphore(0);
2001 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2002 final DataContainer<T> result = new DataContainer<T>();
2003 scheduleRequest(request, new Procedure<T>() {
2006 public void exception(Throwable t) {
2011 public void execute(T t) {
2016 acquire(notify, request);
2017 if(exception.get() != null) {
2018 Throwable t = exception.get();
2019 if(t instanceof DatabaseException) throw (DatabaseException)t;
2020 else throw new DatabaseException(t);
2022 return result.get();
2026 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2028 Semaphore notify = new Semaphore(0);
2029 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2030 final DataContainer<T> result = new DataContainer<T>();
2031 scheduleRequest(request, new Procedure<T>() {
2034 public void exception(Throwable t) {
2039 public void execute(T t) {
2044 acquire(notify, request);
2045 if(exception.get() != null) {
2046 Throwable t = exception.get();
2047 if(t instanceof DatabaseException) throw (DatabaseException)t;
2048 else throw new DatabaseException(t);
2050 return result.get();
2054 public void syncRequest(DelayedWrite request) throws DatabaseException {
2056 Semaphore notify = new Semaphore(0);
2057 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2058 scheduleRequest(request, e -> exception.set(e), notify);
2059 acquire(notify, request);
2060 if(exception.get() != null) throw exception.get();
2064 public void syncRequest(WriteOnly request) throws DatabaseException {
2067 Semaphore notify = new Semaphore(0);
2068 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2069 scheduleRequest(request, e -> exception.set(e), notify);
2070 acquire(notify, request);
2071 if(exception.get() != null) throw exception.get();
2076 * GraphRequestProcessor interface
2080 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2082 scheduleRequest(request, procedure, null, null);
2087 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2089 scheduleRequest(request, procedure, null);
2094 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2096 scheduleRequest(request, procedure, null, null, null);
2101 public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2103 scheduleRequest(request, callback, null);
2108 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2110 scheduleRequest(request, procedure, null);
2115 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2117 scheduleRequest(request, procedure, null);
2122 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2124 scheduleRequest(request, procedure, null);
2129 public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2131 scheduleRequest(request, callback, null);
2136 public void asyncRequest(final Write r) {
2137 asyncRequest(r, null);
2141 public void asyncRequest(final DelayedWrite r) {
2142 asyncRequest(r, null);
2146 public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2148 scheduleRequest(request, callback, null);
2153 public void asyncRequest(final WriteOnly request) {
2155 asyncRequest(request, null);
2160 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2161 r.request(this, procedure);
2165 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2166 r.request(this, procedure);
2170 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2171 r.request(this, procedure);
2175 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2176 r.request(this, procedure);
2180 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2181 r.request(this, procedure);
2185 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2186 r.request(this, procedure);
2190 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2191 return r.request(this);
2195 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2196 return r.request(this);
2200 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2201 r.request(this, procedure);
2205 public <T> void async(WriteInterface<T> r) {
2206 r.request(this, new ProcedureAdapter<T>());
2210 public void incAsync() {
2215 public void decAsync() {
2219 public long getCluster(ResourceImpl resource) {
2220 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2221 return cluster.getClusterId();
2224 public long getCluster(int id) {
2225 if (clusterTable == null)
2226 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2227 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2230 public ResourceImpl getResource(int id) {
2231 return new ResourceImpl(resourceSupport, id);
2234 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2235 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2236 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2237 int key = proxy.getClusterKey();
2238 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2239 return new ResourceImpl(resourceSupport, resourceKey);
2242 public ResourceImpl getResource2(int id) {
2244 return new ResourceImpl(resourceSupport, id);
2247 final public int getId(ResourceImpl impl) {
2251 public static final Charset UTF8 = Charset.forName("utf-8");
2256 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2257 for(TransientGraph g : support.providers) {
2258 if(g.isPending(subject)) return false;
2263 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2264 for(TransientGraph g : support.providers) {
2265 if(g.isPending(subject, predicate)) return false;
2270 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2272 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2274 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2277 public void accept(ReadGraphImpl graph) {
2278 if(ready.decrementAndGet() == 0) {
2279 runnable.accept(graph);
2285 for(TransientGraph g : support.providers) {
2286 if(g.isPending(subject)) {
2288 g.load(graph, subject, composite);
2289 } catch (DatabaseException e) {
2290 e.printStackTrace();
2293 composite.accept(graph);
2297 composite.accept(graph);
2301 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2303 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2305 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2308 public void accept(ReadGraphImpl graph) {
2309 if(ready.decrementAndGet() == 0) {
2310 runnable.accept(graph);
2316 for(TransientGraph g : support.providers) {
2317 if(g.isPending(subject, predicate)) {
2319 g.load(graph, subject, predicate, composite);
2320 } catch (DatabaseException e) {
2321 e.printStackTrace();
2324 composite.accept(graph);
2328 composite.accept(graph);
2332 // void dumpHeap() {
2335 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2336 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2337 // "d:/heap" + flushCounter + ".txt", true);
2338 // } catch (IOException e) {
2339 // e.printStackTrace();
2344 void fireReactionsToSynchronize(ChangeSet cs) {
2346 // Do not fire empty events
2350 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2351 // g.state.barrier.inc();
2355 if (!cs.isEmpty()) {
2356 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2357 for (ChangeListener l : changeListeners2) {
2360 } catch (Exception ex) {
2361 ex.printStackTrace();
2368 // g.state.barrier.dec();
2369 // g.waitAsync(null);
2375 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2378 // Do not fire empty events
2383 // graph.state.barrier.inc();
2387 if (!cs2.isEmpty()) {
2389 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2390 for (ChangeListener l : changeListeners2) {
2393 // System.out.println("changelistener " + l);
2394 } catch (Exception ex) {
2395 ex.printStackTrace();
2403 // graph.state.barrier.dec();
2404 // graph.waitAsync(null);
2407 } catch (Throwable t) {
2408 t.printStackTrace();
2412 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2416 // Do not fire empty events
2420 // Do not fire on virtual requests
2421 if(graph.getProvider() != null)
2424 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2428 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2429 for (ChangeListener l : metadataListeners) {
2432 } catch (Throwable ex) {
2433 ex.printStackTrace();
2441 } catch (Throwable t) {
2442 t.printStackTrace();
2451 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2454 public <T> T getService(Class<T> api) {
2455 T t = peekService(api);
2457 if (state.isClosed())
2458 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2459 throw new ServiceNotFoundException(this, api);
2467 protected abstract ServerInformation getCachedServerInformation();
2469 private Class<?> serviceKey1 = null;
2470 private Class<?> serviceKey2 = null;
2471 private Object service1 = null;
2472 private Object service2 = null;
2478 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2480 @SuppressWarnings("unchecked")
2482 public synchronized <T> T peekService(Class<T> api) {
2484 if(serviceKey1 == api) {
2486 } else if (serviceKey2 == api) {
2488 Object result = service2;
2489 service2 = service1;
2490 serviceKey2 = serviceKey1;
2496 if (Layer0.class == api)
2498 if (ServerInformation.class == api)
2499 return (T) getCachedServerInformation();
2500 else if (WriteGraphImpl.class == api)
2501 return (T) writeState.getGraph();
2502 else if (ClusterBuilder.class == api)
2503 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2504 else if (ClusterBuilderFactory.class == api)
2505 return (T)new ClusterBuilderFactoryImpl(this);
2507 service2 = service1;
2508 serviceKey2 = serviceKey1;
2510 service1 = serviceLocator.peekService(api);
2521 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2524 public boolean hasService(Class<?> api) {
2525 return serviceLocator.hasService(api);
2529 * @param api the api that must be implemented by the specified service
2530 * @param service the service implementation
2533 public <T> void registerService(Class<T> api, T service) {
2534 if(Layer0.class == api) {
2535 L0 = (Layer0)service;
2538 serviceLocator.registerService(api, service);
2539 if (TransactionPolicySupport.class == api) {
2540 transactionPolicy = (TransactionPolicySupport)service;
2541 state.resetTransactionPolicy();
2543 if (api == serviceKey1)
2545 else if (api == serviceKey2)
2553 void fireSessionVariableChange(String variable) {
2554 for (MonitorHandler h : monitorHandlers) {
2555 MonitorContext ctx = monitorContexts.getLeft(h);
2557 // SafeRunner functionality repeated here to avoid dependency.
2559 h.valuesChanged(ctx);
2560 } catch (Exception e) {
2561 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2562 } catch (LinkageError e) {
2563 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2569 class ResourceSerializerImpl implements ResourceSerializer {
2571 public long createRandomAccessId(int id) throws DatabaseException {
2574 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2575 long cluster = getCluster(id);
2577 return 0; // Better to return 0 then invalid id.
2578 long result = ClusterTraitsBase.createResourceId(cluster, index);
2583 public long getRandomAccessId(Resource resource) throws DatabaseException {
2585 ResourceImpl resourceImpl = (ResourceImpl) resource;
2586 return createRandomAccessId(resourceImpl.id);
2591 public int getTransientId(Resource resource) throws DatabaseException {
2593 ResourceImpl resourceImpl = (ResourceImpl) resource;
2594 return resourceImpl.id;
2599 public String createRandomAccessId(Resource resource)
2600 throws InvalidResourceReferenceException {
2602 if(resource == null) throw new IllegalArgumentException();
2604 ResourceImpl resourceImpl = (ResourceImpl) resource;
2606 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2610 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2611 } catch (DatabaseException e1) {
2612 throw new InvalidResourceReferenceException(e1);
2615 // Serialize as '<resource index>_<cluster id>'
2616 return "" + r + "_" + getCluster(resourceImpl);
2617 } catch (Throwable e) {
2618 e.printStackTrace();
2619 throw new InvalidResourceReferenceException(e);
2624 public int getTransientId(long serialized) throws DatabaseException {
2625 if (serialized <= 0)
2626 return (int)serialized;
2627 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2628 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2629 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2630 if (cluster == null)
2631 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2632 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2637 public Resource getResource(long randomAccessId) throws DatabaseException {
2638 return getResourceByKey(getTransientId(randomAccessId));
2642 public Resource getResource(int transientId) throws DatabaseException {
2643 return getResourceByKey(transientId);
2647 public Resource getResource(String randomAccessId)
2648 throws InvalidResourceReferenceException {
2650 int i = randomAccessId.indexOf('_');
2652 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2653 + randomAccessId + "'");
2654 int r = Integer.parseInt(randomAccessId.substring(0, i));
2655 if(r < 0) return getResourceByKey(r);
2656 long c = Long.parseLong(randomAccessId.substring(i + 1));
2657 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2658 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2659 if (cluster.hasResource(key, clusterTranslator))
2660 return getResourceByKey(key);
2661 } catch (InvalidResourceReferenceException e) {
2663 } catch (NumberFormatException e) {
2664 throw new InvalidResourceReferenceException(e);
2665 } catch (Throwable e) {
2666 e.printStackTrace();
2667 throw new InvalidResourceReferenceException(e);
2670 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2674 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2677 } catch (Throwable e) {
2678 e.printStackTrace();
2679 throw new InvalidResourceReferenceException(e);
2685 // Copied from old SessionImpl
2688 if (state.isClosed())
2689 throw new Error("Session closed.");
2694 public ResourceImpl getNewResource(long clusterId) {
2695 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2698 newId = cluster.createResource(clusterTranslator);
2699 } catch (DatabaseException e) {
2700 Logger.defaultLogError(e);
2703 return new ResourceImpl(resourceSupport, newId);
2706 public ResourceImpl getNewResource(Resource clusterSet)
2707 throws DatabaseException {
2708 long resourceId = clusterSet.getResourceId();
2709 Long clusterId = clusterSetsSupport.get(resourceId);
2710 if (null == clusterId)
2711 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2712 if (Constants.NewClusterId == clusterId) {
2713 clusterId = getService(ClusteringSupport.class).createCluster();
2714 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2715 createdClusters.add(clusterId);
2716 clusterSetsSupport.put(resourceId, clusterId);
2717 return getNewResource(clusterId);
2719 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2720 ResourceImpl result;
2721 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2722 clusterId = getService(ClusteringSupport.class).createCluster();
2723 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2724 createdClusters.add(clusterId);
2725 clusterSetsSupport.put(resourceId, clusterId);
2726 return getNewResource(clusterId);
2728 result = getNewResource(clusterId);
2729 int resultKey = querySupport.getId(result);
2730 long resultCluster = querySupport.getClusterId(resultKey);
2731 if (clusterId != resultCluster)
2732 clusterSetsSupport.put(resourceId, resultCluster);
2738 public void getNewClusterSet(Resource clusterSet)
2739 throws DatabaseException {
2741 System.out.println("new cluster set=" + clusterSet);
2742 long resourceId = clusterSet.getResourceId();
2743 if (clusterSetsSupport.containsKey(resourceId))
2744 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2745 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2747 public boolean containsClusterSet(Resource clusterSet)
2748 throws ServiceException {
2749 long resourceId = clusterSet.getResourceId();
2750 return clusterSetsSupport.containsKey(resourceId);
2752 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2753 Resource r = defaultClusterSet;
2754 defaultClusterSet = clusterSet;
2757 void printDiagnostics() {
2761 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2762 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2763 for(ClusterI cluster : clusterTable.getClusters()) {
2765 if (cluster.isLoaded() && !cluster.isEmpty()) {
2766 residentClusters.set(residentClusters.get() + 1);
2767 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2769 } catch (DatabaseException e) {
2770 Logger.defaultLogError(e);
2774 System.out.println("--------------------------------");
2775 System.out.println("Cluster information:");
2776 System.out.println("-amount of resident clusters=" + residentClusters.get());
2777 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2779 for(ClusterI cluster : clusterTable.getClusters()) {
2780 System.out.print("Cluster " + cluster.getClusterId() + " [");
2782 if (!cluster.isLoaded())
2783 System.out.println("not loaded]");
2784 else if (cluster.isEmpty())
2785 System.out.println("is empty]");
2787 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2788 System.out.print(",references=" + cluster.getReferenceCount());
2789 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2791 } catch (DatabaseException e) {
2792 Logger.defaultLogError(e);
2793 System.out.println("is corrupted]");
2797 queryProvider2.printDiagnostics();
2798 System.out.println("--------------------------------");
2803 public void fireStartReadTransaction() {
2806 System.out.println("StartReadTransaction");
2808 for (SessionEventListener listener : eventListeners) {
2810 listener.readTransactionStarted();
2811 } catch (Throwable t) {
2812 t.printStackTrace();
2818 public void fireFinishReadTransaction() {
2821 System.out.println("FinishReadTransaction");
2823 for (SessionEventListener listener : eventListeners) {
2825 listener.readTransactionFinished();
2826 } catch (Throwable t) {
2827 t.printStackTrace();
2833 public void fireStartWriteTransaction() {
2836 System.out.println("StartWriteTransaction");
2838 for (SessionEventListener listener : eventListeners) {
2840 listener.writeTransactionStarted();
2841 } catch (Throwable t) {
2842 t.printStackTrace();
2848 public void fireFinishWriteTransaction() {
2851 System.out.println("FinishWriteTransaction");
2853 for (SessionEventListener listener : eventListeners) {
2855 listener.writeTransactionFinished();
2856 } catch (Throwable t) {
2857 t.printStackTrace();
2861 Indexing.resetDependenciesIndexingDisabled();
2865 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2866 throw new Error("Not supported at the moment.");
2873 ClusterTable getClusterTable() {
2874 return clusterTable;
2877 public GraphSession getGraphSession() {
2878 return graphSession;
2881 public QueryProcessor getQueryProvider2() {
2882 return queryProvider2;
2885 ClientChangesImpl getClientChanges() {
2886 return clientChanges;
2889 boolean getWriteOnly() {
2893 static int counter = 0;
2895 public void onClusterLoaded(long clusterId) {
2897 clusterTable.updateSize();
2905 * Implementation of the interface RequestProcessor
2909 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2914 assert(request != null);
2916 final DataContainer<T> result = new DataContainer<T>();
2917 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2919 syncRequest(request, new AsyncProcedure<T>() {
2922 public void execute(AsyncReadGraph graph, T t) {
2927 public void exception(AsyncReadGraph graph, Throwable t) {
2933 Throwable t = exception.get();
2935 if(t instanceof DatabaseException) throw (DatabaseException)t;
2936 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2939 return result.get();
2944 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2945 // assertNotSession();
2946 // return syncRequest(request, (AsyncProcedure<T>)procedure);
2950 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2952 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2956 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2958 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2962 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2964 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2968 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2970 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2974 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2978 assert(request != null);
2980 final DataContainer<T> result = new DataContainer<T>();
2981 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2983 syncRequest(request, new AsyncProcedure<T>() {
2986 public void execute(AsyncReadGraph graph, T t) {
2991 public void exception(AsyncReadGraph graph, Throwable t) {
2997 Throwable t = exception.get();
2999 if(t instanceof DatabaseException) throw (DatabaseException)t;
3000 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3003 return result.get();
3008 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3010 return syncRequest(request, (AsyncProcedure<T>)procedure);
3014 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3016 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3020 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3022 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3026 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3028 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3032 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3034 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3038 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3042 assert(request != null);
3044 final ArrayList<T> result = new ArrayList<T>();
3045 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3047 syncRequest(request, new AsyncMultiProcedure<T>() {
3050 public void execute(AsyncReadGraph graph, T t) {
3051 synchronized(result) {
3057 public void finished(AsyncReadGraph graph) {
3061 public void exception(AsyncReadGraph graph, Throwable t) {
3068 Throwable t = exception.get();
3070 if(t instanceof DatabaseException) throw (DatabaseException)t;
3071 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3079 public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3081 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3085 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3087 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3091 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3093 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3097 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3099 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3103 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3105 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3109 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3113 assert(request != null);
3115 final ArrayList<T> result = new ArrayList<T>();
3116 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3118 syncRequest(request, new AsyncMultiProcedure<T>() {
3121 public void execute(AsyncReadGraph graph, T t) {
3122 synchronized(result) {
3128 public void finished(AsyncReadGraph graph) {
3132 public void exception(AsyncReadGraph graph, Throwable t) {
3139 Throwable t = exception.get();
3141 if(t instanceof DatabaseException) throw (DatabaseException)t;
3142 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3150 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3152 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3156 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3158 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3162 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3164 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3168 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3170 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3174 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3176 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3181 public <T> void asyncRequest(Read<T> request) {
3183 asyncRequest(request, new ProcedureAdapter<T>() {
3185 public void exception(Throwable t) {
3186 t.printStackTrace();
3193 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3194 asyncRequest(request, (AsyncProcedure<T>)procedure);
3198 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3199 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3203 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3204 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3208 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3209 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3213 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3214 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3218 final public <T> void asyncRequest(final AsyncRead<T> request) {
3220 assert(request != null);
3222 asyncRequest(request, new ProcedureAdapter<T>() {
3224 public void exception(Throwable t) {
3225 t.printStackTrace();
3232 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3233 scheduleRequest(request, procedure, procedure, null);
3237 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3238 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3242 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3243 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3247 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3248 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3252 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3253 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3257 public <T> void asyncRequest(MultiRead<T> request) {
3259 assert(request != null);
3261 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3263 public void exception(AsyncReadGraph graph, Throwable t) {
3264 t.printStackTrace();
3271 public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3272 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3276 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3277 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3281 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3282 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3286 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3287 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3291 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3292 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3296 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3298 assert(request != null);
3300 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3302 public void exception(AsyncReadGraph graph, Throwable t) {
3303 t.printStackTrace();
3310 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3311 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3315 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3316 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3320 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3321 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3325 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3326 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3330 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3331 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3335 public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3337 throw new Error("Not implemented!");
3341 public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3342 throw new Error("Not implemented!");
3346 final public <T> void asyncRequest(final ExternalRead<T> request) {
3348 assert(request != null);
3350 asyncRequest(request, new ProcedureAdapter<T>() {
3352 public void exception(Throwable t) {
3353 t.printStackTrace();
3360 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3361 asyncRequest(request, (Procedure<T>)procedure);
3366 void check(Throwable t) throws DatabaseException {
3368 if(t instanceof DatabaseException) throw (DatabaseException)t;
3369 else throw new DatabaseException("Unexpected exception", t);
3373 void check(DataContainer<Throwable> container) throws DatabaseException {
3374 Throwable t = container.get();
3376 if(t instanceof DatabaseException) throw (DatabaseException)t;
3377 else throw new DatabaseException("Unexpected exception", t);
3386 boolean sameProvider(Write request) {
3387 if(writeState.getGraph().provider != null) {
3388 return writeState.getGraph().provider.equals(request.getProvider());
3390 return request.getProvider() == null;
3394 boolean plainWrite(WriteGraphImpl graph) {
3395 if(graph == null) return false;
3396 if(graph.writeSupport.writeOnly()) return false;
3401 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3402 private void assertNotSession() throws DatabaseException {
3403 Thread current = Thread.currentThread();
3404 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3407 void assertAlive() {
3408 if (!state.isAlive())
3409 throw new RuntimeDatabaseException("Session has been shut down.");
3412 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3413 return querySupport.getValueStream(graph, querySupport.getId(resource));
3416 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3417 return querySupport.getValue(graph, querySupport.getId(resource));
3420 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3421 acquire(semaphore, request, null);
3424 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3428 // Loop until the semaphore is acquired reporting requests that take a long time.
3431 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3435 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3437 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3438 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3440 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3441 + (waitTime) + " ms. (procedure=" + procedure + ")");
3448 } catch (InterruptedException e) {
3449 e.printStackTrace();
3450 // FIXME: Should perhaps do something else in this case ??
3457 // public boolean holdOnToTransactionAfterCancel() {
3458 // return transactionPolicy.holdOnToTransactionAfterCancel();
3462 // public boolean holdOnToTransactionAfterCommit() {
3463 // return transactionPolicy.holdOnToTransactionAfterCommit();
3467 // public boolean holdOnToTransactionAfterRead() {
3468 // return transactionPolicy.holdOnToTransactionAfterRead();
3472 // public void onRelinquish() {
3473 // transactionPolicy.onRelinquish();
3477 // public void onRelinquishDone() {
3478 // transactionPolicy.onRelinquishDone();
3482 // public void onRelinquishError() {
3483 // transactionPolicy.onRelinquishError();
3488 public Session getSession() {
3493 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3494 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3496 public void ceased(int thread) {
3498 requestManager.ceased(thread);
3502 public int getAmountOfQueryThreads() {
3503 // This must be a power of two
3505 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3508 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3510 return new ResourceImpl(resourceSupport, key);
3514 public void acquireWriteOnly() {
3518 public void releaseWriteOnly(ReadGraphImpl graph) {
3520 queryProvider2.releaseWrite(graph);
3523 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3525 long start = System.nanoTime();
3527 while(dirtyPrimitives) {
3528 dirtyPrimitives = false;
3529 getQueryProvider2().performDirtyUpdates(writer);
3530 getQueryProvider2().performScheduledUpdates(writer);
3533 fireMetadataListeners(writer, clientChanges);
3535 if(DebugPolicy.PERFORMANCE_DATA) {
3536 long end = System.nanoTime();
3537 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3542 void removeTemporaryData() {
3543 File platform = Platform.getLocation().toFile();
3544 File tempFiles = new File(platform, "tempFiles");
3545 File temp = new File(tempFiles, "db");
3549 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3551 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3554 } catch (IOException e) {
3555 Logger.defaultLogError(e);
3557 return FileVisitResult.CONTINUE;
3560 } catch (IOException e) {
3561 Logger.defaultLogError(e);
3565 public void handleCreatedClusters() {
3566 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3567 createdClusters.forEach(new TLongProcedure() {
3570 public boolean execute(long value) {
3571 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3572 cluster.setImmutable(true, clusterTranslator);
3577 createdClusters.clear();
3581 public Object getModificationCounter() {
3582 return queryProvider2.modificationCounter;
3586 public void markUndoPoint() {
3587 state.setCombine(false);