1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.Resource;
43 import org.simantics.db.ResourceSerializer;
44 import org.simantics.db.Session;
45 import org.simantics.db.SessionManager;
46 import org.simantics.db.SessionVariables;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.authentication.UserAuthenticationAgent;
50 import org.simantics.db.authentication.UserAuthenticator;
51 import org.simantics.db.common.Indexing;
52 import org.simantics.db.common.TransactionPolicyRelease;
53 import org.simantics.db.common.procedure.BlockingAsyncProcedure;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
64 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
65 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
66 import org.simantics.db.common.utils.Logger;
67 import org.simantics.db.event.ChangeEvent;
68 import org.simantics.db.event.ChangeListener;
69 import org.simantics.db.event.SessionEventListener;
70 import org.simantics.db.exception.CancelTransactionException;
71 import org.simantics.db.exception.ClusterSetExistException;
72 import org.simantics.db.exception.DatabaseException;
73 import org.simantics.db.exception.ImmutableException;
74 import org.simantics.db.exception.InvalidResourceReferenceException;
75 import org.simantics.db.exception.ResourceNotFoundException;
76 import org.simantics.db.exception.RuntimeDatabaseException;
77 import org.simantics.db.exception.ServiceException;
78 import org.simantics.db.exception.ServiceNotFoundException;
79 import org.simantics.db.impl.ClusterBase;
80 import org.simantics.db.impl.ClusterI;
81 import org.simantics.db.impl.ClusterTraitsBase;
82 import org.simantics.db.impl.ClusterTranslator;
83 import org.simantics.db.impl.ResourceImpl;
84 import org.simantics.db.impl.TransientGraph;
85 import org.simantics.db.impl.VirtualGraphImpl;
86 import org.simantics.db.impl.graph.DelayedWriteGraph;
87 import org.simantics.db.impl.graph.ReadGraphImpl;
88 import org.simantics.db.impl.graph.WriteGraphImpl;
89 import org.simantics.db.impl.graph.WriteSupport;
90 import org.simantics.db.impl.internal.RandomAccessValueSupport;
91 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
92 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
93 import org.simantics.db.impl.query.QueryCache;
94 import org.simantics.db.impl.query.QueryCacheBase;
95 import org.simantics.db.impl.query.QueryProcessor;
96 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
97 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
98 import org.simantics.db.impl.service.QueryDebug;
99 import org.simantics.db.impl.support.VirtualGraphServerSupport;
100 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
101 import org.simantics.db.procedure.AsyncListener;
102 import org.simantics.db.procedure.AsyncMultiListener;
103 import org.simantics.db.procedure.AsyncMultiProcedure;
104 import org.simantics.db.procedure.AsyncProcedure;
105 import org.simantics.db.procedure.Listener;
106 import org.simantics.db.procedure.ListenerBase;
107 import org.simantics.db.procedure.MultiListener;
108 import org.simantics.db.procedure.MultiProcedure;
109 import org.simantics.db.procedure.Procedure;
110 import org.simantics.db.procedure.SyncListener;
111 import org.simantics.db.procedure.SyncMultiListener;
112 import org.simantics.db.procedure.SyncMultiProcedure;
113 import org.simantics.db.procedure.SyncProcedure;
114 import org.simantics.db.procore.cluster.ClusterImpl;
115 import org.simantics.db.procore.cluster.ClusterTraits;
116 import org.simantics.db.procore.protocol.Constants;
117 import org.simantics.db.procore.protocol.DebugPolicy;
118 import org.simantics.db.request.AsyncMultiRead;
119 import org.simantics.db.request.AsyncRead;
120 import org.simantics.db.request.DelayedWrite;
121 import org.simantics.db.request.DelayedWriteResult;
122 import org.simantics.db.request.ExternalRead;
123 import org.simantics.db.request.MultiRead;
124 import org.simantics.db.request.Read;
125 import org.simantics.db.request.ReadInterface;
126 import org.simantics.db.request.Write;
127 import org.simantics.db.request.WriteInterface;
128 import org.simantics.db.request.WriteOnly;
129 import org.simantics.db.request.WriteOnlyResult;
130 import org.simantics.db.request.WriteResult;
131 import org.simantics.db.request.WriteTraits;
132 import org.simantics.db.service.ByteReader;
133 import org.simantics.db.service.ClusterBuilder;
134 import org.simantics.db.service.ClusterBuilderFactory;
135 import org.simantics.db.service.ClusterControl;
136 import org.simantics.db.service.ClusterSetsSupport;
137 import org.simantics.db.service.ClusterUID;
138 import org.simantics.db.service.ClusteringSupport;
139 import org.simantics.db.service.CollectionSupport;
140 import org.simantics.db.service.DebugSupport;
141 import org.simantics.db.service.DirectQuerySupport;
142 import org.simantics.db.service.GraphChangeListenerSupport;
143 import org.simantics.db.service.InitSupport;
144 import org.simantics.db.service.LifecycleSupport;
145 import org.simantics.db.service.ManagementSupport;
146 import org.simantics.db.service.QueryControl;
147 import org.simantics.db.service.SerialisationSupport;
148 import org.simantics.db.service.ServerInformation;
149 import org.simantics.db.service.ServiceActivityMonitor;
150 import org.simantics.db.service.SessionEventSupport;
151 import org.simantics.db.service.SessionMonitorSupport;
152 import org.simantics.db.service.SessionUserSupport;
153 import org.simantics.db.service.StatementSupport;
154 import org.simantics.db.service.TransactionPolicySupport;
155 import org.simantics.db.service.TransactionSupport;
156 import org.simantics.db.service.TransferableGraphSupport;
157 import org.simantics.db.service.UndoRedoSupport;
158 import org.simantics.db.service.VirtualGraphSupport;
159 import org.simantics.db.service.XSupport;
160 import org.simantics.layer0.Layer0;
161 import org.simantics.utils.DataContainer;
162 import org.simantics.utils.Development;
163 import org.simantics.utils.threads.logger.ITask;
164 import org.simantics.utils.threads.logger.ThreadLogger;
166 import gnu.trove.procedure.TLongProcedure;
167 import gnu.trove.set.hash.TLongHashSet;
170 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
172 protected static final boolean DEBUG = false;
174 private static final boolean DIAGNOSTICS = false;
176 private TransactionPolicySupport transactionPolicy;
178 final private ClusterControl clusterControl;
180 final protected BuiltinSupportImpl builtinSupport;
181 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
182 final protected ClusterSetsSupport clusterSetsSupport;
183 final protected LifecycleSupportImpl lifecycleSupport;
185 protected QuerySupportImpl querySupport;
186 protected ResourceSupportImpl resourceSupport;
187 protected WriteSupport writeSupport;
188 public ClusterTranslator clusterTranslator;
190 boolean dirtyPrimitives = false;
192 public static final int SERVICE_MODE_CREATE = 2;
193 public static final int SERVICE_MODE_ALLOW = 1;
194 public int serviceMode = 0;
195 public boolean createdImmutableClusters = false;
196 public TLongHashSet createdClusters = new TLongHashSet();
201 * The service locator maintained by the workbench. These services are
202 * initialized during workbench during the <code>init</code> method.
204 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
206 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
208 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
210 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
212 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
214 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
216 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
218 final protected State state = new State();
220 protected GraphSession graphSession = null;
222 protected SessionManager sessionManagerImpl = null;
224 protected UserAuthenticationAgent authAgent = null;
226 protected UserAuthenticator authenticator = null;
228 protected Resource user = null;
230 protected ClusterStream clusterStream = null;
232 protected SessionRequestManager requestManager = null;
234 public ClusterTable clusterTable = null;
236 public QueryProcessor queryProvider2 = null;
238 ClientChangesImpl clientChanges = null;
240 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
242 // protected long newClusterId = Constants.NullClusterId;
244 protected int flushCounter = 0;
246 protected boolean writeOnly = false;
248 WriteState<?> writeState = null;
249 WriteStateBase<?> delayedWriteState = null;
250 protected Resource defaultClusterSet = null; // If not null then used for newResource().
252 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
254 // if (authAgent == null)
255 // throw new IllegalArgumentException("null authentication agent");
257 File t = StaticSessionProperties.virtualGraphStoragePath;
260 this.clusterTable = new ClusterTable(this, t);
261 this.builtinSupport = new BuiltinSupportImpl(this);
262 this.sessionManagerImpl = sessionManagerImpl;
264 // this.authAgent = authAgent;
266 serviceLocator.registerService(Session.class, this);
267 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
268 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
269 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
270 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
271 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
272 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
273 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
274 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
275 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
276 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
277 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
278 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
279 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
280 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
281 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
282 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
283 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
284 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
285 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
286 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
287 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
288 ServiceActivityUpdaterForWriteTransactions.register(this);
290 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
291 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
292 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
293 this.lifecycleSupport = new LifecycleSupportImpl(this);
294 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
295 this.transactionPolicy = new TransactionPolicyRelease();
296 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
297 this.clusterControl = new ClusterControlImpl(this);
298 serviceLocator.registerService(ClusterControl.class, clusterControl);
299 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
300 this.clusterSetsSupport.setReadDirectory(t.toPath());
301 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
302 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
312 public Resource getRootLibrary() {
313 return queryProvider2.getRootLibraryResource();
316 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
317 if (!graphSession.dbSession.refreshEnabled())
320 getClusterTable().refresh(csid, this, clusterUID);
321 } catch (Throwable t) {
322 Logger.defaultLogError("Refesh failed.", t);
326 static final class TaskHelper {
327 private final String name;
328 private Object result;
329 final Semaphore sema = new Semaphore(0);
330 private Throwable throwable = null;
331 final Consumer<DatabaseException> callback = e -> {
332 synchronized (TaskHelper.this) {
336 final Procedure<Object> proc = new Procedure<Object>() {
338 public void execute(Object result) {
339 callback.accept(null);
342 public void exception(Throwable t) {
343 if (t instanceof DatabaseException)
344 callback.accept((DatabaseException)t);
346 callback.accept(new DatabaseException("" + name + "operation failed.", t));
349 final WriteTraits writeTraits = new WriteTraits() {};
350 TaskHelper(String name) {
353 @SuppressWarnings("unchecked")
357 void setResult(Object result) {
358 this.result = result;
360 // Throwable throwableGet() {
363 synchronized void throwableSet(Throwable t) {
366 // void throwableSet(String t) {
367 // throwable = new InternalException("" + name + " operation failed. " + t);
369 synchronized void throwableCheck()
370 throws DatabaseException {
371 if (null != throwable)
372 if (throwable instanceof DatabaseException)
373 throw (DatabaseException)throwable;
375 throw new DatabaseException("Undo operation failed.", throwable);
377 void throw_(String message)
378 throws DatabaseException {
379 throw new DatabaseException("" + name + " operation failed. " + message);
385 private ListenerBase getListenerBase(Object procedure) {
386 if (procedure instanceof ListenerBase)
387 return (ListenerBase) procedure;
392 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
393 scheduleRequest(request, callback, notify, null);
397 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
400 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
402 assert (request != null);
404 if(Development.DEVELOPMENT) {
406 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
407 System.err.println("schedule write '" + request + "'");
408 } catch (Throwable t) {
409 Logger.defaultLogError(t);
413 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
415 requestManager.scheduleWrite(new SessionTask(true) {
418 public void run(int thread) {
420 if(Development.DEVELOPMENT) {
422 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
423 System.err.println("perform write '" + request + "'");
424 } catch (Throwable t) {
425 Logger.defaultLogError(t);
429 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
431 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
436 Disposable.safeDispose(clientChanges);
437 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
439 VirtualGraph vg = getProvider(request.getProvider());
441 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
442 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
445 public void execute(Object result) {
446 if(callback != null) callback.accept(null);
450 public void exception(Throwable t) {
451 if(callback != null) callback.accept((DatabaseException)t);
456 assert (null != writer);
457 // writer.state.barrier.inc();
460 request.perform(writer);
461 assert (null != writer);
462 } catch (Throwable t) {
463 if (!(t instanceof CancelTransactionException))
464 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
465 writeState.except(t);
467 // writer.state.barrier.dec();
468 // writer.waitAsync(request);
472 assert(!queryProvider2.cache.dirty);
474 } catch (Throwable e) {
476 // Log it first, just to be safe that the error is always logged.
477 if (!(e instanceof CancelTransactionException))
478 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
480 // writeState.getGraph().state.barrier.dec();
481 // writeState.getGraph().waitAsync(request);
483 writeState.except(e);
486 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
487 // // All we can do here is to log those, can't really pass them anywhere.
488 // if (callback != null) {
489 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
490 // else callback.run(new DatabaseException(e));
492 // } catch (Throwable e2) {
493 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
495 // boolean empty = clusterStream.reallyFlush();
496 // int callerThread = -1;
497 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
498 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
500 // // Send all local changes to server so it can calculate correct reverse change set.
501 // // This will call clusterStream.accept().
502 // state.cancelCommit(context, clusterStream);
504 // if (!context.isOk()) // this is a blocking operation
505 // throw new InternalException("Cancel failed. This should never happen. Contact application support.");
506 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
508 // state.cancelCommit2(context, clusterStream);
509 // } catch (DatabaseException e3) {
510 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
512 // clusterStream.setOff(false);
513 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
518 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
524 if(Development.DEVELOPMENT) {
526 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
527 System.err.println("finish write '" + request + "'");
528 } catch (Throwable t) {
529 Logger.defaultLogError(t);
539 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
540 scheduleRequest(request, procedure, notify, null);
544 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
547 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
549 assert (request != null);
551 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
553 requestManager.scheduleWrite(new SessionTask(true) {
556 public void run(int thread) {
558 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
560 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
563 Disposable.safeDispose(clientChanges);
564 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
566 VirtualGraph vg = getProvider(request.getProvider());
567 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
570 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
571 writeState = writeStateT;
573 assert (null != writer);
574 // writer.state.barrier.inc();
575 writeStateT.setResult(request.perform(writer));
576 assert (null != writer);
578 // writer.state.barrier.dec();
579 // writer.waitAsync(null);
581 } catch (Throwable e) {
583 // writer.state.barrier.dec();
584 // writer.waitAsync(null);
586 writeState.except(e);
588 // state.stopWriteTransaction(clusterStream);
590 // } catch (Throwable e) {
591 // // Log it first, just to be safe that the error is always logged.
592 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
595 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
596 // // All we can do here is to log those, can't really pass them anywhere.
597 // if (procedure != null) {
598 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
599 // else procedure.exception(new DatabaseException(e));
601 // } catch (Throwable e2) {
602 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
605 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
607 // state.stopWriteTransaction(clusterStream);
610 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
613 // if(notify != null) notify.release();
623 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
624 scheduleRequest(request, callback, notify, null);
628 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
631 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
633 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
635 assert (request != null);
637 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
639 requestManager.scheduleWrite(new SessionTask(true) {
642 public void run(int thread) {
643 fireSessionVariableChange(SessionVariables.QUEUED_READS);
645 Procedure<Object> stateProcedure = new Procedure<Object>() {
647 public void execute(Object result) {
648 if (callback != null)
649 callback.accept(null);
652 public void exception(Throwable t) {
653 if (callback != null) {
654 if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
655 else callback.accept(new DatabaseException(t));
657 Logger.defaultLogError("Unhandled exception", t);
661 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
662 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
663 DelayedWriteGraph dwg = null;
664 // newGraph.state.barrier.inc();
667 dwg = new DelayedWriteGraph(newGraph);
668 request.perform(dwg);
669 } catch (Throwable e) {
670 delayedWriteState.except(e);
675 // newGraph.state.barrier.dec();
676 // newGraph.waitAsync(request);
677 fireSessionVariableChange(SessionVariables.QUEUED_READS);
680 delayedWriteState = null;
682 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
683 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
686 Disposable.safeDispose(clientChanges);
687 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
691 VirtualGraph vg = getProvider(request.getProvider());
692 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
694 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
696 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
698 assert (null != writer);
699 // writer.state.barrier.inc();
703 dwg.commit(writer, request);
705 if(defaultClusterSet != null) {
706 XSupport xs = getService(XSupport.class);
707 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
708 ClusteringSupport cs = getService(ClusteringSupport.class);
709 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
712 // This makes clusters available from server
713 clusterStream.reallyFlush();
715 releaseWriteOnly(writer);
716 handleUpdatesAndMetadata(writer);
718 } catch (ServiceException e) {
719 // writer.state.barrier.dec();
720 // writer.waitAsync(null);
722 // These shall be requested from server
723 clusterTable.removeWriteOnlyClusters();
724 // This makes clusters available from server
725 clusterStream.reallyFlush();
727 releaseWriteOnly(writer);
728 writeState.except(e);
730 // Debugging & Profiling
731 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
741 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
742 scheduleRequest(request, procedure, notify, null);
746 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
749 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
750 throw new Error("Not implemented");
753 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
754 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
755 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
756 createdClusters.add(cluster.clusterId);
761 class WriteOnlySupport implements WriteSupport {
763 ClusterStream stream;
764 ClusterImpl currentCluster;
766 public WriteOnlySupport() {
767 this.stream = clusterStream;
771 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
772 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
776 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
778 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
780 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
781 if(s != queryProvider2.getRootLibrary())
782 throw new ImmutableException("Trying to modify immutable resource key=" + s);
785 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
786 } catch (DatabaseException e) {
787 Logger.defaultLogError(e);
791 clientChanges.invalidate(s);
793 if (cluster.isWriteOnly())
795 queryProvider2.updateStatements(s, p);
800 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
801 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
805 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
807 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
809 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
810 } catch (DatabaseException e) {
811 Logger.defaultLogError(e);
814 clientChanges.invalidate(rid);
816 if (cluster.isWriteOnly())
818 queryProvider2.updateValue(rid);
823 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
826 claimValue(provider,resource, reader.readBytes(null, amount));
830 byte[] bytes = new byte[65536];
832 int rid = ((ResourceImpl)resource).id;
833 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
837 int block = Math.min(left, 65536);
838 reader.readBytes(bytes, block);
839 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
842 } catch (DatabaseException e) {
843 Logger.defaultLogError(e);
846 clientChanges.invalidate(rid);
848 if (cluster.isWriteOnly())
850 queryProvider2.updateValue(rid);
854 private void maintainCluster(ClusterImpl before, ClusterI after_) {
855 if(after_ != null && after_ != before) {
856 ClusterImpl after = (ClusterImpl)after_;
857 if(currentCluster == before) {
858 currentCluster = after;
860 clusterTable.replaceCluster(after);
864 public int createResourceKey(int foreignCounter) throws DatabaseException {
865 if(currentCluster == null) {
866 currentCluster = getNewResourceCluster();
868 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
869 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
870 newCluster.foreignLookup = new byte[foreignCounter];
871 currentCluster = newCluster;
873 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
875 return currentCluster.createResource(clusterTranslator);
879 public Resource createResource(VirtualGraph provider) throws DatabaseException {
880 if(currentCluster == null) {
881 if (null != defaultClusterSet) {
882 ResourceImpl result = getNewResource(defaultClusterSet);
883 currentCluster = clusterTable.getClusterByResourceKey(result.id);
886 currentCluster = getNewResourceCluster();
889 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
890 if (null != defaultClusterSet) {
891 ResourceImpl result = getNewResource(defaultClusterSet);
892 currentCluster = clusterTable.getClusterByResourceKey(result.id);
895 currentCluster = getNewResourceCluster();
898 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
902 public Resource createResource(VirtualGraph provider, long clusterId)
903 throws DatabaseException {
904 return getNewResource(clusterId);
908 public Resource createResource(VirtualGraph provider, Resource clusterSet)
909 throws DatabaseException {
910 return getNewResource(clusterSet);
914 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
915 throws DatabaseException {
916 getNewClusterSet(clusterSet);
920 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
921 throws ServiceException {
922 return containsClusterSet(clusterSet);
925 public void selectCluster(long cluster) {
926 currentCluster = clusterTable.getClusterByClusterId(cluster);
927 long setResourceId = clusterSetsSupport.getSet(cluster);
928 clusterSetsSupport.put(setResourceId, cluster);
932 public Resource setDefaultClusterSet(Resource clusterSet)
933 throws ServiceException {
934 Resource result = setDefaultClusterSet4NewResource(clusterSet);
935 if(clusterSet != null) {
936 long id = clusterSetsSupport.get(clusterSet.getResourceId());
937 currentCluster = clusterTable.getClusterByClusterId(id);
940 currentCluster = null;
946 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
948 provider = getProvider(provider);
949 if (null == provider) {
950 int key = ((ResourceImpl)resource).id;
954 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
955 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
956 // if(key != queryProvider2.getRootLibrary())
957 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
960 // cluster.removeValue(key, clusterTranslator);
961 // } catch (DatabaseException e) {
962 // Logger.defaultLogError(e);
966 clusterTable.writeOnlyInvalidate(cluster);
969 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
970 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
971 clusterTranslator.removeValue(cluster);
972 } catch (DatabaseException e) {
973 Logger.defaultLogError(e);
976 queryProvider2.invalidateResource(key);
977 clientChanges.invalidate(key);
980 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
981 queryProvider2.updateValue(querySupport.getId(resource));
982 clientChanges.claimValue(resource);
989 public void flush(boolean intermediate) {
990 throw new UnsupportedOperationException();
994 public void flushCluster() {
995 clusterTable.flushCluster(graphSession);
996 if(defaultClusterSet != null) {
997 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
999 currentCluster = null;
1003 public void flushCluster(Resource r) {
1004 throw new UnsupportedOperationException("flushCluster resource " + r);
1012 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1015 int s = ((ResourceImpl)subject).id;
1016 int p = ((ResourceImpl)predicate).id;
1017 int o = ((ResourceImpl)object).id;
1019 provider = getProvider(provider);
1020 if (null == provider) {
1022 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1023 clusterTable.writeOnlyInvalidate(cluster);
1027 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1028 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1029 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1031 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1032 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1034 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1035 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1036 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1037 clusterTranslator.removeStatement(cluster);
1039 queryProvider2.invalidateResource(s);
1040 clientChanges.invalidate(s);
1042 } catch (DatabaseException e) {
1044 Logger.defaultLogError(e);
1052 ((VirtualGraphImpl)provider).deny(s, p, o);
1053 queryProvider2.invalidateResource(s);
1054 clientChanges.invalidate(s);
1063 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1064 throw new UnsupportedOperationException();
1068 public boolean writeOnly() {
1073 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1074 writeSupport.performWriteRequest(graph, request);
1078 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1079 throw new UnsupportedOperationException();
1083 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1084 throw new UnsupportedOperationException();
1088 public <T> void addMetadata(Metadata data) throws ServiceException {
1089 writeSupport.addMetadata(data);
1093 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1094 return writeSupport.getMetadata(clazz);
1098 public TreeMap<String, byte[]> getMetadata() {
1099 return writeSupport.getMetadata();
1103 public void commitDone(WriteTraits writeTraits, long csid) {
1104 writeSupport.commitDone(writeTraits, csid);
1108 public void clearUndoList(WriteTraits writeTraits) {
1109 writeSupport.clearUndoList(writeTraits);
1112 public int clearMetadata() {
1113 return writeSupport.clearMetadata();
1117 public void startUndo() {
1118 writeSupport.startUndo();
1122 class VirtualWriteOnlySupport implements WriteSupport {
1125 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1126 // Resource object) {
1127 // throw new UnsupportedOperationException();
1131 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1133 TransientGraph impl = (TransientGraph)provider;
1134 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1135 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1136 clientChanges.claim(subject, predicate, object);
1141 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1143 TransientGraph impl = (TransientGraph)provider;
1144 impl.claim(subject, predicate, object);
1145 getQueryProvider2().updateStatements(subject, predicate);
1146 clientChanges.claim(subject, predicate, object);
1151 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1152 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1156 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1157 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1158 getQueryProvider2().updateValue(resource);
1159 clientChanges.claimValue(resource);
1163 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1164 byte[] value = reader.readBytes(null, amount);
1165 claimValue(provider, resource, value);
1169 public Resource createResource(VirtualGraph provider) {
1170 TransientGraph impl = (TransientGraph)provider;
1171 return impl.getResource(impl.newResource(false));
1175 public Resource createResource(VirtualGraph provider, long clusterId) {
1176 throw new UnsupportedOperationException();
1180 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1181 throws DatabaseException {
1182 throw new UnsupportedOperationException();
1186 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1187 throws DatabaseException {
1188 throw new UnsupportedOperationException();
1192 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1193 throws ServiceException {
1194 throw new UnsupportedOperationException();
1198 public Resource setDefaultClusterSet(Resource clusterSet)
1199 throws ServiceException {
1204 public void denyValue(VirtualGraph provider, Resource resource) {
1205 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1206 getQueryProvider2().updateValue(querySupport.getId(resource));
1207 // NOTE: this only keeps track of value changes by-resource.
1208 clientChanges.claimValue(resource);
1212 public void flush(boolean intermediate) {
1213 throw new UnsupportedOperationException();
1217 public void flushCluster() {
1218 throw new UnsupportedOperationException();
1222 public void flushCluster(Resource r) {
1223 throw new UnsupportedOperationException("Resource " + r);
1231 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1232 TransientGraph impl = (TransientGraph) provider;
1233 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1234 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1235 clientChanges.deny(subject, predicate, object);
1240 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1241 throw new UnsupportedOperationException();
1245 public boolean writeOnly() {
1250 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1251 throw new UnsupportedOperationException();
1255 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1256 throw new UnsupportedOperationException();
1260 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1261 throw new UnsupportedOperationException();
1265 public <T> void addMetadata(Metadata data) throws ServiceException {
1266 throw new UnsupportedOperationException();
1270 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1271 throw new UnsupportedOperationException();
1275 public TreeMap<String, byte[]> getMetadata() {
1276 throw new UnsupportedOperationException();
1280 public void commitDone(WriteTraits writeTraits, long csid) {
1284 public void clearUndoList(WriteTraits writeTraits) {
1288 public int clearMetadata() {
1293 public void startUndo() {
1297 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1301 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1303 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1306 Disposable.safeDispose(clientChanges);
1307 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1311 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1313 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1315 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1316 writeState = writeStateT;
1318 assert (null != writer);
1319 // writer.state.barrier.inc();
1320 long start = System.nanoTime();
1321 T result = request.perform(writer);
1322 long duration = System.nanoTime() - start;
1324 System.err.println("################");
1325 System.err.println("WriteOnly duration " + 1e-9*duration);
1327 writeStateT.setResult(result);
1329 // This makes clusters available from server
1330 clusterStream.reallyFlush();
1332 // This will trigger query updates
1333 releaseWriteOnly(writer);
1334 assert (null != writer);
1336 handleUpdatesAndMetadata(writer);
1338 } catch (CancelTransactionException e) {
1340 releaseWriteOnly(writeState.getGraph());
1342 clusterTable.removeWriteOnlyClusters();
1343 state.stopWriteTransaction(clusterStream);
1345 } catch (Throwable e) {
1347 e.printStackTrace();
1349 releaseWriteOnly(writeState.getGraph());
1351 clusterTable.removeWriteOnlyClusters();
1353 if (callback != null)
1354 callback.exception(new DatabaseException(e));
1356 state.stopWriteTransaction(clusterStream);
1357 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1361 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1367 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1368 scheduleRequest(request, callback, notify, null);
1372 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1376 assert (request != null);
1378 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1380 requestManager.scheduleWrite(new SessionTask(true) {
1383 public void run(int thread) {
1385 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1389 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1392 Disposable.safeDispose(clientChanges);
1393 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1397 VirtualGraph vg = getProvider(request.getProvider());
1398 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1400 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1402 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1405 public void execute(Object result) {
1406 if(callback != null) callback.accept(null);
1410 public void exception(Throwable t) {
1411 if(callback != null) callback.accept((DatabaseException)t);
1416 assert (null != writer);
1417 // writer.state.barrier.inc();
1421 request.perform(writer);
1423 } catch (Throwable e) {
1425 // writer.state.barrier.dec();
1426 // writer.waitAsync(null);
1428 releaseWriteOnly(writer);
1430 clusterTable.removeWriteOnlyClusters();
1432 if(!(e instanceof CancelTransactionException)) {
1433 if (callback != null)
1434 callback.accept(new DatabaseException(e));
1437 writeState.except(e);
1443 // This makes clusters available from server
1444 boolean empty = clusterStream.reallyFlush();
1445 // This was needed to make WO requests call metadata listeners.
1446 // NOTE: the calling event does not contain clientChanges information.
1447 if (!empty && clientChanges.isEmpty())
1448 clientChanges.setNotEmpty(true);
1449 releaseWriteOnly(writer);
1450 assert (null != writer);
1453 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1466 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1467 scheduleRequest(request, callback, notify, null);
1471 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1474 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1476 assert (request != null);
1478 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1480 requestManager.scheduleWrite(new SessionTask(true) {
1483 public void run(int thread) {
1485 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1487 performWriteOnly(request, notify, callback);
1497 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1499 assert (request != null);
1500 assert (procedure != null);
1502 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1504 requestManager.scheduleRead(new SessionRead(throwable, notify) {
1507 public void run(int thread) {
1509 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1511 ListenerBase listener = getListenerBase(procedure);
1513 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1517 if (listener != null) {
1521 AsyncProcedure ap = new AsyncProcedure<T>() {
1524 public void exception(AsyncReadGraph graph, Throwable t) {
1525 procedure.exception(graph, t);
1526 if(throwable != null) {
1529 // ErrorLogger.defaultLogError("Unhandled exception", t);
1534 public void execute(AsyncReadGraph graph, T t) {
1535 if(result != null) result.set(t);
1536 procedure.execute(graph, t);
1541 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1543 } catch (Throwable t) {
1544 // This is handled by the AsyncProcedure
1545 //Logger.defaultLogError("Internal error", t);
1552 // newGraph.state.barrier.inc();
1554 T t = request.perform(newGraph);
1558 if(result != null) result.set(t);
1559 procedure.execute(newGraph, t);
1561 } catch (Throwable th) {
1563 if(throwable != null) {
1566 Logger.defaultLogError("Unhandled exception", th);
1571 } catch (Throwable t) {
1574 t.printStackTrace();
1576 if(throwable != null) {
1579 Logger.defaultLogError("Unhandled exception", t);
1584 procedure.exception(newGraph, t);
1586 } catch (Throwable t2) {
1588 if(throwable != null) {
1591 Logger.defaultLogError("Unhandled exception", t2);
1598 // newGraph.state.barrier.dec();
1599 // newGraph.waitAsync(request);
1605 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1615 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1617 assert (request != null);
1618 assert (procedure != null);
1620 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1622 requestManager.scheduleRead(new SessionRead(null, notify) {
1625 public void run(int thread) {
1627 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1629 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1633 if (listener != null) {
1636 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1637 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1638 //newGraph.processor.query(newGraph, request, null, procedure, listener);
1639 } catch (DatabaseException e) {
1640 Logger.defaultLogError(e);
1645 // final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1646 // procedure, "request");
1648 BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
1652 request.perform(newGraph, wrap);
1655 } catch (Throwable t) {
1657 wrap.exception(newGraph, t);
1659 // wrapper.exception(newGraph, t);
1660 // newGraph.waitAsync(request);
1669 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1679 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1681 assert (request != null);
1682 assert (procedure != null);
1684 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1686 int sync = notify != null ? thread : -1;
1688 requestManager.scheduleRead(new SessionRead(null, notify) {
1691 public void run(int thread) {
1693 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1695 ListenerBase listener = getListenerBase(procedure);
1697 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1701 if (listener != null) {
1703 newGraph.processor.query(newGraph, request, null, procedure, listener);
1705 // newGraph.waitAsync(request);
1709 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1713 request.perform(newGraph, wrapper);
1715 } catch (Throwable t) {
1717 t.printStackTrace();
1725 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1735 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1737 assert (request != null);
1738 assert (procedure != null);
1740 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1742 requestManager.scheduleRead(new SessionRead(throwable, notify) {
1745 public void run(int thread) {
1747 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1749 ListenerBase listener = getListenerBase(procedure);
1751 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1755 if (listener != null) {
1758 QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1759 } catch (DatabaseException e) {
1760 Logger.defaultLogError(e);
1764 // newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1767 // public void exception(Throwable t) {
1768 // procedure.exception(t);
1769 // if(throwable != null) {
1770 // throwable.set(t);
1775 // public void execute(T t) {
1776 // if(result != null) result.set(t);
1777 // procedure.execute(t);
1782 // newGraph.waitAsync(request);
1786 // newGraph.state.barrier.inc();
1788 request.register(newGraph, new Listener<T>() {
1791 public void exception(Throwable t) {
1792 if(throwable != null) throwable.set(t);
1793 procedure.exception(t);
1794 // newGraph.state.barrier.dec();
1798 public void execute(T t) {
1799 if(result != null) result.set(t);
1800 procedure.execute(t);
1801 // newGraph.state.barrier.dec();
1805 public boolean isDisposed() {
1811 // newGraph.waitAsync(request);
1817 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1829 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1831 scheduleRequest(request, procedure, null, null, null);
1836 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1838 Semaphore notify = new Semaphore(0);
1839 DataContainer<Throwable> container = new DataContainer<Throwable>();
1840 DataContainer<T> result = new DataContainer<T>();
1841 scheduleRequest(request, procedure, notify, container, result);
1842 acquire(notify, request);
1843 Throwable throwable = container.get();
1844 if(throwable != null) {
1845 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1846 else throw new DatabaseException("Unexpected exception", throwable);
1848 return result.get();
1852 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1854 Semaphore notify = new Semaphore(0);
1855 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1856 final DataContainer<T> resultContainer = new DataContainer<T>();
1857 scheduleRequest(request, new AsyncProcedure<T>() {
1859 public void exception(AsyncReadGraph graph, Throwable throwable) {
1860 exceptionContainer.set(throwable);
1861 procedure.exception(graph, throwable);
1864 public void execute(AsyncReadGraph graph, T result) {
1865 resultContainer.set(result);
1866 procedure.execute(graph, result);
1868 }, getListenerBase(procedure), notify);
1869 acquire(notify, request, procedure);
1870 Throwable throwable = exceptionContainer.get();
1871 if (throwable != null) {
1872 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1873 else throw new DatabaseException("Unexpected exception", throwable);
1875 return resultContainer.get();
1880 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1882 Semaphore notify = new Semaphore(0);
1883 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1884 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1886 public void exception(AsyncReadGraph graph, Throwable throwable) {
1887 exceptionContainer.set(throwable);
1888 procedure.exception(graph, throwable);
1891 public void execute(AsyncReadGraph graph, T result) {
1892 procedure.execute(graph, result);
1895 public void finished(AsyncReadGraph graph) {
1896 procedure.finished(graph);
1899 acquire(notify, request, procedure);
1900 Throwable throwable = exceptionContainer.get();
1901 if (throwable != null) {
1902 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1903 else throw new DatabaseException("Unexpected exception", throwable);
1905 // TODO: implement return value
1906 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1911 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1912 return syncRequest(request, new ProcedureAdapter<T>());
1915 // assert(request != null);
1917 // final DataContainer<T> result = new DataContainer<T>();
1918 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1920 // syncRequest(request, new Procedure<T>() {
1923 // public void execute(T t) {
1928 // public void exception(Throwable t) {
1929 // exception.set(t);
1934 // Throwable t = exception.get();
1936 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1937 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1940 // return result.get();
1945 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1946 return syncRequest(request, (Procedure<T>)procedure);
1951 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1952 // assertNotSession();
1953 // Semaphore notify = new Semaphore(0);
1954 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1955 // DataContainer<T> result = new DataContainer<T>();
1956 // scheduleRequest(request, procedure, notify, container, result);
1957 // acquire(notify, request);
1958 // Throwable throwable = container.get();
1959 // if(throwable != null) {
1960 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1961 // else throw new DatabaseException("Unexpected exception", throwable);
1963 // return result.get();
1968 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1970 Semaphore notify = new Semaphore(0);
1971 final DataContainer<Throwable> container = new DataContainer<Throwable>();
1972 final DataContainer<T> result = new DataContainer<T>();
1973 scheduleRequest(request, procedure, notify, container, result);
1974 acquire(notify, request);
1975 Throwable throwable = container.get();
1976 if (throwable != null) {
1977 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1978 else throw new DatabaseException("Unexpected exception", throwable);
1980 return result.get();
1984 public void syncRequest(Write request) throws DatabaseException {
1987 Semaphore notify = new Semaphore(0);
1988 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1989 scheduleRequest(request, e -> exception.set(e), notify);
1990 acquire(notify, request);
1991 if(exception.get() != null) throw exception.get();
1995 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1997 Semaphore notify = new Semaphore(0);
1998 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1999 final DataContainer<T> result = new DataContainer<T>();
2000 scheduleRequest(request, new Procedure<T>() {
2003 public void exception(Throwable t) {
2008 public void execute(T t) {
2013 acquire(notify, request);
2014 if(exception.get() != null) {
2015 Throwable t = exception.get();
2016 if(t instanceof DatabaseException) throw (DatabaseException)t;
2017 else throw new DatabaseException(t);
2019 return result.get();
2023 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2025 Semaphore notify = new Semaphore(0);
2026 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2027 final DataContainer<T> result = new DataContainer<T>();
2028 scheduleRequest(request, new Procedure<T>() {
2031 public void exception(Throwable t) {
2036 public void execute(T t) {
2041 acquire(notify, request);
2042 if(exception.get() != null) {
2043 Throwable t = exception.get();
2044 if(t instanceof DatabaseException) throw (DatabaseException)t;
2045 else throw new DatabaseException(t);
2047 return result.get();
2051 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2053 Semaphore notify = new Semaphore(0);
2054 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2055 final DataContainer<T> result = new DataContainer<T>();
2056 scheduleRequest(request, new Procedure<T>() {
2059 public void exception(Throwable t) {
2064 public void execute(T t) {
2069 acquire(notify, request);
2070 if(exception.get() != null) {
2071 Throwable t = exception.get();
2072 if(t instanceof DatabaseException) throw (DatabaseException)t;
2073 else throw new DatabaseException(t);
2075 return result.get();
2079 public void syncRequest(DelayedWrite request) throws DatabaseException {
2081 Semaphore notify = new Semaphore(0);
2082 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2083 scheduleRequest(request, e -> exception.set(e), notify);
2084 acquire(notify, request);
2085 if(exception.get() != null) throw exception.get();
2089 public void syncRequest(WriteOnly request) throws DatabaseException {
2092 Semaphore notify = new Semaphore(0);
2093 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2094 scheduleRequest(request, e -> exception.set(e), notify);
2095 acquire(notify, request);
2096 if(exception.get() != null) throw exception.get();
2101 * GraphRequestProcessor interface
2105 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2107 scheduleRequest(request, procedure, null, null);
2112 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2114 scheduleRequest(request, procedure, null);
2119 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2121 scheduleRequest(request, procedure, null, null, null);
2126 public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2128 scheduleRequest(request, callback, null);
2133 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2135 scheduleRequest(request, procedure, null);
2140 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2142 scheduleRequest(request, procedure, null);
2147 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2149 scheduleRequest(request, procedure, null);
2154 public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2156 scheduleRequest(request, callback, null);
2161 public void asyncRequest(final Write r) {
2162 asyncRequest(r, null);
2166 public void asyncRequest(final DelayedWrite r) {
2167 asyncRequest(r, null);
2171 public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2173 scheduleRequest(request, callback, null);
2178 public void asyncRequest(final WriteOnly request) {
2180 asyncRequest(request, null);
2185 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2186 r.request(this, procedure);
2190 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2191 r.request(this, procedure);
2195 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2196 r.request(this, procedure);
2200 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2201 r.request(this, procedure);
2205 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2206 r.request(this, procedure);
2210 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2211 r.request(this, procedure);
2215 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2216 return r.request(this);
2220 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2221 return r.request(this);
2225 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2226 r.request(this, procedure);
2230 public <T> void async(WriteInterface<T> r) {
2231 r.request(this, new ProcedureAdapter<T>());
2235 public void incAsync() {
2240 public void decAsync() {
2244 public long getCluster(ResourceImpl resource) {
2245 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2246 return cluster.getClusterId();
2249 public long getCluster(int id) {
2250 if (clusterTable == null)
2251 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2252 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2255 public ResourceImpl getResource(int id) {
2256 return new ResourceImpl(resourceSupport, id);
2259 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2260 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2261 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2262 int key = proxy.getClusterKey();
2263 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2264 return new ResourceImpl(resourceSupport, resourceKey);
2267 public ResourceImpl getResource2(int id) {
2269 return new ResourceImpl(resourceSupport, id);
2272 final public int getId(ResourceImpl impl) {
2276 public static final Charset UTF8 = Charset.forName("utf-8");
2281 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2282 for(TransientGraph g : support.providers) {
2283 if(g.isPending(subject)) return false;
2288 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2289 for(TransientGraph g : support.providers) {
2290 if(g.isPending(subject, predicate)) return false;
2295 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2297 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2299 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2302 public void accept(ReadGraphImpl graph) {
2303 if(ready.decrementAndGet() == 0) {
2304 runnable.accept(graph);
2310 for(TransientGraph g : support.providers) {
2311 if(g.isPending(subject)) {
2313 g.load(graph, subject, composite);
2314 } catch (DatabaseException e) {
2315 e.printStackTrace();
2318 composite.accept(graph);
2322 composite.accept(graph);
2326 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2328 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2330 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2333 public void accept(ReadGraphImpl graph) {
2334 if(ready.decrementAndGet() == 0) {
2335 runnable.accept(graph);
2341 for(TransientGraph g : support.providers) {
2342 if(g.isPending(subject, predicate)) {
2344 g.load(graph, subject, predicate, composite);
2345 } catch (DatabaseException e) {
2346 e.printStackTrace();
2349 composite.accept(graph);
2353 composite.accept(graph);
2357 // void dumpHeap() {
2360 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2361 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2362 // "d:/heap" + flushCounter + ".txt", true);
2363 // } catch (IOException e) {
2364 // e.printStackTrace();
2369 void fireReactionsToSynchronize(ChangeSet cs) {
2371 // Do not fire empty events
2375 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2376 // g.state.barrier.inc();
2380 if (!cs.isEmpty()) {
2381 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2382 for (ChangeListener l : changeListeners2) {
2385 } catch (Exception ex) {
2386 ex.printStackTrace();
2393 // g.state.barrier.dec();
2394 // g.waitAsync(null);
2400 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2403 // Do not fire empty events
2408 // graph.state.barrier.inc();
2412 if (!cs2.isEmpty()) {
2414 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2415 for (ChangeListener l : changeListeners2) {
2418 // System.out.println("changelistener " + l);
2419 } catch (Exception ex) {
2420 ex.printStackTrace();
2428 // graph.state.barrier.dec();
2429 // graph.waitAsync(null);
2432 } catch (Throwable t) {
2433 t.printStackTrace();
2437 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2441 // Do not fire empty events
2445 // Do not fire on virtual requests
2446 if(graph.getProvider() != null)
2449 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2453 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2454 for (ChangeListener l : metadataListeners) {
2457 } catch (Throwable ex) {
2458 ex.printStackTrace();
2466 } catch (Throwable t) {
2467 t.printStackTrace();
2476 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2479 public <T> T getService(Class<T> api) {
2480 T t = peekService(api);
2482 if (state.isClosed())
2483 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2484 throw new ServiceNotFoundException(this, api);
2492 protected abstract ServerInformation getCachedServerInformation();
2494 private Class<?> serviceKey1 = null;
2495 private Class<?> serviceKey2 = null;
2496 private Object service1 = null;
2497 private Object service2 = null;
2503 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2505 @SuppressWarnings("unchecked")
2507 public synchronized <T> T peekService(Class<T> api) {
2509 if(serviceKey1 == api) {
2511 } else if (serviceKey2 == api) {
2513 Object result = service2;
2514 service2 = service1;
2515 serviceKey2 = serviceKey1;
2521 if (Layer0.class == api)
2523 if (ServerInformation.class == api)
2524 return (T) getCachedServerInformation();
2525 else if (WriteGraphImpl.class == api)
2526 return (T) writeState.getGraph();
2527 else if (ClusterBuilder.class == api)
2528 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2529 else if (ClusterBuilderFactory.class == api)
2530 return (T)new ClusterBuilderFactoryImpl(this);
2532 service2 = service1;
2533 serviceKey2 = serviceKey1;
2535 service1 = serviceLocator.peekService(api);
2546 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2549 public boolean hasService(Class<?> api) {
2550 return serviceLocator.hasService(api);
2554 * @param api the api that must be implemented by the specified service
2555 * @param service the service implementation
2558 public <T> void registerService(Class<T> api, T service) {
2559 if(Layer0.class == api) {
2560 L0 = (Layer0)service;
2563 serviceLocator.registerService(api, service);
2564 if (TransactionPolicySupport.class == api) {
2565 transactionPolicy = (TransactionPolicySupport)service;
2566 state.resetTransactionPolicy();
2568 if (api == serviceKey1)
2570 else if (api == serviceKey2)
2578 void fireSessionVariableChange(String variable) {
2579 for (MonitorHandler h : monitorHandlers) {
2580 MonitorContext ctx = monitorContexts.getLeft(h);
2582 // SafeRunner functionality repeated here to avoid dependency.
2584 h.valuesChanged(ctx);
2585 } catch (Exception e) {
2586 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2587 } catch (LinkageError e) {
2588 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2594 class ResourceSerializerImpl implements ResourceSerializer {
2596 public long createRandomAccessId(int id) throws DatabaseException {
2599 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2600 long cluster = getCluster(id);
2602 return 0; // Better to return 0 then invalid id.
2603 long result = ClusterTraitsBase.createResourceId(cluster, index);
2608 public long getRandomAccessId(Resource resource) throws DatabaseException {
2610 ResourceImpl resourceImpl = (ResourceImpl) resource;
2611 return createRandomAccessId(resourceImpl.id);
2616 public int getTransientId(Resource resource) throws DatabaseException {
2618 ResourceImpl resourceImpl = (ResourceImpl) resource;
2619 return resourceImpl.id;
2624 public String createRandomAccessId(Resource resource)
2625 throws InvalidResourceReferenceException {
2627 if(resource == null) throw new IllegalArgumentException();
2629 ResourceImpl resourceImpl = (ResourceImpl) resource;
2631 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2635 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2636 } catch (DatabaseException e1) {
2637 throw new InvalidResourceReferenceException(e1);
2640 // Serialize as '<resource index>_<cluster id>'
2641 return "" + r + "_" + getCluster(resourceImpl);
2642 } catch (Throwable e) {
2643 e.printStackTrace();
2644 throw new InvalidResourceReferenceException(e);
2649 public int getTransientId(long serialized) throws DatabaseException {
2650 if (serialized <= 0)
2651 return (int)serialized;
2652 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2653 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2654 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2655 if (cluster == null)
2656 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2657 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2662 public Resource getResource(long randomAccessId) throws DatabaseException {
2663 return getResourceByKey(getTransientId(randomAccessId));
2667 public Resource getResource(int transientId) throws DatabaseException {
2668 return getResourceByKey(transientId);
2672 public Resource getResource(String randomAccessId)
2673 throws InvalidResourceReferenceException {
2675 int i = randomAccessId.indexOf('_');
2677 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2678 + randomAccessId + "'");
2679 int r = Integer.parseInt(randomAccessId.substring(0, i));
2680 if(r < 0) return getResourceByKey(r);
2681 long c = Long.parseLong(randomAccessId.substring(i + 1));
2682 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2683 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2684 if (cluster.hasResource(key, clusterTranslator))
2685 return getResourceByKey(key);
2686 } catch (InvalidResourceReferenceException e) {
2688 } catch (NumberFormatException e) {
2689 throw new InvalidResourceReferenceException(e);
2690 } catch (Throwable e) {
2691 e.printStackTrace();
2692 throw new InvalidResourceReferenceException(e);
2695 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2699 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2702 } catch (Throwable e) {
2703 e.printStackTrace();
2704 throw new InvalidResourceReferenceException(e);
2710 // Copied from old SessionImpl
2713 if (state.isClosed())
2714 throw new Error("Session closed.");
2719 public ResourceImpl getNewResource(long clusterId) {
2720 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2723 newId = cluster.createResource(clusterTranslator);
2724 } catch (DatabaseException e) {
2725 Logger.defaultLogError(e);
2728 return new ResourceImpl(resourceSupport, newId);
2731 public ResourceImpl getNewResource(Resource clusterSet)
2732 throws DatabaseException {
2733 long resourceId = clusterSet.getResourceId();
2734 Long clusterId = clusterSetsSupport.get(resourceId);
2735 if (null == clusterId)
2736 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2737 if (Constants.NewClusterId == clusterId) {
2738 clusterId = getService(ClusteringSupport.class).createCluster();
2739 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2740 createdClusters.add(clusterId);
2741 clusterSetsSupport.put(resourceId, clusterId);
2742 return getNewResource(clusterId);
2744 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2745 ResourceImpl result;
2746 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2747 clusterId = getService(ClusteringSupport.class).createCluster();
2748 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2749 createdClusters.add(clusterId);
2750 clusterSetsSupport.put(resourceId, clusterId);
2751 return getNewResource(clusterId);
2753 result = getNewResource(clusterId);
2754 int resultKey = querySupport.getId(result);
2755 long resultCluster = querySupport.getClusterId(resultKey);
2756 if (clusterId != resultCluster)
2757 clusterSetsSupport.put(resourceId, resultCluster);
2763 public void getNewClusterSet(Resource clusterSet)
2764 throws DatabaseException {
2766 System.out.println("new cluster set=" + clusterSet);
2767 long resourceId = clusterSet.getResourceId();
2768 if (clusterSetsSupport.containsKey(resourceId))
2769 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2770 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2772 public boolean containsClusterSet(Resource clusterSet)
2773 throws ServiceException {
2774 long resourceId = clusterSet.getResourceId();
2775 return clusterSetsSupport.containsKey(resourceId);
2777 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2778 Resource r = defaultClusterSet;
2779 defaultClusterSet = clusterSet;
2782 void printDiagnostics() {
2786 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2787 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2788 for(ClusterI cluster : clusterTable.getClusters()) {
2790 if (cluster.isLoaded() && !cluster.isEmpty()) {
2791 residentClusters.set(residentClusters.get() + 1);
2792 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2794 } catch (DatabaseException e) {
2795 Logger.defaultLogError(e);
2799 System.out.println("--------------------------------");
2800 System.out.println("Cluster information:");
2801 System.out.println("-amount of resident clusters=" + residentClusters.get());
2802 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2804 for(ClusterI cluster : clusterTable.getClusters()) {
2805 System.out.print("Cluster " + cluster.getClusterId() + " [");
2807 if (!cluster.isLoaded())
2808 System.out.println("not loaded]");
2809 else if (cluster.isEmpty())
2810 System.out.println("is empty]");
2812 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2813 System.out.print(",references=" + cluster.getReferenceCount());
2814 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2816 } catch (DatabaseException e) {
2817 Logger.defaultLogError(e);
2818 System.out.println("is corrupted]");
2822 queryProvider2.printDiagnostics();
2823 System.out.println("--------------------------------");
2828 public void fireStartReadTransaction() {
2831 System.out.println("StartReadTransaction");
2833 for (SessionEventListener listener : eventListeners) {
2835 listener.readTransactionStarted();
2836 } catch (Throwable t) {
2837 t.printStackTrace();
2843 public void fireFinishReadTransaction() {
2846 System.out.println("FinishReadTransaction");
2848 for (SessionEventListener listener : eventListeners) {
2850 listener.readTransactionFinished();
2851 } catch (Throwable t) {
2852 t.printStackTrace();
2858 public void fireStartWriteTransaction() {
2861 System.out.println("StartWriteTransaction");
2863 for (SessionEventListener listener : eventListeners) {
2865 listener.writeTransactionStarted();
2866 } catch (Throwable t) {
2867 t.printStackTrace();
2873 public void fireFinishWriteTransaction() {
2876 System.out.println("FinishWriteTransaction");
2878 for (SessionEventListener listener : eventListeners) {
2880 listener.writeTransactionFinished();
2881 } catch (Throwable t) {
2882 t.printStackTrace();
2886 Indexing.resetDependenciesIndexingDisabled();
2890 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2891 throw new Error("Not supported at the moment.");
2898 ClusterTable getClusterTable() {
2899 return clusterTable;
2902 public GraphSession getGraphSession() {
2903 return graphSession;
2906 public QueryProcessor getQueryProvider2() {
2907 return queryProvider2;
2910 ClientChangesImpl getClientChanges() {
2911 return clientChanges;
2914 boolean getWriteOnly() {
2918 static int counter = 0;
2920 public void onClusterLoaded(long clusterId) {
2922 clusterTable.updateSize();
2930 * Implementation of the interface RequestProcessor
2934 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2939 assert(request != null);
2941 final DataContainer<T> result = new DataContainer<T>();
2942 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2944 syncRequest(request, new AsyncProcedure<T>() {
2947 public void execute(AsyncReadGraph graph, T t) {
2952 public void exception(AsyncReadGraph graph, Throwable t) {
2958 Throwable t = exception.get();
2960 if(t instanceof DatabaseException) throw (DatabaseException)t;
2961 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2964 return result.get();
2969 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2970 // assertNotSession();
2971 // return syncRequest(request, (AsyncProcedure<T>)procedure);
2975 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2977 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2981 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2983 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2987 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2989 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2993 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2995 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2999 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3003 assert(request != null);
3005 final DataContainer<T> result = new DataContainer<T>();
3006 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3008 syncRequest(request, new AsyncProcedure<T>() {
3011 public void execute(AsyncReadGraph graph, T t) {
3016 public void exception(AsyncReadGraph graph, Throwable t) {
3022 Throwable t = exception.get();
3024 if(t instanceof DatabaseException) throw (DatabaseException)t;
3025 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3028 return result.get();
3033 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3035 return syncRequest(request, (AsyncProcedure<T>)procedure);
3039 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3041 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3045 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3047 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3051 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3053 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3057 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3059 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3063 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3067 assert(request != null);
3069 final ArrayList<T> result = new ArrayList<T>();
3070 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3072 syncRequest(request, new AsyncMultiProcedure<T>() {
3075 public void execute(AsyncReadGraph graph, T t) {
3076 synchronized(result) {
3082 public void finished(AsyncReadGraph graph) {
3086 public void exception(AsyncReadGraph graph, Throwable t) {
3093 Throwable t = exception.get();
3095 if(t instanceof DatabaseException) throw (DatabaseException)t;
3096 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3104 public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3106 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3110 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3112 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3116 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3118 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3122 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3124 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3128 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3130 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3134 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3138 assert(request != null);
3140 final ArrayList<T> result = new ArrayList<T>();
3141 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3143 syncRequest(request, new AsyncMultiProcedure<T>() {
3146 public void execute(AsyncReadGraph graph, T t) {
3147 synchronized(result) {
3153 public void finished(AsyncReadGraph graph) {
3157 public void exception(AsyncReadGraph graph, Throwable t) {
3164 Throwable t = exception.get();
3166 if(t instanceof DatabaseException) throw (DatabaseException)t;
3167 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3175 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3177 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3181 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3183 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3187 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3189 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3193 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3195 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3199 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3201 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3206 public <T> void asyncRequest(Read<T> request) {
3208 asyncRequest(request, new ProcedureAdapter<T>() {
3210 public void exception(Throwable t) {
3211 t.printStackTrace();
3218 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3219 asyncRequest(request, (AsyncProcedure<T>)procedure);
3223 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3224 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3228 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3229 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3233 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3234 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3238 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3239 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3243 final public <T> void asyncRequest(final AsyncRead<T> request) {
3245 assert(request != null);
3247 asyncRequest(request, new ProcedureAdapter<T>() {
3249 public void exception(Throwable t) {
3250 t.printStackTrace();
3257 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3258 scheduleRequest(request, procedure, procedure, null);
3262 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3263 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3267 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3268 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3272 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3273 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3277 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3278 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3282 public <T> void asyncRequest(MultiRead<T> request) {
3284 assert(request != null);
3286 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3288 public void exception(AsyncReadGraph graph, Throwable t) {
3289 t.printStackTrace();
3296 public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3297 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3301 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3302 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3306 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3307 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3311 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3312 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3316 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3317 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3321 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3323 assert(request != null);
3325 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3327 public void exception(AsyncReadGraph graph, Throwable t) {
3328 t.printStackTrace();
3335 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3336 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3340 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3341 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3345 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3346 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3350 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3351 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3355 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3356 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3360 public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3362 throw new Error("Not implemented!");
3366 public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3367 throw new Error("Not implemented!");
3371 final public <T> void asyncRequest(final ExternalRead<T> request) {
3373 assert(request != null);
3375 asyncRequest(request, new ProcedureAdapter<T>() {
3377 public void exception(Throwable t) {
3378 t.printStackTrace();
3385 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3386 asyncRequest(request, (Procedure<T>)procedure);
3391 void check(Throwable t) throws DatabaseException {
3393 if(t instanceof DatabaseException) throw (DatabaseException)t;
3394 else throw new DatabaseException("Unexpected exception", t);
3398 void check(DataContainer<Throwable> container) throws DatabaseException {
3399 Throwable t = container.get();
3401 if(t instanceof DatabaseException) throw (DatabaseException)t;
3402 else throw new DatabaseException("Unexpected exception", t);
3411 boolean sameProvider(Write request) {
3412 if(writeState.getGraph().provider != null) {
3413 return writeState.getGraph().provider.equals(request.getProvider());
3415 return request.getProvider() == null;
3419 boolean plainWrite(WriteGraphImpl graph) {
3420 if(graph == null) return false;
3421 if(graph.writeSupport.writeOnly()) return false;
3426 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3427 private void assertNotSession() throws DatabaseException {
3428 Thread current = Thread.currentThread();
3429 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3432 void assertAlive() {
3433 if (!state.isAlive())
3434 throw new RuntimeDatabaseException("Session has been shut down.");
3437 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3438 return querySupport.getValueStream(graph, querySupport.getId(resource));
3441 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3442 return querySupport.getValue(graph, querySupport.getId(resource));
3445 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3446 acquire(semaphore, request, null);
3449 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3453 // Loop until the semaphore is acquired reporting requests that take a long time.
3456 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3460 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3462 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3463 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3465 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3466 + (waitTime) + " ms. (procedure=" + procedure + ")");
3473 } catch (InterruptedException e) {
3474 e.printStackTrace();
3475 // FIXME: Should perhaps do something else in this case ??
3482 // public boolean holdOnToTransactionAfterCancel() {
3483 // return transactionPolicy.holdOnToTransactionAfterCancel();
3487 // public boolean holdOnToTransactionAfterCommit() {
3488 // return transactionPolicy.holdOnToTransactionAfterCommit();
3492 // public boolean holdOnToTransactionAfterRead() {
3493 // return transactionPolicy.holdOnToTransactionAfterRead();
3497 // public void onRelinquish() {
3498 // transactionPolicy.onRelinquish();
3502 // public void onRelinquishDone() {
3503 // transactionPolicy.onRelinquishDone();
3507 // public void onRelinquishError() {
3508 // transactionPolicy.onRelinquishError();
3513 public Session getSession() {
3518 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3519 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3521 public void ceased(int thread) {
3523 requestManager.ceased(thread);
3527 public int getAmountOfQueryThreads() {
3528 // This must be a power of two
3530 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3533 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3535 return new ResourceImpl(resourceSupport, key);
3539 public void acquireWriteOnly() {
3543 public void releaseWriteOnly(ReadGraphImpl graph) {
3545 queryProvider2.releaseWrite(graph);
3548 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3550 long start = System.nanoTime();
3552 while(dirtyPrimitives) {
3553 dirtyPrimitives = false;
3554 getQueryProvider2().performDirtyUpdates(writer);
3555 getQueryProvider2().performScheduledUpdates(writer);
3558 fireMetadataListeners(writer, clientChanges);
3560 if(DebugPolicy.PERFORMANCE_DATA) {
3561 long end = System.nanoTime();
3562 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3567 void removeTemporaryData() {
3568 File platform = Platform.getLocation().toFile();
3569 File tempFiles = new File(platform, "tempFiles");
3570 File temp = new File(tempFiles, "db");
3574 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3576 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3579 } catch (IOException e) {
3580 Logger.defaultLogError(e);
3582 return FileVisitResult.CONTINUE;
3585 } catch (IOException e) {
3586 Logger.defaultLogError(e);
3590 public void handleCreatedClusters() {
3591 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3592 createdClusters.forEach(new TLongProcedure() {
3595 public boolean execute(long value) {
3596 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3597 cluster.setImmutable(true, clusterTranslator);
3602 createdClusters.clear();
3606 public Object getModificationCounter() {
3607 return queryProvider2.modificationCounter;
3611 public void markUndoPoint() {
3612 state.setCombine(false);