1 /*******************************************************************************
2 * Copyright (c) 2007, 2018 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.EventSupport;
147 import org.simantics.db.service.GraphChangeListenerSupport;
148 import org.simantics.db.service.InitSupport;
149 import org.simantics.db.service.LifecycleSupport;
150 import org.simantics.db.service.ManagementSupport;
151 import org.simantics.db.service.QueryControl;
152 import org.simantics.db.service.SerialisationSupport;
153 import org.simantics.db.service.ServerInformation;
154 import org.simantics.db.service.ServiceActivityMonitor;
155 import org.simantics.db.service.SessionEventSupport;
156 import org.simantics.db.service.SessionMonitorSupport;
157 import org.simantics.db.service.SessionUserSupport;
158 import org.simantics.db.service.StatementSupport;
159 import org.simantics.db.service.TransactionPolicySupport;
160 import org.simantics.db.service.TransactionSupport;
161 import org.simantics.db.service.TransferableGraphSupport;
162 import org.simantics.db.service.UndoRedoSupport;
163 import org.simantics.db.service.VirtualGraphSupport;
164 import org.simantics.db.service.XSupport;
165 import org.simantics.layer0.Layer0;
166 import org.simantics.utils.DataContainer;
167 import org.simantics.utils.Development;
168 import org.simantics.utils.threads.logger.ITask;
169 import org.simantics.utils.threads.logger.ThreadLogger;
170 import org.slf4j.LoggerFactory;
172 import gnu.trove.procedure.TLongProcedure;
173 import gnu.trove.set.hash.TLongHashSet;
176 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
178 private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class);
180 protected static final boolean DEBUG = false;
182 private static final boolean DIAGNOSTICS = false;
184 private TransactionPolicySupport transactionPolicy;
186 final private ClusterControl clusterControl;
188 final protected BuiltinSupportImpl builtinSupport;
189 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
190 final protected ClusterSetsSupport clusterSetsSupport;
191 final protected LifecycleSupportImpl lifecycleSupport;
193 protected QuerySupportImpl querySupport;
194 protected ResourceSupportImpl resourceSupport;
195 protected WriteSupport writeSupport;
196 public ClusterTranslator clusterTranslator;
198 boolean dirtyPrimitives = false;
200 public static final int SERVICE_MODE_CREATE = 2;
201 public static final int SERVICE_MODE_ALLOW = 1;
202 public int serviceMode = 0;
203 public boolean createdImmutableClusters = false;
204 public TLongHashSet createdClusters = new TLongHashSet();
209 * The service locator maintained by the workbench. These services are
210 * initialized during workbench during the <code>init</code> method.
212 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
214 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
216 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
218 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
220 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
222 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
224 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
226 final protected State state = new State();
228 protected GraphSession graphSession = null;
230 protected SessionManager sessionManagerImpl = null;
232 protected UserAuthenticationAgent authAgent = null;
234 protected UserAuthenticator authenticator = null;
236 protected Resource user = null;
238 protected ClusterStream clusterStream = null;
240 protected SessionRequestManager requestManager = null;
242 public ClusterTable clusterTable = null;
244 public QueryProcessor queryProvider2 = null;
246 ClientChangesImpl clientChanges = null;
248 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
250 // protected long newClusterId = Constants.NullClusterId;
252 protected int flushCounter = 0;
254 protected boolean writeOnly = false;
256 WriteState<?> writeState = null;
257 WriteStateBase<?> delayedWriteState = null;
258 protected Resource defaultClusterSet = null; // If not null then used for newResource().
260 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
262 // if (authAgent == null)
263 // throw new IllegalArgumentException("null authentication agent");
265 File t = StaticSessionProperties.virtualGraphStoragePath;
268 this.clusterTable = new ClusterTable(this, t);
269 this.builtinSupport = new BuiltinSupportImpl(this);
270 this.sessionManagerImpl = sessionManagerImpl;
272 // this.authAgent = authAgent;
274 serviceLocator.registerService(Session.class, this);
275 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
276 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
277 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
278 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
279 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
280 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
281 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
282 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
283 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
284 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
285 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
286 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
287 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
288 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
289 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
290 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
291 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
292 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
293 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
294 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
295 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
296 serviceLocator.registerService(EventSupport.class, new EventSupportImpl());
297 ServiceActivityUpdaterForWriteTransactions.register(this);
299 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
300 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
301 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
302 this.lifecycleSupport = new LifecycleSupportImpl(this);
303 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
304 this.transactionPolicy = new TransactionPolicyRelease();
305 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
306 this.clusterControl = new ClusterControlImpl(this);
307 serviceLocator.registerService(ClusterControl.class, clusterControl);
308 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
309 this.clusterSetsSupport.setReadDirectory(t.toPath());
310 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
311 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
321 public Resource getRootLibrary() {
322 return queryProvider2.getRootLibraryResource();
325 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
326 if (!graphSession.dbSession.refreshEnabled())
329 getClusterTable().refresh(csid, this, clusterUID);
330 } catch (Throwable t) {
331 LOGGER.error("refresh({}, {}) failed", thread, csid, t);
335 static final class TaskHelper {
336 private final String name;
337 private Object result;
338 final Semaphore sema = new Semaphore(0);
339 private Throwable throwable = null;
340 final Consumer<DatabaseException> callback = e -> {
341 synchronized (TaskHelper.this) {
345 final Procedure<Object> proc = new Procedure<Object>() {
347 public void execute(Object result) {
348 callback.accept(null);
351 public void exception(Throwable t) {
352 if (t instanceof DatabaseException)
353 callback.accept((DatabaseException)t);
355 callback.accept(new DatabaseException("" + name + "operation failed.", t));
358 final WriteTraits writeTraits = new WriteTraits() {};
359 TaskHelper(String name) {
362 @SuppressWarnings("unchecked")
366 void setResult(Object result) {
367 this.result = result;
369 // Throwable throwableGet() {
372 synchronized void throwableSet(Throwable t) {
375 // void throwableSet(String t) {
376 // throwable = new InternalException("" + name + " operation failed. " + t);
378 synchronized void throwableCheck()
379 throws DatabaseException {
380 if (null != throwable)
381 if (throwable instanceof DatabaseException)
382 throw (DatabaseException)throwable;
384 throw new DatabaseException("Undo operation failed.", throwable);
386 void throw_(String message)
387 throws DatabaseException {
388 throw new DatabaseException("" + name + " operation failed. " + message);
394 private ListenerBase getListenerBase(Object procedure) {
395 if (procedure instanceof ListenerBase)
396 return (ListenerBase) procedure;
401 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
402 scheduleRequest(request, callback, notify, null);
406 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
409 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
411 assert (request != null);
413 if(Development.DEVELOPMENT) {
415 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
416 System.err.println("schedule write '" + request + "'");
417 } catch (Throwable t) {
418 Logger.defaultLogError(t);
422 requestManager.scheduleWrite(new SessionTask(null) {
425 public void run0(int thread) {
427 if(Development.DEVELOPMENT) {
429 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
430 System.err.println("perform write '" + request + "'");
431 } catch (Throwable t) {
432 Logger.defaultLogError(t);
436 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
438 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
443 Disposable.safeDispose(clientChanges);
444 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
446 VirtualGraph vg = getProvider(request.getProvider());
448 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
449 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
452 public void execute(Object result) {
453 if(callback != null) callback.accept(null);
457 public void exception(Throwable t) {
458 if(callback != null) callback.accept((DatabaseException)t);
463 assert (null != writer);
464 writer.asyncBarrier.inc();
467 request.perform(writer);
468 assert (null != writer);
469 } catch (Throwable t) {
470 if (!(t instanceof CancelTransactionException))
471 LOGGER.error("Write transaction caused an unexpected error, see exception.", t);
472 writeState.except(t);
474 writer.asyncBarrier.dec();
475 // writer.waitAsync(request);
479 assert(!queryProvider2.cache.dirty);
481 } catch (Throwable e) {
483 // Log it first, just to be safe that the error is always logged.
484 if (!(e instanceof CancelTransactionException))
485 LOGGER.error("Write transaction caused an unexpected error, see exception.", e);
487 // writeState.getGraph().state.barrier.dec();
488 // writeState.getGraph().waitAsync(request);
490 writeState.except(e);
493 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
494 // // All we can do here is to log those, can't really pass them anywhere.
495 // if (callback != null) {
496 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
497 // else callback.run(new DatabaseException(e));
499 // } catch (Throwable e2) {
500 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
502 // boolean empty = clusterStream.reallyFlush();
503 // int callerThread = -1;
504 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
505 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
507 // // Send all local changes to server so it can calculate correct reverse change set.
508 // // This will call clusterStream.accept().
509 // state.cancelCommit(context, clusterStream);
511 // if (!context.isOk()) // this is a blocking operation
512 // throw new InternalException("Cancel failed. This should never happen.");
513 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
515 // state.cancelCommit2(context, clusterStream);
516 // } catch (DatabaseException e3) {
517 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
519 // clusterStream.setOff(false);
520 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
525 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
531 if(Development.DEVELOPMENT) {
533 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
534 System.err.println("finish write '" + request + "'");
535 } catch (Throwable t) {
536 Logger.defaultLogError(t);
546 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
547 scheduleRequest(request, procedure, notify, null);
551 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
554 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
556 assert (request != null);
558 requestManager.scheduleWrite(new SessionTask(null) {
561 public void run0(int thread) {
563 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
565 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
568 Disposable.safeDispose(clientChanges);
569 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
571 VirtualGraph vg = getProvider(request.getProvider());
572 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
574 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
575 writeState = writeStateT;
576 assert (null != writer);
579 writer.asyncBarrier.inc();
580 writeStateT.setResult(request.perform(writer));
581 assert (null != writer);
582 } catch (Throwable e) {
583 writeState.except(e);
585 writer.asyncBarrier.dec();
586 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
597 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
598 scheduleRequest(request, callback, notify, null);
602 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
605 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
607 final ITask total = ThreadLogger.task("ScheduleDelayedWrite");
609 assert (request != null);
611 requestManager.scheduleWrite(new SessionTask(null) {
614 public void run0(int thread) {
615 fireSessionVariableChange(SessionVariables.QUEUED_READS);
617 Procedure<Object> stateProcedure = new Procedure<Object>() {
619 public void execute(Object result) {
620 if (callback != null)
621 callback.accept(null);
624 public void exception(Throwable t) {
625 if (callback != null) {
626 if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
627 else callback.accept(new DatabaseException(t));
629 LOGGER.error("Unhandled exception", t);
633 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
634 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
635 DelayedWriteGraph dwg = null;
636 // newGraph.state.barrier.inc();
639 dwg = new DelayedWriteGraph(newGraph);
640 request.perform(dwg);
641 } catch (Throwable e) {
642 delayedWriteState.except(e);
647 // newGraph.state.barrier.dec();
648 // newGraph.waitAsync(request);
649 fireSessionVariableChange(SessionVariables.QUEUED_READS);
652 delayedWriteState = null;
654 ITask task2 = ThreadLogger.task("DelayedWriteCommit");
655 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
658 Disposable.safeDispose(clientChanges);
659 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
663 VirtualGraph vg = getProvider(request.getProvider());
664 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
666 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
668 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
670 assert (null != writer);
671 // writer.state.barrier.inc();
675 dwg.commit(writer, request);
677 if(defaultClusterSet != null) {
678 XSupport xs = getService(XSupport.class);
679 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
680 ClusteringSupport cs = getService(ClusteringSupport.class);
681 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
684 // This makes clusters available from server
685 clusterStream.reallyFlush();
687 releaseWriteOnly(writer);
688 handleUpdatesAndMetadata(writer);
690 } catch (ServiceException e) {
691 // writer.state.barrier.dec();
692 // writer.waitAsync(null);
694 // These shall be requested from server
695 clusterTable.removeWriteOnlyClusters();
696 // This makes clusters available from server
697 clusterStream.reallyFlush();
699 releaseWriteOnly(writer);
700 writeState.except(e);
702 // Debugging & Profiling
703 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
713 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
714 scheduleRequest(request, procedure, notify, null);
718 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
721 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
722 throw new Error("Not implemented");
725 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
726 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
727 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
728 createdClusters.add(cluster.clusterId);
733 class WriteOnlySupport implements WriteSupport {
735 ClusterStream stream;
736 ClusterImpl currentCluster;
738 public WriteOnlySupport() {
739 this.stream = clusterStream;
743 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
744 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
748 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
750 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
752 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
753 if(s != queryProvider2.getRootLibrary())
754 throw new ImmutableException("Trying to modify immutable resource key=" + s);
757 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
758 } catch (DatabaseException e) {
759 Logger.defaultLogError(e);
763 clientChanges.invalidate(s);
765 if (cluster.isWriteOnly())
767 queryProvider2.updateStatements(s, p);
772 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
773 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
777 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
779 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
781 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
782 } catch (DatabaseException e) {
783 Logger.defaultLogError(e);
786 clientChanges.invalidate(rid);
788 if (cluster.isWriteOnly())
790 queryProvider2.updateValue(rid);
795 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
798 claimValue(provider,resource, reader.readBytes(null, amount));
802 byte[] bytes = new byte[65536];
804 int rid = ((ResourceImpl)resource).id;
805 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
809 int block = Math.min(left, 65536);
810 reader.readBytes(bytes, block);
811 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
814 } catch (DatabaseException e) {
815 Logger.defaultLogError(e);
818 clientChanges.invalidate(rid);
820 if (cluster.isWriteOnly())
822 queryProvider2.updateValue(rid);
826 private void maintainCluster(ClusterImpl before, ClusterI after_) {
827 if(after_ != null && after_ != before) {
828 ClusterImpl after = (ClusterImpl)after_;
829 if(currentCluster == before) {
830 currentCluster = after;
832 clusterTable.replaceCluster(after);
836 public int createResourceKey(int foreignCounter) throws DatabaseException {
837 if(currentCluster == null) {
838 currentCluster = getNewResourceCluster();
840 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
841 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
842 newCluster.foreignLookup = new byte[foreignCounter];
843 currentCluster = newCluster;
845 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
847 return currentCluster.createResource(clusterTranslator);
851 public Resource createResource(VirtualGraph provider) throws DatabaseException {
852 if(currentCluster == null) {
853 if (null != defaultClusterSet) {
854 ResourceImpl result = getNewResource(defaultClusterSet);
855 currentCluster = clusterTable.getClusterByResourceKey(result.id);
858 currentCluster = getNewResourceCluster();
861 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
862 if (null != defaultClusterSet) {
863 ResourceImpl result = getNewResource(defaultClusterSet);
864 currentCluster = clusterTable.getClusterByResourceKey(result.id);
867 currentCluster = getNewResourceCluster();
870 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
874 public Resource createResource(VirtualGraph provider, long clusterId)
875 throws DatabaseException {
876 return getNewResource(clusterId);
880 public Resource createResource(VirtualGraph provider, Resource clusterSet)
881 throws DatabaseException {
882 return getNewResource(clusterSet);
886 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
887 throws DatabaseException {
888 getNewClusterSet(clusterSet);
892 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
893 throws ServiceException {
894 return containsClusterSet(clusterSet);
897 public void selectCluster(long cluster) {
898 currentCluster = clusterTable.getClusterByClusterId(cluster);
899 long setResourceId = clusterSetsSupport.getSet(cluster);
900 clusterSetsSupport.put(setResourceId, cluster);
904 public Resource setDefaultClusterSet(Resource clusterSet)
905 throws ServiceException {
906 Resource result = setDefaultClusterSet4NewResource(clusterSet);
907 if(clusterSet != null) {
908 long id = clusterSetsSupport.get(clusterSet.getResourceId());
909 currentCluster = clusterTable.getClusterByClusterId(id);
912 currentCluster = null;
918 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
920 provider = getProvider(provider);
921 if (null == provider) {
922 int key = ((ResourceImpl)resource).id;
926 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
927 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
928 // if(key != queryProvider2.getRootLibrary())
929 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
932 // cluster.removeValue(key, clusterTranslator);
933 // } catch (DatabaseException e) {
934 // Logger.defaultLogError(e);
938 clusterTable.writeOnlyInvalidate(cluster);
941 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
942 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
943 clusterTranslator.removeValue(cluster);
944 } catch (DatabaseException e) {
945 Logger.defaultLogError(e);
948 queryProvider2.invalidateResource(key);
949 clientChanges.invalidate(key);
952 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
953 queryProvider2.updateValue(querySupport.getId(resource));
954 clientChanges.claimValue(resource);
961 public void flush(boolean intermediate) {
962 throw new UnsupportedOperationException();
966 public void flushCluster() {
967 clusterTable.flushCluster(graphSession);
968 if(defaultClusterSet != null) {
969 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
971 currentCluster = null;
975 public void flushCluster(Resource r) {
976 throw new UnsupportedOperationException("flushCluster resource " + r);
984 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
987 int s = ((ResourceImpl)subject).id;
988 int p = ((ResourceImpl)predicate).id;
989 int o = ((ResourceImpl)object).id;
991 provider = getProvider(provider);
992 if (null == provider) {
994 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
995 clusterTable.writeOnlyInvalidate(cluster);
999 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1001 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1002 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1004 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1005 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1006 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1007 clusterTranslator.removeStatement(cluster);
1009 queryProvider2.invalidateResource(s);
1010 clientChanges.invalidate(s);
1012 } catch (DatabaseException e) {
1014 Logger.defaultLogError(e);
1022 ((VirtualGraphImpl)provider).deny(s, p, o);
1023 queryProvider2.invalidateResource(s);
1024 clientChanges.invalidate(s);
1033 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1034 throw new UnsupportedOperationException();
1038 public boolean writeOnly() {
1043 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1044 writeSupport.performWriteRequest(graph, request);
1048 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1049 throw new UnsupportedOperationException();
1053 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1054 throw new UnsupportedOperationException();
1058 public <T> void addMetadata(Metadata data) throws ServiceException {
1059 writeSupport.addMetadata(data);
1063 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1064 return writeSupport.getMetadata(clazz);
1068 public TreeMap<String, byte[]> getMetadata() {
1069 return writeSupport.getMetadata();
1073 public void commitDone(WriteTraits writeTraits, long csid) {
1074 writeSupport.commitDone(writeTraits, csid);
1078 public void clearUndoList(WriteTraits writeTraits) {
1079 writeSupport.clearUndoList(writeTraits);
1082 public int clearMetadata() {
1083 return writeSupport.clearMetadata();
1087 public void startUndo() {
1088 writeSupport.startUndo();
1092 class VirtualWriteOnlySupport implements WriteSupport {
1095 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1096 // Resource object) {
1097 // throw new UnsupportedOperationException();
1101 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1103 TransientGraph impl = (TransientGraph)provider;
1104 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1105 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1106 clientChanges.claim(subject, predicate, object);
1111 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1113 TransientGraph impl = (TransientGraph)provider;
1114 impl.claim(subject, predicate, object);
1115 getQueryProvider2().updateStatements(subject, predicate);
1116 clientChanges.claim(subject, predicate, object);
1121 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1122 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1126 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1127 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1128 getQueryProvider2().updateValue(resource);
1129 clientChanges.claimValue(resource);
1133 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1134 byte[] value = reader.readBytes(null, amount);
1135 claimValue(provider, resource, value);
1139 public Resource createResource(VirtualGraph provider) {
1140 TransientGraph impl = (TransientGraph)provider;
1141 return impl.getResource(impl.newResource(false));
1145 public Resource createResource(VirtualGraph provider, long clusterId) {
1146 throw new UnsupportedOperationException();
1150 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1151 throws DatabaseException {
1152 throw new UnsupportedOperationException();
1156 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1157 throws DatabaseException {
1158 throw new UnsupportedOperationException();
1162 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1163 throws ServiceException {
1164 throw new UnsupportedOperationException();
1168 public Resource setDefaultClusterSet(Resource clusterSet)
1169 throws ServiceException {
1174 public void denyValue(VirtualGraph provider, Resource resource) {
1175 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1176 getQueryProvider2().updateValue(querySupport.getId(resource));
1177 // NOTE: this only keeps track of value changes by-resource.
1178 clientChanges.claimValue(resource);
1182 public void flush(boolean intermediate) {
1183 throw new UnsupportedOperationException();
1187 public void flushCluster() {
1188 throw new UnsupportedOperationException();
1192 public void flushCluster(Resource r) {
1193 throw new UnsupportedOperationException("Resource " + r);
1201 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1202 TransientGraph impl = (TransientGraph) provider;
1203 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1204 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1205 clientChanges.deny(subject, predicate, object);
1210 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1211 throw new UnsupportedOperationException();
1215 public boolean writeOnly() {
1220 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1221 throw new UnsupportedOperationException();
1225 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1226 throw new UnsupportedOperationException();
1230 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1231 throw new UnsupportedOperationException();
1235 public <T> void addMetadata(Metadata data) throws ServiceException {
1236 throw new UnsupportedOperationException();
1240 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1241 throw new UnsupportedOperationException();
1245 public TreeMap<String, byte[]> getMetadata() {
1246 throw new UnsupportedOperationException();
1250 public void commitDone(WriteTraits writeTraits, long csid) {
1254 public void clearUndoList(WriteTraits writeTraits) {
1258 public int clearMetadata() {
1263 public void startUndo() {
1267 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1271 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1273 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1276 Disposable.safeDispose(clientChanges);
1277 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1281 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1283 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1285 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1286 writeState = writeStateT;
1288 assert (null != writer);
1289 // writer.state.barrier.inc();
1290 long start = System.nanoTime();
1291 T result = request.perform(writer);
1292 long duration = System.nanoTime() - start;
1294 System.err.println("################");
1295 System.err.println("WriteOnly duration " + 1e-9*duration);
1297 writeStateT.setResult(result);
1299 // This makes clusters available from server
1300 clusterStream.reallyFlush();
1302 // This will trigger query updates
1303 releaseWriteOnly(writer);
1304 assert (null != writer);
1306 handleUpdatesAndMetadata(writer);
1308 } catch (CancelTransactionException e) {
1310 releaseWriteOnly(writeState.getGraph());
1312 clusterTable.removeWriteOnlyClusters();
1313 state.stopWriteTransaction(clusterStream);
1315 } catch (Throwable e) {
1317 e.printStackTrace();
1319 releaseWriteOnly(writeState.getGraph());
1321 clusterTable.removeWriteOnlyClusters();
1323 if (callback != null)
1324 callback.exception(new DatabaseException(e));
1326 state.stopWriteTransaction(clusterStream);
1327 LOGGER.error("Write transaction caused an unexpected error, see exception.", e);
1331 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1337 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1338 scheduleRequest(request, callback, notify, null);
1342 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1346 assert (request != null);
1348 requestManager.scheduleWrite(new SessionTask(null) {
1351 public void run0(int thread) {
1353 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1357 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1360 Disposable.safeDispose(clientChanges);
1361 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1365 VirtualGraph vg = getProvider(request.getProvider());
1366 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1368 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1370 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1373 public void execute(Object result) {
1374 if(callback != null) callback.accept(null);
1378 public void exception(Throwable t) {
1379 if(callback != null) callback.accept((DatabaseException)t);
1384 assert (null != writer);
1385 // writer.state.barrier.inc();
1389 request.perform(writer);
1391 } catch (Throwable e) {
1393 // writer.state.barrier.dec();
1394 // writer.waitAsync(null);
1396 releaseWriteOnly(writer);
1398 clusterTable.removeWriteOnlyClusters();
1400 if(!(e instanceof CancelTransactionException)) {
1401 if (callback != null)
1402 callback.accept(new DatabaseException(e));
1405 writeState.except(e);
1411 // This makes clusters available from server
1412 boolean empty = clusterStream.reallyFlush();
1413 // This was needed to make WO requests call metadata listeners.
1414 // NOTE: the calling event does not contain clientChanges information.
1415 if (!empty && clientChanges.isEmpty())
1416 clientChanges.setNotEmpty(true);
1417 releaseWriteOnly(writer);
1418 assert (null != writer);
1421 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1434 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1435 scheduleRequest(request, callback, notify, null);
1439 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1442 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1444 assert (request != null);
1446 requestManager.scheduleWrite(new SessionTask(null) {
1449 public void run0(int thread) {
1451 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1453 performWriteOnly(request, notify, callback);
1463 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1465 assert (request != null);
1466 assert (procedure != null);
1468 requestManager.scheduleRead(new SessionRead(throwable, notify) {
1471 public void run0(int thread) {
1473 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1475 ITask task = ThreadLogger.task(request);
1477 ListenerBase listener = getListenerBase(procedure);
1479 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1481 // This is never synced but increase to prevent it from visiting 0
1482 newGraph.asyncBarrier.inc();
1486 if (listener != null) {
1490 AsyncProcedure ap = new AsyncProcedure<T>() {
1493 public void exception(AsyncReadGraph graph, Throwable t) {
1494 procedure.exception(graph, t);
1495 if(throwable != null) {
1498 // ErrorLogger.defaultLogError("Unhandled exception", t);
1503 public void execute(AsyncReadGraph graph, T t) {
1504 if(result != null) result.set(t);
1505 procedure.execute(graph, t);
1510 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1512 } catch (Throwable t) {
1513 // This is handled by the AsyncProcedure
1514 //Logger.defaultLogError("Internal error", t);
1521 T t = request.perform(newGraph);
1525 if(result != null) result.set(t);
1526 procedure.execute(newGraph, t);
1528 } catch (Throwable th) {
1530 if(throwable != null) {
1533 LOGGER.error("Unhandled exception", th);
1538 } catch (Throwable t) {
1541 t.printStackTrace();
1543 if(throwable != null) {
1546 LOGGER.error("Unhandled exception", t);
1551 procedure.exception(newGraph, t);
1553 } catch (Throwable t2) {
1555 if(throwable != null) {
1558 LOGGER.error("Unhandled exception", t2);
1571 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1581 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1583 assert (request != null);
1584 assert (procedure != null);
1586 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1588 requestManager.scheduleRead(new SessionRead(null, notify) {
1591 public void run0(int thread) {
1593 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1595 ITask task = ThreadLogger.task(request);
1597 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1598 newGraph.asyncBarrier.inc();
1602 if (listener != null) {
1605 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1606 } catch (DatabaseException e) {
1607 LOGGER.error("Unhandled query exception", e);
1612 BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
1614 public void execute(AsyncReadGraph graph_, T result) {
1616 super.execute(graph_, result);
1619 public void exception(AsyncReadGraph graph_, Throwable t) {
1621 super.exception(graph_, t);
1627 wrap.performSync(request);
1628 } catch (DatabaseException e) {
1629 LOGGER.error("Unhandled query exception", e);
1636 newGraph.asyncBarrier.dec();
1638 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1648 public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1650 assert (request != null);
1651 assert (procedure != null);
1653 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1655 int sync = notify != null ? thread : -1;
1657 requestManager.scheduleRead(new SessionRead(null, notify) {
1660 public void run0(int thread) {
1662 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1664 ListenerBase listener = getListenerBase(procedure);
1666 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1670 if (listener != null) {
1672 newGraph.processor.query(newGraph, request, null, procedure, listener);
1674 // newGraph.waitAsync(request);
1678 final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1682 request.perform(newGraph, wrapper);
1684 } catch (Throwable t) {
1686 t.printStackTrace();
1694 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1704 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1706 assert (request != null);
1707 assert (procedure != null);
1709 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1711 int sync = notify != null ? thread : -1;
1713 requestManager.scheduleRead(new SessionRead(null, notify) {
1716 public void run0(int thread) {
1718 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1720 ListenerBase listener = getListenerBase(procedure);
1722 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1726 if (listener != null) {
1728 newGraph.processor.query(newGraph, request, null, procedure, listener);
1730 // newGraph.waitAsync(request);
1734 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1738 request.perform(newGraph, wrapper);
1740 } catch (Throwable t) {
1742 t.printStackTrace();
1750 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1760 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1762 assert (request != null);
1763 assert (procedure != null);
1765 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1767 requestManager.scheduleRead(new SessionRead(throwable, notify) {
1770 public void run0(int thread) {
1772 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1774 ListenerBase listener = getListenerBase(procedure);
1776 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1780 if (listener != null) {
1783 QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1784 } catch (DatabaseException e) {
1785 Logger.defaultLogError(e);
1790 // newGraph.state.barrier.inc();
1792 request.register(newGraph, new Listener<T>() {
1795 public void exception(Throwable t) {
1796 if(throwable != null) throwable.set(t);
1797 procedure.exception(t);
1798 // newGraph.state.barrier.dec();
1802 public void execute(T t) {
1803 if(result != null) result.set(t);
1804 procedure.execute(t);
1805 // newGraph.state.barrier.dec();
1809 public boolean isDisposed() {
1815 // newGraph.waitAsync(request);
1821 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1833 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1835 scheduleRequest(request, procedure, null, null, null);
1840 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1842 Semaphore notify = new Semaphore(0);
1843 DataContainer<Throwable> container = new DataContainer<Throwable>();
1844 DataContainer<T> result = new DataContainer<T>();
1845 scheduleRequest(request, procedure, notify, container, result);
1846 acquire(notify, request);
1847 Throwable throwable = container.get();
1848 if(throwable != null) {
1849 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1850 else throw new DatabaseException("Unexpected exception", throwable);
1852 return result.get();
1856 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1858 Semaphore notify = new Semaphore(0);
1859 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1860 final DataContainer<T> resultContainer = new DataContainer<T>();
1861 scheduleRequest(request, new AsyncProcedure<T>() {
1863 public void exception(AsyncReadGraph graph, Throwable throwable) {
1864 exceptionContainer.set(throwable);
1865 procedure.exception(graph, throwable);
1868 public void execute(AsyncReadGraph graph, T result) {
1869 resultContainer.set(result);
1870 procedure.execute(graph, result);
1872 }, getListenerBase(procedure), notify);
1873 acquire(notify, request, procedure);
1874 Throwable throwable = exceptionContainer.get();
1875 if (throwable != null) {
1876 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1877 else throw new DatabaseException("Unexpected exception", throwable);
1879 return resultContainer.get();
1884 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1886 Semaphore notify = new Semaphore(0);
1887 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1888 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1890 public void exception(AsyncReadGraph graph, Throwable throwable) {
1891 exceptionContainer.set(throwable);
1892 procedure.exception(graph, throwable);
1895 public void execute(AsyncReadGraph graph, T result) {
1896 procedure.execute(graph, result);
1899 public void finished(AsyncReadGraph graph) {
1900 procedure.finished(graph);
1903 acquire(notify, request, procedure);
1904 Throwable throwable = exceptionContainer.get();
1905 if (throwable != null) {
1906 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1907 else throw new DatabaseException("Unexpected exception", throwable);
1909 // TODO: implement return value
1910 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1915 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1916 return syncRequest(request, new ProcedureAdapter<T>());
1919 // assert(request != null);
1921 // final DataContainer<T> result = new DataContainer<T>();
1922 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1924 // syncRequest(request, new Procedure<T>() {
1927 // public void execute(T t) {
1932 // public void exception(Throwable t) {
1933 // exception.set(t);
1938 // Throwable t = exception.get();
1940 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1941 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1944 // return result.get();
1949 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1950 return syncRequest(request, (Procedure<T>)procedure);
1955 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1956 // assertNotSession();
1957 // Semaphore notify = new Semaphore(0);
1958 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1959 // DataContainer<T> result = new DataContainer<T>();
1960 // scheduleRequest(request, procedure, notify, container, result);
1961 // acquire(notify, request);
1962 // Throwable throwable = container.get();
1963 // if(throwable != null) {
1964 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1965 // else throw new DatabaseException("Unexpected exception", throwable);
1967 // return result.get();
1972 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1974 Semaphore notify = new Semaphore(0);
1975 final DataContainer<Throwable> container = new DataContainer<Throwable>();
1976 final DataContainer<T> result = new DataContainer<T>();
1977 scheduleRequest(request, procedure, notify, container, result);
1978 acquire(notify, request);
1979 Throwable throwable = container.get();
1980 if (throwable != null) {
1981 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1982 else throw new DatabaseException("Unexpected exception", throwable);
1984 return result.get();
1988 public void syncRequest(Write request) throws DatabaseException {
1991 Semaphore notify = new Semaphore(0);
1992 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1993 scheduleRequest(request, e -> exception.set(e), notify);
1994 acquire(notify, request);
1995 if(exception.get() != null) throw exception.get();
1999 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
2001 Semaphore notify = new Semaphore(0);
2002 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2003 final DataContainer<T> result = new DataContainer<T>();
2004 scheduleRequest(request, new Procedure<T>() {
2007 public void exception(Throwable t) {
2012 public void execute(T t) {
2017 acquire(notify, request);
2018 if(exception.get() != null) {
2019 Throwable t = exception.get();
2020 if(t instanceof DatabaseException) throw (DatabaseException)t;
2021 else throw new DatabaseException(t);
2023 return result.get();
2027 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2029 Semaphore notify = new Semaphore(0);
2030 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2031 final DataContainer<T> result = new DataContainer<T>();
2032 scheduleRequest(request, new Procedure<T>() {
2035 public void exception(Throwable t) {
2040 public void execute(T t) {
2045 acquire(notify, request);
2046 if(exception.get() != null) {
2047 Throwable t = exception.get();
2048 if(t instanceof DatabaseException) throw (DatabaseException)t;
2049 else throw new DatabaseException(t);
2051 return result.get();
2055 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2057 Semaphore notify = new Semaphore(0);
2058 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2059 final DataContainer<T> result = new DataContainer<T>();
2060 scheduleRequest(request, new Procedure<T>() {
2063 public void exception(Throwable t) {
2068 public void execute(T t) {
2073 acquire(notify, request);
2074 if(exception.get() != null) {
2075 Throwable t = exception.get();
2076 if(t instanceof DatabaseException) throw (DatabaseException)t;
2077 else throw new DatabaseException(t);
2079 return result.get();
2083 public void syncRequest(DelayedWrite request) throws DatabaseException {
2085 Semaphore notify = new Semaphore(0);
2086 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2087 scheduleRequest(request, e -> exception.set(e), notify);
2088 acquire(notify, request);
2089 if(exception.get() != null) throw exception.get();
2093 public void syncRequest(WriteOnly request) throws DatabaseException {
2096 Semaphore notify = new Semaphore(0);
2097 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2098 scheduleRequest(request, e -> exception.set(e), notify);
2099 acquire(notify, request);
2100 if(exception.get() != null) throw exception.get();
2105 * GraphRequestProcessor interface
2109 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2111 scheduleRequest(request, procedure, null, null);
2116 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2118 scheduleRequest(request, procedure, null);
2123 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2125 scheduleRequest(request, procedure, null, null, null);
2130 public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2132 scheduleRequest(request, callback, null);
2137 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2139 scheduleRequest(request, procedure, null);
2144 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2146 scheduleRequest(request, procedure, null);
2151 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2153 scheduleRequest(request, procedure, null);
2158 public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2160 scheduleRequest(request, callback, null);
2165 public void asyncRequest(final Write r) {
2166 asyncRequest(r, null);
2170 public void asyncRequest(final DelayedWrite r) {
2171 asyncRequest(r, null);
2175 public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2177 scheduleRequest(request, callback, null);
2182 public void asyncRequest(final WriteOnly request) {
2184 asyncRequest(request, null);
2189 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2190 r.request(this, procedure);
2194 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2195 r.request(this, procedure);
2199 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2200 r.request(this, procedure);
2204 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2205 r.request(this, procedure);
2209 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2210 r.request(this, procedure);
2214 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2215 r.request(this, procedure);
2219 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2220 return r.request(this);
2224 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2225 return r.request(this);
2229 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2230 r.request(this, procedure);
2234 public <T> void async(WriteInterface<T> r) {
2235 r.request(this, new ProcedureAdapter<T>());
2239 public void incAsync() {
2244 public void decAsync() {
2248 public long getCluster(ResourceImpl resource) {
2249 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2250 return cluster.getClusterId();
2253 public long getCluster(int id) {
2254 if (clusterTable == null)
2255 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2256 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2259 public ResourceImpl getResource(int id) {
2260 return new ResourceImpl(resourceSupport, id);
2263 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2264 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2265 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2266 int key = proxy.getClusterKey();
2267 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2268 return new ResourceImpl(resourceSupport, resourceKey);
2271 public ResourceImpl getResource2(int id) {
2273 return new ResourceImpl(resourceSupport, id);
2276 final public int getId(ResourceImpl impl) {
2280 public static final Charset UTF8 = Charset.forName("utf-8");
2285 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2286 for(TransientGraph g : support.providers) {
2287 if(g.isPending(subject)) return false;
2292 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2293 for(TransientGraph g : support.providers) {
2294 if(g.isPending(subject, predicate)) return false;
2299 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2301 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2303 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2306 public void accept(ReadGraphImpl graph) {
2307 if(ready.decrementAndGet() == 0) {
2308 runnable.accept(graph);
2314 for(TransientGraph g : support.providers) {
2315 if(g.isPending(subject)) {
2317 g.load(graph, subject, composite);
2318 } catch (DatabaseException e) {
2319 e.printStackTrace();
2322 composite.accept(graph);
2326 composite.accept(graph);
2330 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2332 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2334 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2337 public void accept(ReadGraphImpl graph) {
2338 if(ready.decrementAndGet() == 0) {
2339 runnable.accept(graph);
2345 for(TransientGraph g : support.providers) {
2346 if(g.isPending(subject, predicate)) {
2348 g.load(graph, subject, predicate, composite);
2349 } catch (DatabaseException e) {
2350 e.printStackTrace();
2353 composite.accept(graph);
2357 composite.accept(graph);
2361 // void dumpHeap() {
2364 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2365 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2366 // "d:/heap" + flushCounter + ".txt", true);
2367 // } catch (IOException e) {
2368 // e.printStackTrace();
2373 void fireReactionsToSynchronize(ChangeSet cs) {
2375 // Do not fire empty events
2379 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2380 // g.state.barrier.inc();
2384 if (!cs.isEmpty()) {
2385 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2386 for (ChangeListener l : changeListeners2) {
2389 } catch (Exception ex) {
2390 ex.printStackTrace();
2397 // g.state.barrier.dec();
2398 // g.waitAsync(null);
2404 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2407 // Do not fire empty events
2412 // graph.state.barrier.inc();
2416 if (!cs2.isEmpty()) {
2418 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2419 for (ChangeListener l : changeListeners2) {
2422 // System.out.println("changelistener " + l);
2423 } catch (Exception ex) {
2424 ex.printStackTrace();
2432 // graph.state.barrier.dec();
2433 // graph.waitAsync(null);
2436 } catch (Throwable t) {
2437 t.printStackTrace();
2441 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2445 // Do not fire empty events
2449 // Do not fire on virtual requests
2450 if(graph.getProvider() != null)
2453 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2454 reactionGraph.asyncBarrier.inc();
2458 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2459 for (ChangeListener l : metadataListeners) {
2462 } catch (Throwable ex) {
2463 LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex);
2469 reactionGraph.asyncBarrier.dec();
2473 } catch (Throwable t) {
2474 LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t);
2483 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2486 public <T> T getService(Class<T> api) {
2487 T t = peekService(api);
2489 if (state.isClosed())
2490 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2491 throw new ServiceNotFoundException(this, api);
2499 protected abstract ServerInformation getCachedServerInformation();
2501 private Class<?> serviceKey1 = null;
2502 private Class<?> serviceKey2 = null;
2503 private Object service1 = null;
2504 private Object service2 = null;
2510 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2512 @SuppressWarnings("unchecked")
2514 public <T> T peekService(Class<T> api) {
2515 if (Layer0.class == api)
2518 synchronized (this) {
2519 if (serviceKey1 == api) {
2520 return (T) service1;
2522 if (serviceKey2 == api) {
2524 Object result = service2;
2525 service2 = service1;
2526 serviceKey2 = serviceKey1;
2532 if (ServerInformation.class == api)
2533 return (T) getCachedServerInformation();
2534 else if (WriteGraphImpl.class == api)
2535 return (T) writeState.getGraph();
2536 else if (ClusterBuilder.class == api)
2537 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2538 else if (ClusterBuilderFactory.class == api)
2539 return (T)new ClusterBuilderFactoryImpl(this);
2541 service2 = service1;
2542 serviceKey2 = serviceKey1;
2544 service1 = serviceLocator.peekService(api);
2555 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2558 public boolean hasService(Class<?> api) {
2559 return serviceLocator.hasService(api);
2563 * @param api the api that must be implemented by the specified service
2564 * @param service the service implementation
2567 public <T> void registerService(Class<T> api, T service) {
2568 if(Layer0.class == api) {
2569 L0 = (Layer0)service;
2572 serviceLocator.registerService(api, service);
2573 if (TransactionPolicySupport.class == api) {
2574 transactionPolicy = (TransactionPolicySupport)service;
2575 state.resetTransactionPolicy();
2577 if (api == serviceKey1)
2579 else if (api == serviceKey2)
2587 void fireSessionVariableChange(String variable) {
2588 for (MonitorHandler h : monitorHandlers) {
2589 MonitorContext ctx = monitorContexts.getLeft(h);
2591 // SafeRunner functionality repeated here to avoid dependency.
2593 h.valuesChanged(ctx);
2594 } catch (Exception e) {
2595 LOGGER.error("monitor handler notification produced the following exception", e);
2596 } catch (LinkageError e) {
2597 LOGGER.error("monitor handler notification produced a linkage error", e);
2603 class ResourceSerializerImpl implements ResourceSerializer {
2605 public long createRandomAccessId(int id) throws DatabaseException {
2608 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2609 long cluster = getCluster(id);
2611 return 0; // Better to return 0 then invalid id.
2612 long result = ClusterTraitsBase.createResourceId(cluster, index);
2617 public long getRandomAccessId(Resource resource) throws DatabaseException {
2619 ResourceImpl resourceImpl = (ResourceImpl) resource;
2620 return createRandomAccessId(resourceImpl.id);
2625 public int getTransientId(Resource resource) throws DatabaseException {
2627 ResourceImpl resourceImpl = (ResourceImpl) resource;
2628 return resourceImpl.id;
2633 public String createRandomAccessId(Resource resource)
2634 throws InvalidResourceReferenceException {
2636 if(resource == null) throw new IllegalArgumentException();
2638 ResourceImpl resourceImpl = (ResourceImpl) resource;
2640 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2644 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2645 } catch (DatabaseException e1) {
2646 throw new InvalidResourceReferenceException(e1);
2649 // Serialize as '<resource index>_<cluster id>'
2650 return "" + r + "_" + getCluster(resourceImpl);
2651 } catch (Throwable e) {
2652 e.printStackTrace();
2653 throw new InvalidResourceReferenceException(e);
2658 public int getTransientId(long serialized) throws DatabaseException {
2659 if (serialized <= 0)
2660 return (int)serialized;
2661 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2662 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2663 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2664 if (cluster == null)
2665 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2666 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2671 public Resource getResource(long randomAccessId) throws DatabaseException {
2672 return getResourceByKey(getTransientId(randomAccessId));
2676 public Resource getResource(int transientId) throws DatabaseException {
2677 return getResourceByKey(transientId);
2681 public Resource getResource(String randomAccessId)
2682 throws InvalidResourceReferenceException {
2684 int i = randomAccessId.indexOf('_');
2686 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2687 + randomAccessId + "'");
2688 int r = Integer.parseInt(randomAccessId.substring(0, i));
2689 if(r < 0) return getResourceByKey(r);
2690 long c = Long.parseLong(randomAccessId.substring(i + 1));
2691 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2692 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2693 if (cluster.hasResource(key, clusterTranslator))
2694 return getResourceByKey(key);
2695 } catch (InvalidResourceReferenceException e) {
2697 } catch (NumberFormatException e) {
2698 throw new InvalidResourceReferenceException(e);
2699 } catch (Throwable e) {
2700 e.printStackTrace();
2701 throw new InvalidResourceReferenceException(e);
2704 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2708 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2711 } catch (Throwable e) {
2712 e.printStackTrace();
2713 throw new InvalidResourceReferenceException(e);
2719 // Copied from old SessionImpl
2722 if (state.isClosed())
2723 throw new Error("Session closed.");
2728 public ResourceImpl getNewResource(long clusterId) {
2729 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2732 newId = cluster.createResource(clusterTranslator);
2733 } catch (DatabaseException e) {
2734 Logger.defaultLogError(e);
2737 return new ResourceImpl(resourceSupport, newId);
2740 public ResourceImpl getNewResource(Resource clusterSet)
2741 throws DatabaseException {
2742 long resourceId = clusterSet.getResourceId();
2743 Long clusterId = clusterSetsSupport.get(resourceId);
2744 if (null == clusterId)
2745 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2746 if (Constants.NewClusterId == clusterId) {
2747 clusterId = getService(ClusteringSupport.class).createCluster();
2748 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2749 createdClusters.add(clusterId);
2750 clusterSetsSupport.put(resourceId, clusterId);
2751 return getNewResource(clusterId);
2753 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2754 ResourceImpl result;
2755 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2756 clusterId = getService(ClusteringSupport.class).createCluster();
2757 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2758 createdClusters.add(clusterId);
2759 clusterSetsSupport.put(resourceId, clusterId);
2760 return getNewResource(clusterId);
2762 result = getNewResource(clusterId);
2763 int resultKey = querySupport.getId(result);
2764 long resultCluster = querySupport.getClusterId(resultKey);
2765 if (clusterId != resultCluster)
2766 clusterSetsSupport.put(resourceId, resultCluster);
2772 public void getNewClusterSet(Resource clusterSet)
2773 throws DatabaseException {
2775 System.out.println("new cluster set=" + clusterSet);
2776 long resourceId = clusterSet.getResourceId();
2777 if (clusterSetsSupport.containsKey(resourceId))
2778 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2779 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2781 public boolean containsClusterSet(Resource clusterSet)
2782 throws ServiceException {
2783 long resourceId = clusterSet.getResourceId();
2784 return clusterSetsSupport.containsKey(resourceId);
2786 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2787 Resource r = defaultClusterSet;
2788 defaultClusterSet = clusterSet;
2791 void printDiagnostics() {
2795 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2796 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2797 for(ClusterI cluster : clusterTable.getClusters()) {
2799 if (cluster.isLoaded() && !cluster.isEmpty()) {
2800 residentClusters.set(residentClusters.get() + 1);
2801 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2803 } catch (DatabaseException e) {
2804 Logger.defaultLogError(e);
2808 System.out.println("--------------------------------");
2809 System.out.println("Cluster information:");
2810 System.out.println("-amount of resident clusters=" + residentClusters.get());
2811 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2813 for(ClusterI cluster : clusterTable.getClusters()) {
2814 System.out.print("Cluster " + cluster.getClusterId() + " [");
2816 if (!cluster.isLoaded())
2817 System.out.println("not loaded]");
2818 else if (cluster.isEmpty())
2819 System.out.println("is empty]");
2821 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2822 System.out.print(",references=" + cluster.getReferenceCount());
2823 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2825 } catch (DatabaseException e) {
2826 Logger.defaultLogError(e);
2827 System.out.println("is corrupted]");
2831 queryProvider2.printDiagnostics();
2832 System.out.println("--------------------------------");
2837 public void fireStartReadTransaction() {
2840 System.out.println("StartReadTransaction");
2842 for (SessionEventListener listener : eventListeners) {
2844 listener.readTransactionStarted();
2845 } catch (Throwable t) {
2846 t.printStackTrace();
2852 public void fireFinishReadTransaction() {
2855 System.out.println("FinishReadTransaction");
2857 for (SessionEventListener listener : eventListeners) {
2859 listener.readTransactionFinished();
2860 } catch (Throwable t) {
2861 t.printStackTrace();
2867 public void fireStartWriteTransaction() {
2870 System.out.println("StartWriteTransaction");
2872 for (SessionEventListener listener : eventListeners) {
2874 listener.writeTransactionStarted();
2875 } catch (Throwable t) {
2876 t.printStackTrace();
2882 public void fireFinishWriteTransaction() {
2885 System.out.println("FinishWriteTransaction");
2887 for (SessionEventListener listener : eventListeners) {
2889 listener.writeTransactionFinished();
2890 } catch (Throwable t) {
2891 t.printStackTrace();
2895 Indexing.resetDependenciesIndexingDisabled();
2899 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2900 throw new Error("Not supported at the moment.");
2907 ClusterTable getClusterTable() {
2908 return clusterTable;
2911 public GraphSession getGraphSession() {
2912 return graphSession;
2915 public QueryProcessor getQueryProvider2() {
2916 return queryProvider2;
2919 ClientChangesImpl getClientChanges() {
2920 return clientChanges;
2923 boolean getWriteOnly() {
2927 static int counter = 0;
2929 public void onClusterLoaded(long clusterId) {
2931 clusterTable.updateSize();
2939 * Implementation of the interface RequestProcessor
2943 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2948 assert(request != null);
2950 final DataContainer<T> result = new DataContainer<T>();
2951 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2953 syncRequest(request, new AsyncProcedure<T>() {
2956 public void execute(AsyncReadGraph graph, T t) {
2961 public void exception(AsyncReadGraph graph, Throwable t) {
2967 Throwable t = exception.get();
2969 if(t instanceof DatabaseException) throw (DatabaseException)t;
2970 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2973 return result.get();
2978 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2979 // assertNotSession();
2980 // return syncRequest(request, (AsyncProcedure<T>)procedure);
2984 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2986 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2990 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2992 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2996 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2998 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3002 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
3004 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3008 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3012 assert(request != null);
3014 final DataContainer<T> result = new DataContainer<T>();
3015 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3017 syncRequest(request, new AsyncProcedure<T>() {
3020 public void execute(AsyncReadGraph graph, T t) {
3025 public void exception(AsyncReadGraph graph, Throwable t) {
3031 Throwable t = exception.get();
3033 if(t instanceof DatabaseException) throw (DatabaseException)t;
3034 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3037 return result.get();
3042 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3044 return syncRequest(request, (AsyncProcedure<T>)procedure);
3048 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3050 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3054 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3056 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3060 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3062 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3066 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3068 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3072 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3076 assert(request != null);
3078 final ArrayList<T> result = new ArrayList<T>();
3079 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3081 syncRequest(request, new SyncMultiProcedure<T>() {
3084 public void execute(ReadGraph graph, T t) {
3085 synchronized(result) {
3091 public void finished(ReadGraph graph) {
3095 public void exception(ReadGraph graph, Throwable t) {
3102 Throwable t = exception.get();
3104 if(t instanceof DatabaseException) throw (DatabaseException)t;
3105 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3113 public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
3115 throw new Error("Not implemented!");
3119 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3121 return syncRequest(request, (SyncMultiProcedure<T>)procedure);
3125 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3127 return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3131 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3133 return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3137 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3141 assert(request != null);
3143 final ArrayList<T> result = new ArrayList<T>();
3144 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3146 syncRequest(request, new AsyncMultiProcedure<T>() {
3149 public void execute(AsyncReadGraph graph, T t) {
3150 synchronized(result) {
3156 public void finished(AsyncReadGraph graph) {
3160 public void exception(AsyncReadGraph graph, Throwable t) {
3167 Throwable t = exception.get();
3169 if (t instanceof DatabaseException)
3170 throw (DatabaseException) t;
3172 throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3180 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3182 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3186 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3188 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3192 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3194 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3198 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3200 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3204 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3206 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3211 public <T> void asyncRequest(Read<T> request) {
3213 asyncRequest(request, new ProcedureAdapter<T>() {
3215 public void exception(Throwable t) {
3216 t.printStackTrace();
3223 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3224 asyncRequest(request, (AsyncProcedure<T>)procedure);
3228 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3229 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3233 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3234 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3238 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3239 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3243 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3244 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3248 final public <T> void asyncRequest(final AsyncRead<T> request) {
3250 assert(request != null);
3252 asyncRequest(request, new ProcedureAdapter<T>() {
3254 public void exception(Throwable t) {
3255 t.printStackTrace();
3262 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3263 scheduleRequest(request, procedure, procedure, null);
3267 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3268 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3272 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3273 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3277 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3278 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3282 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3283 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3287 public <T> void asyncRequest(MultiRead<T> request) {
3289 assert(request != null);
3291 asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
3293 public void exception(ReadGraph graph, Throwable t) {
3294 t.printStackTrace();
3301 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3302 asyncRequest(request, (SyncMultiProcedure<T>)procedure);
3306 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3307 asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3311 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3312 scheduleRequest(request, procedure, null);
3316 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3317 asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3321 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3323 assert(request != null);
3325 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3327 public void exception(AsyncReadGraph graph, Throwable t) {
3328 t.printStackTrace();
3335 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3336 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3340 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3341 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3345 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3346 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3350 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3351 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3355 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3356 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3360 final public <T> void asyncRequest(final ExternalRead<T> request) {
3362 assert(request != null);
3364 asyncRequest(request, new ProcedureAdapter<T>() {
3366 public void exception(Throwable t) {
3367 t.printStackTrace();
3374 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3375 asyncRequest(request, (Procedure<T>)procedure);
3378 boolean sameProvider(Write request) {
3379 if(writeState.getGraph().provider != null) {
3380 return writeState.getGraph().provider.equals(request.getProvider());
3382 return request.getProvider() == null;
3386 boolean plainWrite(WriteGraphImpl graph) {
3387 if(graph == null) return false;
3388 if(graph.writeSupport.writeOnly()) return false;
3393 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3395 private void assertNotSession() throws DatabaseException {
3396 Thread current = Thread.currentThread();
3397 if (sessionThreads.contains(current))
3398 throw new ServiceException("Caller is already inside a transaction.");
3401 void assertAlive() {
3402 if (!state.isAlive())
3403 throw new RuntimeDatabaseException("Session has been shut down.");
3406 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3407 return querySupport.getValueStream(graph, querySupport.getId(resource));
3410 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3411 return querySupport.getValue(graph, querySupport.getId(resource));
3414 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3415 acquire(semaphore, request, null);
3418 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3422 // Loop until the semaphore is acquired reporting requests that take a long time.
3425 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3429 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3431 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3432 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3434 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3435 + (waitTime) + " ms. (procedure=" + procedure + ")");
3442 } catch (InterruptedException e) {
3443 e.printStackTrace();
3444 // FIXME: Should perhaps do something else in this case ??
3451 // public boolean holdOnToTransactionAfterCancel() {
3452 // return transactionPolicy.holdOnToTransactionAfterCancel();
3456 // public boolean holdOnToTransactionAfterCommit() {
3457 // return transactionPolicy.holdOnToTransactionAfterCommit();
3461 // public boolean holdOnToTransactionAfterRead() {
3462 // return transactionPolicy.holdOnToTransactionAfterRead();
3466 // public void onRelinquish() {
3467 // transactionPolicy.onRelinquish();
3471 // public void onRelinquishDone() {
3472 // transactionPolicy.onRelinquishDone();
3476 // public void onRelinquishError() {
3477 // transactionPolicy.onRelinquishError();
3482 public Session getSession() {
3487 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3488 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3490 public void ceased(int thread) {
3492 requestManager.ceased(thread);
3496 public int getAmountOfQueryThreads() {
3497 // This must be a power of two
3499 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3502 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3504 return new ResourceImpl(resourceSupport, key);
3508 public void acquireWriteOnly() {
3512 public void releaseWriteOnly(ReadGraphImpl graph) {
3514 queryProvider2.releaseWrite(graph);
3517 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3519 long start = System.nanoTime();
3521 while(dirtyPrimitives) {
3522 dirtyPrimitives = false;
3523 getQueryProvider2().propagateChangesInQueryCache(writer);
3524 getQueryProvider2().listening.fireListeners(writer);
3527 fireMetadataListeners(writer, clientChanges);
3529 if(DebugPolicy.PERFORMANCE_DATA) {
3530 long end = System.nanoTime();
3531 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3536 void removeTemporaryData() {
3537 File platform = Platform.getLocation().toFile();
3538 File tempFiles = new File(platform, "tempFiles");
3539 File temp = new File(tempFiles, "db");
3543 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3545 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3548 } catch (IOException e) {
3549 Logger.defaultLogError(e);
3551 return FileVisitResult.CONTINUE;
3554 } catch (IOException e) {
3555 Logger.defaultLogError(e);
3559 public void handleCreatedClusters() {
3560 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3561 createdClusters.forEach(new TLongProcedure() {
3564 public boolean execute(long value) {
3565 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3566 cluster.setImmutable(true, clusterTranslator);
3571 createdClusters.clear();
3575 public Object getModificationCounter() {
3576 return queryProvider2.modificationCounter;
3580 public void markUndoPoint() {
3581 state.setCombine(false);
3584 @SuppressWarnings("unchecked")