1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.GraphChangeListenerSupport;
147 import org.simantics.db.service.InitSupport;
148 import org.simantics.db.service.LifecycleSupport;
149 import org.simantics.db.service.ManagementSupport;
150 import org.simantics.db.service.QueryControl;
151 import org.simantics.db.service.SerialisationSupport;
152 import org.simantics.db.service.ServerInformation;
153 import org.simantics.db.service.ServiceActivityMonitor;
154 import org.simantics.db.service.SessionEventSupport;
155 import org.simantics.db.service.SessionMonitorSupport;
156 import org.simantics.db.service.SessionUserSupport;
157 import org.simantics.db.service.StatementSupport;
158 import org.simantics.db.service.TransactionPolicySupport;
159 import org.simantics.db.service.TransactionSupport;
160 import org.simantics.db.service.TransferableGraphSupport;
161 import org.simantics.db.service.UndoRedoSupport;
162 import org.simantics.db.service.VirtualGraphSupport;
163 import org.simantics.db.service.XSupport;
164 import org.simantics.layer0.Layer0;
165 import org.simantics.utils.DataContainer;
166 import org.simantics.utils.Development;
167 import org.simantics.utils.threads.logger.ITask;
168 import org.simantics.utils.threads.logger.ThreadLogger;
170 import gnu.trove.procedure.TLongProcedure;
171 import gnu.trove.set.hash.TLongHashSet;
174 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
176 protected static final boolean DEBUG = false;
178 private static final boolean DIAGNOSTICS = false;
180 private TransactionPolicySupport transactionPolicy;
182 final private ClusterControl clusterControl;
184 final protected BuiltinSupportImpl builtinSupport;
185 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
186 final protected ClusterSetsSupport clusterSetsSupport;
187 final protected LifecycleSupportImpl lifecycleSupport;
189 protected QuerySupportImpl querySupport;
190 protected ResourceSupportImpl resourceSupport;
191 protected WriteSupport writeSupport;
192 public ClusterTranslator clusterTranslator;
194 boolean dirtyPrimitives = false;
196 public static final int SERVICE_MODE_CREATE = 2;
197 public static final int SERVICE_MODE_ALLOW = 1;
198 public int serviceMode = 0;
199 public boolean createdImmutableClusters = false;
200 public TLongHashSet createdClusters = new TLongHashSet();
205 * The service locator maintained by the workbench. These services are
206 * initialized during workbench during the <code>init</code> method.
208 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
210 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
212 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
214 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
216 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
218 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
220 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
222 final protected State state = new State();
224 protected GraphSession graphSession = null;
226 protected SessionManager sessionManagerImpl = null;
228 protected UserAuthenticationAgent authAgent = null;
230 protected UserAuthenticator authenticator = null;
232 protected Resource user = null;
234 protected ClusterStream clusterStream = null;
236 protected SessionRequestManager requestManager = null;
238 public ClusterTable clusterTable = null;
240 public QueryProcessor queryProvider2 = null;
242 ClientChangesImpl clientChanges = null;
244 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
246 // protected long newClusterId = Constants.NullClusterId;
248 protected int flushCounter = 0;
250 protected boolean writeOnly = false;
252 WriteState<?> writeState = null;
253 WriteStateBase<?> delayedWriteState = null;
254 protected Resource defaultClusterSet = null; // If not null then used for newResource().
256 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
258 // if (authAgent == null)
259 // throw new IllegalArgumentException("null authentication agent");
261 File t = StaticSessionProperties.virtualGraphStoragePath;
264 this.clusterTable = new ClusterTable(this, t);
265 this.builtinSupport = new BuiltinSupportImpl(this);
266 this.sessionManagerImpl = sessionManagerImpl;
268 // this.authAgent = authAgent;
270 serviceLocator.registerService(Session.class, this);
271 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
272 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
273 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
274 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
275 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
276 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
277 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
278 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
279 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
280 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
281 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
282 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
283 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
284 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
285 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
286 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
287 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
288 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
289 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
290 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
291 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
292 ServiceActivityUpdaterForWriteTransactions.register(this);
294 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
295 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
296 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
297 this.lifecycleSupport = new LifecycleSupportImpl(this);
298 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
299 this.transactionPolicy = new TransactionPolicyRelease();
300 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
301 this.clusterControl = new ClusterControlImpl(this);
302 serviceLocator.registerService(ClusterControl.class, clusterControl);
303 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
304 this.clusterSetsSupport.setReadDirectory(t.toPath());
305 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
306 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
316 public Resource getRootLibrary() {
317 return queryProvider2.getRootLibraryResource();
320 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
321 if (!graphSession.dbSession.refreshEnabled())
324 getClusterTable().refresh(csid, this, clusterUID);
325 } catch (Throwable t) {
326 Logger.defaultLogError("Refesh failed.", t);
330 static final class TaskHelper {
331 private final String name;
332 private Object result;
333 final Semaphore sema = new Semaphore(0);
334 private Throwable throwable = null;
335 final Consumer<DatabaseException> callback = e -> {
336 synchronized (TaskHelper.this) {
340 final Procedure<Object> proc = new Procedure<Object>() {
342 public void execute(Object result) {
343 callback.accept(null);
346 public void exception(Throwable t) {
347 if (t instanceof DatabaseException)
348 callback.accept((DatabaseException)t);
350 callback.accept(new DatabaseException("" + name + "operation failed.", t));
353 final WriteTraits writeTraits = new WriteTraits() {};
354 TaskHelper(String name) {
357 @SuppressWarnings("unchecked")
361 void setResult(Object result) {
362 this.result = result;
364 // Throwable throwableGet() {
367 synchronized void throwableSet(Throwable t) {
370 // void throwableSet(String t) {
371 // throwable = new InternalException("" + name + " operation failed. " + t);
373 synchronized void throwableCheck()
374 throws DatabaseException {
375 if (null != throwable)
376 if (throwable instanceof DatabaseException)
377 throw (DatabaseException)throwable;
379 throw new DatabaseException("Undo operation failed.", throwable);
381 void throw_(String message)
382 throws DatabaseException {
383 throw new DatabaseException("" + name + " operation failed. " + message);
389 private ListenerBase getListenerBase(Object procedure) {
390 if (procedure instanceof ListenerBase)
391 return (ListenerBase) procedure;
396 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
397 scheduleRequest(request, callback, notify, null);
401 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
404 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
406 assert (request != null);
408 if(Development.DEVELOPMENT) {
410 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
411 System.err.println("schedule write '" + request + "'");
412 } catch (Throwable t) {
413 Logger.defaultLogError(t);
417 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
419 requestManager.scheduleWrite(new SessionTask(true) {
422 public void run(int thread) {
424 if(Development.DEVELOPMENT) {
426 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
427 System.err.println("perform write '" + request + "'");
428 } catch (Throwable t) {
429 Logger.defaultLogError(t);
433 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
435 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
440 Disposable.safeDispose(clientChanges);
441 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
443 VirtualGraph vg = getProvider(request.getProvider());
445 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
446 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
449 public void execute(Object result) {
450 if(callback != null) callback.accept(null);
454 public void exception(Throwable t) {
455 if(callback != null) callback.accept((DatabaseException)t);
460 assert (null != writer);
461 // writer.state.barrier.inc();
464 request.perform(writer);
465 assert (null != writer);
466 } catch (Throwable t) {
467 if (!(t instanceof CancelTransactionException))
468 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
469 writeState.except(t);
471 // writer.state.barrier.dec();
472 // writer.waitAsync(request);
476 assert(!queryProvider2.cache.dirty);
478 } catch (Throwable e) {
480 // Log it first, just to be safe that the error is always logged.
481 if (!(e instanceof CancelTransactionException))
482 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
484 // writeState.getGraph().state.barrier.dec();
485 // writeState.getGraph().waitAsync(request);
487 writeState.except(e);
490 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
491 // // All we can do here is to log those, can't really pass them anywhere.
492 // if (callback != null) {
493 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
494 // else callback.run(new DatabaseException(e));
496 // } catch (Throwable e2) {
497 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
499 // boolean empty = clusterStream.reallyFlush();
500 // int callerThread = -1;
501 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
502 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
504 // // Send all local changes to server so it can calculate correct reverse change set.
505 // // This will call clusterStream.accept().
506 // state.cancelCommit(context, clusterStream);
508 // if (!context.isOk()) // this is a blocking operation
509 // throw new InternalException("Cancel failed. This should never happen. Contact application support.");
510 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
512 // state.cancelCommit2(context, clusterStream);
513 // } catch (DatabaseException e3) {
514 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
516 // clusterStream.setOff(false);
517 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
522 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
528 if(Development.DEVELOPMENT) {
530 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
531 System.err.println("finish write '" + request + "'");
532 } catch (Throwable t) {
533 Logger.defaultLogError(t);
543 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
544 scheduleRequest(request, procedure, notify, null);
548 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
551 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
553 assert (request != null);
555 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
557 requestManager.scheduleWrite(new SessionTask(true) {
560 public void run(int thread) {
562 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
564 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
567 Disposable.safeDispose(clientChanges);
568 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
570 VirtualGraph vg = getProvider(request.getProvider());
571 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
574 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
575 writeState = writeStateT;
577 assert (null != writer);
578 // writer.state.barrier.inc();
579 writeStateT.setResult(request.perform(writer));
580 assert (null != writer);
582 // writer.state.barrier.dec();
583 // writer.waitAsync(null);
585 } catch (Throwable e) {
587 // writer.state.barrier.dec();
588 // writer.waitAsync(null);
590 writeState.except(e);
592 // state.stopWriteTransaction(clusterStream);
594 // } catch (Throwable e) {
595 // // Log it first, just to be safe that the error is always logged.
596 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
599 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
600 // // All we can do here is to log those, can't really pass them anywhere.
601 // if (procedure != null) {
602 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
603 // else procedure.exception(new DatabaseException(e));
605 // } catch (Throwable e2) {
606 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
609 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
611 // state.stopWriteTransaction(clusterStream);
614 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
617 // if(notify != null) notify.release();
627 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
628 scheduleRequest(request, callback, notify, null);
632 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
635 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
637 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
639 assert (request != null);
641 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
643 requestManager.scheduleWrite(new SessionTask(true) {
646 public void run(int thread) {
647 fireSessionVariableChange(SessionVariables.QUEUED_READS);
649 Procedure<Object> stateProcedure = new Procedure<Object>() {
651 public void execute(Object result) {
652 if (callback != null)
653 callback.accept(null);
656 public void exception(Throwable t) {
657 if (callback != null) {
658 if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
659 else callback.accept(new DatabaseException(t));
661 Logger.defaultLogError("Unhandled exception", t);
665 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
666 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
667 DelayedWriteGraph dwg = null;
668 // newGraph.state.barrier.inc();
671 dwg = new DelayedWriteGraph(newGraph);
672 request.perform(dwg);
673 } catch (Throwable e) {
674 delayedWriteState.except(e);
679 // newGraph.state.barrier.dec();
680 // newGraph.waitAsync(request);
681 fireSessionVariableChange(SessionVariables.QUEUED_READS);
684 delayedWriteState = null;
686 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
687 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
690 Disposable.safeDispose(clientChanges);
691 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
695 VirtualGraph vg = getProvider(request.getProvider());
696 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
698 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
700 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
702 assert (null != writer);
703 // writer.state.barrier.inc();
707 dwg.commit(writer, request);
709 if(defaultClusterSet != null) {
710 XSupport xs = getService(XSupport.class);
711 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
712 ClusteringSupport cs = getService(ClusteringSupport.class);
713 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
716 // This makes clusters available from server
717 clusterStream.reallyFlush();
719 releaseWriteOnly(writer);
720 handleUpdatesAndMetadata(writer);
722 } catch (ServiceException e) {
723 // writer.state.barrier.dec();
724 // writer.waitAsync(null);
726 // These shall be requested from server
727 clusterTable.removeWriteOnlyClusters();
728 // This makes clusters available from server
729 clusterStream.reallyFlush();
731 releaseWriteOnly(writer);
732 writeState.except(e);
734 // Debugging & Profiling
735 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
745 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
746 scheduleRequest(request, procedure, notify, null);
750 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
753 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
754 throw new Error("Not implemented");
757 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
758 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
759 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
760 createdClusters.add(cluster.clusterId);
765 class WriteOnlySupport implements WriteSupport {
767 ClusterStream stream;
768 ClusterImpl currentCluster;
770 public WriteOnlySupport() {
771 this.stream = clusterStream;
775 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
776 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
780 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
782 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
784 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
785 if(s != queryProvider2.getRootLibrary())
786 throw new ImmutableException("Trying to modify immutable resource key=" + s);
789 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
790 } catch (DatabaseException e) {
791 Logger.defaultLogError(e);
795 clientChanges.invalidate(s);
797 if (cluster.isWriteOnly())
799 queryProvider2.updateStatements(s, p);
804 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
805 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
809 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
811 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
813 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
814 } catch (DatabaseException e) {
815 Logger.defaultLogError(e);
818 clientChanges.invalidate(rid);
820 if (cluster.isWriteOnly())
822 queryProvider2.updateValue(rid);
827 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
830 claimValue(provider,resource, reader.readBytes(null, amount));
834 byte[] bytes = new byte[65536];
836 int rid = ((ResourceImpl)resource).id;
837 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
841 int block = Math.min(left, 65536);
842 reader.readBytes(bytes, block);
843 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
846 } catch (DatabaseException e) {
847 Logger.defaultLogError(e);
850 clientChanges.invalidate(rid);
852 if (cluster.isWriteOnly())
854 queryProvider2.updateValue(rid);
858 private void maintainCluster(ClusterImpl before, ClusterI after_) {
859 if(after_ != null && after_ != before) {
860 ClusterImpl after = (ClusterImpl)after_;
861 if(currentCluster == before) {
862 currentCluster = after;
864 clusterTable.replaceCluster(after);
868 public int createResourceKey(int foreignCounter) throws DatabaseException {
869 if(currentCluster == null) {
870 currentCluster = getNewResourceCluster();
872 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
873 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
874 newCluster.foreignLookup = new byte[foreignCounter];
875 currentCluster = newCluster;
877 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
879 return currentCluster.createResource(clusterTranslator);
883 public Resource createResource(VirtualGraph provider) throws DatabaseException {
884 if(currentCluster == null) {
885 if (null != defaultClusterSet) {
886 ResourceImpl result = getNewResource(defaultClusterSet);
887 currentCluster = clusterTable.getClusterByResourceKey(result.id);
890 currentCluster = getNewResourceCluster();
893 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
894 if (null != defaultClusterSet) {
895 ResourceImpl result = getNewResource(defaultClusterSet);
896 currentCluster = clusterTable.getClusterByResourceKey(result.id);
899 currentCluster = getNewResourceCluster();
902 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
906 public Resource createResource(VirtualGraph provider, long clusterId)
907 throws DatabaseException {
908 return getNewResource(clusterId);
912 public Resource createResource(VirtualGraph provider, Resource clusterSet)
913 throws DatabaseException {
914 return getNewResource(clusterSet);
918 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
919 throws DatabaseException {
920 getNewClusterSet(clusterSet);
924 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
925 throws ServiceException {
926 return containsClusterSet(clusterSet);
929 public void selectCluster(long cluster) {
930 currentCluster = clusterTable.getClusterByClusterId(cluster);
931 long setResourceId = clusterSetsSupport.getSet(cluster);
932 clusterSetsSupport.put(setResourceId, cluster);
936 public Resource setDefaultClusterSet(Resource clusterSet)
937 throws ServiceException {
938 Resource result = setDefaultClusterSet4NewResource(clusterSet);
939 if(clusterSet != null) {
940 long id = clusterSetsSupport.get(clusterSet.getResourceId());
941 currentCluster = clusterTable.getClusterByClusterId(id);
944 currentCluster = null;
950 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
952 provider = getProvider(provider);
953 if (null == provider) {
954 int key = ((ResourceImpl)resource).id;
958 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
959 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
960 // if(key != queryProvider2.getRootLibrary())
961 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
964 // cluster.removeValue(key, clusterTranslator);
965 // } catch (DatabaseException e) {
966 // Logger.defaultLogError(e);
970 clusterTable.writeOnlyInvalidate(cluster);
973 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
974 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
975 clusterTranslator.removeValue(cluster);
976 } catch (DatabaseException e) {
977 Logger.defaultLogError(e);
980 queryProvider2.invalidateResource(key);
981 clientChanges.invalidate(key);
984 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
985 queryProvider2.updateValue(querySupport.getId(resource));
986 clientChanges.claimValue(resource);
993 public void flush(boolean intermediate) {
994 throw new UnsupportedOperationException();
998 public void flushCluster() {
999 clusterTable.flushCluster(graphSession);
1000 if(defaultClusterSet != null) {
1001 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
1003 currentCluster = null;
1007 public void flushCluster(Resource r) {
1008 throw new UnsupportedOperationException("flushCluster resource " + r);
1016 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1019 int s = ((ResourceImpl)subject).id;
1020 int p = ((ResourceImpl)predicate).id;
1021 int o = ((ResourceImpl)object).id;
1023 provider = getProvider(provider);
1024 if (null == provider) {
1026 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1027 clusterTable.writeOnlyInvalidate(cluster);
1031 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1033 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1034 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1036 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1037 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1038 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1039 clusterTranslator.removeStatement(cluster);
1041 queryProvider2.invalidateResource(s);
1042 clientChanges.invalidate(s);
1044 } catch (DatabaseException e) {
1046 Logger.defaultLogError(e);
1054 ((VirtualGraphImpl)provider).deny(s, p, o);
1055 queryProvider2.invalidateResource(s);
1056 clientChanges.invalidate(s);
1065 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1066 throw new UnsupportedOperationException();
1070 public boolean writeOnly() {
1075 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1076 writeSupport.performWriteRequest(graph, request);
1080 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1081 throw new UnsupportedOperationException();
1085 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1086 throw new UnsupportedOperationException();
1090 public <T> void addMetadata(Metadata data) throws ServiceException {
1091 writeSupport.addMetadata(data);
1095 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1096 return writeSupport.getMetadata(clazz);
1100 public TreeMap<String, byte[]> getMetadata() {
1101 return writeSupport.getMetadata();
1105 public void commitDone(WriteTraits writeTraits, long csid) {
1106 writeSupport.commitDone(writeTraits, csid);
1110 public void clearUndoList(WriteTraits writeTraits) {
1111 writeSupport.clearUndoList(writeTraits);
1114 public int clearMetadata() {
1115 return writeSupport.clearMetadata();
1119 public void startUndo() {
1120 writeSupport.startUndo();
1124 class VirtualWriteOnlySupport implements WriteSupport {
1127 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1128 // Resource object) {
1129 // throw new UnsupportedOperationException();
1133 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1135 TransientGraph impl = (TransientGraph)provider;
1136 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1137 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1138 clientChanges.claim(subject, predicate, object);
1143 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1145 TransientGraph impl = (TransientGraph)provider;
1146 impl.claim(subject, predicate, object);
1147 getQueryProvider2().updateStatements(subject, predicate);
1148 clientChanges.claim(subject, predicate, object);
1153 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1154 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1158 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1159 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1160 getQueryProvider2().updateValue(resource);
1161 clientChanges.claimValue(resource);
1165 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1166 byte[] value = reader.readBytes(null, amount);
1167 claimValue(provider, resource, value);
1171 public Resource createResource(VirtualGraph provider) {
1172 TransientGraph impl = (TransientGraph)provider;
1173 return impl.getResource(impl.newResource(false));
1177 public Resource createResource(VirtualGraph provider, long clusterId) {
1178 throw new UnsupportedOperationException();
1182 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1183 throws DatabaseException {
1184 throw new UnsupportedOperationException();
1188 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1189 throws DatabaseException {
1190 throw new UnsupportedOperationException();
1194 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1195 throws ServiceException {
1196 throw new UnsupportedOperationException();
1200 public Resource setDefaultClusterSet(Resource clusterSet)
1201 throws ServiceException {
1206 public void denyValue(VirtualGraph provider, Resource resource) {
1207 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1208 getQueryProvider2().updateValue(querySupport.getId(resource));
1209 // NOTE: this only keeps track of value changes by-resource.
1210 clientChanges.claimValue(resource);
1214 public void flush(boolean intermediate) {
1215 throw new UnsupportedOperationException();
1219 public void flushCluster() {
1220 throw new UnsupportedOperationException();
1224 public void flushCluster(Resource r) {
1225 throw new UnsupportedOperationException("Resource " + r);
1233 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1234 TransientGraph impl = (TransientGraph) provider;
1235 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1236 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1237 clientChanges.deny(subject, predicate, object);
1242 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1243 throw new UnsupportedOperationException();
1247 public boolean writeOnly() {
1252 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1253 throw new UnsupportedOperationException();
1257 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1258 throw new UnsupportedOperationException();
1262 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1263 throw new UnsupportedOperationException();
1267 public <T> void addMetadata(Metadata data) throws ServiceException {
1268 throw new UnsupportedOperationException();
1272 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1273 throw new UnsupportedOperationException();
1277 public TreeMap<String, byte[]> getMetadata() {
1278 throw new UnsupportedOperationException();
1282 public void commitDone(WriteTraits writeTraits, long csid) {
1286 public void clearUndoList(WriteTraits writeTraits) {
1290 public int clearMetadata() {
1295 public void startUndo() {
1299 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1303 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1305 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1308 Disposable.safeDispose(clientChanges);
1309 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1313 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1315 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1317 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1318 writeState = writeStateT;
1320 assert (null != writer);
1321 // writer.state.barrier.inc();
1322 long start = System.nanoTime();
1323 T result = request.perform(writer);
1324 long duration = System.nanoTime() - start;
1326 System.err.println("################");
1327 System.err.println("WriteOnly duration " + 1e-9*duration);
1329 writeStateT.setResult(result);
1331 // This makes clusters available from server
1332 clusterStream.reallyFlush();
1334 // This will trigger query updates
1335 releaseWriteOnly(writer);
1336 assert (null != writer);
1338 handleUpdatesAndMetadata(writer);
1340 } catch (CancelTransactionException e) {
1342 releaseWriteOnly(writeState.getGraph());
1344 clusterTable.removeWriteOnlyClusters();
1345 state.stopWriteTransaction(clusterStream);
1347 } catch (Throwable e) {
1349 e.printStackTrace();
1351 releaseWriteOnly(writeState.getGraph());
1353 clusterTable.removeWriteOnlyClusters();
1355 if (callback != null)
1356 callback.exception(new DatabaseException(e));
1358 state.stopWriteTransaction(clusterStream);
1359 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1363 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1369 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1370 scheduleRequest(request, callback, notify, null);
1374 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1378 assert (request != null);
1380 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1382 requestManager.scheduleWrite(new SessionTask(true) {
1385 public void run(int thread) {
1387 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1391 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1394 Disposable.safeDispose(clientChanges);
1395 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1399 VirtualGraph vg = getProvider(request.getProvider());
1400 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1402 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1404 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1407 public void execute(Object result) {
1408 if(callback != null) callback.accept(null);
1412 public void exception(Throwable t) {
1413 if(callback != null) callback.accept((DatabaseException)t);
1418 assert (null != writer);
1419 // writer.state.barrier.inc();
1423 request.perform(writer);
1425 } catch (Throwable e) {
1427 // writer.state.barrier.dec();
1428 // writer.waitAsync(null);
1430 releaseWriteOnly(writer);
1432 clusterTable.removeWriteOnlyClusters();
1434 if(!(e instanceof CancelTransactionException)) {
1435 if (callback != null)
1436 callback.accept(new DatabaseException(e));
1439 writeState.except(e);
1445 // This makes clusters available from server
1446 boolean empty = clusterStream.reallyFlush();
1447 // This was needed to make WO requests call metadata listeners.
1448 // NOTE: the calling event does not contain clientChanges information.
1449 if (!empty && clientChanges.isEmpty())
1450 clientChanges.setNotEmpty(true);
1451 releaseWriteOnly(writer);
1452 assert (null != writer);
1455 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1468 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1469 scheduleRequest(request, callback, notify, null);
1473 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1476 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1478 assert (request != null);
1480 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1482 requestManager.scheduleWrite(new SessionTask(true) {
1485 public void run(int thread) {
1487 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1489 performWriteOnly(request, notify, callback);
1499 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1501 assert (request != null);
1502 assert (procedure != null);
1504 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1506 requestManager.scheduleRead(new SessionRead(throwable, notify) {
1509 public void run(int thread) {
1511 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1513 ListenerBase listener = getListenerBase(procedure);
1515 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1519 if (listener != null) {
1523 AsyncProcedure ap = new AsyncProcedure<T>() {
1526 public void exception(AsyncReadGraph graph, Throwable t) {
1527 procedure.exception(graph, t);
1528 if(throwable != null) {
1531 // ErrorLogger.defaultLogError("Unhandled exception", t);
1536 public void execute(AsyncReadGraph graph, T t) {
1537 if(result != null) result.set(t);
1538 procedure.execute(graph, t);
1543 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1545 } catch (Throwable t) {
1546 // This is handled by the AsyncProcedure
1547 //Logger.defaultLogError("Internal error", t);
1554 // newGraph.state.barrier.inc();
1556 T t = request.perform(newGraph);
1560 if(result != null) result.set(t);
1561 procedure.execute(newGraph, t);
1563 } catch (Throwable th) {
1565 if(throwable != null) {
1568 Logger.defaultLogError("Unhandled exception", th);
1573 } catch (Throwable t) {
1576 t.printStackTrace();
1578 if(throwable != null) {
1581 Logger.defaultLogError("Unhandled exception", t);
1586 procedure.exception(newGraph, t);
1588 } catch (Throwable t2) {
1590 if(throwable != null) {
1593 Logger.defaultLogError("Unhandled exception", t2);
1600 // newGraph.state.barrier.dec();
1601 // newGraph.waitAsync(request);
1607 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1617 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1619 assert (request != null);
1620 assert (procedure != null);
1622 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1624 requestManager.scheduleRead(new SessionRead(null, notify) {
1627 public void run(int thread) {
1629 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1631 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1635 if (listener != null) {
1638 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1639 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1640 //newGraph.processor.query(newGraph, request, null, procedure, listener);
1641 } catch (DatabaseException e) {
1642 Logger.defaultLogError(e);
1647 // final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1648 // procedure, "request");
1650 BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
1654 request.perform(newGraph, wrap);
1656 <<<<<<< Upstream, based on branch 'private/antti_threads' of ssh://villberg@gerrit.simantics.org:29418/simantics/platform.git
1659 } catch (DatabaseException e) {
1661 Logger.defaultLogError(e);
1663 // wrap.exception(newGraph, t);
1664 // wrapper.exception(newGraph, t);
1665 // newGraph.waitAsync(request);
1666 >>>>>>> 82fa68e Generate parts of db client query code
1668 <<<<<<< Upstream, based on branch 'private/antti_threads' of ssh://villberg@gerrit.simantics.org:29418/simantics/platform.git
1669 } catch (Throwable t) {
1671 wrap.exception(newGraph, t);
1673 // wrapper.exception(newGraph, t);
1674 // newGraph.waitAsync(request);
1678 >>>>>>> 82fa68e Generate parts of db client query code
1685 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1695 public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1697 assert (request != null);
1698 assert (procedure != null);
1700 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1702 int sync = notify != null ? thread : -1;
1704 requestManager.scheduleRead(new SessionRead(null, notify) {
1707 public void run(int thread) {
1709 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1711 ListenerBase listener = getListenerBase(procedure);
1713 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1717 if (listener != null) {
1719 newGraph.processor.query(newGraph, request, null, procedure, listener);
1721 // newGraph.waitAsync(request);
1725 final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1729 request.perform(newGraph, wrapper);
1731 } catch (Throwable t) {
1733 t.printStackTrace();
1741 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1751 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1753 assert (request != null);
1754 assert (procedure != null);
1756 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1758 int sync = notify != null ? thread : -1;
1760 requestManager.scheduleRead(new SessionRead(null, notify) {
1763 public void run(int thread) {
1765 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1767 ListenerBase listener = getListenerBase(procedure);
1769 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1773 if (listener != null) {
1775 newGraph.processor.query(newGraph, request, null, procedure, listener);
1777 // newGraph.waitAsync(request);
1781 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1785 request.perform(newGraph, wrapper);
1787 } catch (Throwable t) {
1789 t.printStackTrace();
1797 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1807 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1809 assert (request != null);
1810 assert (procedure != null);
1812 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1814 requestManager.scheduleRead(new SessionRead(throwable, notify) {
1817 public void run(int thread) {
1819 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1821 ListenerBase listener = getListenerBase(procedure);
1823 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1827 if (listener != null) {
1830 QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1831 } catch (DatabaseException e) {
1832 Logger.defaultLogError(e);
1836 // newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1839 // public void exception(Throwable t) {
1840 // procedure.exception(t);
1841 // if(throwable != null) {
1842 // throwable.set(t);
1847 // public void execute(T t) {
1848 // if(result != null) result.set(t);
1849 // procedure.execute(t);
1854 // newGraph.waitAsync(request);
1858 // newGraph.state.barrier.inc();
1860 request.register(newGraph, new Listener<T>() {
1863 public void exception(Throwable t) {
1864 if(throwable != null) throwable.set(t);
1865 procedure.exception(t);
1866 // newGraph.state.barrier.dec();
1870 public void execute(T t) {
1871 if(result != null) result.set(t);
1872 procedure.execute(t);
1873 // newGraph.state.barrier.dec();
1877 public boolean isDisposed() {
1883 // newGraph.waitAsync(request);
1889 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1901 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1903 scheduleRequest(request, procedure, null, null, null);
1908 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1910 Semaphore notify = new Semaphore(0);
1911 DataContainer<Throwable> container = new DataContainer<Throwable>();
1912 DataContainer<T> result = new DataContainer<T>();
1913 scheduleRequest(request, procedure, notify, container, result);
1914 acquire(notify, request);
1915 Throwable throwable = container.get();
1916 if(throwable != null) {
1917 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1918 else throw new DatabaseException("Unexpected exception", throwable);
1920 return result.get();
1924 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1926 Semaphore notify = new Semaphore(0);
1927 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1928 final DataContainer<T> resultContainer = new DataContainer<T>();
1929 scheduleRequest(request, new AsyncProcedure<T>() {
1931 public void exception(AsyncReadGraph graph, Throwable throwable) {
1932 exceptionContainer.set(throwable);
1933 procedure.exception(graph, throwable);
1936 public void execute(AsyncReadGraph graph, T result) {
1937 resultContainer.set(result);
1938 procedure.execute(graph, result);
1940 }, getListenerBase(procedure), notify);
1941 acquire(notify, request, procedure);
1942 Throwable throwable = exceptionContainer.get();
1943 if (throwable != null) {
1944 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1945 else throw new DatabaseException("Unexpected exception", throwable);
1947 return resultContainer.get();
1952 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1954 Semaphore notify = new Semaphore(0);
1955 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1956 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1958 public void exception(AsyncReadGraph graph, Throwable throwable) {
1959 exceptionContainer.set(throwable);
1960 procedure.exception(graph, throwable);
1963 public void execute(AsyncReadGraph graph, T result) {
1964 procedure.execute(graph, result);
1967 public void finished(AsyncReadGraph graph) {
1968 procedure.finished(graph);
1971 acquire(notify, request, procedure);
1972 Throwable throwable = exceptionContainer.get();
1973 if (throwable != null) {
1974 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1975 else throw new DatabaseException("Unexpected exception", throwable);
1977 // TODO: implement return value
1978 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1983 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1984 return syncRequest(request, new ProcedureAdapter<T>());
1987 // assert(request != null);
1989 // final DataContainer<T> result = new DataContainer<T>();
1990 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1992 // syncRequest(request, new Procedure<T>() {
1995 // public void execute(T t) {
2000 // public void exception(Throwable t) {
2001 // exception.set(t);
2006 // Throwable t = exception.get();
2008 // if(t instanceof DatabaseException) throw (DatabaseException)t;
2009 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
2012 // return result.get();
2017 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
2018 return syncRequest(request, (Procedure<T>)procedure);
2023 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
2024 // assertNotSession();
2025 // Semaphore notify = new Semaphore(0);
2026 // DataContainer<Throwable> container = new DataContainer<Throwable>();
2027 // DataContainer<T> result = new DataContainer<T>();
2028 // scheduleRequest(request, procedure, notify, container, result);
2029 // acquire(notify, request);
2030 // Throwable throwable = container.get();
2031 // if(throwable != null) {
2032 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2033 // else throw new DatabaseException("Unexpected exception", throwable);
2035 // return result.get();
2040 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
2042 Semaphore notify = new Semaphore(0);
2043 final DataContainer<Throwable> container = new DataContainer<Throwable>();
2044 final DataContainer<T> result = new DataContainer<T>();
2045 scheduleRequest(request, procedure, notify, container, result);
2046 acquire(notify, request);
2047 Throwable throwable = container.get();
2048 if (throwable != null) {
2049 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2050 else throw new DatabaseException("Unexpected exception", throwable);
2052 return result.get();
2056 public void syncRequest(Write request) throws DatabaseException {
2059 Semaphore notify = new Semaphore(0);
2060 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2061 scheduleRequest(request, e -> exception.set(e), notify);
2062 acquire(notify, request);
2063 if(exception.get() != null) throw exception.get();
2067 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
2069 Semaphore notify = new Semaphore(0);
2070 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2071 final DataContainer<T> result = new DataContainer<T>();
2072 scheduleRequest(request, new Procedure<T>() {
2075 public void exception(Throwable t) {
2080 public void execute(T t) {
2085 acquire(notify, request);
2086 if(exception.get() != null) {
2087 Throwable t = exception.get();
2088 if(t instanceof DatabaseException) throw (DatabaseException)t;
2089 else throw new DatabaseException(t);
2091 return result.get();
2095 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2097 Semaphore notify = new Semaphore(0);
2098 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2099 final DataContainer<T> result = new DataContainer<T>();
2100 scheduleRequest(request, new Procedure<T>() {
2103 public void exception(Throwable t) {
2108 public void execute(T t) {
2113 acquire(notify, request);
2114 if(exception.get() != null) {
2115 Throwable t = exception.get();
2116 if(t instanceof DatabaseException) throw (DatabaseException)t;
2117 else throw new DatabaseException(t);
2119 return result.get();
2123 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2125 Semaphore notify = new Semaphore(0);
2126 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2127 final DataContainer<T> result = new DataContainer<T>();
2128 scheduleRequest(request, new Procedure<T>() {
2131 public void exception(Throwable t) {
2136 public void execute(T t) {
2141 acquire(notify, request);
2142 if(exception.get() != null) {
2143 Throwable t = exception.get();
2144 if(t instanceof DatabaseException) throw (DatabaseException)t;
2145 else throw new DatabaseException(t);
2147 return result.get();
2151 public void syncRequest(DelayedWrite request) throws DatabaseException {
2153 Semaphore notify = new Semaphore(0);
2154 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2155 scheduleRequest(request, e -> exception.set(e), notify);
2156 acquire(notify, request);
2157 if(exception.get() != null) throw exception.get();
2161 public void syncRequest(WriteOnly request) throws DatabaseException {
2164 Semaphore notify = new Semaphore(0);
2165 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2166 scheduleRequest(request, e -> exception.set(e), notify);
2167 acquire(notify, request);
2168 if(exception.get() != null) throw exception.get();
2173 * GraphRequestProcessor interface
2177 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2179 scheduleRequest(request, procedure, null, null);
2184 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2186 scheduleRequest(request, procedure, null);
2191 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2193 scheduleRequest(request, procedure, null, null, null);
2198 public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2200 scheduleRequest(request, callback, null);
2205 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2207 scheduleRequest(request, procedure, null);
2212 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2214 scheduleRequest(request, procedure, null);
2219 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2221 scheduleRequest(request, procedure, null);
2226 public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2228 scheduleRequest(request, callback, null);
2233 public void asyncRequest(final Write r) {
2234 asyncRequest(r, null);
2238 public void asyncRequest(final DelayedWrite r) {
2239 asyncRequest(r, null);
2243 public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2245 scheduleRequest(request, callback, null);
2250 public void asyncRequest(final WriteOnly request) {
2252 asyncRequest(request, null);
2257 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2258 r.request(this, procedure);
2262 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2263 r.request(this, procedure);
2267 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2268 r.request(this, procedure);
2272 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2273 r.request(this, procedure);
2277 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2278 r.request(this, procedure);
2282 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2283 r.request(this, procedure);
2287 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2288 return r.request(this);
2292 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2293 return r.request(this);
2297 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2298 r.request(this, procedure);
2302 public <T> void async(WriteInterface<T> r) {
2303 r.request(this, new ProcedureAdapter<T>());
2307 public void incAsync() {
2312 public void decAsync() {
2316 public long getCluster(ResourceImpl resource) {
2317 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2318 return cluster.getClusterId();
2321 public long getCluster(int id) {
2322 if (clusterTable == null)
2323 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2324 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2327 public ResourceImpl getResource(int id) {
2328 return new ResourceImpl(resourceSupport, id);
2331 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2332 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2333 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2334 int key = proxy.getClusterKey();
2335 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2336 return new ResourceImpl(resourceSupport, resourceKey);
2339 public ResourceImpl getResource2(int id) {
2341 return new ResourceImpl(resourceSupport, id);
2344 final public int getId(ResourceImpl impl) {
2348 public static final Charset UTF8 = Charset.forName("utf-8");
2353 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2354 for(TransientGraph g : support.providers) {
2355 if(g.isPending(subject)) return false;
2360 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2361 for(TransientGraph g : support.providers) {
2362 if(g.isPending(subject, predicate)) return false;
2367 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2369 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2371 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2374 public void accept(ReadGraphImpl graph) {
2375 if(ready.decrementAndGet() == 0) {
2376 runnable.accept(graph);
2382 for(TransientGraph g : support.providers) {
2383 if(g.isPending(subject)) {
2385 g.load(graph, subject, composite);
2386 } catch (DatabaseException e) {
2387 e.printStackTrace();
2390 composite.accept(graph);
2394 composite.accept(graph);
2398 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2400 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2402 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2405 public void accept(ReadGraphImpl graph) {
2406 if(ready.decrementAndGet() == 0) {
2407 runnable.accept(graph);
2413 for(TransientGraph g : support.providers) {
2414 if(g.isPending(subject, predicate)) {
2416 g.load(graph, subject, predicate, composite);
2417 } catch (DatabaseException e) {
2418 e.printStackTrace();
2421 composite.accept(graph);
2425 composite.accept(graph);
2429 // void dumpHeap() {
2432 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2433 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2434 // "d:/heap" + flushCounter + ".txt", true);
2435 // } catch (IOException e) {
2436 // e.printStackTrace();
2441 void fireReactionsToSynchronize(ChangeSet cs) {
2443 // Do not fire empty events
2447 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2448 // g.state.barrier.inc();
2452 if (!cs.isEmpty()) {
2453 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2454 for (ChangeListener l : changeListeners2) {
2457 } catch (Exception ex) {
2458 ex.printStackTrace();
2465 // g.state.barrier.dec();
2466 // g.waitAsync(null);
2472 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2475 // Do not fire empty events
2480 // graph.state.barrier.inc();
2484 if (!cs2.isEmpty()) {
2486 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2487 for (ChangeListener l : changeListeners2) {
2490 // System.out.println("changelistener " + l);
2491 } catch (Exception ex) {
2492 ex.printStackTrace();
2500 // graph.state.barrier.dec();
2501 // graph.waitAsync(null);
2504 } catch (Throwable t) {
2505 t.printStackTrace();
2509 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2513 // Do not fire empty events
2517 // Do not fire on virtual requests
2518 if(graph.getProvider() != null)
2521 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2525 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2526 for (ChangeListener l : metadataListeners) {
2529 } catch (Throwable ex) {
2530 ex.printStackTrace();
2538 } catch (Throwable t) {
2539 t.printStackTrace();
2548 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2551 public <T> T getService(Class<T> api) {
2552 T t = peekService(api);
2554 if (state.isClosed())
2555 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2556 throw new ServiceNotFoundException(this, api);
2564 protected abstract ServerInformation getCachedServerInformation();
2566 private Class<?> serviceKey1 = null;
2567 private Class<?> serviceKey2 = null;
2568 private Object service1 = null;
2569 private Object service2 = null;
2575 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2577 @SuppressWarnings("unchecked")
2579 public synchronized <T> T peekService(Class<T> api) {
2581 if(serviceKey1 == api) {
2583 } else if (serviceKey2 == api) {
2585 Object result = service2;
2586 service2 = service1;
2587 serviceKey2 = serviceKey1;
2593 if (Layer0.class == api)
2595 if (ServerInformation.class == api)
2596 return (T) getCachedServerInformation();
2597 else if (WriteGraphImpl.class == api)
2598 return (T) writeState.getGraph();
2599 else if (ClusterBuilder.class == api)
2600 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2601 else if (ClusterBuilderFactory.class == api)
2602 return (T)new ClusterBuilderFactoryImpl(this);
2604 service2 = service1;
2605 serviceKey2 = serviceKey1;
2607 service1 = serviceLocator.peekService(api);
2618 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2621 public boolean hasService(Class<?> api) {
2622 return serviceLocator.hasService(api);
2626 * @param api the api that must be implemented by the specified service
2627 * @param service the service implementation
2630 public <T> void registerService(Class<T> api, T service) {
2631 if(Layer0.class == api) {
2632 L0 = (Layer0)service;
2635 serviceLocator.registerService(api, service);
2636 if (TransactionPolicySupport.class == api) {
2637 transactionPolicy = (TransactionPolicySupport)service;
2638 state.resetTransactionPolicy();
2640 if (api == serviceKey1)
2642 else if (api == serviceKey2)
2650 void fireSessionVariableChange(String variable) {
2651 for (MonitorHandler h : monitorHandlers) {
2652 MonitorContext ctx = monitorContexts.getLeft(h);
2654 // SafeRunner functionality repeated here to avoid dependency.
2656 h.valuesChanged(ctx);
2657 } catch (Exception e) {
2658 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2659 } catch (LinkageError e) {
2660 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2666 class ResourceSerializerImpl implements ResourceSerializer {
2668 public long createRandomAccessId(int id) throws DatabaseException {
2671 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2672 long cluster = getCluster(id);
2674 return 0; // Better to return 0 then invalid id.
2675 long result = ClusterTraitsBase.createResourceId(cluster, index);
2680 public long getRandomAccessId(Resource resource) throws DatabaseException {
2682 ResourceImpl resourceImpl = (ResourceImpl) resource;
2683 return createRandomAccessId(resourceImpl.id);
2688 public int getTransientId(Resource resource) throws DatabaseException {
2690 ResourceImpl resourceImpl = (ResourceImpl) resource;
2691 return resourceImpl.id;
2696 public String createRandomAccessId(Resource resource)
2697 throws InvalidResourceReferenceException {
2699 if(resource == null) throw new IllegalArgumentException();
2701 ResourceImpl resourceImpl = (ResourceImpl) resource;
2703 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2707 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2708 } catch (DatabaseException e1) {
2709 throw new InvalidResourceReferenceException(e1);
2712 // Serialize as '<resource index>_<cluster id>'
2713 return "" + r + "_" + getCluster(resourceImpl);
2714 } catch (Throwable e) {
2715 e.printStackTrace();
2716 throw new InvalidResourceReferenceException(e);
2721 public int getTransientId(long serialized) throws DatabaseException {
2722 if (serialized <= 0)
2723 return (int)serialized;
2724 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2725 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2726 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2727 if (cluster == null)
2728 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2729 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2734 public Resource getResource(long randomAccessId) throws DatabaseException {
2735 return getResourceByKey(getTransientId(randomAccessId));
2739 public Resource getResource(int transientId) throws DatabaseException {
2740 return getResourceByKey(transientId);
2744 public Resource getResource(String randomAccessId)
2745 throws InvalidResourceReferenceException {
2747 int i = randomAccessId.indexOf('_');
2749 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2750 + randomAccessId + "'");
2751 int r = Integer.parseInt(randomAccessId.substring(0, i));
2752 if(r < 0) return getResourceByKey(r);
2753 long c = Long.parseLong(randomAccessId.substring(i + 1));
2754 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2755 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2756 if (cluster.hasResource(key, clusterTranslator))
2757 return getResourceByKey(key);
2758 } catch (InvalidResourceReferenceException e) {
2760 } catch (NumberFormatException e) {
2761 throw new InvalidResourceReferenceException(e);
2762 } catch (Throwable e) {
2763 e.printStackTrace();
2764 throw new InvalidResourceReferenceException(e);
2767 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2771 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2774 } catch (Throwable e) {
2775 e.printStackTrace();
2776 throw new InvalidResourceReferenceException(e);
2782 // Copied from old SessionImpl
2785 if (state.isClosed())
2786 throw new Error("Session closed.");
2791 public ResourceImpl getNewResource(long clusterId) {
2792 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2795 newId = cluster.createResource(clusterTranslator);
2796 } catch (DatabaseException e) {
2797 Logger.defaultLogError(e);
2800 return new ResourceImpl(resourceSupport, newId);
2803 public ResourceImpl getNewResource(Resource clusterSet)
2804 throws DatabaseException {
2805 long resourceId = clusterSet.getResourceId();
2806 Long clusterId = clusterSetsSupport.get(resourceId);
2807 if (null == clusterId)
2808 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2809 if (Constants.NewClusterId == clusterId) {
2810 clusterId = getService(ClusteringSupport.class).createCluster();
2811 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2812 createdClusters.add(clusterId);
2813 clusterSetsSupport.put(resourceId, clusterId);
2814 return getNewResource(clusterId);
2816 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2817 ResourceImpl result;
2818 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2819 clusterId = getService(ClusteringSupport.class).createCluster();
2820 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2821 createdClusters.add(clusterId);
2822 clusterSetsSupport.put(resourceId, clusterId);
2823 return getNewResource(clusterId);
2825 result = getNewResource(clusterId);
2826 int resultKey = querySupport.getId(result);
2827 long resultCluster = querySupport.getClusterId(resultKey);
2828 if (clusterId != resultCluster)
2829 clusterSetsSupport.put(resourceId, resultCluster);
2835 public void getNewClusterSet(Resource clusterSet)
2836 throws DatabaseException {
2838 System.out.println("new cluster set=" + clusterSet);
2839 long resourceId = clusterSet.getResourceId();
2840 if (clusterSetsSupport.containsKey(resourceId))
2841 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2842 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2844 public boolean containsClusterSet(Resource clusterSet)
2845 throws ServiceException {
2846 long resourceId = clusterSet.getResourceId();
2847 return clusterSetsSupport.containsKey(resourceId);
2849 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2850 Resource r = defaultClusterSet;
2851 defaultClusterSet = clusterSet;
2854 void printDiagnostics() {
2858 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2859 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2860 for(ClusterI cluster : clusterTable.getClusters()) {
2862 if (cluster.isLoaded() && !cluster.isEmpty()) {
2863 residentClusters.set(residentClusters.get() + 1);
2864 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2866 } catch (DatabaseException e) {
2867 Logger.defaultLogError(e);
2871 System.out.println("--------------------------------");
2872 System.out.println("Cluster information:");
2873 System.out.println("-amount of resident clusters=" + residentClusters.get());
2874 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2876 for(ClusterI cluster : clusterTable.getClusters()) {
2877 System.out.print("Cluster " + cluster.getClusterId() + " [");
2879 if (!cluster.isLoaded())
2880 System.out.println("not loaded]");
2881 else if (cluster.isEmpty())
2882 System.out.println("is empty]");
2884 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2885 System.out.print(",references=" + cluster.getReferenceCount());
2886 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2888 } catch (DatabaseException e) {
2889 Logger.defaultLogError(e);
2890 System.out.println("is corrupted]");
2894 queryProvider2.printDiagnostics();
2895 System.out.println("--------------------------------");
2900 public void fireStartReadTransaction() {
2903 System.out.println("StartReadTransaction");
2905 for (SessionEventListener listener : eventListeners) {
2907 listener.readTransactionStarted();
2908 } catch (Throwable t) {
2909 t.printStackTrace();
2915 public void fireFinishReadTransaction() {
2918 System.out.println("FinishReadTransaction");
2920 for (SessionEventListener listener : eventListeners) {
2922 listener.readTransactionFinished();
2923 } catch (Throwable t) {
2924 t.printStackTrace();
2930 public void fireStartWriteTransaction() {
2933 System.out.println("StartWriteTransaction");
2935 for (SessionEventListener listener : eventListeners) {
2937 listener.writeTransactionStarted();
2938 } catch (Throwable t) {
2939 t.printStackTrace();
2945 public void fireFinishWriteTransaction() {
2948 System.out.println("FinishWriteTransaction");
2950 for (SessionEventListener listener : eventListeners) {
2952 listener.writeTransactionFinished();
2953 } catch (Throwable t) {
2954 t.printStackTrace();
2958 Indexing.resetDependenciesIndexingDisabled();
2962 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2963 throw new Error("Not supported at the moment.");
2970 ClusterTable getClusterTable() {
2971 return clusterTable;
2974 public GraphSession getGraphSession() {
2975 return graphSession;
2978 public QueryProcessor getQueryProvider2() {
2979 return queryProvider2;
2982 ClientChangesImpl getClientChanges() {
2983 return clientChanges;
2986 boolean getWriteOnly() {
2990 static int counter = 0;
2992 public void onClusterLoaded(long clusterId) {
2994 clusterTable.updateSize();
3002 * Implementation of the interface RequestProcessor
3006 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
3011 assert(request != null);
3013 final DataContainer<T> result = new DataContainer<T>();
3014 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3016 syncRequest(request, new AsyncProcedure<T>() {
3019 public void execute(AsyncReadGraph graph, T t) {
3024 public void exception(AsyncReadGraph graph, Throwable t) {
3030 Throwable t = exception.get();
3032 if(t instanceof DatabaseException) throw (DatabaseException)t;
3033 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3036 return result.get();
3041 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
3042 // assertNotSession();
3043 // return syncRequest(request, (AsyncProcedure<T>)procedure);
3047 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
3049 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3053 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
3055 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3059 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3061 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3065 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
3067 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3071 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3075 assert(request != null);
3077 final DataContainer<T> result = new DataContainer<T>();
3078 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3080 syncRequest(request, new AsyncProcedure<T>() {
3083 public void execute(AsyncReadGraph graph, T t) {
3088 public void exception(AsyncReadGraph graph, Throwable t) {
3094 Throwable t = exception.get();
3096 if(t instanceof DatabaseException) throw (DatabaseException)t;
3097 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3100 return result.get();
3105 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3107 return syncRequest(request, (AsyncProcedure<T>)procedure);
3111 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3113 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3117 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3119 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3123 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3125 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3129 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3131 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3135 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3139 assert(request != null);
3141 final ArrayList<T> result = new ArrayList<T>();
3142 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3144 syncRequest(request, new SyncMultiProcedure<T>() {
3147 public void execute(ReadGraph graph, T t) {
3148 synchronized(result) {
3154 public void finished(ReadGraph graph) {
3158 public void exception(ReadGraph graph, Throwable t) {
3165 Throwable t = exception.get();
3167 if(t instanceof DatabaseException) throw (DatabaseException)t;
3168 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3176 public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
3178 throw new Error("Not implemented!");
3182 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3184 return syncRequest(request, (SyncMultiProcedure<T>)procedure);
3188 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3190 return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3194 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3196 return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3200 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3204 assert(request != null);
3206 final ArrayList<T> result = new ArrayList<T>();
3207 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3209 syncRequest(request, new AsyncMultiProcedure<T>() {
3212 public void execute(AsyncReadGraph graph, T t) {
3213 synchronized(result) {
3219 public void finished(AsyncReadGraph graph) {
3223 public void exception(AsyncReadGraph graph, Throwable t) {
3230 Throwable t = exception.get();
3232 if(t instanceof DatabaseException) throw (DatabaseException)t;
3233 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3241 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3243 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3247 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3249 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3253 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3255 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3259 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3261 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3265 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3267 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3272 public <T> void asyncRequest(Read<T> request) {
3274 asyncRequest(request, new ProcedureAdapter<T>() {
3276 public void exception(Throwable t) {
3277 t.printStackTrace();
3284 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3285 asyncRequest(request, (AsyncProcedure<T>)procedure);
3289 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3290 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3294 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3295 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3299 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3300 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3304 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3305 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3309 final public <T> void asyncRequest(final AsyncRead<T> request) {
3311 assert(request != null);
3313 asyncRequest(request, new ProcedureAdapter<T>() {
3315 public void exception(Throwable t) {
3316 t.printStackTrace();
3323 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3324 scheduleRequest(request, procedure, procedure, null);
3328 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3329 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3333 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3334 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3338 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3339 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3343 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3344 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3348 public <T> void asyncRequest(MultiRead<T> request) {
3350 assert(request != null);
3352 asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
3354 public void exception(ReadGraph graph, Throwable t) {
3355 t.printStackTrace();
3362 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3363 asyncRequest(request, (SyncMultiProcedure<T>)procedure);
3367 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3368 asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3372 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3373 scheduleRequest(request, procedure, null);
3377 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3378 asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3382 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3384 assert(request != null);
3386 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3388 public void exception(AsyncReadGraph graph, Throwable t) {
3389 t.printStackTrace();
3396 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3397 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3401 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3402 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3406 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3407 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3411 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3412 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3416 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3417 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3421 final public <T> void asyncRequest(final ExternalRead<T> request) {
3423 assert(request != null);
3425 asyncRequest(request, new ProcedureAdapter<T>() {
3427 public void exception(Throwable t) {
3428 t.printStackTrace();
3435 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3436 asyncRequest(request, (Procedure<T>)procedure);
3441 void check(Throwable t) throws DatabaseException {
3443 if(t instanceof DatabaseException) throw (DatabaseException)t;
3444 else throw new DatabaseException("Unexpected exception", t);
3448 void check(DataContainer<Throwable> container) throws DatabaseException {
3449 Throwable t = container.get();
3451 if(t instanceof DatabaseException) throw (DatabaseException)t;
3452 else throw new DatabaseException("Unexpected exception", t);
3461 boolean sameProvider(Write request) {
3462 if(writeState.getGraph().provider != null) {
3463 return writeState.getGraph().provider.equals(request.getProvider());
3465 return request.getProvider() == null;
3469 boolean plainWrite(WriteGraphImpl graph) {
3470 if(graph == null) return false;
3471 if(graph.writeSupport.writeOnly()) return false;
3476 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3477 private void assertNotSession() throws DatabaseException {
3478 Thread current = Thread.currentThread();
3479 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3482 void assertAlive() {
3483 if (!state.isAlive())
3484 throw new RuntimeDatabaseException("Session has been shut down.");
3487 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3488 return querySupport.getValueStream(graph, querySupport.getId(resource));
3491 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3492 return querySupport.getValue(graph, querySupport.getId(resource));
3495 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3496 acquire(semaphore, request, null);
3499 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3503 // Loop until the semaphore is acquired reporting requests that take a long time.
3506 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3510 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3512 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3513 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3515 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3516 + (waitTime) + " ms. (procedure=" + procedure + ")");
3523 } catch (InterruptedException e) {
3524 e.printStackTrace();
3525 // FIXME: Should perhaps do something else in this case ??
3532 // public boolean holdOnToTransactionAfterCancel() {
3533 // return transactionPolicy.holdOnToTransactionAfterCancel();
3537 // public boolean holdOnToTransactionAfterCommit() {
3538 // return transactionPolicy.holdOnToTransactionAfterCommit();
3542 // public boolean holdOnToTransactionAfterRead() {
3543 // return transactionPolicy.holdOnToTransactionAfterRead();
3547 // public void onRelinquish() {
3548 // transactionPolicy.onRelinquish();
3552 // public void onRelinquishDone() {
3553 // transactionPolicy.onRelinquishDone();
3557 // public void onRelinquishError() {
3558 // transactionPolicy.onRelinquishError();
3563 public Session getSession() {
3568 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3569 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3571 public void ceased(int thread) {
3573 requestManager.ceased(thread);
3577 public int getAmountOfQueryThreads() {
3578 // This must be a power of two
3580 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3583 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3585 return new ResourceImpl(resourceSupport, key);
3589 public void acquireWriteOnly() {
3593 public void releaseWriteOnly(ReadGraphImpl graph) {
3595 queryProvider2.releaseWrite(graph);
3598 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3600 long start = System.nanoTime();
3602 while(dirtyPrimitives) {
3603 dirtyPrimitives = false;
3604 getQueryProvider2().performDirtyUpdates(writer);
3605 getQueryProvider2().performScheduledUpdates(writer);
3608 fireMetadataListeners(writer, clientChanges);
3610 if(DebugPolicy.PERFORMANCE_DATA) {
3611 long end = System.nanoTime();
3612 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3617 void removeTemporaryData() {
3618 File platform = Platform.getLocation().toFile();
3619 File tempFiles = new File(platform, "tempFiles");
3620 File temp = new File(tempFiles, "db");
3624 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3626 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3629 } catch (IOException e) {
3630 Logger.defaultLogError(e);
3632 return FileVisitResult.CONTINUE;
3635 } catch (IOException e) {
3636 Logger.defaultLogError(e);
3640 public void handleCreatedClusters() {
3641 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3642 createdClusters.forEach(new TLongProcedure() {
3645 public boolean execute(long value) {
3646 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3647 cluster.setImmutable(true, clusterTranslator);
3652 createdClusters.clear();
3656 public Object getModificationCounter() {
3657 return queryProvider2.modificationCounter;
3661 public void markUndoPoint() {
3662 state.setCombine(false);