1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.Resource;
43 import org.simantics.db.ResourceSerializer;
44 import org.simantics.db.Session;
45 import org.simantics.db.SessionManager;
46 import org.simantics.db.SessionVariables;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.authentication.UserAuthenticationAgent;
50 import org.simantics.db.authentication.UserAuthenticator;
51 import org.simantics.db.common.Indexing;
52 import org.simantics.db.common.TransactionPolicyRelease;
53 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
54 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
62 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
63 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
64 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
65 import org.simantics.db.common.utils.Logger;
66 import org.simantics.db.event.ChangeEvent;
67 import org.simantics.db.event.ChangeListener;
68 import org.simantics.db.event.SessionEventListener;
69 import org.simantics.db.exception.CancelTransactionException;
70 import org.simantics.db.exception.ClusterSetExistException;
71 import org.simantics.db.exception.DatabaseException;
72 import org.simantics.db.exception.ImmutableException;
73 import org.simantics.db.exception.InvalidResourceReferenceException;
74 import org.simantics.db.exception.ResourceNotFoundException;
75 import org.simantics.db.exception.RuntimeDatabaseException;
76 import org.simantics.db.exception.ServiceException;
77 import org.simantics.db.exception.ServiceNotFoundException;
78 import org.simantics.db.impl.ClusterBase;
79 import org.simantics.db.impl.ClusterI;
80 import org.simantics.db.impl.ClusterTraitsBase;
81 import org.simantics.db.impl.ClusterTranslator;
82 import org.simantics.db.impl.ResourceImpl;
83 import org.simantics.db.impl.TransientGraph;
84 import org.simantics.db.impl.VirtualGraphImpl;
85 import org.simantics.db.impl.graph.DelayedWriteGraph;
86 import org.simantics.db.impl.graph.ReadGraphImpl;
87 import org.simantics.db.impl.graph.WriteGraphImpl;
88 import org.simantics.db.impl.graph.WriteSupport;
89 import org.simantics.db.impl.internal.RandomAccessValueSupport;
90 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
91 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
92 import org.simantics.db.impl.query.QueryCache;
93 import org.simantics.db.impl.query.QueryProcessor;
94 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
95 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
96 import org.simantics.db.impl.service.QueryDebug;
97 import org.simantics.db.impl.support.VirtualGraphServerSupport;
98 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
99 import org.simantics.db.procedure.AsyncListener;
100 import org.simantics.db.procedure.AsyncMultiListener;
101 import org.simantics.db.procedure.AsyncMultiProcedure;
102 import org.simantics.db.procedure.AsyncProcedure;
103 import org.simantics.db.procedure.Listener;
104 import org.simantics.db.procedure.ListenerBase;
105 import org.simantics.db.procedure.MultiListener;
106 import org.simantics.db.procedure.MultiProcedure;
107 import org.simantics.db.procedure.Procedure;
108 import org.simantics.db.procedure.SyncListener;
109 import org.simantics.db.procedure.SyncMultiListener;
110 import org.simantics.db.procedure.SyncMultiProcedure;
111 import org.simantics.db.procedure.SyncProcedure;
112 import org.simantics.db.procore.cluster.ClusterImpl;
113 import org.simantics.db.procore.cluster.ClusterTraits;
114 import org.simantics.db.procore.protocol.Constants;
115 import org.simantics.db.procore.protocol.DebugPolicy;
116 import org.simantics.db.request.AsyncMultiRead;
117 import org.simantics.db.request.AsyncRead;
118 import org.simantics.db.request.DelayedWrite;
119 import org.simantics.db.request.DelayedWriteResult;
120 import org.simantics.db.request.ExternalRead;
121 import org.simantics.db.request.MultiRead;
122 import org.simantics.db.request.Read;
123 import org.simantics.db.request.ReadInterface;
124 import org.simantics.db.request.Write;
125 import org.simantics.db.request.WriteInterface;
126 import org.simantics.db.request.WriteOnly;
127 import org.simantics.db.request.WriteOnlyResult;
128 import org.simantics.db.request.WriteResult;
129 import org.simantics.db.request.WriteTraits;
130 import org.simantics.db.service.ByteReader;
131 import org.simantics.db.service.ClusterBuilder;
132 import org.simantics.db.service.ClusterBuilderFactory;
133 import org.simantics.db.service.ClusterControl;
134 import org.simantics.db.service.ClusterSetsSupport;
135 import org.simantics.db.service.ClusterUID;
136 import org.simantics.db.service.ClusteringSupport;
137 import org.simantics.db.service.CollectionSupport;
138 import org.simantics.db.service.DebugSupport;
139 import org.simantics.db.service.DirectQuerySupport;
140 import org.simantics.db.service.GraphChangeListenerSupport;
141 import org.simantics.db.service.InitSupport;
142 import org.simantics.db.service.LifecycleSupport;
143 import org.simantics.db.service.ManagementSupport;
144 import org.simantics.db.service.QueryControl;
145 import org.simantics.db.service.SerialisationSupport;
146 import org.simantics.db.service.ServerInformation;
147 import org.simantics.db.service.ServiceActivityMonitor;
148 import org.simantics.db.service.SessionEventSupport;
149 import org.simantics.db.service.SessionMonitorSupport;
150 import org.simantics.db.service.SessionUserSupport;
151 import org.simantics.db.service.StatementSupport;
152 import org.simantics.db.service.TransactionPolicySupport;
153 import org.simantics.db.service.TransactionSupport;
154 import org.simantics.db.service.TransferableGraphSupport;
155 import org.simantics.db.service.UndoRedoSupport;
156 import org.simantics.db.service.VirtualGraphSupport;
157 import org.simantics.db.service.XSupport;
158 import org.simantics.layer0.Layer0;
159 import org.simantics.utils.DataContainer;
160 import org.simantics.utils.Development;
161 import org.simantics.utils.threads.logger.ITask;
162 import org.simantics.utils.threads.logger.ThreadLogger;
164 import gnu.trove.procedure.TLongProcedure;
165 import gnu.trove.set.hash.TLongHashSet;
168 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
170 protected static final boolean DEBUG = false;
172 private static final boolean DIAGNOSTICS = false;
174 private TransactionPolicySupport transactionPolicy;
176 final private ClusterControl clusterControl;
178 final protected BuiltinSupportImpl builtinSupport;
179 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
180 final protected ClusterSetsSupport clusterSetsSupport;
181 final protected LifecycleSupportImpl lifecycleSupport;
183 protected QuerySupportImpl querySupport;
184 protected ResourceSupportImpl resourceSupport;
185 protected WriteSupport writeSupport;
186 public ClusterTranslator clusterTranslator;
188 boolean dirtyPrimitives = false;
190 public static final int SERVICE_MODE_CREATE = 2;
191 public static final int SERVICE_MODE_ALLOW = 1;
192 public int serviceMode = 0;
193 public boolean createdImmutableClusters = false;
194 public TLongHashSet createdClusters = new TLongHashSet();
199 * The service locator maintained by the workbench. These services are
200 * initialized during workbench during the <code>init</code> method.
202 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
204 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
206 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
208 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
210 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
212 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
214 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
216 final protected State state = new State();
218 protected GraphSession graphSession = null;
220 protected SessionManager sessionManagerImpl = null;
222 protected UserAuthenticationAgent authAgent = null;
224 protected UserAuthenticator authenticator = null;
226 protected Resource user = null;
228 protected ClusterStream clusterStream = null;
230 protected SessionRequestManager requestManager = null;
232 public ClusterTable clusterTable = null;
234 public QueryProcessor queryProvider2 = null;
236 ClientChangesImpl clientChanges = null;
238 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
240 // protected long newClusterId = Constants.NullClusterId;
242 protected int flushCounter = 0;
244 protected boolean writeOnly = false;
246 WriteState<?> writeState = null;
247 WriteStateBase<?> delayedWriteState = null;
248 protected Resource defaultClusterSet = null; // If not null then used for newResource().
250 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
252 // if (authAgent == null)
253 // throw new IllegalArgumentException("null authentication agent");
255 File t = StaticSessionProperties.virtualGraphStoragePath;
258 this.clusterTable = new ClusterTable(this, t);
259 this.builtinSupport = new BuiltinSupportImpl(this);
260 this.sessionManagerImpl = sessionManagerImpl;
262 // this.authAgent = authAgent;
264 serviceLocator.registerService(Session.class, this);
265 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
266 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
267 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
268 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
269 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
270 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
271 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
272 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
273 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
274 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
275 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
276 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
277 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
278 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
279 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
280 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
281 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
282 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
283 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
284 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
285 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
286 ServiceActivityUpdaterForWriteTransactions.register(this);
288 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
289 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
290 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
291 this.lifecycleSupport = new LifecycleSupportImpl(this);
292 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
293 this.transactionPolicy = new TransactionPolicyRelease();
294 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
295 this.clusterControl = new ClusterControlImpl(this);
296 serviceLocator.registerService(ClusterControl.class, clusterControl);
297 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
298 this.clusterSetsSupport.setReadDirectory(t.toPath());
299 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
300 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
310 public Resource getRootLibrary() {
311 return queryProvider2.getRootLibraryResource();
314 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
315 if (!graphSession.dbSession.refreshEnabled())
318 getClusterTable().refresh(csid, this, clusterUID);
319 } catch (Throwable t) {
320 Logger.defaultLogError("Refesh failed.", t);
324 static final class TaskHelper {
325 private final String name;
326 private Object result;
327 final Semaphore sema = new Semaphore(0);
328 private Throwable throwable = null;
329 final Consumer<DatabaseException> callback = e -> {
330 synchronized (TaskHelper.this) {
334 final Procedure<Object> proc = new Procedure<Object>() {
336 public void execute(Object result) {
337 callback.accept(null);
340 public void exception(Throwable t) {
341 if (t instanceof DatabaseException)
342 callback.accept((DatabaseException)t);
344 callback.accept(new DatabaseException("" + name + "operation failed.", t));
347 final WriteTraits writeTraits = new WriteTraits() {};
348 TaskHelper(String name) {
351 @SuppressWarnings("unchecked")
355 void setResult(Object result) {
356 this.result = result;
358 // Throwable throwableGet() {
361 synchronized void throwableSet(Throwable t) {
364 // void throwableSet(String t) {
365 // throwable = new InternalException("" + name + " operation failed. " + t);
367 synchronized void throwableCheck()
368 throws DatabaseException {
369 if (null != throwable)
370 if (throwable instanceof DatabaseException)
371 throw (DatabaseException)throwable;
373 throw new DatabaseException("Undo operation failed.", throwable);
375 void throw_(String message)
376 throws DatabaseException {
377 throw new DatabaseException("" + name + " operation failed. " + message);
383 private ListenerBase getListenerBase(Object procedure) {
384 if (procedure instanceof ListenerBase)
385 return (ListenerBase) procedure;
390 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
391 scheduleRequest(request, callback, notify, null);
395 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
398 public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
400 assert (request != null);
402 if(Development.DEVELOPMENT) {
404 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
405 System.err.println("schedule write '" + request + "'");
406 } catch (Throwable t) {
407 Logger.defaultLogError(t);
411 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
413 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
416 public void run(int thread) {
418 if(Development.DEVELOPMENT) {
420 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
421 System.err.println("perform write '" + request + "'");
422 } catch (Throwable t) {
423 Logger.defaultLogError(t);
427 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
429 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
434 Disposable.safeDispose(clientChanges);
435 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
437 VirtualGraph vg = getProvider(request.getProvider());
439 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
440 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
443 public void execute(Object result) {
444 if(callback != null) callback.accept(null);
448 public void exception(Throwable t) {
449 if(callback != null) callback.accept((DatabaseException)t);
454 assert (null != writer);
455 // writer.state.barrier.inc();
458 request.perform(writer);
459 assert (null != writer);
460 } catch (Throwable t) {
461 if (!(t instanceof CancelTransactionException))
462 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
463 writeState.except(t);
465 // writer.state.barrier.dec();
466 // writer.waitAsync(request);
470 assert(!queryProvider2.cache.dirty);
472 } catch (Throwable e) {
474 // Log it first, just to be safe that the error is always logged.
475 if (!(e instanceof CancelTransactionException))
476 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
478 // writeState.getGraph().state.barrier.dec();
479 // writeState.getGraph().waitAsync(request);
481 writeState.except(e);
484 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
485 // // All we can do here is to log those, can't really pass them anywhere.
486 // if (callback != null) {
487 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
488 // else callback.run(new DatabaseException(e));
490 // } catch (Throwable e2) {
491 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
493 // boolean empty = clusterStream.reallyFlush();
494 // int callerThread = -1;
495 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
496 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
498 // // Send all local changes to server so it can calculate correct reverse change set.
499 // // This will call clusterStream.accept().
500 // state.cancelCommit(context, clusterStream);
502 // if (!context.isOk()) // this is a blocking operation
503 // throw new InternalException("Cancel failed. This should never happen. Contact application support.");
504 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
506 // state.cancelCommit2(context, clusterStream);
507 // } catch (DatabaseException e3) {
508 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
510 // clusterStream.setOff(false);
511 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
516 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
522 if(Development.DEVELOPMENT) {
524 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
525 System.err.println("finish write '" + request + "'");
526 } catch (Throwable t) {
527 Logger.defaultLogError(t);
537 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
538 scheduleRequest(request, procedure, notify, null);
542 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
545 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
547 assert (request != null);
549 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
551 requestManager.scheduleWrite(new SessionTask(request, thread) {
554 public void run(int thread) {
556 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
558 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
561 Disposable.safeDispose(clientChanges);
562 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
564 VirtualGraph vg = getProvider(request.getProvider());
565 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
568 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
569 writeState = writeStateT;
571 assert (null != writer);
572 // writer.state.barrier.inc();
573 writeStateT.setResult(request.perform(writer));
574 assert (null != writer);
576 // writer.state.barrier.dec();
577 // writer.waitAsync(null);
579 } catch (Throwable e) {
581 // writer.state.barrier.dec();
582 // writer.waitAsync(null);
584 writeState.except(e);
586 // state.stopWriteTransaction(clusterStream);
588 // } catch (Throwable e) {
589 // // Log it first, just to be safe that the error is always logged.
590 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
593 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
594 // // All we can do here is to log those, can't really pass them anywhere.
595 // if (procedure != null) {
596 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
597 // else procedure.exception(new DatabaseException(e));
599 // } catch (Throwable e2) {
600 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
603 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
605 // state.stopWriteTransaction(clusterStream);
608 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
611 // if(notify != null) notify.release();
621 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
622 scheduleRequest(request, callback, notify, null);
626 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
629 public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
631 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
633 assert (request != null);
635 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
637 requestManager.scheduleWrite(new SessionTask(request, thread) {
640 public void run(int thread) {
641 fireSessionVariableChange(SessionVariables.QUEUED_READS);
643 Procedure<Object> stateProcedure = new Procedure<Object>() {
645 public void execute(Object result) {
646 if (callback != null)
647 callback.accept(null);
650 public void exception(Throwable t) {
651 if (callback != null) {
652 if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
653 else callback.accept(new DatabaseException(t));
655 Logger.defaultLogError("Unhandled exception", t);
659 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
660 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
661 DelayedWriteGraph dwg = null;
662 // newGraph.state.barrier.inc();
665 dwg = new DelayedWriteGraph(newGraph);
666 request.perform(dwg);
667 } catch (Throwable e) {
668 delayedWriteState.except(e);
673 // newGraph.state.barrier.dec();
674 // newGraph.waitAsync(request);
675 fireSessionVariableChange(SessionVariables.QUEUED_READS);
678 delayedWriteState = null;
680 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
681 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
684 Disposable.safeDispose(clientChanges);
685 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
689 VirtualGraph vg = getProvider(request.getProvider());
690 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
692 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
694 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
696 assert (null != writer);
697 // writer.state.barrier.inc();
701 dwg.commit(writer, request);
703 if(defaultClusterSet != null) {
704 XSupport xs = getService(XSupport.class);
705 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
706 ClusteringSupport cs = getService(ClusteringSupport.class);
707 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
710 // This makes clusters available from server
711 clusterStream.reallyFlush();
713 releaseWriteOnly(writer);
714 handleUpdatesAndMetadata(writer);
716 } catch (ServiceException e) {
717 // writer.state.barrier.dec();
718 // writer.waitAsync(null);
720 // These shall be requested from server
721 clusterTable.removeWriteOnlyClusters();
722 // This makes clusters available from server
723 clusterStream.reallyFlush();
725 releaseWriteOnly(writer);
726 writeState.except(e);
728 // Debugging & Profiling
729 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
739 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
740 scheduleRequest(request, procedure, notify, null);
744 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
747 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
748 throw new Error("Not implemented");
751 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
752 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
753 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
754 createdClusters.add(cluster.clusterId);
759 class WriteOnlySupport implements WriteSupport {
761 ClusterStream stream;
762 ClusterImpl currentCluster;
764 public WriteOnlySupport() {
765 this.stream = clusterStream;
769 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
770 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
774 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
776 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
778 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
779 if(s != queryProvider2.getRootLibrary())
780 throw new ImmutableException("Trying to modify immutable resource key=" + s);
783 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
784 } catch (DatabaseException e) {
785 Logger.defaultLogError(e);
789 clientChanges.invalidate(s);
791 if (cluster.isWriteOnly())
793 queryProvider2.updateStatements(s, p);
798 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
799 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
803 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
805 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
807 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
808 } catch (DatabaseException e) {
809 Logger.defaultLogError(e);
812 clientChanges.invalidate(rid);
814 if (cluster.isWriteOnly())
816 queryProvider2.updateValue(rid);
821 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
824 claimValue(provider,resource, reader.readBytes(null, amount));
828 byte[] bytes = new byte[65536];
830 int rid = ((ResourceImpl)resource).id;
831 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
835 int block = Math.min(left, 65536);
836 reader.readBytes(bytes, block);
837 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
840 } catch (DatabaseException e) {
841 Logger.defaultLogError(e);
844 clientChanges.invalidate(rid);
846 if (cluster.isWriteOnly())
848 queryProvider2.updateValue(rid);
852 private void maintainCluster(ClusterImpl before, ClusterI after_) {
853 if(after_ != null && after_ != before) {
854 ClusterImpl after = (ClusterImpl)after_;
855 if(currentCluster == before) {
856 currentCluster = after;
858 clusterTable.replaceCluster(after);
862 public int createResourceKey(int foreignCounter) throws DatabaseException {
863 if(currentCluster == null) {
864 currentCluster = getNewResourceCluster();
866 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
867 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
868 newCluster.foreignLookup = new byte[foreignCounter];
869 currentCluster = newCluster;
871 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
873 return currentCluster.createResource(clusterTranslator);
877 public Resource createResource(VirtualGraph provider) throws DatabaseException {
878 if(currentCluster == null) {
879 if (null != defaultClusterSet) {
880 ResourceImpl result = getNewResource(defaultClusterSet);
881 currentCluster = clusterTable.getClusterByResourceKey(result.id);
884 currentCluster = getNewResourceCluster();
887 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
888 if (null != defaultClusterSet) {
889 ResourceImpl result = getNewResource(defaultClusterSet);
890 currentCluster = clusterTable.getClusterByResourceKey(result.id);
893 currentCluster = getNewResourceCluster();
896 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
900 public Resource createResource(VirtualGraph provider, long clusterId)
901 throws DatabaseException {
902 return getNewResource(clusterId);
906 public Resource createResource(VirtualGraph provider, Resource clusterSet)
907 throws DatabaseException {
908 return getNewResource(clusterSet);
912 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
913 throws DatabaseException {
914 getNewClusterSet(clusterSet);
918 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
919 throws ServiceException {
920 return containsClusterSet(clusterSet);
923 public void selectCluster(long cluster) {
924 currentCluster = clusterTable.getClusterByClusterId(cluster);
925 long setResourceId = clusterSetsSupport.getSet(cluster);
926 clusterSetsSupport.put(setResourceId, cluster);
930 public Resource setDefaultClusterSet(Resource clusterSet)
931 throws ServiceException {
932 Resource result = setDefaultClusterSet4NewResource(clusterSet);
933 if(clusterSet != null) {
934 long id = clusterSetsSupport.get(clusterSet.getResourceId());
935 currentCluster = clusterTable.getClusterByClusterId(id);
938 currentCluster = null;
944 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
946 provider = getProvider(provider);
947 if (null == provider) {
948 int key = ((ResourceImpl)resource).id;
952 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
953 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
954 // if(key != queryProvider2.getRootLibrary())
955 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
958 // cluster.removeValue(key, clusterTranslator);
959 // } catch (DatabaseException e) {
960 // Logger.defaultLogError(e);
964 clusterTable.writeOnlyInvalidate(cluster);
967 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
968 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
969 clusterTranslator.removeValue(cluster);
970 } catch (DatabaseException e) {
971 Logger.defaultLogError(e);
974 queryProvider2.invalidateResource(key);
975 clientChanges.invalidate(key);
978 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
979 queryProvider2.updateValue(querySupport.getId(resource));
980 clientChanges.claimValue(resource);
987 public void flush(boolean intermediate) {
988 throw new UnsupportedOperationException();
992 public void flushCluster() {
993 clusterTable.flushCluster(graphSession);
994 if(defaultClusterSet != null) {
995 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
997 currentCluster = null;
1001 public void flushCluster(Resource r) {
1002 throw new UnsupportedOperationException("flushCluster resource " + r);
1010 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1013 int s = ((ResourceImpl)subject).id;
1014 int p = ((ResourceImpl)predicate).id;
1015 int o = ((ResourceImpl)object).id;
1017 provider = getProvider(provider);
1018 if (null == provider) {
1020 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1021 clusterTable.writeOnlyInvalidate(cluster);
1025 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1026 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1027 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1029 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1030 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1032 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1033 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1034 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1035 clusterTranslator.removeStatement(cluster);
1037 queryProvider2.invalidateResource(s);
1038 clientChanges.invalidate(s);
1040 } catch (DatabaseException e) {
1042 Logger.defaultLogError(e);
1050 ((VirtualGraphImpl)provider).deny(s, p, o);
1051 queryProvider2.invalidateResource(s);
1052 clientChanges.invalidate(s);
1061 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1062 throw new UnsupportedOperationException();
1066 public boolean writeOnly() {
1071 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1072 writeSupport.performWriteRequest(graph, request);
1076 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1077 throw new UnsupportedOperationException();
1081 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1082 throw new UnsupportedOperationException();
1086 public <T> void addMetadata(Metadata data) throws ServiceException {
1087 writeSupport.addMetadata(data);
1091 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1092 return writeSupport.getMetadata(clazz);
1096 public TreeMap<String, byte[]> getMetadata() {
1097 return writeSupport.getMetadata();
1101 public void commitDone(WriteTraits writeTraits, long csid) {
1102 writeSupport.commitDone(writeTraits, csid);
1106 public void clearUndoList(WriteTraits writeTraits) {
1107 writeSupport.clearUndoList(writeTraits);
1110 public int clearMetadata() {
1111 return writeSupport.clearMetadata();
1115 public void startUndo() {
1116 writeSupport.startUndo();
1120 class VirtualWriteOnlySupport implements WriteSupport {
1123 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1124 // Resource object) {
1125 // throw new UnsupportedOperationException();
1129 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1131 TransientGraph impl = (TransientGraph)provider;
1132 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1133 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1134 clientChanges.claim(subject, predicate, object);
1139 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1141 TransientGraph impl = (TransientGraph)provider;
1142 impl.claim(subject, predicate, object);
1143 getQueryProvider2().updateStatements(subject, predicate);
1144 clientChanges.claim(subject, predicate, object);
1149 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1150 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1154 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1155 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1156 getQueryProvider2().updateValue(resource);
1157 clientChanges.claimValue(resource);
1161 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1162 byte[] value = reader.readBytes(null, amount);
1163 claimValue(provider, resource, value);
1167 public Resource createResource(VirtualGraph provider) {
1168 TransientGraph impl = (TransientGraph)provider;
1169 return impl.getResource(impl.newResource(false));
1173 public Resource createResource(VirtualGraph provider, long clusterId) {
1174 throw new UnsupportedOperationException();
1178 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1179 throws DatabaseException {
1180 throw new UnsupportedOperationException();
1184 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1185 throws DatabaseException {
1186 throw new UnsupportedOperationException();
1190 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1191 throws ServiceException {
1192 throw new UnsupportedOperationException();
1196 public Resource setDefaultClusterSet(Resource clusterSet)
1197 throws ServiceException {
1202 public void denyValue(VirtualGraph provider, Resource resource) {
1203 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1204 getQueryProvider2().updateValue(querySupport.getId(resource));
1205 // NOTE: this only keeps track of value changes by-resource.
1206 clientChanges.claimValue(resource);
1210 public void flush(boolean intermediate) {
1211 throw new UnsupportedOperationException();
1215 public void flushCluster() {
1216 throw new UnsupportedOperationException();
1220 public void flushCluster(Resource r) {
1221 throw new UnsupportedOperationException("Resource " + r);
1229 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1230 TransientGraph impl = (TransientGraph) provider;
1231 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1232 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1233 clientChanges.deny(subject, predicate, object);
1238 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1239 throw new UnsupportedOperationException();
1243 public boolean writeOnly() {
1248 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1249 throw new UnsupportedOperationException();
1253 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1254 throw new UnsupportedOperationException();
1258 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1259 throw new UnsupportedOperationException();
1263 public <T> void addMetadata(Metadata data) throws ServiceException {
1264 throw new UnsupportedOperationException();
1268 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1269 throw new UnsupportedOperationException();
1273 public TreeMap<String, byte[]> getMetadata() {
1274 throw new UnsupportedOperationException();
1278 public void commitDone(WriteTraits writeTraits, long csid) {
1282 public void clearUndoList(WriteTraits writeTraits) {
1286 public int clearMetadata() {
1291 public void startUndo() {
1295 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1299 //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1301 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1304 Disposable.safeDispose(clientChanges);
1305 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1309 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1311 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1313 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1314 writeState = writeStateT;
1316 assert (null != writer);
1317 // writer.state.barrier.inc();
1318 long start = System.nanoTime();
1319 T result = request.perform(writer);
1320 long duration = System.nanoTime() - start;
1322 System.err.println("################");
1323 System.err.println("WriteOnly duration " + 1e-9*duration);
1325 writeStateT.setResult(result);
1327 // This makes clusters available from server
1328 clusterStream.reallyFlush();
1330 // This will trigger query updates
1331 releaseWriteOnly(writer);
1332 assert (null != writer);
1334 handleUpdatesAndMetadata(writer);
1336 } catch (CancelTransactionException e) {
1338 releaseWriteOnly(writeState.getGraph());
1340 clusterTable.removeWriteOnlyClusters();
1341 state.stopWriteTransaction(clusterStream);
1343 } catch (Throwable e) {
1345 e.printStackTrace();
1347 releaseWriteOnly(writeState.getGraph());
1349 clusterTable.removeWriteOnlyClusters();
1351 if (callback != null)
1352 callback.exception(new DatabaseException(e));
1354 state.stopWriteTransaction(clusterStream);
1355 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1359 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1365 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1366 scheduleRequest(request, callback, notify, null);
1370 public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1374 assert (request != null);
1376 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1378 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1381 public void run(int thread) {
1383 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1387 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1390 Disposable.safeDispose(clientChanges);
1391 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1395 VirtualGraph vg = getProvider(request.getProvider());
1396 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1398 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1400 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1403 public void execute(Object result) {
1404 if(callback != null) callback.accept(null);
1408 public void exception(Throwable t) {
1409 if(callback != null) callback.accept((DatabaseException)t);
1414 assert (null != writer);
1415 // writer.state.barrier.inc();
1419 request.perform(writer);
1421 } catch (Throwable e) {
1423 // writer.state.barrier.dec();
1424 // writer.waitAsync(null);
1426 releaseWriteOnly(writer);
1428 clusterTable.removeWriteOnlyClusters();
1430 if(!(e instanceof CancelTransactionException)) {
1431 if (callback != null)
1432 callback.accept(new DatabaseException(e));
1435 writeState.except(e);
1441 // This makes clusters available from server
1442 boolean empty = clusterStream.reallyFlush();
1443 // This was needed to make WO requests call metadata listeners.
1444 // NOTE: the calling event does not contain clientChanges information.
1445 if (!empty && clientChanges.isEmpty())
1446 clientChanges.setNotEmpty(true);
1447 releaseWriteOnly(writer);
1448 assert (null != writer);
1451 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1464 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1465 scheduleRequest(request, callback, notify, null);
1469 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1472 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1474 assert (request != null);
1476 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1478 requestManager.scheduleWrite(new SessionTask(request, thread) {
1481 public void run(int thread) {
1483 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1485 performWriteOnly(request, notify, callback);
1495 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1497 assert (request != null);
1498 assert (procedure != null);
1500 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1502 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1505 public void run(int thread) {
1507 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1509 ListenerBase listener = getListenerBase(procedure);
1511 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1515 if (listener != null) {
1519 AsyncProcedure ap = new AsyncProcedure<T>() {
1522 public void exception(AsyncReadGraph graph, Throwable t) {
1523 procedure.exception(graph, t);
1524 if(throwable != null) {
1527 // ErrorLogger.defaultLogError("Unhandled exception", t);
1532 public void execute(AsyncReadGraph graph, T t) {
1533 if(result != null) result.set(t);
1534 procedure.execute(graph, t);
1539 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap);
1541 } catch (Throwable t) {
1542 // This is handled by the AsyncProcedure
1543 //Logger.defaultLogError("Internal error", t);
1550 // newGraph.state.barrier.inc();
1552 T t = request.perform(newGraph);
1556 if(result != null) result.set(t);
1557 procedure.execute(newGraph, t);
1559 } catch (Throwable th) {
1561 if(throwable != null) {
1564 Logger.defaultLogError("Unhandled exception", th);
1569 } catch (Throwable t) {
1572 t.printStackTrace();
1574 if(throwable != null) {
1577 Logger.defaultLogError("Unhandled exception", t);
1582 procedure.exception(newGraph, t);
1584 } catch (Throwable t2) {
1586 if(throwable != null) {
1589 Logger.defaultLogError("Unhandled exception", t2);
1596 // newGraph.state.barrier.dec();
1597 // newGraph.waitAsync(request);
1603 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1613 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1615 assert (request != null);
1616 assert (procedure != null);
1618 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1620 requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1623 public void run(int thread) {
1625 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1627 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1631 if (listener != null) {
1634 QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure);
1635 //newGraph.processor.query(newGraph, request, null, procedure, listener);
1636 } catch (DatabaseException e) {
1637 Logger.defaultLogError(e);
1642 final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1643 procedure, "request");
1647 // newGraph.state.barrier.inc();
1649 request.perform(newGraph, wrapper);
1651 // newGraph.waitAsync(request);
1653 } catch (Throwable t) {
1655 wrapper.exception(newGraph, t);
1656 // newGraph.waitAsync(request);
1665 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1675 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1677 assert (request != null);
1678 assert (procedure != null);
1680 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1682 int sync = notify != null ? thread : -1;
1684 requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1687 public void run(int thread) {
1689 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1691 ListenerBase listener = getListenerBase(procedure);
1693 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1697 if (listener != null) {
1699 newGraph.processor.query(newGraph, request, null, procedure, listener);
1701 // newGraph.waitAsync(request);
1705 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1709 request.perform(newGraph, wrapper);
1711 } catch (Throwable t) {
1713 t.printStackTrace();
1721 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1731 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1733 assert (request != null);
1734 assert (procedure != null);
1736 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1738 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1741 public void run(int thread) {
1743 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1745 ListenerBase listener = getListenerBase(procedure);
1747 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1751 if (listener != null) {
1753 newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1756 public void exception(Throwable t) {
1757 procedure.exception(t);
1758 if(throwable != null) {
1764 public void execute(T t) {
1765 if(result != null) result.set(t);
1766 procedure.execute(t);
1771 // newGraph.waitAsync(request);
1775 // newGraph.state.barrier.inc();
1777 request.register(newGraph, new Listener<T>() {
1780 public void exception(Throwable t) {
1781 if(throwable != null) throwable.set(t);
1782 procedure.exception(t);
1783 // newGraph.state.barrier.dec();
1787 public void execute(T t) {
1788 if(result != null) result.set(t);
1789 procedure.execute(t);
1790 // newGraph.state.barrier.dec();
1794 public boolean isDisposed() {
1800 // newGraph.waitAsync(request);
1806 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1818 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1820 scheduleRequest(request, procedure, null, null, null);
1825 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1827 Semaphore notify = new Semaphore(0);
1828 DataContainer<Throwable> container = new DataContainer<Throwable>();
1829 DataContainer<T> result = new DataContainer<T>();
1830 scheduleRequest(request, procedure, notify, container, result);
1831 acquire(notify, request);
1832 Throwable throwable = container.get();
1833 if(throwable != null) {
1834 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1835 else throw new DatabaseException("Unexpected exception", throwable);
1837 return result.get();
1841 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1843 Semaphore notify = new Semaphore(0);
1844 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1845 final DataContainer<T> resultContainer = new DataContainer<T>();
1846 scheduleRequest(request, new AsyncProcedure<T>() {
1848 public void exception(AsyncReadGraph graph, Throwable throwable) {
1849 exceptionContainer.set(throwable);
1850 procedure.exception(graph, throwable);
1853 public void execute(AsyncReadGraph graph, T result) {
1854 resultContainer.set(result);
1855 procedure.execute(graph, result);
1857 }, getListenerBase(procedure), notify);
1858 acquire(notify, request, procedure);
1859 Throwable throwable = exceptionContainer.get();
1860 if (throwable != null) {
1861 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1862 else throw new DatabaseException("Unexpected exception", throwable);
1864 return resultContainer.get();
1869 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1871 Semaphore notify = new Semaphore(0);
1872 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1873 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1875 public void exception(AsyncReadGraph graph, Throwable throwable) {
1876 exceptionContainer.set(throwable);
1877 procedure.exception(graph, throwable);
1880 public void execute(AsyncReadGraph graph, T result) {
1881 procedure.execute(graph, result);
1884 public void finished(AsyncReadGraph graph) {
1885 procedure.finished(graph);
1888 acquire(notify, request, procedure);
1889 Throwable throwable = exceptionContainer.get();
1890 if (throwable != null) {
1891 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1892 else throw new DatabaseException("Unexpected exception", throwable);
1894 // TODO: implement return value
1895 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1900 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1901 return syncRequest(request, new ProcedureAdapter<T>());
1904 // assert(request != null);
1906 // final DataContainer<T> result = new DataContainer<T>();
1907 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1909 // syncRequest(request, new Procedure<T>() {
1912 // public void execute(T t) {
1917 // public void exception(Throwable t) {
1918 // exception.set(t);
1923 // Throwable t = exception.get();
1925 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1926 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1929 // return result.get();
1934 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1935 return syncRequest(request, (Procedure<T>)procedure);
1940 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1941 // assertNotSession();
1942 // Semaphore notify = new Semaphore(0);
1943 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1944 // DataContainer<T> result = new DataContainer<T>();
1945 // scheduleRequest(request, procedure, notify, container, result);
1946 // acquire(notify, request);
1947 // Throwable throwable = container.get();
1948 // if(throwable != null) {
1949 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1950 // else throw new DatabaseException("Unexpected exception", throwable);
1952 // return result.get();
1957 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1959 Semaphore notify = new Semaphore(0);
1960 final DataContainer<Throwable> container = new DataContainer<Throwable>();
1961 final DataContainer<T> result = new DataContainer<T>();
1962 scheduleRequest(request, procedure, notify, container, result);
1963 acquire(notify, request);
1964 Throwable throwable = container.get();
1965 if (throwable != null) {
1966 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1967 else throw new DatabaseException("Unexpected exception", throwable);
1969 return result.get();
1973 public void syncRequest(Write request) throws DatabaseException {
1976 Semaphore notify = new Semaphore(0);
1977 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1978 scheduleRequest(request, e -> exception.set(e), notify);
1979 acquire(notify, request);
1980 if(exception.get() != null) throw exception.get();
1984 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1986 Semaphore notify = new Semaphore(0);
1987 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1988 final DataContainer<T> result = new DataContainer<T>();
1989 scheduleRequest(request, new Procedure<T>() {
1992 public void exception(Throwable t) {
1997 public void execute(T t) {
2002 acquire(notify, request);
2003 if(exception.get() != null) {
2004 Throwable t = exception.get();
2005 if(t instanceof DatabaseException) throw (DatabaseException)t;
2006 else throw new DatabaseException(t);
2008 return result.get();
2012 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2014 Semaphore notify = new Semaphore(0);
2015 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2016 final DataContainer<T> result = new DataContainer<T>();
2017 scheduleRequest(request, new Procedure<T>() {
2020 public void exception(Throwable t) {
2025 public void execute(T t) {
2030 acquire(notify, request);
2031 if(exception.get() != null) {
2032 Throwable t = exception.get();
2033 if(t instanceof DatabaseException) throw (DatabaseException)t;
2034 else throw new DatabaseException(t);
2036 return result.get();
2040 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2042 Semaphore notify = new Semaphore(0);
2043 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2044 final DataContainer<T> result = new DataContainer<T>();
2045 scheduleRequest(request, new Procedure<T>() {
2048 public void exception(Throwable t) {
2053 public void execute(T t) {
2058 acquire(notify, request);
2059 if(exception.get() != null) {
2060 Throwable t = exception.get();
2061 if(t instanceof DatabaseException) throw (DatabaseException)t;
2062 else throw new DatabaseException(t);
2064 return result.get();
2068 public void syncRequest(DelayedWrite request) throws DatabaseException {
2070 Semaphore notify = new Semaphore(0);
2071 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2072 scheduleRequest(request, e -> exception.set(e), notify);
2073 acquire(notify, request);
2074 if(exception.get() != null) throw exception.get();
2078 public void syncRequest(WriteOnly request) throws DatabaseException {
2081 Semaphore notify = new Semaphore(0);
2082 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2083 scheduleRequest(request, e -> exception.set(e), notify);
2084 acquire(notify, request);
2085 if(exception.get() != null) throw exception.get();
2090 * GraphRequestProcessor interface
2094 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2096 scheduleRequest(request, procedure, null, null);
2101 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2103 scheduleRequest(request, procedure, null);
2108 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2110 scheduleRequest(request, procedure, null, null, null);
2115 public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2117 scheduleRequest(request, callback, null);
2122 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2124 scheduleRequest(request, procedure, null);
2129 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2131 scheduleRequest(request, procedure, null);
2136 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2138 scheduleRequest(request, procedure, null);
2143 public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2145 scheduleRequest(request, callback, null);
2150 public void asyncRequest(final Write r) {
2151 asyncRequest(r, null);
2155 public void asyncRequest(final DelayedWrite r) {
2156 asyncRequest(r, null);
2160 public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2162 scheduleRequest(request, callback, null);
2167 public void asyncRequest(final WriteOnly request) {
2169 asyncRequest(request, null);
2174 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2175 r.request(this, procedure);
2179 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2180 r.request(this, procedure);
2184 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2185 r.request(this, procedure);
2189 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2190 r.request(this, procedure);
2194 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2195 r.request(this, procedure);
2199 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2200 r.request(this, procedure);
2204 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2205 return r.request(this);
2209 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2210 return r.request(this);
2214 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2215 r.request(this, procedure);
2219 public <T> void async(WriteInterface<T> r) {
2220 r.request(this, new ProcedureAdapter<T>());
2224 public void incAsync() {
2229 public void decAsync() {
2233 public long getCluster(ResourceImpl resource) {
2234 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2235 return cluster.getClusterId();
2238 public long getCluster(int id) {
2239 if (clusterTable == null)
2240 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2241 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2244 public ResourceImpl getResource(int id) {
2245 return new ResourceImpl(resourceSupport, id);
2248 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2249 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2250 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2251 int key = proxy.getClusterKey();
2252 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2253 return new ResourceImpl(resourceSupport, resourceKey);
2256 public ResourceImpl getResource2(int id) {
2258 return new ResourceImpl(resourceSupport, id);
2261 final public int getId(ResourceImpl impl) {
2265 public static final Charset UTF8 = Charset.forName("utf-8");
2270 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2271 for(TransientGraph g : support.providers) {
2272 if(g.isPending(subject)) return false;
2277 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2278 for(TransientGraph g : support.providers) {
2279 if(g.isPending(subject, predicate)) return false;
2284 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2286 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2288 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2291 public void accept(ReadGraphImpl graph) {
2292 if(ready.decrementAndGet() == 0) {
2293 runnable.accept(graph);
2299 for(TransientGraph g : support.providers) {
2300 if(g.isPending(subject)) {
2302 g.load(graph, subject, composite);
2303 } catch (DatabaseException e) {
2304 e.printStackTrace();
2307 composite.accept(graph);
2311 composite.accept(graph);
2315 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2317 Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2319 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2322 public void accept(ReadGraphImpl graph) {
2323 if(ready.decrementAndGet() == 0) {
2324 runnable.accept(graph);
2330 for(TransientGraph g : support.providers) {
2331 if(g.isPending(subject, predicate)) {
2333 g.load(graph, subject, predicate, composite);
2334 } catch (DatabaseException e) {
2335 e.printStackTrace();
2338 composite.accept(graph);
2342 composite.accept(graph);
2346 // void dumpHeap() {
2349 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2350 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2351 // "d:/heap" + flushCounter + ".txt", true);
2352 // } catch (IOException e) {
2353 // e.printStackTrace();
2358 void fireReactionsToSynchronize(ChangeSet cs) {
2360 // Do not fire empty events
2364 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2365 // g.state.barrier.inc();
2369 if (!cs.isEmpty()) {
2370 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2371 for (ChangeListener l : changeListeners2) {
2374 } catch (Exception ex) {
2375 ex.printStackTrace();
2382 // g.state.barrier.dec();
2383 // g.waitAsync(null);
2389 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2392 // Do not fire empty events
2397 // graph.state.barrier.inc();
2401 if (!cs2.isEmpty()) {
2403 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2404 for (ChangeListener l : changeListeners2) {
2407 // System.out.println("changelistener " + l);
2408 } catch (Exception ex) {
2409 ex.printStackTrace();
2417 // graph.state.barrier.dec();
2418 // graph.waitAsync(null);
2421 } catch (Throwable t) {
2422 t.printStackTrace();
2426 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2430 // Do not fire empty events
2434 // Do not fire on virtual requests
2435 if(graph.getProvider() != null)
2438 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2442 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2443 for (ChangeListener l : metadataListeners) {
2446 } catch (Throwable ex) {
2447 ex.printStackTrace();
2455 } catch (Throwable t) {
2456 t.printStackTrace();
2465 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2468 public <T> T getService(Class<T> api) {
2469 T t = peekService(api);
2471 if (state.isClosed())
2472 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2473 throw new ServiceNotFoundException(this, api);
2481 protected abstract ServerInformation getCachedServerInformation();
2483 private Class<?> serviceKey1 = null;
2484 private Class<?> serviceKey2 = null;
2485 private Object service1 = null;
2486 private Object service2 = null;
2492 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2494 @SuppressWarnings("unchecked")
2496 public synchronized <T> T peekService(Class<T> api) {
2498 if(serviceKey1 == api) {
2500 } else if (serviceKey2 == api) {
2502 Object result = service2;
2503 service2 = service1;
2504 serviceKey2 = serviceKey1;
2510 if (Layer0.class == api)
2512 if (ServerInformation.class == api)
2513 return (T) getCachedServerInformation();
2514 else if (WriteGraphImpl.class == api)
2515 return (T) writeState.getGraph();
2516 else if (ClusterBuilder.class == api)
2517 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2518 else if (ClusterBuilderFactory.class == api)
2519 return (T)new ClusterBuilderFactoryImpl(this);
2521 service2 = service1;
2522 serviceKey2 = serviceKey1;
2524 service1 = serviceLocator.peekService(api);
2535 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2538 public boolean hasService(Class<?> api) {
2539 return serviceLocator.hasService(api);
2543 * @param api the api that must be implemented by the specified service
2544 * @param service the service implementation
2547 public <T> void registerService(Class<T> api, T service) {
2548 if(Layer0.class == api) {
2549 L0 = (Layer0)service;
2552 serviceLocator.registerService(api, service);
2553 if (TransactionPolicySupport.class == api) {
2554 transactionPolicy = (TransactionPolicySupport)service;
2555 state.resetTransactionPolicy();
2557 if (api == serviceKey1)
2559 else if (api == serviceKey2)
2567 void fireSessionVariableChange(String variable) {
2568 for (MonitorHandler h : monitorHandlers) {
2569 MonitorContext ctx = monitorContexts.getLeft(h);
2571 // SafeRunner functionality repeated here to avoid dependency.
2573 h.valuesChanged(ctx);
2574 } catch (Exception e) {
2575 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2576 } catch (LinkageError e) {
2577 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2583 class ResourceSerializerImpl implements ResourceSerializer {
2585 public long createRandomAccessId(int id) throws DatabaseException {
2588 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2589 long cluster = getCluster(id);
2591 return 0; // Better to return 0 then invalid id.
2592 long result = ClusterTraitsBase.createResourceId(cluster, index);
2597 public long getRandomAccessId(Resource resource) throws DatabaseException {
2599 ResourceImpl resourceImpl = (ResourceImpl) resource;
2600 return createRandomAccessId(resourceImpl.id);
2605 public int getTransientId(Resource resource) throws DatabaseException {
2607 ResourceImpl resourceImpl = (ResourceImpl) resource;
2608 return resourceImpl.id;
2613 public String createRandomAccessId(Resource resource)
2614 throws InvalidResourceReferenceException {
2616 if(resource == null) throw new IllegalArgumentException();
2618 ResourceImpl resourceImpl = (ResourceImpl) resource;
2620 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2624 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2625 } catch (DatabaseException e1) {
2626 throw new InvalidResourceReferenceException(e1);
2629 // Serialize as '<resource index>_<cluster id>'
2630 return "" + r + "_" + getCluster(resourceImpl);
2631 } catch (Throwable e) {
2632 e.printStackTrace();
2633 throw new InvalidResourceReferenceException(e);
2638 public int getTransientId(long serialized) throws DatabaseException {
2639 if (serialized <= 0)
2640 return (int)serialized;
2641 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2642 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2643 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2644 if (cluster == null)
2645 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2646 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2651 public Resource getResource(long randomAccessId) throws DatabaseException {
2652 return getResourceByKey(getTransientId(randomAccessId));
2656 public Resource getResource(int transientId) throws DatabaseException {
2657 return getResourceByKey(transientId);
2661 public Resource getResource(String randomAccessId)
2662 throws InvalidResourceReferenceException {
2664 int i = randomAccessId.indexOf('_');
2666 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2667 + randomAccessId + "'");
2668 int r = Integer.parseInt(randomAccessId.substring(0, i));
2669 if(r < 0) return getResourceByKey(r);
2670 long c = Long.parseLong(randomAccessId.substring(i + 1));
2671 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2672 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2673 if (cluster.hasResource(key, clusterTranslator))
2674 return getResourceByKey(key);
2675 } catch (InvalidResourceReferenceException e) {
2677 } catch (NumberFormatException e) {
2678 throw new InvalidResourceReferenceException(e);
2679 } catch (Throwable e) {
2680 e.printStackTrace();
2681 throw new InvalidResourceReferenceException(e);
2684 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2688 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2691 } catch (Throwable e) {
2692 e.printStackTrace();
2693 throw new InvalidResourceReferenceException(e);
2699 // Copied from old SessionImpl
2702 if (state.isClosed())
2703 throw new Error("Session closed.");
2708 public ResourceImpl getNewResource(long clusterId) {
2709 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2712 newId = cluster.createResource(clusterTranslator);
2713 } catch (DatabaseException e) {
2714 Logger.defaultLogError(e);
2717 return new ResourceImpl(resourceSupport, newId);
2720 public ResourceImpl getNewResource(Resource clusterSet)
2721 throws DatabaseException {
2722 long resourceId = clusterSet.getResourceId();
2723 Long clusterId = clusterSetsSupport.get(resourceId);
2724 if (null == clusterId)
2725 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2726 if (Constants.NewClusterId == clusterId) {
2727 clusterId = getService(ClusteringSupport.class).createCluster();
2728 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2729 createdClusters.add(clusterId);
2730 clusterSetsSupport.put(resourceId, clusterId);
2731 return getNewResource(clusterId);
2733 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2734 ResourceImpl result;
2735 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2736 clusterId = getService(ClusteringSupport.class).createCluster();
2737 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2738 createdClusters.add(clusterId);
2739 clusterSetsSupport.put(resourceId, clusterId);
2740 return getNewResource(clusterId);
2742 result = getNewResource(clusterId);
2743 int resultKey = querySupport.getId(result);
2744 long resultCluster = querySupport.getClusterId(resultKey);
2745 if (clusterId != resultCluster)
2746 clusterSetsSupport.put(resourceId, resultCluster);
2752 public void getNewClusterSet(Resource clusterSet)
2753 throws DatabaseException {
2755 System.out.println("new cluster set=" + clusterSet);
2756 long resourceId = clusterSet.getResourceId();
2757 if (clusterSetsSupport.containsKey(resourceId))
2758 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2759 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2761 public boolean containsClusterSet(Resource clusterSet)
2762 throws ServiceException {
2763 long resourceId = clusterSet.getResourceId();
2764 return clusterSetsSupport.containsKey(resourceId);
2766 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2767 Resource r = defaultClusterSet;
2768 defaultClusterSet = clusterSet;
2771 void printDiagnostics() {
2775 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2776 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2777 for(ClusterI cluster : clusterTable.getClusters()) {
2779 if (cluster.isLoaded() && !cluster.isEmpty()) {
2780 residentClusters.set(residentClusters.get() + 1);
2781 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2783 } catch (DatabaseException e) {
2784 Logger.defaultLogError(e);
2788 System.out.println("--------------------------------");
2789 System.out.println("Cluster information:");
2790 System.out.println("-amount of resident clusters=" + residentClusters.get());
2791 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2793 for(ClusterI cluster : clusterTable.getClusters()) {
2794 System.out.print("Cluster " + cluster.getClusterId() + " [");
2796 if (!cluster.isLoaded())
2797 System.out.println("not loaded]");
2798 else if (cluster.isEmpty())
2799 System.out.println("is empty]");
2801 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2802 System.out.print(",references=" + cluster.getReferenceCount());
2803 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2805 } catch (DatabaseException e) {
2806 Logger.defaultLogError(e);
2807 System.out.println("is corrupted]");
2811 queryProvider2.printDiagnostics();
2812 System.out.println("--------------------------------");
2817 public void fireStartReadTransaction() {
2820 System.out.println("StartReadTransaction");
2822 for (SessionEventListener listener : eventListeners) {
2824 listener.readTransactionStarted();
2825 } catch (Throwable t) {
2826 t.printStackTrace();
2832 public void fireFinishReadTransaction() {
2835 System.out.println("FinishReadTransaction");
2837 for (SessionEventListener listener : eventListeners) {
2839 listener.readTransactionFinished();
2840 } catch (Throwable t) {
2841 t.printStackTrace();
2847 public void fireStartWriteTransaction() {
2850 System.out.println("StartWriteTransaction");
2852 for (SessionEventListener listener : eventListeners) {
2854 listener.writeTransactionStarted();
2855 } catch (Throwable t) {
2856 t.printStackTrace();
2862 public void fireFinishWriteTransaction() {
2865 System.out.println("FinishWriteTransaction");
2867 for (SessionEventListener listener : eventListeners) {
2869 listener.writeTransactionFinished();
2870 } catch (Throwable t) {
2871 t.printStackTrace();
2875 Indexing.resetDependenciesIndexingDisabled();
2879 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2880 throw new Error("Not supported at the moment.");
2887 ClusterTable getClusterTable() {
2888 return clusterTable;
2891 public GraphSession getGraphSession() {
2892 return graphSession;
2895 public QueryProcessor getQueryProvider2() {
2896 return queryProvider2;
2899 ClientChangesImpl getClientChanges() {
2900 return clientChanges;
2903 boolean getWriteOnly() {
2907 static int counter = 0;
2909 public void onClusterLoaded(long clusterId) {
2911 clusterTable.updateSize();
2919 * Implementation of the interface RequestProcessor
2923 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2928 assert(request != null);
2930 final DataContainer<T> result = new DataContainer<T>();
2931 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2933 syncRequest(request, new AsyncProcedure<T>() {
2936 public void execute(AsyncReadGraph graph, T t) {
2941 public void exception(AsyncReadGraph graph, Throwable t) {
2947 Throwable t = exception.get();
2949 if(t instanceof DatabaseException) throw (DatabaseException)t;
2950 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2953 return result.get();
2958 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2959 // assertNotSession();
2960 // return syncRequest(request, (AsyncProcedure<T>)procedure);
2964 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2966 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2970 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2972 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2976 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2978 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2982 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2984 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2988 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2992 assert(request != null);
2994 final DataContainer<T> result = new DataContainer<T>();
2995 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2997 syncRequest(request, new AsyncProcedure<T>() {
3000 public void execute(AsyncReadGraph graph, T t) {
3005 public void exception(AsyncReadGraph graph, Throwable t) {
3011 Throwable t = exception.get();
3013 if(t instanceof DatabaseException) throw (DatabaseException)t;
3014 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3017 return result.get();
3022 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3024 return syncRequest(request, (AsyncProcedure<T>)procedure);
3028 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3030 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3034 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3036 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3040 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3042 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3046 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3048 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3052 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3056 assert(request != null);
3058 final ArrayList<T> result = new ArrayList<T>();
3059 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3061 syncRequest(request, new AsyncMultiProcedure<T>() {
3064 public void execute(AsyncReadGraph graph, T t) {
3065 synchronized(result) {
3071 public void finished(AsyncReadGraph graph) {
3075 public void exception(AsyncReadGraph graph, Throwable t) {
3082 Throwable t = exception.get();
3084 if(t instanceof DatabaseException) throw (DatabaseException)t;
3085 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3093 public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3095 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3099 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3101 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3105 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3107 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3111 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3113 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3117 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3119 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3123 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3127 assert(request != null);
3129 final ArrayList<T> result = new ArrayList<T>();
3130 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3132 syncRequest(request, new AsyncMultiProcedure<T>() {
3135 public void execute(AsyncReadGraph graph, T t) {
3136 synchronized(result) {
3142 public void finished(AsyncReadGraph graph) {
3146 public void exception(AsyncReadGraph graph, Throwable t) {
3153 Throwable t = exception.get();
3155 if(t instanceof DatabaseException) throw (DatabaseException)t;
3156 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3164 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3166 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3170 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3172 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3176 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3178 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3182 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3184 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3188 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3190 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3195 public <T> void asyncRequest(Read<T> request) {
3197 asyncRequest(request, new ProcedureAdapter<T>() {
3199 public void exception(Throwable t) {
3200 t.printStackTrace();
3207 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3208 asyncRequest(request, (AsyncProcedure<T>)procedure);
3212 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3213 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3217 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3218 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3222 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3223 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3227 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3228 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3232 final public <T> void asyncRequest(final AsyncRead<T> request) {
3234 assert(request != null);
3236 asyncRequest(request, new ProcedureAdapter<T>() {
3238 public void exception(Throwable t) {
3239 t.printStackTrace();
3246 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3247 scheduleRequest(request, procedure, procedure, null);
3251 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3252 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3256 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3257 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3261 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3262 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3266 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3267 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3271 public <T> void asyncRequest(MultiRead<T> request) {
3273 assert(request != null);
3275 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3277 public void exception(AsyncReadGraph graph, Throwable t) {
3278 t.printStackTrace();
3285 public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3286 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3290 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3291 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3295 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3296 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3300 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3301 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3305 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3306 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3310 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3312 assert(request != null);
3314 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3316 public void exception(AsyncReadGraph graph, Throwable t) {
3317 t.printStackTrace();
3324 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3325 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3329 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3330 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3334 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3335 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3339 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3340 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3344 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3345 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3349 public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3351 throw new Error("Not implemented!");
3355 public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3356 throw new Error("Not implemented!");
3360 final public <T> void asyncRequest(final ExternalRead<T> request) {
3362 assert(request != null);
3364 asyncRequest(request, new ProcedureAdapter<T>() {
3366 public void exception(Throwable t) {
3367 t.printStackTrace();
3374 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3375 asyncRequest(request, (Procedure<T>)procedure);
3380 void check(Throwable t) throws DatabaseException {
3382 if(t instanceof DatabaseException) throw (DatabaseException)t;
3383 else throw new DatabaseException("Unexpected exception", t);
3387 void check(DataContainer<Throwable> container) throws DatabaseException {
3388 Throwable t = container.get();
3390 if(t instanceof DatabaseException) throw (DatabaseException)t;
3391 else throw new DatabaseException("Unexpected exception", t);
3400 boolean sameProvider(Write request) {
3401 if(writeState.getGraph().provider != null) {
3402 return writeState.getGraph().provider.equals(request.getProvider());
3404 return request.getProvider() == null;
3408 boolean plainWrite(WriteGraphImpl graph) {
3409 if(graph == null) return false;
3410 if(graph.writeSupport.writeOnly()) return false;
3415 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3416 private void assertNotSession() throws DatabaseException {
3417 Thread current = Thread.currentThread();
3418 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3421 void assertAlive() {
3422 if (!state.isAlive())
3423 throw new RuntimeDatabaseException("Session has been shut down.");
3426 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3427 return querySupport.getValueStream(graph, querySupport.getId(resource));
3430 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3431 return querySupport.getValue(graph, querySupport.getId(resource));
3434 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3435 acquire(semaphore, request, null);
3438 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3442 // Loop until the semaphore is acquired reporting requests that take a long time.
3445 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3449 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3451 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3452 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3454 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3455 + (waitTime) + " ms. (procedure=" + procedure + ")");
3462 } catch (InterruptedException e) {
3463 e.printStackTrace();
3464 // FIXME: Should perhaps do something else in this case ??
3471 // public boolean holdOnToTransactionAfterCancel() {
3472 // return transactionPolicy.holdOnToTransactionAfterCancel();
3476 // public boolean holdOnToTransactionAfterCommit() {
3477 // return transactionPolicy.holdOnToTransactionAfterCommit();
3481 // public boolean holdOnToTransactionAfterRead() {
3482 // return transactionPolicy.holdOnToTransactionAfterRead();
3486 // public void onRelinquish() {
3487 // transactionPolicy.onRelinquish();
3491 // public void onRelinquishDone() {
3492 // transactionPolicy.onRelinquishDone();
3496 // public void onRelinquishError() {
3497 // transactionPolicy.onRelinquishError();
3502 public Session getSession() {
3507 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3508 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3510 public void ceased(int thread) {
3512 requestManager.ceased(thread);
3516 public int getAmountOfQueryThreads() {
3517 // This must be a power of two
3519 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3522 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3524 return new ResourceImpl(resourceSupport, key);
3528 public void acquireWriteOnly() {
3532 public void releaseWriteOnly(ReadGraphImpl graph) {
3534 queryProvider2.releaseWrite(graph);
3537 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3539 long start = System.nanoTime();
3541 while(dirtyPrimitives) {
3542 dirtyPrimitives = false;
3543 getQueryProvider2().performDirtyUpdates(writer);
3544 getQueryProvider2().performScheduledUpdates(writer);
3547 fireMetadataListeners(writer, clientChanges);
3549 if(DebugPolicy.PERFORMANCE_DATA) {
3550 long end = System.nanoTime();
3551 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3556 void removeTemporaryData() {
3557 File platform = Platform.getLocation().toFile();
3558 File tempFiles = new File(platform, "tempFiles");
3559 File temp = new File(tempFiles, "db");
3563 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3565 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3568 } catch (IOException e) {
3569 Logger.defaultLogError(e);
3571 return FileVisitResult.CONTINUE;
3574 } catch (IOException e) {
3575 Logger.defaultLogError(e);
3579 public void handleCreatedClusters() {
3580 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3581 createdClusters.forEach(new TLongProcedure() {
3584 public boolean execute(long value) {
3585 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3586 cluster.setImmutable(true, clusterTranslator);
3591 createdClusters.clear();
3595 public Object getModificationCounter() {
3596 return queryProvider2.modificationCounter;
3600 public void markUndoPoint() {
3601 state.setCombine(false);