1 /*******************************************************************************
2 * Copyright (c) 2007, 2018 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.EventSupport;
147 import org.simantics.db.service.GraphChangeListenerSupport;
148 import org.simantics.db.service.InitSupport;
149 import org.simantics.db.service.LifecycleSupport;
150 import org.simantics.db.service.ManagementSupport;
151 import org.simantics.db.service.QueryControl;
152 import org.simantics.db.service.SerialisationSupport;
153 import org.simantics.db.service.ServerInformation;
154 import org.simantics.db.service.ServiceActivityMonitor;
155 import org.simantics.db.service.SessionEventSupport;
156 import org.simantics.db.service.SessionMonitorSupport;
157 import org.simantics.db.service.SessionUserSupport;
158 import org.simantics.db.service.StatementSupport;
159 import org.simantics.db.service.TransactionPolicySupport;
160 import org.simantics.db.service.TransactionSupport;
161 import org.simantics.db.service.TransferableGraphSupport;
162 import org.simantics.db.service.UndoRedoSupport;
163 import org.simantics.db.service.VirtualGraphSupport;
164 import org.simantics.db.service.XSupport;
165 import org.simantics.layer0.Layer0;
166 import org.simantics.utils.DataContainer;
167 import org.simantics.utils.Development;
168 import org.simantics.utils.threads.logger.ITask;
169 import org.simantics.utils.threads.logger.ThreadLogger;
170 import org.slf4j.LoggerFactory;
172 import gnu.trove.procedure.TLongProcedure;
173 import gnu.trove.set.hash.TLongHashSet;
176 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
178 private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class);
180 protected static final boolean DEBUG = false;
182 private static final boolean DIAGNOSTICS = false;
184 private TransactionPolicySupport transactionPolicy;
186 final private ClusterControl clusterControl;
188 final protected BuiltinSupportImpl builtinSupport;
189 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
190 final protected ClusterSetsSupport clusterSetsSupport;
191 final protected LifecycleSupportImpl lifecycleSupport;
193 protected QuerySupportImpl querySupport;
194 protected ResourceSupportImpl resourceSupport;
195 protected WriteSupport writeSupport;
196 public ClusterTranslator clusterTranslator;
198 boolean dirtyPrimitives = false;
200 public static final int SERVICE_MODE_CREATE = 2;
201 public static final int SERVICE_MODE_ALLOW = 1;
202 public int serviceMode = 0;
203 public boolean createdImmutableClusters = false;
204 public TLongHashSet createdClusters = new TLongHashSet();
209 * The service locator maintained by the workbench. These services are
210 * initialized during workbench during the <code>init</code> method.
212 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
214 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
216 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
218 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
220 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
222 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
224 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
226 final protected State state = new State();
228 protected GraphSession graphSession = null;
230 protected SessionManager sessionManagerImpl = null;
232 protected UserAuthenticationAgent authAgent = null;
234 protected UserAuthenticator authenticator = null;
236 protected Resource user = null;
238 protected ClusterStream clusterStream = null;
240 protected SessionRequestManager requestManager = null;
242 public ClusterTable clusterTable = null;
244 public QueryProcessor queryProvider2 = null;
246 ClientChangesImpl clientChanges = null;
248 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
250 // protected long newClusterId = Constants.NullClusterId;
252 protected int flushCounter = 0;
254 protected boolean writeOnly = false;
256 WriteState<?> writeState = null;
257 WriteStateBase<?> delayedWriteState = null;
258 protected Resource defaultClusterSet = null; // If not null then used for newResource().
260 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
262 // if (authAgent == null)
263 // throw new IllegalArgumentException("null authentication agent");
265 File t = StaticSessionProperties.virtualGraphStoragePath;
268 this.clusterTable = new ClusterTable(this, t);
269 this.builtinSupport = new BuiltinSupportImpl(this);
270 this.sessionManagerImpl = sessionManagerImpl;
272 // this.authAgent = authAgent;
274 serviceLocator.registerService(Session.class, this);
275 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
276 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
277 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
278 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
279 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
280 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
281 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
282 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
283 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
284 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
285 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
286 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
287 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
288 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
289 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
290 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
291 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
292 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
293 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
294 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
295 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
296 serviceLocator.registerService(EventSupport.class, new EventSupportImpl());
297 ServiceActivityUpdaterForWriteTransactions.register(this);
299 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
300 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
301 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
302 this.lifecycleSupport = new LifecycleSupportImpl(this);
303 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
304 this.transactionPolicy = new TransactionPolicyRelease();
305 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
306 this.clusterControl = new ClusterControlImpl(this);
307 serviceLocator.registerService(ClusterControl.class, clusterControl);
308 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
309 this.clusterSetsSupport.setReadDirectory(t.toPath());
310 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
311 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
321 public Resource getRootLibrary() {
322 return queryProvider2.getRootLibraryResource();
325 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
326 if (!graphSession.dbSession.refreshEnabled())
329 getClusterTable().refresh(csid, this, clusterUID);
330 } catch (Throwable t) {
331 Logger.defaultLogError("Refesh failed.", t);
335 static final class TaskHelper {
336 private final String name;
337 private Object result;
338 final Semaphore sema = new Semaphore(0);
339 private Throwable throwable = null;
340 final Consumer<DatabaseException> callback = e -> {
341 synchronized (TaskHelper.this) {
345 final Procedure<Object> proc = new Procedure<Object>() {
347 public void execute(Object result) {
348 callback.accept(null);
351 public void exception(Throwable t) {
352 if (t instanceof DatabaseException)
353 callback.accept((DatabaseException)t);
355 callback.accept(new DatabaseException("" + name + "operation failed.", t));
358 final WriteTraits writeTraits = new WriteTraits() {};
359 TaskHelper(String name) {
362 @SuppressWarnings("unchecked")
366 void setResult(Object result) {
367 this.result = result;
369 // Throwable throwableGet() {
372 synchronized void throwableSet(Throwable t) {
375 // void throwableSet(String t) {
376 // throwable = new InternalException("" + name + " operation failed. " + t);
378 synchronized void throwableCheck()
379 throws DatabaseException {
380 if (null != throwable)
381 if (throwable instanceof DatabaseException)
382 throw (DatabaseException)throwable;
384 throw new DatabaseException("Undo operation failed.", throwable);
386 void throw_(String message)
387 throws DatabaseException {
388 throw new DatabaseException("" + name + " operation failed. " + message);
394 private ListenerBase getListenerBase(Object procedure) {
395 if (procedure instanceof ListenerBase)
396 return (ListenerBase) procedure;
401 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
402 scheduleRequest(request, callback, notify, null);
406 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
409 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
411 assert (request != null);
413 if(Development.DEVELOPMENT) {
415 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
416 System.err.println("schedule write '" + request + "'");
417 } catch (Throwable t) {
418 Logger.defaultLogError(t);
422 requestManager.scheduleWrite(new SessionTask(null) {
425 public void run0(int thread) {
427 if(Development.DEVELOPMENT) {
429 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
430 System.err.println("perform write '" + request + "'");
431 } catch (Throwable t) {
432 Logger.defaultLogError(t);
436 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
438 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
443 Disposable.safeDispose(clientChanges);
444 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
446 VirtualGraph vg = getProvider(request.getProvider());
448 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
449 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
452 public void execute(Object result) {
453 if(callback != null) callback.accept(null);
457 public void exception(Throwable t) {
458 if(callback != null) callback.accept((DatabaseException)t);
463 assert (null != writer);
464 // writer.state.barrier.inc();
467 request.perform(writer);
468 assert (null != writer);
469 } catch (Throwable t) {
470 if (!(t instanceof CancelTransactionException))
471 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
472 writeState.except(t);
474 // writer.state.barrier.dec();
475 // writer.waitAsync(request);
479 assert(!queryProvider2.cache.dirty);
481 } catch (Throwable e) {
483 // Log it first, just to be safe that the error is always logged.
484 if (!(e instanceof CancelTransactionException))
485 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
487 // writeState.getGraph().state.barrier.dec();
488 // writeState.getGraph().waitAsync(request);
490 writeState.except(e);
493 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
494 // // All we can do here is to log those, can't really pass them anywhere.
495 // if (callback != null) {
496 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
497 // else callback.run(new DatabaseException(e));
499 // } catch (Throwable e2) {
500 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
502 // boolean empty = clusterStream.reallyFlush();
503 // int callerThread = -1;
504 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
505 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
507 // // Send all local changes to server so it can calculate correct reverse change set.
508 // // This will call clusterStream.accept().
509 // state.cancelCommit(context, clusterStream);
511 // if (!context.isOk()) // this is a blocking operation
512 // throw new InternalException("Cancel failed. This should never happen.");
513 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
515 // state.cancelCommit2(context, clusterStream);
516 // } catch (DatabaseException e3) {
517 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
519 // clusterStream.setOff(false);
520 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
525 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
531 if(Development.DEVELOPMENT) {
533 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
534 System.err.println("finish write '" + request + "'");
535 } catch (Throwable t) {
536 Logger.defaultLogError(t);
546 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
547 scheduleRequest(request, procedure, notify, null);
551 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
554 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
556 assert (request != null);
558 requestManager.scheduleWrite(new SessionTask(null) {
561 public void run0(int thread) {
563 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
565 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
568 Disposable.safeDispose(clientChanges);
569 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
571 VirtualGraph vg = getProvider(request.getProvider());
572 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
575 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
576 writeState = writeStateT;
578 assert (null != writer);
579 // writer.state.barrier.inc();
580 writeStateT.setResult(request.perform(writer));
581 assert (null != writer);
583 // writer.state.barrier.dec();
584 // writer.waitAsync(null);
586 } catch (Throwable e) {
588 // writer.state.barrier.dec();
589 // writer.waitAsync(null);
591 writeState.except(e);
593 // state.stopWriteTransaction(clusterStream);
595 // } catch (Throwable e) {
596 // // Log it first, just to be safe that the error is always logged.
597 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
600 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
601 // // All we can do here is to log those, can't really pass them anywhere.
602 // if (procedure != null) {
603 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
604 // else procedure.exception(new DatabaseException(e));
606 // } catch (Throwable e2) {
607 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
610 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
612 // state.stopWriteTransaction(clusterStream);
615 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
618 // if(notify != null) notify.release();
628 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
629 scheduleRequest(request, callback, notify, null);
633 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
636 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
638 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
640 assert (request != null);
642 requestManager.scheduleWrite(new SessionTask(null) {
645 public void run0(int thread) {
646 fireSessionVariableChange(SessionVariables.QUEUED_READS);
648 Procedure<Object> stateProcedure = new Procedure<Object>() {
650 public void execute(Object result) {
651 if (callback != null)
652 callback.accept(null);
655 public void exception(Throwable t) {
656 if (callback != null) {
657 if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
658 else callback.accept(new DatabaseException(t));
660 Logger.defaultLogError("Unhandled exception", t);
664 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
665 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
666 DelayedWriteGraph dwg = null;
667 // newGraph.state.barrier.inc();
670 dwg = new DelayedWriteGraph(newGraph);
671 request.perform(dwg);
672 } catch (Throwable e) {
673 delayedWriteState.except(e);
678 // newGraph.state.barrier.dec();
679 // newGraph.waitAsync(request);
680 fireSessionVariableChange(SessionVariables.QUEUED_READS);
683 delayedWriteState = null;
685 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
686 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
689 Disposable.safeDispose(clientChanges);
690 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
694 VirtualGraph vg = getProvider(request.getProvider());
695 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
697 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
699 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
701 assert (null != writer);
702 // writer.state.barrier.inc();
706 dwg.commit(writer, request);
708 if(defaultClusterSet != null) {
709 XSupport xs = getService(XSupport.class);
710 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
711 ClusteringSupport cs = getService(ClusteringSupport.class);
712 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
715 // This makes clusters available from server
716 clusterStream.reallyFlush();
718 releaseWriteOnly(writer);
719 handleUpdatesAndMetadata(writer);
721 } catch (ServiceException e) {
722 // writer.state.barrier.dec();
723 // writer.waitAsync(null);
725 // These shall be requested from server
726 clusterTable.removeWriteOnlyClusters();
727 // This makes clusters available from server
728 clusterStream.reallyFlush();
730 releaseWriteOnly(writer);
731 writeState.except(e);
733 // Debugging & Profiling
734 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
744 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
745 scheduleRequest(request, procedure, notify, null);
749 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
752 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
753 throw new Error("Not implemented");
756 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
757 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
758 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
759 createdClusters.add(cluster.clusterId);
764 class WriteOnlySupport implements WriteSupport {
766 ClusterStream stream;
767 ClusterImpl currentCluster;
769 public WriteOnlySupport() {
770 this.stream = clusterStream;
774 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
775 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
779 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
781 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
783 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
784 if(s != queryProvider2.getRootLibrary())
785 throw new ImmutableException("Trying to modify immutable resource key=" + s);
788 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
789 } catch (DatabaseException e) {
790 Logger.defaultLogError(e);
794 clientChanges.invalidate(s);
796 if (cluster.isWriteOnly())
798 queryProvider2.updateStatements(s, p);
803 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
804 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
808 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
810 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
812 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
813 } catch (DatabaseException e) {
814 Logger.defaultLogError(e);
817 clientChanges.invalidate(rid);
819 if (cluster.isWriteOnly())
821 queryProvider2.updateValue(rid);
826 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
829 claimValue(provider,resource, reader.readBytes(null, amount));
833 byte[] bytes = new byte[65536];
835 int rid = ((ResourceImpl)resource).id;
836 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
840 int block = Math.min(left, 65536);
841 reader.readBytes(bytes, block);
842 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
845 } catch (DatabaseException e) {
846 Logger.defaultLogError(e);
849 clientChanges.invalidate(rid);
851 if (cluster.isWriteOnly())
853 queryProvider2.updateValue(rid);
857 private void maintainCluster(ClusterImpl before, ClusterI after_) {
858 if(after_ != null && after_ != before) {
859 ClusterImpl after = (ClusterImpl)after_;
860 if(currentCluster == before) {
861 currentCluster = after;
863 clusterTable.replaceCluster(after);
867 public int createResourceKey(int foreignCounter) throws DatabaseException {
868 if(currentCluster == null) {
869 currentCluster = getNewResourceCluster();
871 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
872 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
873 newCluster.foreignLookup = new byte[foreignCounter];
874 currentCluster = newCluster;
876 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
878 return currentCluster.createResource(clusterTranslator);
882 public Resource createResource(VirtualGraph provider) throws DatabaseException {
883 if(currentCluster == null) {
884 if (null != defaultClusterSet) {
885 ResourceImpl result = getNewResource(defaultClusterSet);
886 currentCluster = clusterTable.getClusterByResourceKey(result.id);
889 currentCluster = getNewResourceCluster();
892 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
893 if (null != defaultClusterSet) {
894 ResourceImpl result = getNewResource(defaultClusterSet);
895 currentCluster = clusterTable.getClusterByResourceKey(result.id);
898 currentCluster = getNewResourceCluster();
901 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
905 public Resource createResource(VirtualGraph provider, long clusterId)
906 throws DatabaseException {
907 return getNewResource(clusterId);
911 public Resource createResource(VirtualGraph provider, Resource clusterSet)
912 throws DatabaseException {
913 return getNewResource(clusterSet);
917 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
918 throws DatabaseException {
919 getNewClusterSet(clusterSet);
923 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
924 throws ServiceException {
925 return containsClusterSet(clusterSet);
928 public void selectCluster(long cluster) {
929 currentCluster = clusterTable.getClusterByClusterId(cluster);
930 long setResourceId = clusterSetsSupport.getSet(cluster);
931 clusterSetsSupport.put(setResourceId, cluster);
935 public Resource setDefaultClusterSet(Resource clusterSet)
936 throws ServiceException {
937 Resource result = setDefaultClusterSet4NewResource(clusterSet);
938 if(clusterSet != null) {
939 long id = clusterSetsSupport.get(clusterSet.getResourceId());
940 currentCluster = clusterTable.getClusterByClusterId(id);
943 currentCluster = null;
949 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
951 provider = getProvider(provider);
952 if (null == provider) {
953 int key = ((ResourceImpl)resource).id;
957 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
958 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
959 // if(key != queryProvider2.getRootLibrary())
960 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
963 // cluster.removeValue(key, clusterTranslator);
964 // } catch (DatabaseException e) {
965 // Logger.defaultLogError(e);
969 clusterTable.writeOnlyInvalidate(cluster);
972 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
973 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
974 clusterTranslator.removeValue(cluster);
975 } catch (DatabaseException e) {
976 Logger.defaultLogError(e);
979 queryProvider2.invalidateResource(key);
980 clientChanges.invalidate(key);
983 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
984 queryProvider2.updateValue(querySupport.getId(resource));
985 clientChanges.claimValue(resource);
992 public void flush(boolean intermediate) {
993 throw new UnsupportedOperationException();
997 public void flushCluster() {
998 clusterTable.flushCluster(graphSession);
999 if(defaultClusterSet != null) {
1000 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
1002 currentCluster = null;
1006 public void flushCluster(Resource r) {
1007 throw new UnsupportedOperationException("flushCluster resource " + r);
1015 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1018 int s = ((ResourceImpl)subject).id;
1019 int p = ((ResourceImpl)predicate).id;
1020 int o = ((ResourceImpl)object).id;
1022 provider = getProvider(provider);
1023 if (null == provider) {
1025 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1026 clusterTable.writeOnlyInvalidate(cluster);
1030 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1032 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1033 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1035 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1036 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1037 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1038 clusterTranslator.removeStatement(cluster);
1040 queryProvider2.invalidateResource(s);
1041 clientChanges.invalidate(s);
1043 } catch (DatabaseException e) {
1045 Logger.defaultLogError(e);
1053 ((VirtualGraphImpl)provider).deny(s, p, o);
1054 queryProvider2.invalidateResource(s);
1055 clientChanges.invalidate(s);
1064 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1065 throw new UnsupportedOperationException();
1069 public boolean writeOnly() {
1074 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1075 writeSupport.performWriteRequest(graph, request);
1079 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1080 throw new UnsupportedOperationException();
1084 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1085 throw new UnsupportedOperationException();
1089 public <T> void addMetadata(Metadata data) throws ServiceException {
1090 writeSupport.addMetadata(data);
1094 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1095 return writeSupport.getMetadata(clazz);
1099 public TreeMap<String, byte[]> getMetadata() {
1100 return writeSupport.getMetadata();
1104 public void commitDone(WriteTraits writeTraits, long csid) {
1105 writeSupport.commitDone(writeTraits, csid);
1109 public void clearUndoList(WriteTraits writeTraits) {
1110 writeSupport.clearUndoList(writeTraits);
1113 public int clearMetadata() {
1114 return writeSupport.clearMetadata();
1118 public void startUndo() {
1119 writeSupport.startUndo();
1123 class VirtualWriteOnlySupport implements WriteSupport {
1126 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1127 // Resource object) {
1128 // throw new UnsupportedOperationException();
1132 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1134 TransientGraph impl = (TransientGraph)provider;
1135 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1136 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1137 clientChanges.claim(subject, predicate, object);
1142 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1144 TransientGraph impl = (TransientGraph)provider;
1145 impl.claim(subject, predicate, object);
1146 getQueryProvider2().updateStatements(subject, predicate);
1147 clientChanges.claim(subject, predicate, object);
1152 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1153 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1157 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1158 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1159 getQueryProvider2().updateValue(resource);
1160 clientChanges.claimValue(resource);
1164 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1165 byte[] value = reader.readBytes(null, amount);
1166 claimValue(provider, resource, value);
1170 public Resource createResource(VirtualGraph provider) {
1171 TransientGraph impl = (TransientGraph)provider;
1172 return impl.getResource(impl.newResource(false));
1176 public Resource createResource(VirtualGraph provider, long clusterId) {
1177 throw new UnsupportedOperationException();
1181 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1182 throws DatabaseException {
1183 throw new UnsupportedOperationException();
1187 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1188 throws DatabaseException {
1189 throw new UnsupportedOperationException();
1193 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1194 throws ServiceException {
1195 throw new UnsupportedOperationException();
1199 public Resource setDefaultClusterSet(Resource clusterSet)
1200 throws ServiceException {
1205 public void denyValue(VirtualGraph provider, Resource resource) {
1206 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1207 getQueryProvider2().updateValue(querySupport.getId(resource));
1208 // NOTE: this only keeps track of value changes by-resource.
1209 clientChanges.claimValue(resource);
1213 public void flush(boolean intermediate) {
1214 throw new UnsupportedOperationException();
1218 public void flushCluster() {
1219 throw new UnsupportedOperationException();
1223 public void flushCluster(Resource r) {
1224 throw new UnsupportedOperationException("Resource " + r);
1232 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1233 TransientGraph impl = (TransientGraph) provider;
1234 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1235 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1236 clientChanges.deny(subject, predicate, object);
1241 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1242 throw new UnsupportedOperationException();
1246 public boolean writeOnly() {
1251 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1252 throw new UnsupportedOperationException();
1256 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1257 throw new UnsupportedOperationException();
1261 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1262 throw new UnsupportedOperationException();
1266 public <T> void addMetadata(Metadata data) throws ServiceException {
1267 throw new UnsupportedOperationException();
1271 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1272 throw new UnsupportedOperationException();
1276 public TreeMap<String, byte[]> getMetadata() {
1277 throw new UnsupportedOperationException();
1281 public void commitDone(WriteTraits writeTraits, long csid) {
1285 public void clearUndoList(WriteTraits writeTraits) {
1289 public int clearMetadata() {
1294 public void startUndo() {
1298 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1302 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1304 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1307 Disposable.safeDispose(clientChanges);
1308 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1312 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1314 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1316 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1317 writeState = writeStateT;
1319 assert (null != writer);
1320 // writer.state.barrier.inc();
1321 long start = System.nanoTime();
1322 T result = request.perform(writer);
1323 long duration = System.nanoTime() - start;
1325 System.err.println("################");
1326 System.err.println("WriteOnly duration " + 1e-9*duration);
1328 writeStateT.setResult(result);
1330 // This makes clusters available from server
1331 clusterStream.reallyFlush();
1333 // This will trigger query updates
1334 releaseWriteOnly(writer);
1335 assert (null != writer);
1337 handleUpdatesAndMetadata(writer);
1339 } catch (CancelTransactionException e) {
1341 releaseWriteOnly(writeState.getGraph());
1343 clusterTable.removeWriteOnlyClusters();
1344 state.stopWriteTransaction(clusterStream);
1346 } catch (Throwable e) {
1348 e.printStackTrace();
1350 releaseWriteOnly(writeState.getGraph());
1352 clusterTable.removeWriteOnlyClusters();
1354 if (callback != null)
1355 callback.exception(new DatabaseException(e));
1357 state.stopWriteTransaction(clusterStream);
1358 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1362 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1368 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1369 scheduleRequest(request, callback, notify, null);
1373 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1377 assert (request != null);
1379 requestManager.scheduleWrite(new SessionTask(null) {
1382 public void run0(int thread) {
1384 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1388 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1391 Disposable.safeDispose(clientChanges);
1392 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1396 VirtualGraph vg = getProvider(request.getProvider());
1397 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1399 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1401 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1404 public void execute(Object result) {
1405 if(callback != null) callback.accept(null);
1409 public void exception(Throwable t) {
1410 if(callback != null) callback.accept((DatabaseException)t);
1415 assert (null != writer);
1416 // writer.state.barrier.inc();
1420 request.perform(writer);
1422 } catch (Throwable e) {
1424 // writer.state.barrier.dec();
1425 // writer.waitAsync(null);
1427 releaseWriteOnly(writer);
1429 clusterTable.removeWriteOnlyClusters();
1431 if(!(e instanceof CancelTransactionException)) {
1432 if (callback != null)
1433 callback.accept(new DatabaseException(e));
1436 writeState.except(e);
1442 // This makes clusters available from server
1443 boolean empty = clusterStream.reallyFlush();
1444 // This was needed to make WO requests call metadata listeners.
1445 // NOTE: the calling event does not contain clientChanges information.
1446 if (!empty && clientChanges.isEmpty())
1447 clientChanges.setNotEmpty(true);
1448 releaseWriteOnly(writer);
1449 assert (null != writer);
1452 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1465 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1466 scheduleRequest(request, callback, notify, null);
1470 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1473 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1475 assert (request != null);
1477 requestManager.scheduleWrite(new SessionTask(null) {
1480 public void run0(int thread) {
1482 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1484 performWriteOnly(request, notify, callback);
1494 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1496 assert (request != null);
1497 assert (procedure != null);
1499 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1501 requestManager.scheduleRead(new SessionRead(throwable, notify) {
1504 public void run0(int thread) {
1506 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1508 ListenerBase listener = getListenerBase(procedure);
1510 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1512 // This is never synced but increase to prevent it from visiting 0
1513 newGraph.asyncBarrier.inc();
1517 if (listener != null) {
1521 AsyncProcedure ap = new AsyncProcedure<T>() {
1524 public void exception(AsyncReadGraph graph, Throwable t) {
1525 procedure.exception(graph, t);
1526 if(throwable != null) {
1529 // ErrorLogger.defaultLogError("Unhandled exception", t);
1534 public void execute(AsyncReadGraph graph, T t) {
1535 if(result != null) result.set(t);
1536 procedure.execute(graph, t);
1541 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1543 } catch (Throwable t) {
1544 // This is handled by the AsyncProcedure
1545 //Logger.defaultLogError("Internal error", t);
1552 // newGraph.state.barrier.inc();
1554 T t = request.perform(newGraph);
1558 if(result != null) result.set(t);
1559 procedure.execute(newGraph, t);
1561 } catch (Throwable th) {
1563 if(throwable != null) {
1566 Logger.defaultLogError("Unhandled exception", th);
1571 } catch (Throwable t) {
1574 t.printStackTrace();
1576 if(throwable != null) {
1579 Logger.defaultLogError("Unhandled exception", t);
1584 procedure.exception(newGraph, t);
1586 } catch (Throwable t2) {
1588 if(throwable != null) {
1591 Logger.defaultLogError("Unhandled exception", t2);
1598 // newGraph.state.barrier.dec();
1599 // newGraph.waitAsync(request);
1605 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1615 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1617 assert (request != null);
1618 assert (procedure != null);
1620 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1622 requestManager.scheduleRead(new SessionRead(null, notify) {
1625 public void run0(int thread) {
1627 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1629 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1633 if (listener != null) {
1636 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1637 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1638 //newGraph.processor.query(newGraph, request, null, procedure, listener);
1639 } catch (DatabaseException e) {
1640 Logger.defaultLogError(e);
1645 // final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1646 // procedure, "request");
1648 BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request);
1652 request.perform(newGraph, wrap);
1655 } catch (DatabaseException e) {
1657 Logger.defaultLogError(e);
1665 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1675 public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1677 assert (request != null);
1678 assert (procedure != null);
1680 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1682 int sync = notify != null ? thread : -1;
1684 requestManager.scheduleRead(new SessionRead(null, notify) {
1687 public void run0(int thread) {
1689 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1691 ListenerBase listener = getListenerBase(procedure);
1693 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1697 if (listener != null) {
1699 newGraph.processor.query(newGraph, request, null, procedure, listener);
1701 // newGraph.waitAsync(request);
1705 final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1709 request.perform(newGraph, wrapper);
1711 } catch (Throwable t) {
1713 t.printStackTrace();
1721 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1731 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1733 assert (request != null);
1734 assert (procedure != null);
1736 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1738 int sync = notify != null ? thread : -1;
1740 requestManager.scheduleRead(new SessionRead(null, notify) {
1743 public void run0(int thread) {
1745 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1747 ListenerBase listener = getListenerBase(procedure);
1749 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1753 if (listener != null) {
1755 newGraph.processor.query(newGraph, request, null, procedure, listener);
1757 // newGraph.waitAsync(request);
1761 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1765 request.perform(newGraph, wrapper);
1767 } catch (Throwable t) {
1769 t.printStackTrace();
1777 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1787 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1789 assert (request != null);
1790 assert (procedure != null);
1792 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1794 requestManager.scheduleRead(new SessionRead(throwable, notify) {
1797 public void run0(int thread) {
1799 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1801 ListenerBase listener = getListenerBase(procedure);
1803 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1807 if (listener != null) {
1810 QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1811 } catch (DatabaseException e) {
1812 Logger.defaultLogError(e);
1817 // newGraph.state.barrier.inc();
1819 request.register(newGraph, new Listener<T>() {
1822 public void exception(Throwable t) {
1823 if(throwable != null) throwable.set(t);
1824 procedure.exception(t);
1825 // newGraph.state.barrier.dec();
1829 public void execute(T t) {
1830 if(result != null) result.set(t);
1831 procedure.execute(t);
1832 // newGraph.state.barrier.dec();
1836 public boolean isDisposed() {
1842 // newGraph.waitAsync(request);
1848 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1860 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1862 scheduleRequest(request, procedure, null, null, null);
1867 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1869 Semaphore notify = new Semaphore(0);
1870 DataContainer<Throwable> container = new DataContainer<Throwable>();
1871 DataContainer<T> result = new DataContainer<T>();
1872 scheduleRequest(request, procedure, notify, container, result);
1873 acquire(notify, request);
1874 Throwable throwable = container.get();
1875 if(throwable != null) {
1876 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1877 else throw new DatabaseException("Unexpected exception", throwable);
1879 return result.get();
1883 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1885 Semaphore notify = new Semaphore(0);
1886 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1887 final DataContainer<T> resultContainer = new DataContainer<T>();
1888 scheduleRequest(request, new AsyncProcedure<T>() {
1890 public void exception(AsyncReadGraph graph, Throwable throwable) {
1891 exceptionContainer.set(throwable);
1892 procedure.exception(graph, throwable);
1895 public void execute(AsyncReadGraph graph, T result) {
1896 resultContainer.set(result);
1897 procedure.execute(graph, result);
1899 }, getListenerBase(procedure), notify);
1900 acquire(notify, request, procedure);
1901 Throwable throwable = exceptionContainer.get();
1902 if (throwable != null) {
1903 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1904 else throw new DatabaseException("Unexpected exception", throwable);
1906 return resultContainer.get();
1911 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1913 Semaphore notify = new Semaphore(0);
1914 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1915 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1917 public void exception(AsyncReadGraph graph, Throwable throwable) {
1918 exceptionContainer.set(throwable);
1919 procedure.exception(graph, throwable);
1922 public void execute(AsyncReadGraph graph, T result) {
1923 procedure.execute(graph, result);
1926 public void finished(AsyncReadGraph graph) {
1927 procedure.finished(graph);
1930 acquire(notify, request, procedure);
1931 Throwable throwable = exceptionContainer.get();
1932 if (throwable != null) {
1933 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1934 else throw new DatabaseException("Unexpected exception", throwable);
1936 // TODO: implement return value
1937 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1942 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1943 return syncRequest(request, new ProcedureAdapter<T>());
1946 // assert(request != null);
1948 // final DataContainer<T> result = new DataContainer<T>();
1949 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1951 // syncRequest(request, new Procedure<T>() {
1954 // public void execute(T t) {
1959 // public void exception(Throwable t) {
1960 // exception.set(t);
1965 // Throwable t = exception.get();
1967 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1968 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1971 // return result.get();
1976 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1977 return syncRequest(request, (Procedure<T>)procedure);
1982 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1983 // assertNotSession();
1984 // Semaphore notify = new Semaphore(0);
1985 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1986 // DataContainer<T> result = new DataContainer<T>();
1987 // scheduleRequest(request, procedure, notify, container, result);
1988 // acquire(notify, request);
1989 // Throwable throwable = container.get();
1990 // if(throwable != null) {
1991 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1992 // else throw new DatabaseException("Unexpected exception", throwable);
1994 // return result.get();
1999 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
2001 Semaphore notify = new Semaphore(0);
2002 final DataContainer<Throwable> container = new DataContainer<Throwable>();
2003 final DataContainer<T> result = new DataContainer<T>();
2004 scheduleRequest(request, procedure, notify, container, result);
2005 acquire(notify, request);
2006 Throwable throwable = container.get();
2007 if (throwable != null) {
2008 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2009 else throw new DatabaseException("Unexpected exception", throwable);
2011 return result.get();
2015 public void syncRequest(Write request) throws DatabaseException {
2018 Semaphore notify = new Semaphore(0);
2019 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2020 scheduleRequest(request, e -> exception.set(e), notify);
2021 acquire(notify, request);
2022 if(exception.get() != null) throw exception.get();
2026 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
2028 Semaphore notify = new Semaphore(0);
2029 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2030 final DataContainer<T> result = new DataContainer<T>();
2031 scheduleRequest(request, new Procedure<T>() {
2034 public void exception(Throwable t) {
2039 public void execute(T t) {
2044 acquire(notify, request);
2045 if(exception.get() != null) {
2046 Throwable t = exception.get();
2047 if(t instanceof DatabaseException) throw (DatabaseException)t;
2048 else throw new DatabaseException(t);
2050 return result.get();
2054 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2056 Semaphore notify = new Semaphore(0);
2057 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2058 final DataContainer<T> result = new DataContainer<T>();
2059 scheduleRequest(request, new Procedure<T>() {
2062 public void exception(Throwable t) {
2067 public void execute(T t) {
2072 acquire(notify, request);
2073 if(exception.get() != null) {
2074 Throwable t = exception.get();
2075 if(t instanceof DatabaseException) throw (DatabaseException)t;
2076 else throw new DatabaseException(t);
2078 return result.get();
2082 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2084 Semaphore notify = new Semaphore(0);
2085 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2086 final DataContainer<T> result = new DataContainer<T>();
2087 scheduleRequest(request, new Procedure<T>() {
2090 public void exception(Throwable t) {
2095 public void execute(T t) {
2100 acquire(notify, request);
2101 if(exception.get() != null) {
2102 Throwable t = exception.get();
2103 if(t instanceof DatabaseException) throw (DatabaseException)t;
2104 else throw new DatabaseException(t);
2106 return result.get();
2110 public void syncRequest(DelayedWrite request) throws DatabaseException {
2112 Semaphore notify = new Semaphore(0);
2113 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2114 scheduleRequest(request, e -> exception.set(e), notify);
2115 acquire(notify, request);
2116 if(exception.get() != null) throw exception.get();
2120 public void syncRequest(WriteOnly request) throws DatabaseException {
2123 Semaphore notify = new Semaphore(0);
2124 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2125 scheduleRequest(request, e -> exception.set(e), notify);
2126 acquire(notify, request);
2127 if(exception.get() != null) throw exception.get();
2132 * GraphRequestProcessor interface
2136 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2138 scheduleRequest(request, procedure, null, null);
2143 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2145 scheduleRequest(request, procedure, null);
2150 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2152 scheduleRequest(request, procedure, null, null, null);
2157 public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2159 scheduleRequest(request, callback, null);
2164 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2166 scheduleRequest(request, procedure, null);
2171 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2173 scheduleRequest(request, procedure, null);
2178 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2180 scheduleRequest(request, procedure, null);
2185 public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2187 scheduleRequest(request, callback, null);
2192 public void asyncRequest(final Write r) {
2193 asyncRequest(r, null);
2197 public void asyncRequest(final DelayedWrite r) {
2198 asyncRequest(r, null);
2202 public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2204 scheduleRequest(request, callback, null);
2209 public void asyncRequest(final WriteOnly request) {
2211 asyncRequest(request, null);
2216 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2217 r.request(this, procedure);
2221 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2222 r.request(this, procedure);
2226 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2227 r.request(this, procedure);
2231 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2232 r.request(this, procedure);
2236 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2237 r.request(this, procedure);
2241 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2242 r.request(this, procedure);
2246 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2247 return r.request(this);
2251 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2252 return r.request(this);
2256 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2257 r.request(this, procedure);
2261 public <T> void async(WriteInterface<T> r) {
2262 r.request(this, new ProcedureAdapter<T>());
2266 public void incAsync() {
2271 public void decAsync() {
2275 public long getCluster(ResourceImpl resource) {
2276 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2277 return cluster.getClusterId();
2280 public long getCluster(int id) {
2281 if (clusterTable == null)
2282 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2283 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2286 public ResourceImpl getResource(int id) {
2287 return new ResourceImpl(resourceSupport, id);
2290 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2291 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2292 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2293 int key = proxy.getClusterKey();
2294 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2295 return new ResourceImpl(resourceSupport, resourceKey);
2298 public ResourceImpl getResource2(int id) {
2300 return new ResourceImpl(resourceSupport, id);
2303 final public int getId(ResourceImpl impl) {
2307 public static final Charset UTF8 = Charset.forName("utf-8");
2312 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2313 for(TransientGraph g : support.providers) {
2314 if(g.isPending(subject)) return false;
2319 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2320 for(TransientGraph g : support.providers) {
2321 if(g.isPending(subject, predicate)) return false;
2326 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2328 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2330 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2333 public void accept(ReadGraphImpl graph) {
2334 if(ready.decrementAndGet() == 0) {
2335 runnable.accept(graph);
2341 for(TransientGraph g : support.providers) {
2342 if(g.isPending(subject)) {
2344 g.load(graph, subject, composite);
2345 } catch (DatabaseException e) {
2346 e.printStackTrace();
2349 composite.accept(graph);
2353 composite.accept(graph);
2357 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2359 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2361 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2364 public void accept(ReadGraphImpl graph) {
2365 if(ready.decrementAndGet() == 0) {
2366 runnable.accept(graph);
2372 for(TransientGraph g : support.providers) {
2373 if(g.isPending(subject, predicate)) {
2375 g.load(graph, subject, predicate, composite);
2376 } catch (DatabaseException e) {
2377 e.printStackTrace();
2380 composite.accept(graph);
2384 composite.accept(graph);
2388 // void dumpHeap() {
2391 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2392 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2393 // "d:/heap" + flushCounter + ".txt", true);
2394 // } catch (IOException e) {
2395 // e.printStackTrace();
2400 void fireReactionsToSynchronize(ChangeSet cs) {
2402 // Do not fire empty events
2406 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2407 // g.state.barrier.inc();
2411 if (!cs.isEmpty()) {
2412 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2413 for (ChangeListener l : changeListeners2) {
2416 } catch (Exception ex) {
2417 ex.printStackTrace();
2424 // g.state.barrier.dec();
2425 // g.waitAsync(null);
2431 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2434 // Do not fire empty events
2439 // graph.state.barrier.inc();
2443 if (!cs2.isEmpty()) {
2445 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2446 for (ChangeListener l : changeListeners2) {
2449 // System.out.println("changelistener " + l);
2450 } catch (Exception ex) {
2451 ex.printStackTrace();
2459 // graph.state.barrier.dec();
2460 // graph.waitAsync(null);
2463 } catch (Throwable t) {
2464 t.printStackTrace();
2468 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2472 // Do not fire empty events
2476 // Do not fire on virtual requests
2477 if(graph.getProvider() != null)
2480 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2484 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2485 for (ChangeListener l : metadataListeners) {
2488 } catch (Throwable ex) {
2489 LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex);
2497 } catch (Throwable t) {
2498 LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t);
2507 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2510 public <T> T getService(Class<T> api) {
2511 T t = peekService(api);
2513 if (state.isClosed())
2514 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2515 throw new ServiceNotFoundException(this, api);
2523 protected abstract ServerInformation getCachedServerInformation();
2525 private Class<?> serviceKey1 = null;
2526 private Class<?> serviceKey2 = null;
2527 private Object service1 = null;
2528 private Object service2 = null;
2534 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2536 @SuppressWarnings("unchecked")
2538 public synchronized <T> T peekService(Class<T> api) {
2540 if(serviceKey1 == api) {
2542 } else if (serviceKey2 == api) {
2544 Object result = service2;
2545 service2 = service1;
2546 serviceKey2 = serviceKey1;
2552 if (Layer0.class == api)
2554 if (ServerInformation.class == api)
2555 return (T) getCachedServerInformation();
2556 else if (WriteGraphImpl.class == api)
2557 return (T) writeState.getGraph();
2558 else if (ClusterBuilder.class == api)
2559 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2560 else if (ClusterBuilderFactory.class == api)
2561 return (T)new ClusterBuilderFactoryImpl(this);
2563 service2 = service1;
2564 serviceKey2 = serviceKey1;
2566 service1 = serviceLocator.peekService(api);
2577 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2580 public boolean hasService(Class<?> api) {
2581 return serviceLocator.hasService(api);
2585 * @param api the api that must be implemented by the specified service
2586 * @param service the service implementation
2589 public <T> void registerService(Class<T> api, T service) {
2590 if(Layer0.class == api) {
2591 L0 = (Layer0)service;
2594 serviceLocator.registerService(api, service);
2595 if (TransactionPolicySupport.class == api) {
2596 transactionPolicy = (TransactionPolicySupport)service;
2597 state.resetTransactionPolicy();
2599 if (api == serviceKey1)
2601 else if (api == serviceKey2)
2609 void fireSessionVariableChange(String variable) {
2610 for (MonitorHandler h : monitorHandlers) {
2611 MonitorContext ctx = monitorContexts.getLeft(h);
2613 // SafeRunner functionality repeated here to avoid dependency.
2615 h.valuesChanged(ctx);
2616 } catch (Exception e) {
2617 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2618 } catch (LinkageError e) {
2619 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2625 class ResourceSerializerImpl implements ResourceSerializer {
2627 public long createRandomAccessId(int id) throws DatabaseException {
2630 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2631 long cluster = getCluster(id);
2633 return 0; // Better to return 0 then invalid id.
2634 long result = ClusterTraitsBase.createResourceId(cluster, index);
2639 public long getRandomAccessId(Resource resource) throws DatabaseException {
2641 ResourceImpl resourceImpl = (ResourceImpl) resource;
2642 return createRandomAccessId(resourceImpl.id);
2647 public int getTransientId(Resource resource) throws DatabaseException {
2649 ResourceImpl resourceImpl = (ResourceImpl) resource;
2650 return resourceImpl.id;
2655 public String createRandomAccessId(Resource resource)
2656 throws InvalidResourceReferenceException {
2658 if(resource == null) throw new IllegalArgumentException();
2660 ResourceImpl resourceImpl = (ResourceImpl) resource;
2662 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2666 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2667 } catch (DatabaseException e1) {
2668 throw new InvalidResourceReferenceException(e1);
2671 // Serialize as '<resource index>_<cluster id>'
2672 return "" + r + "_" + getCluster(resourceImpl);
2673 } catch (Throwable e) {
2674 e.printStackTrace();
2675 throw new InvalidResourceReferenceException(e);
2680 public int getTransientId(long serialized) throws DatabaseException {
2681 if (serialized <= 0)
2682 return (int)serialized;
2683 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2684 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2685 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2686 if (cluster == null)
2687 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2688 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2693 public Resource getResource(long randomAccessId) throws DatabaseException {
2694 return getResourceByKey(getTransientId(randomAccessId));
2698 public Resource getResource(int transientId) throws DatabaseException {
2699 return getResourceByKey(transientId);
2703 public Resource getResource(String randomAccessId)
2704 throws InvalidResourceReferenceException {
2706 int i = randomAccessId.indexOf('_');
2708 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2709 + randomAccessId + "'");
2710 int r = Integer.parseInt(randomAccessId.substring(0, i));
2711 if(r < 0) return getResourceByKey(r);
2712 long c = Long.parseLong(randomAccessId.substring(i + 1));
2713 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2714 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2715 if (cluster.hasResource(key, clusterTranslator))
2716 return getResourceByKey(key);
2717 } catch (InvalidResourceReferenceException e) {
2719 } catch (NumberFormatException e) {
2720 throw new InvalidResourceReferenceException(e);
2721 } catch (Throwable e) {
2722 e.printStackTrace();
2723 throw new InvalidResourceReferenceException(e);
2726 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2730 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2733 } catch (Throwable e) {
2734 e.printStackTrace();
2735 throw new InvalidResourceReferenceException(e);
2741 // Copied from old SessionImpl
2744 if (state.isClosed())
2745 throw new Error("Session closed.");
2750 public ResourceImpl getNewResource(long clusterId) {
2751 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2754 newId = cluster.createResource(clusterTranslator);
2755 } catch (DatabaseException e) {
2756 Logger.defaultLogError(e);
2759 return new ResourceImpl(resourceSupport, newId);
2762 public ResourceImpl getNewResource(Resource clusterSet)
2763 throws DatabaseException {
2764 long resourceId = clusterSet.getResourceId();
2765 Long clusterId = clusterSetsSupport.get(resourceId);
2766 if (null == clusterId)
2767 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2768 if (Constants.NewClusterId == clusterId) {
2769 clusterId = getService(ClusteringSupport.class).createCluster();
2770 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2771 createdClusters.add(clusterId);
2772 clusterSetsSupport.put(resourceId, clusterId);
2773 return getNewResource(clusterId);
2775 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2776 ResourceImpl result;
2777 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2778 clusterId = getService(ClusteringSupport.class).createCluster();
2779 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2780 createdClusters.add(clusterId);
2781 clusterSetsSupport.put(resourceId, clusterId);
2782 return getNewResource(clusterId);
2784 result = getNewResource(clusterId);
2785 int resultKey = querySupport.getId(result);
2786 long resultCluster = querySupport.getClusterId(resultKey);
2787 if (clusterId != resultCluster)
2788 clusterSetsSupport.put(resourceId, resultCluster);
2794 public void getNewClusterSet(Resource clusterSet)
2795 throws DatabaseException {
2797 System.out.println("new cluster set=" + clusterSet);
2798 long resourceId = clusterSet.getResourceId();
2799 if (clusterSetsSupport.containsKey(resourceId))
2800 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2801 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2803 public boolean containsClusterSet(Resource clusterSet)
2804 throws ServiceException {
2805 long resourceId = clusterSet.getResourceId();
2806 return clusterSetsSupport.containsKey(resourceId);
2808 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2809 Resource r = defaultClusterSet;
2810 defaultClusterSet = clusterSet;
2813 void printDiagnostics() {
2817 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2818 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2819 for(ClusterI cluster : clusterTable.getClusters()) {
2821 if (cluster.isLoaded() && !cluster.isEmpty()) {
2822 residentClusters.set(residentClusters.get() + 1);
2823 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2825 } catch (DatabaseException e) {
2826 Logger.defaultLogError(e);
2830 System.out.println("--------------------------------");
2831 System.out.println("Cluster information:");
2832 System.out.println("-amount of resident clusters=" + residentClusters.get());
2833 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2835 for(ClusterI cluster : clusterTable.getClusters()) {
2836 System.out.print("Cluster " + cluster.getClusterId() + " [");
2838 if (!cluster.isLoaded())
2839 System.out.println("not loaded]");
2840 else if (cluster.isEmpty())
2841 System.out.println("is empty]");
2843 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2844 System.out.print(",references=" + cluster.getReferenceCount());
2845 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2847 } catch (DatabaseException e) {
2848 Logger.defaultLogError(e);
2849 System.out.println("is corrupted]");
2853 queryProvider2.printDiagnostics();
2854 System.out.println("--------------------------------");
2859 public void fireStartReadTransaction() {
2862 System.out.println("StartReadTransaction");
2864 for (SessionEventListener listener : eventListeners) {
2866 listener.readTransactionStarted();
2867 } catch (Throwable t) {
2868 t.printStackTrace();
2874 public void fireFinishReadTransaction() {
2877 System.out.println("FinishReadTransaction");
2879 for (SessionEventListener listener : eventListeners) {
2881 listener.readTransactionFinished();
2882 } catch (Throwable t) {
2883 t.printStackTrace();
2889 public void fireStartWriteTransaction() {
2892 System.out.println("StartWriteTransaction");
2894 for (SessionEventListener listener : eventListeners) {
2896 listener.writeTransactionStarted();
2897 } catch (Throwable t) {
2898 t.printStackTrace();
2904 public void fireFinishWriteTransaction() {
2907 System.out.println("FinishWriteTransaction");
2909 for (SessionEventListener listener : eventListeners) {
2911 listener.writeTransactionFinished();
2912 } catch (Throwable t) {
2913 t.printStackTrace();
2917 Indexing.resetDependenciesIndexingDisabled();
2921 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2922 throw new Error("Not supported at the moment.");
2929 ClusterTable getClusterTable() {
2930 return clusterTable;
2933 public GraphSession getGraphSession() {
2934 return graphSession;
2937 public QueryProcessor getQueryProvider2() {
2938 return queryProvider2;
2941 ClientChangesImpl getClientChanges() {
2942 return clientChanges;
2945 boolean getWriteOnly() {
2949 static int counter = 0;
2951 public void onClusterLoaded(long clusterId) {
2953 clusterTable.updateSize();
2961 * Implementation of the interface RequestProcessor
2965 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2970 assert(request != null);
2972 final DataContainer<T> result = new DataContainer<T>();
2973 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2975 syncRequest(request, new AsyncProcedure<T>() {
2978 public void execute(AsyncReadGraph graph, T t) {
2983 public void exception(AsyncReadGraph graph, Throwable t) {
2989 Throwable t = exception.get();
2991 if(t instanceof DatabaseException) throw (DatabaseException)t;
2992 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2995 return result.get();
3000 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
3001 // assertNotSession();
3002 // return syncRequest(request, (AsyncProcedure<T>)procedure);
3006 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
3008 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3012 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
3014 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3018 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3020 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3024 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
3026 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3030 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3034 assert(request != null);
3036 final DataContainer<T> result = new DataContainer<T>();
3037 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3039 syncRequest(request, new AsyncProcedure<T>() {
3042 public void execute(AsyncReadGraph graph, T t) {
3047 public void exception(AsyncReadGraph graph, Throwable t) {
3053 Throwable t = exception.get();
3055 if(t instanceof DatabaseException) throw (DatabaseException)t;
3056 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3059 return result.get();
3064 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3066 return syncRequest(request, (AsyncProcedure<T>)procedure);
3070 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3072 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3076 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3078 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3082 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3084 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3088 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3090 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3094 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3098 assert(request != null);
3100 final ArrayList<T> result = new ArrayList<T>();
3101 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3103 syncRequest(request, new SyncMultiProcedure<T>() {
3106 public void execute(ReadGraph graph, T t) {
3107 synchronized(result) {
3113 public void finished(ReadGraph graph) {
3117 public void exception(ReadGraph graph, Throwable t) {
3124 Throwable t = exception.get();
3126 if(t instanceof DatabaseException) throw (DatabaseException)t;
3127 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3135 public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
3137 throw new Error("Not implemented!");
3141 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3143 return syncRequest(request, (SyncMultiProcedure<T>)procedure);
3147 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3149 return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3153 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3155 return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3159 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3163 assert(request != null);
3165 final ArrayList<T> result = new ArrayList<T>();
3166 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3168 syncRequest(request, new AsyncMultiProcedure<T>() {
3171 public void execute(AsyncReadGraph graph, T t) {
3172 synchronized(result) {
3178 public void finished(AsyncReadGraph graph) {
3182 public void exception(AsyncReadGraph graph, Throwable t) {
3189 Throwable t = exception.get();
3191 if (t instanceof DatabaseException)
3192 throw (DatabaseException) t;
3194 throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3202 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3204 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3208 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3210 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3214 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3216 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3220 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3222 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3226 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3228 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3233 public <T> void asyncRequest(Read<T> request) {
3235 asyncRequest(request, new ProcedureAdapter<T>() {
3237 public void exception(Throwable t) {
3238 t.printStackTrace();
3245 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3246 asyncRequest(request, (AsyncProcedure<T>)procedure);
3250 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3251 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3255 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3256 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3260 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3261 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3265 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3266 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3270 final public <T> void asyncRequest(final AsyncRead<T> request) {
3272 assert(request != null);
3274 asyncRequest(request, new ProcedureAdapter<T>() {
3276 public void exception(Throwable t) {
3277 t.printStackTrace();
3284 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3285 scheduleRequest(request, procedure, procedure, null);
3289 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3290 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3294 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3295 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3299 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3300 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3304 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3305 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3309 public <T> void asyncRequest(MultiRead<T> request) {
3311 assert(request != null);
3313 asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
3315 public void exception(ReadGraph graph, Throwable t) {
3316 t.printStackTrace();
3323 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3324 asyncRequest(request, (SyncMultiProcedure<T>)procedure);
3328 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3329 asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3333 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3334 scheduleRequest(request, procedure, null);
3338 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3339 asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3343 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3345 assert(request != null);
3347 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3349 public void exception(AsyncReadGraph graph, Throwable t) {
3350 t.printStackTrace();
3357 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3358 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3362 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3363 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3367 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3368 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3372 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3373 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3377 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3378 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3382 final public <T> void asyncRequest(final ExternalRead<T> request) {
3384 assert(request != null);
3386 asyncRequest(request, new ProcedureAdapter<T>() {
3388 public void exception(Throwable t) {
3389 t.printStackTrace();
3396 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3397 asyncRequest(request, (Procedure<T>)procedure);
3400 boolean sameProvider(Write request) {
3401 if(writeState.getGraph().provider != null) {
3402 return writeState.getGraph().provider.equals(request.getProvider());
3404 return request.getProvider() == null;
3408 boolean plainWrite(WriteGraphImpl graph) {
3409 if(graph == null) return false;
3410 if(graph.writeSupport.writeOnly()) return false;
3415 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3417 private void assertNotSession() throws DatabaseException {
3418 Thread current = Thread.currentThread();
3419 if (sessionThreads.contains(current))
3420 throw new ServiceException("Caller is already inside a transaction.");
3423 void assertAlive() {
3424 if (!state.isAlive())
3425 throw new RuntimeDatabaseException("Session has been shut down.");
3428 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3429 return querySupport.getValueStream(graph, querySupport.getId(resource));
3432 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3433 return querySupport.getValue(graph, querySupport.getId(resource));
3436 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3437 acquire(semaphore, request, null);
3440 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3444 // Loop until the semaphore is acquired reporting requests that take a long time.
3447 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3451 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3453 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3454 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3456 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3457 + (waitTime) + " ms. (procedure=" + procedure + ")");
3464 } catch (InterruptedException e) {
3465 e.printStackTrace();
3466 // FIXME: Should perhaps do something else in this case ??
3473 // public boolean holdOnToTransactionAfterCancel() {
3474 // return transactionPolicy.holdOnToTransactionAfterCancel();
3478 // public boolean holdOnToTransactionAfterCommit() {
3479 // return transactionPolicy.holdOnToTransactionAfterCommit();
3483 // public boolean holdOnToTransactionAfterRead() {
3484 // return transactionPolicy.holdOnToTransactionAfterRead();
3488 // public void onRelinquish() {
3489 // transactionPolicy.onRelinquish();
3493 // public void onRelinquishDone() {
3494 // transactionPolicy.onRelinquishDone();
3498 // public void onRelinquishError() {
3499 // transactionPolicy.onRelinquishError();
3504 public Session getSession() {
3509 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3510 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3512 public void ceased(int thread) {
3514 requestManager.ceased(thread);
3518 public int getAmountOfQueryThreads() {
3519 // This must be a power of two
3521 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3524 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3526 return new ResourceImpl(resourceSupport, key);
3530 public void acquireWriteOnly() {
3534 public void releaseWriteOnly(ReadGraphImpl graph) {
3536 queryProvider2.releaseWrite(graph);
3539 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3541 long start = System.nanoTime();
3543 while(dirtyPrimitives) {
3544 dirtyPrimitives = false;
3545 getQueryProvider2().performDirtyUpdates(writer);
3546 getQueryProvider2().performScheduledUpdates(writer);
3549 fireMetadataListeners(writer, clientChanges);
3551 if(DebugPolicy.PERFORMANCE_DATA) {
3552 long end = System.nanoTime();
3553 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3558 void removeTemporaryData() {
3559 File platform = Platform.getLocation().toFile();
3560 File tempFiles = new File(platform, "tempFiles");
3561 File temp = new File(tempFiles, "db");
3565 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3567 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3570 } catch (IOException e) {
3571 Logger.defaultLogError(e);
3573 return FileVisitResult.CONTINUE;
3576 } catch (IOException e) {
3577 Logger.defaultLogError(e);
3581 public void handleCreatedClusters() {
3582 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3583 createdClusters.forEach(new TLongProcedure() {
3586 public boolean execute(long value) {
3587 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3588 cluster.setImmutable(true, clusterTranslator);
3593 createdClusters.clear();
3597 public Object getModificationCounter() {
3598 return queryProvider2.modificationCounter;
3602 public void markUndoPoint() {
3603 state.setCombine(false);