1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.Resource;
43 import org.simantics.db.ResourceSerializer;
44 import org.simantics.db.Session;
45 import org.simantics.db.SessionManager;
46 import org.simantics.db.SessionVariables;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.authentication.UserAuthenticationAgent;
50 import org.simantics.db.authentication.UserAuthenticator;
51 import org.simantics.db.common.Indexing;
52 import org.simantics.db.common.TransactionPolicyRelease;
53 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
54 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
62 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
63 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
64 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
65 import org.simantics.db.common.utils.Logger;
66 import org.simantics.db.event.ChangeEvent;
67 import org.simantics.db.event.ChangeListener;
68 import org.simantics.db.event.SessionEventListener;
69 import org.simantics.db.exception.CancelTransactionException;
70 import org.simantics.db.exception.ClusterSetExistException;
71 import org.simantics.db.exception.DatabaseException;
72 import org.simantics.db.exception.ImmutableException;
73 import org.simantics.db.exception.InvalidResourceReferenceException;
74 import org.simantics.db.exception.ResourceNotFoundException;
75 import org.simantics.db.exception.RuntimeDatabaseException;
76 import org.simantics.db.exception.ServiceException;
77 import org.simantics.db.exception.ServiceNotFoundException;
78 import org.simantics.db.impl.ClusterBase;
79 import org.simantics.db.impl.ClusterI;
80 import org.simantics.db.impl.ClusterTraitsBase;
81 import org.simantics.db.impl.ClusterTranslator;
82 import org.simantics.db.impl.ResourceImpl;
83 import org.simantics.db.impl.TransientGraph;
84 import org.simantics.db.impl.VirtualGraphImpl;
85 import org.simantics.db.impl.graph.DelayedWriteGraph;
86 import org.simantics.db.impl.graph.ReadGraphImpl;
87 import org.simantics.db.impl.graph.WriteGraphImpl;
88 import org.simantics.db.impl.graph.WriteSupport;
89 import org.simantics.db.impl.internal.RandomAccessValueSupport;
90 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
91 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
92 import org.simantics.db.impl.query.QueryCache;
93 import org.simantics.db.impl.query.QueryCacheBase;
94 import org.simantics.db.impl.query.QueryProcessor;
95 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
96 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
97 import org.simantics.db.impl.service.QueryDebug;
98 import org.simantics.db.impl.support.VirtualGraphServerSupport;
99 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
100 import org.simantics.db.procedure.AsyncListener;
101 import org.simantics.db.procedure.AsyncMultiListener;
102 import org.simantics.db.procedure.AsyncMultiProcedure;
103 import org.simantics.db.procedure.AsyncProcedure;
104 import org.simantics.db.procedure.Listener;
105 import org.simantics.db.procedure.ListenerBase;
106 import org.simantics.db.procedure.MultiListener;
107 import org.simantics.db.procedure.MultiProcedure;
108 import org.simantics.db.procedure.Procedure;
109 import org.simantics.db.procedure.SyncListener;
110 import org.simantics.db.procedure.SyncMultiListener;
111 import org.simantics.db.procedure.SyncMultiProcedure;
112 import org.simantics.db.procedure.SyncProcedure;
113 import org.simantics.db.procore.cluster.ClusterImpl;
114 import org.simantics.db.procore.cluster.ClusterTraits;
115 import org.simantics.db.procore.protocol.Constants;
116 import org.simantics.db.procore.protocol.DebugPolicy;
117 import org.simantics.db.request.AsyncMultiRead;
118 import org.simantics.db.request.AsyncRead;
119 import org.simantics.db.request.DelayedWrite;
120 import org.simantics.db.request.DelayedWriteResult;
121 import org.simantics.db.request.ExternalRead;
122 import org.simantics.db.request.MultiRead;
123 import org.simantics.db.request.Read;
124 import org.simantics.db.request.ReadInterface;
125 import org.simantics.db.request.Write;
126 import org.simantics.db.request.WriteInterface;
127 import org.simantics.db.request.WriteOnly;
128 import org.simantics.db.request.WriteOnlyResult;
129 import org.simantics.db.request.WriteResult;
130 import org.simantics.db.request.WriteTraits;
131 import org.simantics.db.service.ByteReader;
132 import org.simantics.db.service.ClusterBuilder;
133 import org.simantics.db.service.ClusterBuilderFactory;
134 import org.simantics.db.service.ClusterControl;
135 import org.simantics.db.service.ClusterSetsSupport;
136 import org.simantics.db.service.ClusterUID;
137 import org.simantics.db.service.ClusteringSupport;
138 import org.simantics.db.service.CollectionSupport;
139 import org.simantics.db.service.DebugSupport;
140 import org.simantics.db.service.DirectQuerySupport;
141 import org.simantics.db.service.GraphChangeListenerSupport;
142 import org.simantics.db.service.InitSupport;
143 import org.simantics.db.service.LifecycleSupport;
144 import org.simantics.db.service.ManagementSupport;
145 import org.simantics.db.service.QueryControl;
146 import org.simantics.db.service.SerialisationSupport;
147 import org.simantics.db.service.ServerInformation;
148 import org.simantics.db.service.ServiceActivityMonitor;
149 import org.simantics.db.service.SessionEventSupport;
150 import org.simantics.db.service.SessionMonitorSupport;
151 import org.simantics.db.service.SessionUserSupport;
152 import org.simantics.db.service.StatementSupport;
153 import org.simantics.db.service.TransactionPolicySupport;
154 import org.simantics.db.service.TransactionSupport;
155 import org.simantics.db.service.TransferableGraphSupport;
156 import org.simantics.db.service.UndoRedoSupport;
157 import org.simantics.db.service.VirtualGraphSupport;
158 import org.simantics.db.service.XSupport;
159 import org.simantics.layer0.Layer0;
160 import org.simantics.utils.DataContainer;
161 import org.simantics.utils.Development;
162 import org.simantics.utils.threads.logger.ITask;
163 import org.simantics.utils.threads.logger.ThreadLogger;
165 import gnu.trove.procedure.TLongProcedure;
166 import gnu.trove.set.hash.TLongHashSet;
169 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
171 protected static final boolean DEBUG = false;
173 private static final boolean DIAGNOSTICS = false;
175 private TransactionPolicySupport transactionPolicy;
177 final private ClusterControl clusterControl;
179 final protected BuiltinSupportImpl builtinSupport;
180 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
181 final protected ClusterSetsSupport clusterSetsSupport;
182 final protected LifecycleSupportImpl lifecycleSupport;
184 protected QuerySupportImpl querySupport;
185 protected ResourceSupportImpl resourceSupport;
186 protected WriteSupport writeSupport;
187 public ClusterTranslator clusterTranslator;
189 boolean dirtyPrimitives = false;
191 public static final int SERVICE_MODE_CREATE = 2;
192 public static final int SERVICE_MODE_ALLOW = 1;
193 public int serviceMode = 0;
194 public boolean createdImmutableClusters = false;
195 public TLongHashSet createdClusters = new TLongHashSet();
200 * The service locator maintained by the workbench. These services are
201 * initialized during workbench during the <code>init</code> method.
203 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
205 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
207 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
209 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
211 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
213 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
215 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
217 final protected State state = new State();
219 protected GraphSession graphSession = null;
221 protected SessionManager sessionManagerImpl = null;
223 protected UserAuthenticationAgent authAgent = null;
225 protected UserAuthenticator authenticator = null;
227 protected Resource user = null;
229 protected ClusterStream clusterStream = null;
231 protected SessionRequestManager requestManager = null;
233 public ClusterTable clusterTable = null;
235 public QueryProcessor queryProvider2 = null;
237 ClientChangesImpl clientChanges = null;
239 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
241 // protected long newClusterId = Constants.NullClusterId;
243 protected int flushCounter = 0;
245 protected boolean writeOnly = false;
247 WriteState<?> writeState = null;
248 WriteStateBase<?> delayedWriteState = null;
249 protected Resource defaultClusterSet = null; // If not null then used for newResource().
251 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
253 // if (authAgent == null)
254 // throw new IllegalArgumentException("null authentication agent");
256 File t = StaticSessionProperties.virtualGraphStoragePath;
259 this.clusterTable = new ClusterTable(this, t);
260 this.builtinSupport = new BuiltinSupportImpl(this);
261 this.sessionManagerImpl = sessionManagerImpl;
263 // this.authAgent = authAgent;
265 serviceLocator.registerService(Session.class, this);
266 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
267 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
268 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
269 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
270 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
271 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
272 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
273 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
274 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
275 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
276 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
277 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
278 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
279 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
280 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
281 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
282 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
283 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
284 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
285 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
286 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
287 ServiceActivityUpdaterForWriteTransactions.register(this);
289 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
290 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
291 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
292 this.lifecycleSupport = new LifecycleSupportImpl(this);
293 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
294 this.transactionPolicy = new TransactionPolicyRelease();
295 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
296 this.clusterControl = new ClusterControlImpl(this);
297 serviceLocator.registerService(ClusterControl.class, clusterControl);
298 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
299 this.clusterSetsSupport.setReadDirectory(t.toPath());
300 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
301 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
311 public Resource getRootLibrary() {
312 return queryProvider2.getRootLibraryResource();
315 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
316 if (!graphSession.dbSession.refreshEnabled())
319 getClusterTable().refresh(csid, this, clusterUID);
320 } catch (Throwable t) {
321 Logger.defaultLogError("Refesh failed.", t);
325 static final class TaskHelper {
326 private final String name;
327 private Object result;
328 final Semaphore sema = new Semaphore(0);
329 private Throwable throwable = null;
330 final Consumer<DatabaseException> callback = e -> {
331 synchronized (TaskHelper.this) {
335 final Procedure<Object> proc = new Procedure<Object>() {
337 public void execute(Object result) {
338 callback.accept(null);
341 public void exception(Throwable t) {
342 if (t instanceof DatabaseException)
343 callback.accept((DatabaseException)t);
345 callback.accept(new DatabaseException("" + name + "operation failed.", t));
348 final WriteTraits writeTraits = new WriteTraits() {};
349 TaskHelper(String name) {
352 @SuppressWarnings("unchecked")
356 void setResult(Object result) {
357 this.result = result;
359 // Throwable throwableGet() {
362 synchronized void throwableSet(Throwable t) {
365 // void throwableSet(String t) {
366 // throwable = new InternalException("" + name + " operation failed. " + t);
368 synchronized void throwableCheck()
369 throws DatabaseException {
370 if (null != throwable)
371 if (throwable instanceof DatabaseException)
372 throw (DatabaseException)throwable;
374 throw new DatabaseException("Undo operation failed.", throwable);
376 void throw_(String message)
377 throws DatabaseException {
378 throw new DatabaseException("" + name + " operation failed. " + message);
384 private ListenerBase getListenerBase(Object procedure) {
385 if (procedure instanceof ListenerBase)
386 return (ListenerBase) procedure;
391 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
392 scheduleRequest(request, callback, notify, null);
396 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
399 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
401 assert (request != null);
403 if(Development.DEVELOPMENT) {
405 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
406 System.err.println("schedule write '" + request + "'");
407 } catch (Throwable t) {
408 Logger.defaultLogError(t);
412 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
414 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
417 public void run(int thread) {
419 if(Development.DEVELOPMENT) {
421 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
422 System.err.println("perform write '" + request + "'");
423 } catch (Throwable t) {
424 Logger.defaultLogError(t);
428 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
430 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
435 Disposable.safeDispose(clientChanges);
436 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
438 VirtualGraph vg = getProvider(request.getProvider());
440 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
441 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
444 public void execute(Object result) {
445 if(callback != null) callback.accept(null);
449 public void exception(Throwable t) {
450 if(callback != null) callback.accept((DatabaseException)t);
455 assert (null != writer);
456 // writer.state.barrier.inc();
459 request.perform(writer);
460 assert (null != writer);
461 } catch (Throwable t) {
462 if (!(t instanceof CancelTransactionException))
463 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
464 writeState.except(t);
466 // writer.state.barrier.dec();
467 // writer.waitAsync(request);
471 assert(!queryProvider2.cache.dirty);
473 } catch (Throwable e) {
475 // Log it first, just to be safe that the error is always logged.
476 if (!(e instanceof CancelTransactionException))
477 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
479 // writeState.getGraph().state.barrier.dec();
480 // writeState.getGraph().waitAsync(request);
482 writeState.except(e);
485 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
486 // // All we can do here is to log those, can't really pass them anywhere.
487 // if (callback != null) {
488 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
489 // else callback.run(new DatabaseException(e));
491 // } catch (Throwable e2) {
492 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
494 // boolean empty = clusterStream.reallyFlush();
495 // int callerThread = -1;
496 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
497 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
499 // // Send all local changes to server so it can calculate correct reverse change set.
500 // // This will call clusterStream.accept().
501 // state.cancelCommit(context, clusterStream);
503 // if (!context.isOk()) // this is a blocking operation
504 // throw new InternalException("Cancel failed. This should never happen. Contact application support.");
505 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
507 // state.cancelCommit2(context, clusterStream);
508 // } catch (DatabaseException e3) {
509 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
511 // clusterStream.setOff(false);
512 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
517 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
523 if(Development.DEVELOPMENT) {
525 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
526 System.err.println("finish write '" + request + "'");
527 } catch (Throwable t) {
528 Logger.defaultLogError(t);
538 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
539 scheduleRequest(request, procedure, notify, null);
543 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
546 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
548 assert (request != null);
550 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
552 requestManager.scheduleWrite(new SessionTask(request, thread) {
555 public void run(int thread) {
557 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
559 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
562 Disposable.safeDispose(clientChanges);
563 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
565 VirtualGraph vg = getProvider(request.getProvider());
566 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
569 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
570 writeState = writeStateT;
572 assert (null != writer);
573 // writer.state.barrier.inc();
574 writeStateT.setResult(request.perform(writer));
575 assert (null != writer);
577 // writer.state.barrier.dec();
578 // writer.waitAsync(null);
580 } catch (Throwable e) {
582 // writer.state.barrier.dec();
583 // writer.waitAsync(null);
585 writeState.except(e);
587 // state.stopWriteTransaction(clusterStream);
589 // } catch (Throwable e) {
590 // // Log it first, just to be safe that the error is always logged.
591 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
594 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
595 // // All we can do here is to log those, can't really pass them anywhere.
596 // if (procedure != null) {
597 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
598 // else procedure.exception(new DatabaseException(e));
600 // } catch (Throwable e2) {
601 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
604 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
606 // state.stopWriteTransaction(clusterStream);
609 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
612 // if(notify != null) notify.release();
622 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
623 scheduleRequest(request, callback, notify, null);
627 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
630 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
632 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
634 assert (request != null);
636 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
638 requestManager.scheduleWrite(new SessionTask(request, thread) {
641 public void run(int thread) {
642 fireSessionVariableChange(SessionVariables.QUEUED_READS);
644 Procedure<Object> stateProcedure = new Procedure<Object>() {
646 public void execute(Object result) {
647 if (callback != null)
648 callback.accept(null);
651 public void exception(Throwable t) {
652 if (callback != null) {
653 if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
654 else callback.accept(new DatabaseException(t));
656 Logger.defaultLogError("Unhandled exception", t);
660 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
661 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
662 DelayedWriteGraph dwg = null;
663 // newGraph.state.barrier.inc();
666 dwg = new DelayedWriteGraph(newGraph);
667 request.perform(dwg);
668 } catch (Throwable e) {
669 delayedWriteState.except(e);
674 // newGraph.state.barrier.dec();
675 // newGraph.waitAsync(request);
676 fireSessionVariableChange(SessionVariables.QUEUED_READS);
679 delayedWriteState = null;
681 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
682 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
685 Disposable.safeDispose(clientChanges);
686 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
690 VirtualGraph vg = getProvider(request.getProvider());
691 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
693 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
695 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
697 assert (null != writer);
698 // writer.state.barrier.inc();
702 dwg.commit(writer, request);
704 if(defaultClusterSet != null) {
705 XSupport xs = getService(XSupport.class);
706 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
707 ClusteringSupport cs = getService(ClusteringSupport.class);
708 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
711 // This makes clusters available from server
712 clusterStream.reallyFlush();
714 releaseWriteOnly(writer);
715 handleUpdatesAndMetadata(writer);
717 } catch (ServiceException e) {
718 // writer.state.barrier.dec();
719 // writer.waitAsync(null);
721 // These shall be requested from server
722 clusterTable.removeWriteOnlyClusters();
723 // This makes clusters available from server
724 clusterStream.reallyFlush();
726 releaseWriteOnly(writer);
727 writeState.except(e);
729 // Debugging & Profiling
730 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
740 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
741 scheduleRequest(request, procedure, notify, null);
745 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
748 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
749 throw new Error("Not implemented");
752 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
753 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
754 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
755 createdClusters.add(cluster.clusterId);
760 class WriteOnlySupport implements WriteSupport {
762 ClusterStream stream;
763 ClusterImpl currentCluster;
765 public WriteOnlySupport() {
766 this.stream = clusterStream;
770 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
771 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
775 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
777 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
779 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
780 if(s != queryProvider2.getRootLibrary())
781 throw new ImmutableException("Trying to modify immutable resource key=" + s);
784 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
785 } catch (DatabaseException e) {
786 Logger.defaultLogError(e);
790 clientChanges.invalidate(s);
792 if (cluster.isWriteOnly())
794 queryProvider2.updateStatements(s, p);
799 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
800 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
804 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
806 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
808 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
809 } catch (DatabaseException e) {
810 Logger.defaultLogError(e);
813 clientChanges.invalidate(rid);
815 if (cluster.isWriteOnly())
817 queryProvider2.updateValue(rid);
822 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
825 claimValue(provider,resource, reader.readBytes(null, amount));
829 byte[] bytes = new byte[65536];
831 int rid = ((ResourceImpl)resource).id;
832 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
836 int block = Math.min(left, 65536);
837 reader.readBytes(bytes, block);
838 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
841 } catch (DatabaseException e) {
842 Logger.defaultLogError(e);
845 clientChanges.invalidate(rid);
847 if (cluster.isWriteOnly())
849 queryProvider2.updateValue(rid);
853 private void maintainCluster(ClusterImpl before, ClusterI after_) {
854 if(after_ != null && after_ != before) {
855 ClusterImpl after = (ClusterImpl)after_;
856 if(currentCluster == before) {
857 currentCluster = after;
859 clusterTable.replaceCluster(after);
863 public int createResourceKey(int foreignCounter) throws DatabaseException {
864 if(currentCluster == null) {
865 currentCluster = getNewResourceCluster();
867 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
868 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
869 newCluster.foreignLookup = new byte[foreignCounter];
870 currentCluster = newCluster;
872 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
874 return currentCluster.createResource(clusterTranslator);
878 public Resource createResource(VirtualGraph provider) throws DatabaseException {
879 if(currentCluster == null) {
880 if (null != defaultClusterSet) {
881 ResourceImpl result = getNewResource(defaultClusterSet);
882 currentCluster = clusterTable.getClusterByResourceKey(result.id);
885 currentCluster = getNewResourceCluster();
888 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
889 if (null != defaultClusterSet) {
890 ResourceImpl result = getNewResource(defaultClusterSet);
891 currentCluster = clusterTable.getClusterByResourceKey(result.id);
894 currentCluster = getNewResourceCluster();
897 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
901 public Resource createResource(VirtualGraph provider, long clusterId)
902 throws DatabaseException {
903 return getNewResource(clusterId);
907 public Resource createResource(VirtualGraph provider, Resource clusterSet)
908 throws DatabaseException {
909 return getNewResource(clusterSet);
913 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
914 throws DatabaseException {
915 getNewClusterSet(clusterSet);
919 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
920 throws ServiceException {
921 return containsClusterSet(clusterSet);
924 public void selectCluster(long cluster) {
925 currentCluster = clusterTable.getClusterByClusterId(cluster);
926 long setResourceId = clusterSetsSupport.getSet(cluster);
927 clusterSetsSupport.put(setResourceId, cluster);
931 public Resource setDefaultClusterSet(Resource clusterSet)
932 throws ServiceException {
933 Resource result = setDefaultClusterSet4NewResource(clusterSet);
934 if(clusterSet != null) {
935 long id = clusterSetsSupport.get(clusterSet.getResourceId());
936 currentCluster = clusterTable.getClusterByClusterId(id);
939 currentCluster = null;
945 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
947 provider = getProvider(provider);
948 if (null == provider) {
949 int key = ((ResourceImpl)resource).id;
953 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
954 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
955 // if(key != queryProvider2.getRootLibrary())
956 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
959 // cluster.removeValue(key, clusterTranslator);
960 // } catch (DatabaseException e) {
961 // Logger.defaultLogError(e);
965 clusterTable.writeOnlyInvalidate(cluster);
968 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
969 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
970 clusterTranslator.removeValue(cluster);
971 } catch (DatabaseException e) {
972 Logger.defaultLogError(e);
975 queryProvider2.invalidateResource(key);
976 clientChanges.invalidate(key);
979 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
980 queryProvider2.updateValue(querySupport.getId(resource));
981 clientChanges.claimValue(resource);
988 public void flush(boolean intermediate) {
989 throw new UnsupportedOperationException();
993 public void flushCluster() {
994 clusterTable.flushCluster(graphSession);
995 if(defaultClusterSet != null) {
996 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
998 currentCluster = null;
1002 public void flushCluster(Resource r) {
1003 throw new UnsupportedOperationException("flushCluster resource " + r);
1011 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1014 int s = ((ResourceImpl)subject).id;
1015 int p = ((ResourceImpl)predicate).id;
1016 int o = ((ResourceImpl)object).id;
1018 provider = getProvider(provider);
1019 if (null == provider) {
1021 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1022 clusterTable.writeOnlyInvalidate(cluster);
1026 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1027 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1028 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1030 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1031 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1033 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1034 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1035 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1036 clusterTranslator.removeStatement(cluster);
1038 queryProvider2.invalidateResource(s);
1039 clientChanges.invalidate(s);
1041 } catch (DatabaseException e) {
1043 Logger.defaultLogError(e);
1051 ((VirtualGraphImpl)provider).deny(s, p, o);
1052 queryProvider2.invalidateResource(s);
1053 clientChanges.invalidate(s);
1062 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1063 throw new UnsupportedOperationException();
1067 public boolean writeOnly() {
1072 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1073 writeSupport.performWriteRequest(graph, request);
1077 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1078 throw new UnsupportedOperationException();
1082 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1083 throw new UnsupportedOperationException();
1087 public <T> void addMetadata(Metadata data) throws ServiceException {
1088 writeSupport.addMetadata(data);
1092 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1093 return writeSupport.getMetadata(clazz);
1097 public TreeMap<String, byte[]> getMetadata() {
1098 return writeSupport.getMetadata();
1102 public void commitDone(WriteTraits writeTraits, long csid) {
1103 writeSupport.commitDone(writeTraits, csid);
1107 public void clearUndoList(WriteTraits writeTraits) {
1108 writeSupport.clearUndoList(writeTraits);
1111 public int clearMetadata() {
1112 return writeSupport.clearMetadata();
1116 public void startUndo() {
1117 writeSupport.startUndo();
1121 class VirtualWriteOnlySupport implements WriteSupport {
1124 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1125 // Resource object) {
1126 // throw new UnsupportedOperationException();
1130 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1132 TransientGraph impl = (TransientGraph)provider;
1133 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1134 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1135 clientChanges.claim(subject, predicate, object);
1140 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1142 TransientGraph impl = (TransientGraph)provider;
1143 impl.claim(subject, predicate, object);
1144 getQueryProvider2().updateStatements(subject, predicate);
1145 clientChanges.claim(subject, predicate, object);
1150 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1151 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1155 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1156 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1157 getQueryProvider2().updateValue(resource);
1158 clientChanges.claimValue(resource);
1162 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1163 byte[] value = reader.readBytes(null, amount);
1164 claimValue(provider, resource, value);
1168 public Resource createResource(VirtualGraph provider) {
1169 TransientGraph impl = (TransientGraph)provider;
1170 return impl.getResource(impl.newResource(false));
1174 public Resource createResource(VirtualGraph provider, long clusterId) {
1175 throw new UnsupportedOperationException();
1179 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1180 throws DatabaseException {
1181 throw new UnsupportedOperationException();
1185 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1186 throws DatabaseException {
1187 throw new UnsupportedOperationException();
1191 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1192 throws ServiceException {
1193 throw new UnsupportedOperationException();
1197 public Resource setDefaultClusterSet(Resource clusterSet)
1198 throws ServiceException {
1203 public void denyValue(VirtualGraph provider, Resource resource) {
1204 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1205 getQueryProvider2().updateValue(querySupport.getId(resource));
1206 // NOTE: this only keeps track of value changes by-resource.
1207 clientChanges.claimValue(resource);
1211 public void flush(boolean intermediate) {
1212 throw new UnsupportedOperationException();
1216 public void flushCluster() {
1217 throw new UnsupportedOperationException();
1221 public void flushCluster(Resource r) {
1222 throw new UnsupportedOperationException("Resource " + r);
1230 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1231 TransientGraph impl = (TransientGraph) provider;
1232 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1233 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1234 clientChanges.deny(subject, predicate, object);
1239 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1240 throw new UnsupportedOperationException();
1244 public boolean writeOnly() {
1249 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1250 throw new UnsupportedOperationException();
1254 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1255 throw new UnsupportedOperationException();
1259 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1260 throw new UnsupportedOperationException();
1264 public <T> void addMetadata(Metadata data) throws ServiceException {
1265 throw new UnsupportedOperationException();
1269 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1270 throw new UnsupportedOperationException();
1274 public TreeMap<String, byte[]> getMetadata() {
1275 throw new UnsupportedOperationException();
1279 public void commitDone(WriteTraits writeTraits, long csid) {
1283 public void clearUndoList(WriteTraits writeTraits) {
1287 public int clearMetadata() {
1292 public void startUndo() {
1296 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1300 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1302 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1305 Disposable.safeDispose(clientChanges);
1306 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1310 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1312 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1314 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1315 writeState = writeStateT;
1317 assert (null != writer);
1318 // writer.state.barrier.inc();
1319 long start = System.nanoTime();
1320 T result = request.perform(writer);
1321 long duration = System.nanoTime() - start;
1323 System.err.println("################");
1324 System.err.println("WriteOnly duration " + 1e-9*duration);
1326 writeStateT.setResult(result);
1328 // This makes clusters available from server
1329 clusterStream.reallyFlush();
1331 // This will trigger query updates
1332 releaseWriteOnly(writer);
1333 assert (null != writer);
1335 handleUpdatesAndMetadata(writer);
1337 } catch (CancelTransactionException e) {
1339 releaseWriteOnly(writeState.getGraph());
1341 clusterTable.removeWriteOnlyClusters();
1342 state.stopWriteTransaction(clusterStream);
1344 } catch (Throwable e) {
1346 e.printStackTrace();
1348 releaseWriteOnly(writeState.getGraph());
1350 clusterTable.removeWriteOnlyClusters();
1352 if (callback != null)
1353 callback.exception(new DatabaseException(e));
1355 state.stopWriteTransaction(clusterStream);
1356 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1360 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1366 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1367 scheduleRequest(request, callback, notify, null);
1371 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1375 assert (request != null);
1377 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1379 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1382 public void run(int thread) {
1384 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1388 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1391 Disposable.safeDispose(clientChanges);
1392 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1396 VirtualGraph vg = getProvider(request.getProvider());
1397 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1399 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1401 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1404 public void execute(Object result) {
1405 if(callback != null) callback.accept(null);
1409 public void exception(Throwable t) {
1410 if(callback != null) callback.accept((DatabaseException)t);
1415 assert (null != writer);
1416 // writer.state.barrier.inc();
1420 request.perform(writer);
1422 } catch (Throwable e) {
1424 // writer.state.barrier.dec();
1425 // writer.waitAsync(null);
1427 releaseWriteOnly(writer);
1429 clusterTable.removeWriteOnlyClusters();
1431 if(!(e instanceof CancelTransactionException)) {
1432 if (callback != null)
1433 callback.accept(new DatabaseException(e));
1436 writeState.except(e);
1442 // This makes clusters available from server
1443 boolean empty = clusterStream.reallyFlush();
1444 // This was needed to make WO requests call metadata listeners.
1445 // NOTE: the calling event does not contain clientChanges information.
1446 if (!empty && clientChanges.isEmpty())
1447 clientChanges.setNotEmpty(true);
1448 releaseWriteOnly(writer);
1449 assert (null != writer);
1452 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1465 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1466 scheduleRequest(request, callback, notify, null);
1470 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1473 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1475 assert (request != null);
1477 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1479 requestManager.scheduleWrite(new SessionTask(request, thread) {
1482 public void run(int thread) {
1484 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1486 performWriteOnly(request, notify, callback);
1496 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1498 assert (request != null);
1499 assert (procedure != null);
1501 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1503 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1506 public void run(int thread) {
1508 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1510 ListenerBase listener = getListenerBase(procedure);
1512 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1516 if (listener != null) {
1520 AsyncProcedure ap = new AsyncProcedure<T>() {
1523 public void exception(AsyncReadGraph graph, Throwable t) {
1524 procedure.exception(graph, t);
1525 if(throwable != null) {
1528 // ErrorLogger.defaultLogError("Unhandled exception", t);
1533 public void execute(AsyncReadGraph graph, T t) {
1534 if(result != null) result.set(t);
1535 procedure.execute(graph, t);
1540 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap);
1542 } catch (Throwable t) {
1543 // This is handled by the AsyncProcedure
1544 //Logger.defaultLogError("Internal error", t);
1551 // newGraph.state.barrier.inc();
1553 T t = request.perform(newGraph);
1557 if(result != null) result.set(t);
1558 procedure.execute(newGraph, t);
1560 } catch (Throwable th) {
1562 if(throwable != null) {
1565 Logger.defaultLogError("Unhandled exception", th);
1570 } catch (Throwable t) {
1573 t.printStackTrace();
1575 if(throwable != null) {
1578 Logger.defaultLogError("Unhandled exception", t);
1583 procedure.exception(newGraph, t);
1585 } catch (Throwable t2) {
1587 if(throwable != null) {
1590 Logger.defaultLogError("Unhandled exception", t2);
1597 // newGraph.state.barrier.dec();
1598 // newGraph.waitAsync(request);
1604 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1614 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1616 assert (request != null);
1617 assert (procedure != null);
1619 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1621 requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1624 public void run(int thread) {
1626 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1628 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1632 if (listener != null) {
1635 QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure);
1636 //newGraph.processor.query(newGraph, request, null, procedure, listener);
1637 } catch (DatabaseException e) {
1638 Logger.defaultLogError(e);
1643 final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1644 procedure, "request");
1648 // newGraph.state.barrier.inc();
1650 request.perform(newGraph, wrapper);
1652 // newGraph.waitAsync(request);
1654 } catch (Throwable t) {
1656 wrapper.exception(newGraph, t);
1657 // newGraph.waitAsync(request);
1666 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1676 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1678 assert (request != null);
1679 assert (procedure != null);
1681 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1683 int sync = notify != null ? thread : -1;
1685 requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1688 public void run(int thread) {
1690 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1692 ListenerBase listener = getListenerBase(procedure);
1694 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1698 if (listener != null) {
1700 newGraph.processor.query(newGraph, request, null, procedure, listener);
1702 // newGraph.waitAsync(request);
1706 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1710 request.perform(newGraph, wrapper);
1712 } catch (Throwable t) {
1714 t.printStackTrace();
1722 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1732 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1734 assert (request != null);
1735 assert (procedure != null);
1737 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1739 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1742 public void run(int thread) {
1744 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1746 ListenerBase listener = getListenerBase(procedure);
1748 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1752 if (listener != null) {
1755 QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1756 } catch (DatabaseException e) {
1757 Logger.defaultLogError(e);
1761 // newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1764 // public void exception(Throwable t) {
1765 // procedure.exception(t);
1766 // if(throwable != null) {
1767 // throwable.set(t);
1772 // public void execute(T t) {
1773 // if(result != null) result.set(t);
1774 // procedure.execute(t);
1779 // newGraph.waitAsync(request);
1783 // newGraph.state.barrier.inc();
1785 request.register(newGraph, new Listener<T>() {
1788 public void exception(Throwable t) {
1789 if(throwable != null) throwable.set(t);
1790 procedure.exception(t);
1791 // newGraph.state.barrier.dec();
1795 public void execute(T t) {
1796 if(result != null) result.set(t);
1797 procedure.execute(t);
1798 // newGraph.state.barrier.dec();
1802 public boolean isDisposed() {
1808 // newGraph.waitAsync(request);
1814 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1826 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1828 scheduleRequest(request, procedure, null, null, null);
1833 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1835 Semaphore notify = new Semaphore(0);
1836 DataContainer<Throwable> container = new DataContainer<Throwable>();
1837 DataContainer<T> result = new DataContainer<T>();
1838 scheduleRequest(request, procedure, notify, container, result);
1839 acquire(notify, request);
1840 Throwable throwable = container.get();
1841 if(throwable != null) {
1842 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1843 else throw new DatabaseException("Unexpected exception", throwable);
1845 return result.get();
1849 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1851 Semaphore notify = new Semaphore(0);
1852 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1853 final DataContainer<T> resultContainer = new DataContainer<T>();
1854 scheduleRequest(request, new AsyncProcedure<T>() {
1856 public void exception(AsyncReadGraph graph, Throwable throwable) {
1857 exceptionContainer.set(throwable);
1858 procedure.exception(graph, throwable);
1861 public void execute(AsyncReadGraph graph, T result) {
1862 resultContainer.set(result);
1863 procedure.execute(graph, result);
1865 }, getListenerBase(procedure), notify);
1866 acquire(notify, request, procedure);
1867 Throwable throwable = exceptionContainer.get();
1868 if (throwable != null) {
1869 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1870 else throw new DatabaseException("Unexpected exception", throwable);
1872 return resultContainer.get();
1877 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1879 Semaphore notify = new Semaphore(0);
1880 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1881 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1883 public void exception(AsyncReadGraph graph, Throwable throwable) {
1884 exceptionContainer.set(throwable);
1885 procedure.exception(graph, throwable);
1888 public void execute(AsyncReadGraph graph, T result) {
1889 procedure.execute(graph, result);
1892 public void finished(AsyncReadGraph graph) {
1893 procedure.finished(graph);
1896 acquire(notify, request, procedure);
1897 Throwable throwable = exceptionContainer.get();
1898 if (throwable != null) {
1899 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1900 else throw new DatabaseException("Unexpected exception", throwable);
1902 // TODO: implement return value
1903 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1908 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1909 return syncRequest(request, new ProcedureAdapter<T>());
1912 // assert(request != null);
1914 // final DataContainer<T> result = new DataContainer<T>();
1915 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1917 // syncRequest(request, new Procedure<T>() {
1920 // public void execute(T t) {
1925 // public void exception(Throwable t) {
1926 // exception.set(t);
1931 // Throwable t = exception.get();
1933 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1934 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1937 // return result.get();
1942 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1943 return syncRequest(request, (Procedure<T>)procedure);
1948 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1949 // assertNotSession();
1950 // Semaphore notify = new Semaphore(0);
1951 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1952 // DataContainer<T> result = new DataContainer<T>();
1953 // scheduleRequest(request, procedure, notify, container, result);
1954 // acquire(notify, request);
1955 // Throwable throwable = container.get();
1956 // if(throwable != null) {
1957 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1958 // else throw new DatabaseException("Unexpected exception", throwable);
1960 // return result.get();
1965 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1967 Semaphore notify = new Semaphore(0);
1968 final DataContainer<Throwable> container = new DataContainer<Throwable>();
1969 final DataContainer<T> result = new DataContainer<T>();
1970 scheduleRequest(request, procedure, notify, container, result);
1971 acquire(notify, request);
1972 Throwable throwable = container.get();
1973 if (throwable != null) {
1974 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1975 else throw new DatabaseException("Unexpected exception", throwable);
1977 return result.get();
1981 public void syncRequest(Write request) throws DatabaseException {
1984 Semaphore notify = new Semaphore(0);
1985 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1986 scheduleRequest(request, e -> exception.set(e), notify);
1987 acquire(notify, request);
1988 if(exception.get() != null) throw exception.get();
1992 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1994 Semaphore notify = new Semaphore(0);
1995 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1996 final DataContainer<T> result = new DataContainer<T>();
1997 scheduleRequest(request, new Procedure<T>() {
2000 public void exception(Throwable t) {
2005 public void execute(T t) {
2010 acquire(notify, request);
2011 if(exception.get() != null) {
2012 Throwable t = exception.get();
2013 if(t instanceof DatabaseException) throw (DatabaseException)t;
2014 else throw new DatabaseException(t);
2016 return result.get();
2020 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2022 Semaphore notify = new Semaphore(0);
2023 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2024 final DataContainer<T> result = new DataContainer<T>();
2025 scheduleRequest(request, new Procedure<T>() {
2028 public void exception(Throwable t) {
2033 public void execute(T t) {
2038 acquire(notify, request);
2039 if(exception.get() != null) {
2040 Throwable t = exception.get();
2041 if(t instanceof DatabaseException) throw (DatabaseException)t;
2042 else throw new DatabaseException(t);
2044 return result.get();
2048 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2050 Semaphore notify = new Semaphore(0);
2051 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2052 final DataContainer<T> result = new DataContainer<T>();
2053 scheduleRequest(request, new Procedure<T>() {
2056 public void exception(Throwable t) {
2061 public void execute(T t) {
2066 acquire(notify, request);
2067 if(exception.get() != null) {
2068 Throwable t = exception.get();
2069 if(t instanceof DatabaseException) throw (DatabaseException)t;
2070 else throw new DatabaseException(t);
2072 return result.get();
2076 public void syncRequest(DelayedWrite request) throws DatabaseException {
2078 Semaphore notify = new Semaphore(0);
2079 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2080 scheduleRequest(request, e -> exception.set(e), notify);
2081 acquire(notify, request);
2082 if(exception.get() != null) throw exception.get();
2086 public void syncRequest(WriteOnly request) throws DatabaseException {
2089 Semaphore notify = new Semaphore(0);
2090 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2091 scheduleRequest(request, e -> exception.set(e), notify);
2092 acquire(notify, request);
2093 if(exception.get() != null) throw exception.get();
2098 * GraphRequestProcessor interface
2102 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2104 scheduleRequest(request, procedure, null, null);
2109 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2111 scheduleRequest(request, procedure, null);
2116 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2118 scheduleRequest(request, procedure, null, null, null);
2123 public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2125 scheduleRequest(request, callback, null);
2130 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2132 scheduleRequest(request, procedure, null);
2137 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2139 scheduleRequest(request, procedure, null);
2144 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2146 scheduleRequest(request, procedure, null);
2151 public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2153 scheduleRequest(request, callback, null);
2158 public void asyncRequest(final Write r) {
2159 asyncRequest(r, null);
2163 public void asyncRequest(final DelayedWrite r) {
2164 asyncRequest(r, null);
2168 public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2170 scheduleRequest(request, callback, null);
2175 public void asyncRequest(final WriteOnly request) {
2177 asyncRequest(request, null);
2182 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2183 r.request(this, procedure);
2187 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2188 r.request(this, procedure);
2192 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2193 r.request(this, procedure);
2197 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2198 r.request(this, procedure);
2202 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2203 r.request(this, procedure);
2207 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2208 r.request(this, procedure);
2212 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2213 return r.request(this);
2217 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2218 return r.request(this);
2222 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2223 r.request(this, procedure);
2227 public <T> void async(WriteInterface<T> r) {
2228 r.request(this, new ProcedureAdapter<T>());
2232 public void incAsync() {
2237 public void decAsync() {
2241 public long getCluster(ResourceImpl resource) {
2242 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2243 return cluster.getClusterId();
2246 public long getCluster(int id) {
2247 if (clusterTable == null)
2248 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2249 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2252 public ResourceImpl getResource(int id) {
2253 return new ResourceImpl(resourceSupport, id);
2256 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2257 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2258 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2259 int key = proxy.getClusterKey();
2260 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2261 return new ResourceImpl(resourceSupport, resourceKey);
2264 public ResourceImpl getResource2(int id) {
2266 return new ResourceImpl(resourceSupport, id);
2269 final public int getId(ResourceImpl impl) {
2273 public static final Charset UTF8 = Charset.forName("utf-8");
2278 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2279 for(TransientGraph g : support.providers) {
2280 if(g.isPending(subject)) return false;
2285 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2286 for(TransientGraph g : support.providers) {
2287 if(g.isPending(subject, predicate)) return false;
2292 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2294 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2296 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2299 public void accept(ReadGraphImpl graph) {
2300 if(ready.decrementAndGet() == 0) {
2301 runnable.accept(graph);
2307 for(TransientGraph g : support.providers) {
2308 if(g.isPending(subject)) {
2310 g.load(graph, subject, composite);
2311 } catch (DatabaseException e) {
2312 e.printStackTrace();
2315 composite.accept(graph);
2319 composite.accept(graph);
2323 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2325 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2327 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2330 public void accept(ReadGraphImpl graph) {
2331 if(ready.decrementAndGet() == 0) {
2332 runnable.accept(graph);
2338 for(TransientGraph g : support.providers) {
2339 if(g.isPending(subject, predicate)) {
2341 g.load(graph, subject, predicate, composite);
2342 } catch (DatabaseException e) {
2343 e.printStackTrace();
2346 composite.accept(graph);
2350 composite.accept(graph);
2354 // void dumpHeap() {
2357 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2358 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2359 // "d:/heap" + flushCounter + ".txt", true);
2360 // } catch (IOException e) {
2361 // e.printStackTrace();
2366 void fireReactionsToSynchronize(ChangeSet cs) {
2368 // Do not fire empty events
2372 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2373 // g.state.barrier.inc();
2377 if (!cs.isEmpty()) {
2378 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2379 for (ChangeListener l : changeListeners2) {
2382 } catch (Exception ex) {
2383 ex.printStackTrace();
2390 // g.state.barrier.dec();
2391 // g.waitAsync(null);
2397 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2400 // Do not fire empty events
2405 // graph.state.barrier.inc();
2409 if (!cs2.isEmpty()) {
2411 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2412 for (ChangeListener l : changeListeners2) {
2415 // System.out.println("changelistener " + l);
2416 } catch (Exception ex) {
2417 ex.printStackTrace();
2425 // graph.state.barrier.dec();
2426 // graph.waitAsync(null);
2429 } catch (Throwable t) {
2430 t.printStackTrace();
2434 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2438 // Do not fire empty events
2442 // Do not fire on virtual requests
2443 if(graph.getProvider() != null)
2446 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2450 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2451 for (ChangeListener l : metadataListeners) {
2454 } catch (Throwable ex) {
2455 ex.printStackTrace();
2463 } catch (Throwable t) {
2464 t.printStackTrace();
2473 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2476 public <T> T getService(Class<T> api) {
2477 T t = peekService(api);
2479 if (state.isClosed())
2480 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2481 throw new ServiceNotFoundException(this, api);
2489 protected abstract ServerInformation getCachedServerInformation();
2491 private Class<?> serviceKey1 = null;
2492 private Class<?> serviceKey2 = null;
2493 private Object service1 = null;
2494 private Object service2 = null;
2500 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2502 @SuppressWarnings("unchecked")
2504 public synchronized <T> T peekService(Class<T> api) {
2506 if(serviceKey1 == api) {
2508 } else if (serviceKey2 == api) {
2510 Object result = service2;
2511 service2 = service1;
2512 serviceKey2 = serviceKey1;
2518 if (Layer0.class == api)
2520 if (ServerInformation.class == api)
2521 return (T) getCachedServerInformation();
2522 else if (WriteGraphImpl.class == api)
2523 return (T) writeState.getGraph();
2524 else if (ClusterBuilder.class == api)
2525 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2526 else if (ClusterBuilderFactory.class == api)
2527 return (T)new ClusterBuilderFactoryImpl(this);
2529 service2 = service1;
2530 serviceKey2 = serviceKey1;
2532 service1 = serviceLocator.peekService(api);
2543 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2546 public boolean hasService(Class<?> api) {
2547 return serviceLocator.hasService(api);
2551 * @param api the api that must be implemented by the specified service
2552 * @param service the service implementation
2555 public <T> void registerService(Class<T> api, T service) {
2556 if(Layer0.class == api) {
2557 L0 = (Layer0)service;
2560 serviceLocator.registerService(api, service);
2561 if (TransactionPolicySupport.class == api) {
2562 transactionPolicy = (TransactionPolicySupport)service;
2563 state.resetTransactionPolicy();
2565 if (api == serviceKey1)
2567 else if (api == serviceKey2)
2575 void fireSessionVariableChange(String variable) {
2576 for (MonitorHandler h : monitorHandlers) {
2577 MonitorContext ctx = monitorContexts.getLeft(h);
2579 // SafeRunner functionality repeated here to avoid dependency.
2581 h.valuesChanged(ctx);
2582 } catch (Exception e) {
2583 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2584 } catch (LinkageError e) {
2585 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2591 class ResourceSerializerImpl implements ResourceSerializer {
2593 public long createRandomAccessId(int id) throws DatabaseException {
2596 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2597 long cluster = getCluster(id);
2599 return 0; // Better to return 0 then invalid id.
2600 long result = ClusterTraitsBase.createResourceId(cluster, index);
2605 public long getRandomAccessId(Resource resource) throws DatabaseException {
2607 ResourceImpl resourceImpl = (ResourceImpl) resource;
2608 return createRandomAccessId(resourceImpl.id);
2613 public int getTransientId(Resource resource) throws DatabaseException {
2615 ResourceImpl resourceImpl = (ResourceImpl) resource;
2616 return resourceImpl.id;
2621 public String createRandomAccessId(Resource resource)
2622 throws InvalidResourceReferenceException {
2624 if(resource == null) throw new IllegalArgumentException();
2626 ResourceImpl resourceImpl = (ResourceImpl) resource;
2628 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2632 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2633 } catch (DatabaseException e1) {
2634 throw new InvalidResourceReferenceException(e1);
2637 // Serialize as '<resource index>_<cluster id>'
2638 return "" + r + "_" + getCluster(resourceImpl);
2639 } catch (Throwable e) {
2640 e.printStackTrace();
2641 throw new InvalidResourceReferenceException(e);
2646 public int getTransientId(long serialized) throws DatabaseException {
2647 if (serialized <= 0)
2648 return (int)serialized;
2649 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2650 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2651 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2652 if (cluster == null)
2653 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2654 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2659 public Resource getResource(long randomAccessId) throws DatabaseException {
2660 return getResourceByKey(getTransientId(randomAccessId));
2664 public Resource getResource(int transientId) throws DatabaseException {
2665 return getResourceByKey(transientId);
2669 public Resource getResource(String randomAccessId)
2670 throws InvalidResourceReferenceException {
2672 int i = randomAccessId.indexOf('_');
2674 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2675 + randomAccessId + "'");
2676 int r = Integer.parseInt(randomAccessId.substring(0, i));
2677 if(r < 0) return getResourceByKey(r);
2678 long c = Long.parseLong(randomAccessId.substring(i + 1));
2679 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2680 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2681 if (cluster.hasResource(key, clusterTranslator))
2682 return getResourceByKey(key);
2683 } catch (InvalidResourceReferenceException e) {
2685 } catch (NumberFormatException e) {
2686 throw new InvalidResourceReferenceException(e);
2687 } catch (Throwable e) {
2688 e.printStackTrace();
2689 throw new InvalidResourceReferenceException(e);
2692 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2696 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2699 } catch (Throwable e) {
2700 e.printStackTrace();
2701 throw new InvalidResourceReferenceException(e);
2707 // Copied from old SessionImpl
2710 if (state.isClosed())
2711 throw new Error("Session closed.");
2716 public ResourceImpl getNewResource(long clusterId) {
2717 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2720 newId = cluster.createResource(clusterTranslator);
2721 } catch (DatabaseException e) {
2722 Logger.defaultLogError(e);
2725 return new ResourceImpl(resourceSupport, newId);
2728 public ResourceImpl getNewResource(Resource clusterSet)
2729 throws DatabaseException {
2730 long resourceId = clusterSet.getResourceId();
2731 Long clusterId = clusterSetsSupport.get(resourceId);
2732 if (null == clusterId)
2733 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2734 if (Constants.NewClusterId == clusterId) {
2735 clusterId = getService(ClusteringSupport.class).createCluster();
2736 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2737 createdClusters.add(clusterId);
2738 clusterSetsSupport.put(resourceId, clusterId);
2739 return getNewResource(clusterId);
2741 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2742 ResourceImpl result;
2743 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2744 clusterId = getService(ClusteringSupport.class).createCluster();
2745 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2746 createdClusters.add(clusterId);
2747 clusterSetsSupport.put(resourceId, clusterId);
2748 return getNewResource(clusterId);
2750 result = getNewResource(clusterId);
2751 int resultKey = querySupport.getId(result);
2752 long resultCluster = querySupport.getClusterId(resultKey);
2753 if (clusterId != resultCluster)
2754 clusterSetsSupport.put(resourceId, resultCluster);
2760 public void getNewClusterSet(Resource clusterSet)
2761 throws DatabaseException {
2763 System.out.println("new cluster set=" + clusterSet);
2764 long resourceId = clusterSet.getResourceId();
2765 if (clusterSetsSupport.containsKey(resourceId))
2766 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2767 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2769 public boolean containsClusterSet(Resource clusterSet)
2770 throws ServiceException {
2771 long resourceId = clusterSet.getResourceId();
2772 return clusterSetsSupport.containsKey(resourceId);
2774 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2775 Resource r = defaultClusterSet;
2776 defaultClusterSet = clusterSet;
2779 void printDiagnostics() {
2783 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2784 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2785 for(ClusterI cluster : clusterTable.getClusters()) {
2787 if (cluster.isLoaded() && !cluster.isEmpty()) {
2788 residentClusters.set(residentClusters.get() + 1);
2789 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2791 } catch (DatabaseException e) {
2792 Logger.defaultLogError(e);
2796 System.out.println("--------------------------------");
2797 System.out.println("Cluster information:");
2798 System.out.println("-amount of resident clusters=" + residentClusters.get());
2799 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2801 for(ClusterI cluster : clusterTable.getClusters()) {
2802 System.out.print("Cluster " + cluster.getClusterId() + " [");
2804 if (!cluster.isLoaded())
2805 System.out.println("not loaded]");
2806 else if (cluster.isEmpty())
2807 System.out.println("is empty]");
2809 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2810 System.out.print(",references=" + cluster.getReferenceCount());
2811 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2813 } catch (DatabaseException e) {
2814 Logger.defaultLogError(e);
2815 System.out.println("is corrupted]");
2819 queryProvider2.printDiagnostics();
2820 System.out.println("--------------------------------");
2825 public void fireStartReadTransaction() {
2828 System.out.println("StartReadTransaction");
2830 for (SessionEventListener listener : eventListeners) {
2832 listener.readTransactionStarted();
2833 } catch (Throwable t) {
2834 t.printStackTrace();
2840 public void fireFinishReadTransaction() {
2843 System.out.println("FinishReadTransaction");
2845 for (SessionEventListener listener : eventListeners) {
2847 listener.readTransactionFinished();
2848 } catch (Throwable t) {
2849 t.printStackTrace();
2855 public void fireStartWriteTransaction() {
2858 System.out.println("StartWriteTransaction");
2860 for (SessionEventListener listener : eventListeners) {
2862 listener.writeTransactionStarted();
2863 } catch (Throwable t) {
2864 t.printStackTrace();
2870 public void fireFinishWriteTransaction() {
2873 System.out.println("FinishWriteTransaction");
2875 for (SessionEventListener listener : eventListeners) {
2877 listener.writeTransactionFinished();
2878 } catch (Throwable t) {
2879 t.printStackTrace();
2883 Indexing.resetDependenciesIndexingDisabled();
2887 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2888 throw new Error("Not supported at the moment.");
2895 ClusterTable getClusterTable() {
2896 return clusterTable;
2899 public GraphSession getGraphSession() {
2900 return graphSession;
2903 public QueryProcessor getQueryProvider2() {
2904 return queryProvider2;
2907 ClientChangesImpl getClientChanges() {
2908 return clientChanges;
2911 boolean getWriteOnly() {
2915 static int counter = 0;
2917 public void onClusterLoaded(long clusterId) {
2919 clusterTable.updateSize();
2927 * Implementation of the interface RequestProcessor
2931 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2936 assert(request != null);
2938 final DataContainer<T> result = new DataContainer<T>();
2939 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2941 syncRequest(request, new AsyncProcedure<T>() {
2944 public void execute(AsyncReadGraph graph, T t) {
2949 public void exception(AsyncReadGraph graph, Throwable t) {
2955 Throwable t = exception.get();
2957 if(t instanceof DatabaseException) throw (DatabaseException)t;
2958 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2961 return result.get();
2966 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2967 // assertNotSession();
2968 // return syncRequest(request, (AsyncProcedure<T>)procedure);
2972 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2974 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2978 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2980 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2984 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2986 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2990 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2992 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2996 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3000 assert(request != null);
3002 final DataContainer<T> result = new DataContainer<T>();
3003 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3005 syncRequest(request, new AsyncProcedure<T>() {
3008 public void execute(AsyncReadGraph graph, T t) {
3013 public void exception(AsyncReadGraph graph, Throwable t) {
3019 Throwable t = exception.get();
3021 if(t instanceof DatabaseException) throw (DatabaseException)t;
3022 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3025 return result.get();
3030 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3032 return syncRequest(request, (AsyncProcedure<T>)procedure);
3036 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3038 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3042 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3044 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3048 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3050 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3054 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3056 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3060 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3064 assert(request != null);
3066 final ArrayList<T> result = new ArrayList<T>();
3067 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3069 syncRequest(request, new AsyncMultiProcedure<T>() {
3072 public void execute(AsyncReadGraph graph, T t) {
3073 synchronized(result) {
3079 public void finished(AsyncReadGraph graph) {
3083 public void exception(AsyncReadGraph graph, Throwable t) {
3090 Throwable t = exception.get();
3092 if(t instanceof DatabaseException) throw (DatabaseException)t;
3093 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3101 public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3103 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3107 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3109 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3113 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3115 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3119 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3121 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3125 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3127 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3131 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3135 assert(request != null);
3137 final ArrayList<T> result = new ArrayList<T>();
3138 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3140 syncRequest(request, new AsyncMultiProcedure<T>() {
3143 public void execute(AsyncReadGraph graph, T t) {
3144 synchronized(result) {
3150 public void finished(AsyncReadGraph graph) {
3154 public void exception(AsyncReadGraph graph, Throwable t) {
3161 Throwable t = exception.get();
3163 if(t instanceof DatabaseException) throw (DatabaseException)t;
3164 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3172 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3174 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3178 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3180 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3184 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3186 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3190 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3192 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3196 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3198 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3203 public <T> void asyncRequest(Read<T> request) {
3205 asyncRequest(request, new ProcedureAdapter<T>() {
3207 public void exception(Throwable t) {
3208 t.printStackTrace();
3215 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3216 asyncRequest(request, (AsyncProcedure<T>)procedure);
3220 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3221 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3225 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3226 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3230 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3231 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3235 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3236 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3240 final public <T> void asyncRequest(final AsyncRead<T> request) {
3242 assert(request != null);
3244 asyncRequest(request, new ProcedureAdapter<T>() {
3246 public void exception(Throwable t) {
3247 t.printStackTrace();
3254 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3255 scheduleRequest(request, procedure, procedure, null);
3259 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3260 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3264 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3265 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3269 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3270 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3274 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3275 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3279 public <T> void asyncRequest(MultiRead<T> request) {
3281 assert(request != null);
3283 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3285 public void exception(AsyncReadGraph graph, Throwable t) {
3286 t.printStackTrace();
3293 public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3294 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3298 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3299 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3303 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3304 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3308 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3309 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3313 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3314 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3318 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3320 assert(request != null);
3322 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3324 public void exception(AsyncReadGraph graph, Throwable t) {
3325 t.printStackTrace();
3332 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3333 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3337 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3338 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3342 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3343 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3347 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3348 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3352 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3353 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3357 public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3359 throw new Error("Not implemented!");
3363 public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3364 throw new Error("Not implemented!");
3368 final public <T> void asyncRequest(final ExternalRead<T> request) {
3370 assert(request != null);
3372 asyncRequest(request, new ProcedureAdapter<T>() {
3374 public void exception(Throwable t) {
3375 t.printStackTrace();
3382 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3383 asyncRequest(request, (Procedure<T>)procedure);
3388 void check(Throwable t) throws DatabaseException {
3390 if(t instanceof DatabaseException) throw (DatabaseException)t;
3391 else throw new DatabaseException("Unexpected exception", t);
3395 void check(DataContainer<Throwable> container) throws DatabaseException {
3396 Throwable t = container.get();
3398 if(t instanceof DatabaseException) throw (DatabaseException)t;
3399 else throw new DatabaseException("Unexpected exception", t);
3408 boolean sameProvider(Write request) {
3409 if(writeState.getGraph().provider != null) {
3410 return writeState.getGraph().provider.equals(request.getProvider());
3412 return request.getProvider() == null;
3416 boolean plainWrite(WriteGraphImpl graph) {
3417 if(graph == null) return false;
3418 if(graph.writeSupport.writeOnly()) return false;
3423 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3424 private void assertNotSession() throws DatabaseException {
3425 Thread current = Thread.currentThread();
3426 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3429 void assertAlive() {
3430 if (!state.isAlive())
3431 throw new RuntimeDatabaseException("Session has been shut down.");
3434 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3435 return querySupport.getValueStream(graph, querySupport.getId(resource));
3438 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3439 return querySupport.getValue(graph, querySupport.getId(resource));
3442 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3443 acquire(semaphore, request, null);
3446 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3450 // Loop until the semaphore is acquired reporting requests that take a long time.
3453 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3457 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3459 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3460 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3462 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3463 + (waitTime) + " ms. (procedure=" + procedure + ")");
3470 } catch (InterruptedException e) {
3471 e.printStackTrace();
3472 // FIXME: Should perhaps do something else in this case ??
3479 // public boolean holdOnToTransactionAfterCancel() {
3480 // return transactionPolicy.holdOnToTransactionAfterCancel();
3484 // public boolean holdOnToTransactionAfterCommit() {
3485 // return transactionPolicy.holdOnToTransactionAfterCommit();
3489 // public boolean holdOnToTransactionAfterRead() {
3490 // return transactionPolicy.holdOnToTransactionAfterRead();
3494 // public void onRelinquish() {
3495 // transactionPolicy.onRelinquish();
3499 // public void onRelinquishDone() {
3500 // transactionPolicy.onRelinquishDone();
3504 // public void onRelinquishError() {
3505 // transactionPolicy.onRelinquishError();
3510 public Session getSession() {
3515 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3516 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3518 public void ceased(int thread) {
3520 requestManager.ceased(thread);
3524 public int getAmountOfQueryThreads() {
3525 // This must be a power of two
3527 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3530 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3532 return new ResourceImpl(resourceSupport, key);
3536 public void acquireWriteOnly() {
3540 public void releaseWriteOnly(ReadGraphImpl graph) {
3542 queryProvider2.releaseWrite(graph);
3545 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3547 long start = System.nanoTime();
3549 while(dirtyPrimitives) {
3550 dirtyPrimitives = false;
3551 getQueryProvider2().performDirtyUpdates(writer);
3552 getQueryProvider2().performScheduledUpdates(writer);
3555 fireMetadataListeners(writer, clientChanges);
3557 if(DebugPolicy.PERFORMANCE_DATA) {
3558 long end = System.nanoTime();
3559 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3564 void removeTemporaryData() {
3565 File platform = Platform.getLocation().toFile();
3566 File tempFiles = new File(platform, "tempFiles");
3567 File temp = new File(tempFiles, "db");
3571 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3573 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3576 } catch (IOException e) {
3577 Logger.defaultLogError(e);
3579 return FileVisitResult.CONTINUE;
3582 } catch (IOException e) {
3583 Logger.defaultLogError(e);
3587 public void handleCreatedClusters() {
3588 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3589 createdClusters.forEach(new TLongProcedure() {
3592 public boolean execute(long value) {
3593 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3594 cluster.setImmutable(true, clusterTranslator);
3599 createdClusters.clear();
3603 public Object getModificationCounter() {
3604 return queryProvider2.modificationCounter;
3608 public void markUndoPoint() {
3609 state.setCombine(false);