1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
31 import org.eclipse.core.runtime.Platform;
32 import org.simantics.databoard.Bindings;
33 import org.simantics.db.AsyncReadGraph;
34 import org.simantics.db.ChangeSet;
35 import org.simantics.db.DevelopmentKeys;
36 import org.simantics.db.ExternalValueSupport;
37 import org.simantics.db.Metadata;
38 import org.simantics.db.MonitorContext;
39 import org.simantics.db.MonitorHandler;
40 import org.simantics.db.Resource;
41 import org.simantics.db.ResourceSerializer;
42 import org.simantics.db.Session;
43 import org.simantics.db.SessionManager;
44 import org.simantics.db.SessionVariables;
45 import org.simantics.db.VirtualGraph;
46 import org.simantics.db.WriteGraph;
47 import org.simantics.db.authentication.UserAuthenticationAgent;
48 import org.simantics.db.authentication.UserAuthenticator;
49 import org.simantics.db.common.Indexing;
50 import org.simantics.db.common.TransactionPolicyRelease;
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
63 import org.simantics.db.common.utils.Logger;
64 import org.simantics.db.event.ChangeEvent;
65 import org.simantics.db.event.ChangeListener;
66 import org.simantics.db.event.SessionEventListener;
67 import org.simantics.db.exception.CancelTransactionException;
68 import org.simantics.db.exception.ClusterSetExistException;
69 import org.simantics.db.exception.DatabaseException;
70 import org.simantics.db.exception.ImmutableException;
71 import org.simantics.db.exception.InvalidResourceReferenceException;
72 import org.simantics.db.exception.ResourceNotFoundException;
73 import org.simantics.db.exception.RuntimeDatabaseException;
74 import org.simantics.db.exception.ServiceException;
75 import org.simantics.db.exception.ServiceNotFoundException;
76 import org.simantics.db.impl.ClusterBase;
77 import org.simantics.db.impl.ClusterI;
78 import org.simantics.db.impl.ClusterTraitsBase;
79 import org.simantics.db.impl.ClusterTranslator;
80 import org.simantics.db.impl.ResourceImpl;
81 import org.simantics.db.impl.TransientGraph;
82 import org.simantics.db.impl.VirtualGraphImpl;
83 import org.simantics.db.impl.graph.DelayedWriteGraph;
84 import org.simantics.db.impl.graph.ReadGraphImpl;
85 import org.simantics.db.impl.graph.WriteGraphImpl;
86 import org.simantics.db.impl.graph.WriteSupport;
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
90 import org.simantics.db.impl.query.QueryProcessor;
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
93 import org.simantics.db.impl.service.QueryDebug;
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
96 import org.simantics.db.procedure.AsyncListener;
97 import org.simantics.db.procedure.AsyncMultiListener;
98 import org.simantics.db.procedure.AsyncMultiProcedure;
99 import org.simantics.db.procedure.AsyncProcedure;
100 import org.simantics.db.procedure.Listener;
101 import org.simantics.db.procedure.ListenerBase;
102 import org.simantics.db.procedure.MultiListener;
103 import org.simantics.db.procedure.MultiProcedure;
104 import org.simantics.db.procedure.Procedure;
105 import org.simantics.db.procedure.SyncListener;
106 import org.simantics.db.procedure.SyncMultiListener;
107 import org.simantics.db.procedure.SyncMultiProcedure;
108 import org.simantics.db.procedure.SyncProcedure;
109 import org.simantics.db.procore.cluster.ClusterImpl;
110 import org.simantics.db.procore.cluster.ClusterTraits;
111 import org.simantics.db.procore.protocol.Constants;
112 import org.simantics.db.procore.protocol.DebugPolicy;
113 import org.simantics.db.request.AsyncMultiRead;
114 import org.simantics.db.request.AsyncRead;
115 import org.simantics.db.request.DelayedWrite;
116 import org.simantics.db.request.DelayedWriteResult;
117 import org.simantics.db.request.ExternalRead;
118 import org.simantics.db.request.MultiRead;
119 import org.simantics.db.request.Read;
120 import org.simantics.db.request.ReadInterface;
121 import org.simantics.db.request.UndoTraits;
122 import org.simantics.db.request.Write;
123 import org.simantics.db.request.WriteInterface;
124 import org.simantics.db.request.WriteOnly;
125 import org.simantics.db.request.WriteOnlyResult;
126 import org.simantics.db.request.WriteResult;
127 import org.simantics.db.request.WriteTraits;
128 import org.simantics.db.service.ByteReader;
129 import org.simantics.db.service.ClusterBuilder;
130 import org.simantics.db.service.ClusterBuilderFactory;
131 import org.simantics.db.service.ClusterControl;
132 import org.simantics.db.service.ClusterSetsSupport;
133 import org.simantics.db.service.ClusterUID;
134 import org.simantics.db.service.ClusteringSupport;
135 import org.simantics.db.service.CollectionSupport;
136 import org.simantics.db.service.DebugSupport;
137 import org.simantics.db.service.DirectQuerySupport;
138 import org.simantics.db.service.GraphChangeListenerSupport;
139 import org.simantics.db.service.InitSupport;
140 import org.simantics.db.service.LifecycleSupport;
141 import org.simantics.db.service.ManagementSupport;
142 import org.simantics.db.service.QueryControl;
143 import org.simantics.db.service.SerialisationSupport;
144 import org.simantics.db.service.ServerInformation;
145 import org.simantics.db.service.ServiceActivityMonitor;
146 import org.simantics.db.service.SessionEventSupport;
147 import org.simantics.db.service.SessionMonitorSupport;
148 import org.simantics.db.service.SessionUserSupport;
149 import org.simantics.db.service.StatementSupport;
150 import org.simantics.db.service.TransactionPolicySupport;
151 import org.simantics.db.service.TransactionSupport;
152 import org.simantics.db.service.TransferableGraphSupport;
153 import org.simantics.db.service.UndoRedoSupport;
154 import org.simantics.db.service.VirtualGraphSupport;
155 import org.simantics.db.service.XSupport;
156 import org.simantics.layer0.Layer0;
157 import org.simantics.utils.DataContainer;
158 import org.simantics.utils.Development;
159 import org.simantics.utils.datastructures.Callback;
160 import org.simantics.utils.threads.logger.ITask;
161 import org.simantics.utils.threads.logger.ThreadLogger;
163 import gnu.trove.procedure.TLongProcedure;
164 import gnu.trove.set.hash.TLongHashSet;
167 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
169 protected static final boolean DEBUG = false;
171 private static final boolean DIAGNOSTICS = false;
173 private TransactionPolicySupport transactionPolicy;
175 final private ClusterControl clusterControl;
177 final protected BuiltinSupportImpl builtinSupport;
178 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
179 final protected ClusterSetsSupport clusterSetsSupport;
180 final protected LifecycleSupportImpl lifecycleSupport;
182 protected QuerySupportImpl querySupport;
183 protected ResourceSupportImpl resourceSupport;
184 protected WriteSupport writeSupport;
185 public ClusterTranslator clusterTranslator;
187 boolean dirtyPrimitives = false;
189 public static final int SERVICE_MODE_CREATE = 2;
190 public static final int SERVICE_MODE_ALLOW = 1;
191 public int serviceMode = 0;
192 public boolean createdImmutableClusters = false;
193 public TLongHashSet createdClusters = new TLongHashSet();
198 * The service locator maintained by the workbench. These services are
199 * initialized during workbench during the <code>init</code> method.
201 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
203 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
205 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
207 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
209 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
211 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
213 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
215 final protected State state = new State();
217 protected GraphSession graphSession = null;
219 protected SessionManager sessionManagerImpl = null;
221 protected UserAuthenticationAgent authAgent = null;
223 protected UserAuthenticator authenticator = null;
225 protected Resource user = null;
227 protected ClusterStream clusterStream = null;
229 protected SessionRequestManager requestManager = null;
231 public ClusterTable clusterTable = null;
233 public QueryProcessor queryProvider2 = null;
235 ClientChangesImpl clientChanges = null;
237 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
239 // protected long newClusterId = Constants.NullClusterId;
241 protected int flushCounter = 0;
243 protected boolean writeOnly = false;
245 WriteState<?> writeState = null;
246 WriteStateBase<?> delayedWriteState = null;
247 protected Resource defaultClusterSet = null; // If not null then used for newResource().
249 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
251 // if (authAgent == null)
252 // throw new IllegalArgumentException("null authentication agent");
254 File t = StaticSessionProperties.virtualGraphStoragePath;
257 this.clusterTable = new ClusterTable(this, t);
258 this.builtinSupport = new BuiltinSupportImpl(this);
259 this.sessionManagerImpl = sessionManagerImpl;
261 // this.authAgent = authAgent;
263 serviceLocator.registerService(Session.class, this);
264 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
265 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
266 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
267 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
268 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
269 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
270 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
271 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
272 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
273 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
274 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
275 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
276 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
277 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
278 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
279 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
280 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
281 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
282 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
283 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
284 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
285 ServiceActivityUpdaterForWriteTransactions.register(this);
287 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
288 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
289 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
290 this.lifecycleSupport = new LifecycleSupportImpl(this);
291 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
292 this.transactionPolicy = new TransactionPolicyRelease();
293 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
294 this.clusterControl = new ClusterControlImpl(this);
295 serviceLocator.registerService(ClusterControl.class, clusterControl);
296 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
297 this.clusterSetsSupport.setReadDirectory(t.toPath());
298 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
299 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
309 public Resource getRootLibrary() {
310 return queryProvider2.getRootLibraryResource();
313 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
314 if (!graphSession.dbSession.refreshEnabled())
317 getClusterTable().refresh(csid, this, clusterUID);
318 } catch (Throwable t) {
319 Logger.defaultLogError("Refesh failed.", t);
323 static final class TaskHelper {
324 private final String name;
325 private Object result;
326 final Semaphore sema = new Semaphore(0);
327 private Throwable throwable = null;
328 final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
330 public void run(DatabaseException e) {
331 synchronized (TaskHelper.this) {
336 final Procedure<Object> proc = new Procedure<Object>() {
338 public void execute(Object result) {
342 public void exception(Throwable t) {
343 if (t instanceof DatabaseException)
344 callback.run((DatabaseException)t);
346 callback.run(new DatabaseException("" + name + "operation failed.", t));
349 final WriteTraits writeTraits = new WriteTraits() {
351 public UndoTraits getUndoTraits() {
355 public VirtualGraph getProvider() {
359 TaskHelper(String name) {
365 void setResult(Object result) {
366 this.result = result;
368 // Throwable throwableGet() {
371 synchronized void throwableSet(Throwable t) {
374 // void throwableSet(String t) {
375 // throwable = new InternalException("" + name + " operation failed. " + t);
377 synchronized void throwableCheck()
378 throws DatabaseException {
379 if (null != throwable)
380 if (throwable instanceof DatabaseException)
381 throw (DatabaseException)throwable;
383 throw new DatabaseException("Undo operation failed.", throwable);
385 void throw_(String message)
386 throws DatabaseException {
387 throw new DatabaseException("" + name + " operation failed. " + message);
393 private ListenerBase getListenerBase(Object procedure) {
394 if (procedure instanceof ListenerBase)
395 return (ListenerBase) procedure;
400 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
401 scheduleRequest(request, callback, notify, null);
405 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
408 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
410 assert (request != null);
412 if(Development.DEVELOPMENT) {
414 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
415 System.err.println("schedule write '" + request + "'");
416 } catch (Throwable t) {
417 Logger.defaultLogError(t);
421 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
423 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
426 public void run(int thread) {
428 if(Development.DEVELOPMENT) {
430 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
431 System.err.println("perform write '" + request + "'");
432 } catch (Throwable t) {
433 Logger.defaultLogError(t);
437 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
439 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
444 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
446 VirtualGraph vg = getProvider(request.getProvider());
448 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
449 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
452 public void execute(Object result) {
453 if(callback != null) callback.run(null);
457 public void exception(Throwable t) {
458 if(callback != null) callback.run((DatabaseException)t);
463 assert (null != writer);
464 // writer.state.barrier.inc();
467 request.perform(writer);
468 assert (null != writer);
469 } catch (Throwable t) {
470 if (!(t instanceof CancelTransactionException))
471 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
472 writeState.except(t);
474 // writer.state.barrier.dec();
475 // writer.waitAsync(request);
479 assert(!queryProvider2.dirty);
481 } catch (Throwable e) {
483 // Log it first, just to be safe that the error is always logged.
484 if (!(e instanceof CancelTransactionException))
485 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
487 // writeState.getGraph().state.barrier.dec();
488 // writeState.getGraph().waitAsync(request);
490 writeState.except(e);
493 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
494 // // All we can do here is to log those, can't really pass them anywhere.
495 // if (callback != null) {
496 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
497 // else callback.run(new DatabaseException(e));
499 // } catch (Throwable e2) {
500 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
502 // boolean empty = clusterStream.reallyFlush();
503 // int callerThread = -1;
504 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
505 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
507 // // Send all local changes to server so it can calculate correct reverse change set.
508 // // This will call clusterStream.accept().
509 // state.cancelCommit(context, clusterStream);
511 // if (!context.isOk()) // this is a blocking operation
512 // throw new InternalException("Cancel failed. This should never happen. Contact application support.");
513 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
515 // state.cancelCommit2(context, clusterStream);
516 // } catch (DatabaseException e3) {
517 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
519 // clusterStream.setOff(false);
520 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
525 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
531 if(Development.DEVELOPMENT) {
533 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
534 System.err.println("finish write '" + request + "'");
535 } catch (Throwable t) {
536 Logger.defaultLogError(t);
546 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
547 scheduleRequest(request, procedure, notify, null);
551 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
554 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
556 assert (request != null);
558 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
560 requestManager.scheduleWrite(new SessionTask(request, thread) {
563 public void run(int thread) {
565 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
567 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
570 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
572 VirtualGraph vg = getProvider(request.getProvider());
573 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
576 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
577 writeState = writeStateT;
579 assert (null != writer);
580 // writer.state.barrier.inc();
581 writeStateT.setResult(request.perform(writer));
582 assert (null != writer);
584 // writer.state.barrier.dec();
585 // writer.waitAsync(null);
587 } catch (Throwable e) {
589 // writer.state.barrier.dec();
590 // writer.waitAsync(null);
592 writeState.except(e);
594 // state.stopWriteTransaction(clusterStream);
596 // } catch (Throwable e) {
597 // // Log it first, just to be safe that the error is always logged.
598 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
601 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
602 // // All we can do here is to log those, can't really pass them anywhere.
603 // if (procedure != null) {
604 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
605 // else procedure.exception(new DatabaseException(e));
607 // } catch (Throwable e2) {
608 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
611 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
613 // state.stopWriteTransaction(clusterStream);
616 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
619 // if(notify != null) notify.release();
629 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
630 scheduleRequest(request, callback, notify, null);
634 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
637 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
639 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
641 assert (request != null);
643 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
645 requestManager.scheduleWrite(new SessionTask(request, thread) {
648 public void run(int thread) {
649 fireSessionVariableChange(SessionVariables.QUEUED_READS);
651 Procedure<Object> stateProcedure = new Procedure<Object>() {
653 public void execute(Object result) {
654 if (callback != null)
658 public void exception(Throwable t) {
659 if (callback != null) {
660 if (t instanceof DatabaseException) callback.run((DatabaseException) t);
661 else callback.run(new DatabaseException(t));
663 Logger.defaultLogError("Unhandled exception", t);
667 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
668 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
669 DelayedWriteGraph dwg = null;
670 // newGraph.state.barrier.inc();
673 dwg = new DelayedWriteGraph(newGraph);
674 request.perform(dwg);
675 } catch (Throwable e) {
676 delayedWriteState.except(e);
680 // newGraph.state.barrier.dec();
681 // newGraph.waitAsync(request);
682 fireSessionVariableChange(SessionVariables.QUEUED_READS);
685 delayedWriteState = null;
687 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
688 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
691 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
695 VirtualGraph vg = getProvider(request.getProvider());
696 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
698 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
700 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
702 assert (null != writer);
703 // writer.state.barrier.inc();
707 dwg.commit(writer, request);
709 if(defaultClusterSet != null) {
710 XSupport xs = getService(XSupport.class);
711 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
712 ClusteringSupport cs = getService(ClusteringSupport.class);
713 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
716 // This makes clusters available from server
717 clusterStream.reallyFlush();
719 releaseWriteOnly(writer);
720 handleUpdatesAndMetadata(writer);
722 } catch (ServiceException e) {
723 // writer.state.barrier.dec();
724 // writer.waitAsync(null);
726 // These shall be requested from server
727 clusterTable.removeWriteOnlyClusters();
728 // This makes clusters available from server
729 clusterStream.reallyFlush();
731 releaseWriteOnly(writer);
732 writeState.except(e);
734 // Debugging & Profiling
735 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
745 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
746 scheduleRequest(request, procedure, notify, null);
750 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
753 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
754 throw new Error("Not implemented");
757 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
758 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
759 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
760 createdClusters.add(cluster.clusterId);
765 class WriteOnlySupport implements WriteSupport {
767 ClusterStream stream;
768 ClusterImpl currentCluster;
770 public WriteOnlySupport() {
771 this.stream = clusterStream;
775 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
776 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
780 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
782 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
784 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
785 if(s != queryProvider2.getRootLibrary())
786 throw new ImmutableException("Trying to modify immutable resource key=" + s);
789 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
790 } catch (DatabaseException e) {
791 Logger.defaultLogError(e);
795 clientChanges.invalidate(s);
797 if (cluster.isWriteOnly())
799 queryProvider2.updateStatements(s, p);
804 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
805 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
809 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
811 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
813 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
814 } catch (DatabaseException e) {
815 Logger.defaultLogError(e);
818 clientChanges.invalidate(rid);
820 if (cluster.isWriteOnly())
822 queryProvider2.updateValue(rid);
827 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
830 claimValue(provider,resource, reader.readBytes(null, amount));
834 byte[] bytes = new byte[65536];
836 int rid = ((ResourceImpl)resource).id;
837 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
841 int block = Math.min(left, 65536);
842 reader.readBytes(bytes, block);
843 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
846 } catch (DatabaseException e) {
847 Logger.defaultLogError(e);
850 clientChanges.invalidate(rid);
852 if (cluster.isWriteOnly())
854 queryProvider2.updateValue(rid);
858 private void maintainCluster(ClusterImpl before, ClusterI after_) {
859 if(after_ != null && after_ != before) {
860 ClusterImpl after = (ClusterImpl)after_;
861 if(currentCluster == before) currentCluster = after;
862 clusterTable.replaceCluster(after);
866 public int createResourceKey(int foreignCounter) throws DatabaseException {
867 if(currentCluster == null)
868 currentCluster = getNewResourceCluster();
869 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
870 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
871 newCluster.foreignLookup = new byte[foreignCounter];
872 currentCluster = newCluster;
874 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
876 return currentCluster.createResource(clusterTranslator);
880 public Resource createResource(VirtualGraph provider) throws DatabaseException {
881 if(currentCluster == null) {
882 if (null != defaultClusterSet) {
883 ResourceImpl result = getNewResource(defaultClusterSet);
884 currentCluster = clusterTable.getClusterByResourceKey(result.id);
887 currentCluster = getNewResourceCluster();
890 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
891 if (null != defaultClusterSet) {
892 ResourceImpl result = getNewResource(defaultClusterSet);
893 currentCluster = clusterTable.getClusterByResourceKey(result.id);
896 currentCluster = getNewResourceCluster();
899 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
903 public Resource createResource(VirtualGraph provider, long clusterId)
904 throws DatabaseException {
905 return getNewResource(clusterId);
909 public Resource createResource(VirtualGraph provider, Resource clusterSet)
910 throws DatabaseException {
911 return getNewResource(clusterSet);
915 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
916 throws DatabaseException {
917 getNewClusterSet(clusterSet);
921 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
922 throws ServiceException {
923 return containsClusterSet(clusterSet);
926 public void selectCluster(long cluster) {
927 currentCluster = clusterTable.getClusterByClusterId(cluster);
928 long setResourceId = clusterSetsSupport.getSet(cluster);
929 clusterSetsSupport.put(setResourceId, cluster);
933 public Resource setDefaultClusterSet(Resource clusterSet)
934 throws ServiceException {
935 Resource result = setDefaultClusterSet4NewResource(clusterSet);
936 if(clusterSet != null) {
937 long id = clusterSetsSupport.get(clusterSet.getResourceId());
938 currentCluster = clusterTable.getClusterByClusterId(id);
941 currentCluster = null;
947 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
949 provider = getProvider(provider);
950 if (null == provider) {
951 int key = ((ResourceImpl)resource).id;
955 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
956 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
957 // if(key != queryProvider2.getRootLibrary())
958 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
961 // cluster.removeValue(key, clusterTranslator);
962 // } catch (DatabaseException e) {
963 // Logger.defaultLogError(e);
967 clusterTable.writeOnlyInvalidate(cluster);
970 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
971 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
972 clusterTranslator.removeValue(cluster);
973 } catch (DatabaseException e) {
974 Logger.defaultLogError(e);
977 queryProvider2.invalidateResource(key);
978 clientChanges.invalidate(key);
981 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
982 queryProvider2.updateValue(querySupport.getId(resource));
983 clientChanges.claimValue(resource);
990 public void flush(boolean intermediate) {
991 throw new UnsupportedOperationException();
995 public void flushCluster() {
996 clusterTable.flushCluster(graphSession);
997 if(defaultClusterSet != null) {
998 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
1000 currentCluster = null;
1004 public void flushCluster(Resource r) {
1005 throw new UnsupportedOperationException("flushCluster resource " + r);
1013 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1016 int s = ((ResourceImpl)subject).id;
1017 int p = ((ResourceImpl)predicate).id;
1018 int o = ((ResourceImpl)object).id;
1020 provider = getProvider(provider);
1021 if (null == provider) {
1023 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1024 clusterTable.writeOnlyInvalidate(cluster);
1028 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1029 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1030 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1032 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1033 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1035 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1036 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1037 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1038 clusterTranslator.removeStatement(cluster);
1040 queryProvider2.invalidateResource(s);
1041 clientChanges.invalidate(s);
1043 } catch (DatabaseException e) {
1045 Logger.defaultLogError(e);
1053 ((VirtualGraphImpl)provider).deny(s, p, o);
1054 queryProvider2.invalidateResource(s);
1055 clientChanges.invalidate(s);
1064 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1065 throw new UnsupportedOperationException();
1069 public boolean writeOnly() {
1074 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1075 writeSupport.performWriteRequest(graph, request);
1079 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1080 throw new UnsupportedOperationException();
1084 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1085 throw new UnsupportedOperationException();
1089 public <T> void addMetadata(Metadata data) throws ServiceException {
1090 writeSupport.addMetadata(data);
1094 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1095 return writeSupport.getMetadata(clazz);
1099 public TreeMap<String, byte[]> getMetadata() {
1100 return writeSupport.getMetadata();
1104 public void commitDone(WriteTraits writeTraits, long csid) {
1105 writeSupport.commitDone(writeTraits, csid);
1109 public void clearUndoList(WriteTraits writeTraits) {
1110 writeSupport.clearUndoList(writeTraits);
1113 public int clearMetadata() {
1114 return writeSupport.clearMetadata();
1118 public void startUndo() {
1119 writeSupport.startUndo();
1123 class VirtualWriteOnlySupport implements WriteSupport {
1126 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1127 // Resource object) {
1128 // throw new UnsupportedOperationException();
1132 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1134 TransientGraph impl = (TransientGraph)provider;
1135 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1136 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1137 clientChanges.claim(subject, predicate, object);
1142 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1144 TransientGraph impl = (TransientGraph)provider;
1145 impl.claim(subject, predicate, object);
1146 getQueryProvider2().updateStatements(subject, predicate);
1147 clientChanges.claim(subject, predicate, object);
1152 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1153 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1157 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1158 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1159 getQueryProvider2().updateValue(resource);
1160 clientChanges.claimValue(resource);
1164 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1165 byte[] value = reader.readBytes(null, amount);
1166 claimValue(provider, resource, value);
1170 public Resource createResource(VirtualGraph provider) {
1171 TransientGraph impl = (TransientGraph)provider;
1172 return impl.getResource(impl.newResource(false));
1176 public Resource createResource(VirtualGraph provider, long clusterId) {
1177 throw new UnsupportedOperationException();
1181 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1182 throws DatabaseException {
1183 throw new UnsupportedOperationException();
1187 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1188 throws DatabaseException {
1189 throw new UnsupportedOperationException();
1193 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1194 throws ServiceException {
1195 throw new UnsupportedOperationException();
1199 public Resource setDefaultClusterSet(Resource clusterSet)
1200 throws ServiceException {
1205 public void denyValue(VirtualGraph provider, Resource resource) {
1206 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1207 getQueryProvider2().updateValue(querySupport.getId(resource));
1208 // NOTE: this only keeps track of value changes by-resource.
1209 clientChanges.claimValue(resource);
1213 public void flush(boolean intermediate) {
1214 throw new UnsupportedOperationException();
1218 public void flushCluster() {
1219 throw new UnsupportedOperationException();
1223 public void flushCluster(Resource r) {
1224 throw new UnsupportedOperationException("Resource " + r);
1232 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1233 TransientGraph impl = (TransientGraph) provider;
1234 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1235 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1236 clientChanges.deny(subject, predicate, object);
1241 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1242 throw new UnsupportedOperationException();
1246 public boolean writeOnly() {
1251 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1252 throw new UnsupportedOperationException();
1256 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1257 throw new UnsupportedOperationException();
1261 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1262 throw new UnsupportedOperationException();
1266 public <T> void addMetadata(Metadata data) throws ServiceException {
1267 throw new UnsupportedOperationException();
1271 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1272 throw new UnsupportedOperationException();
1276 public TreeMap<String, byte[]> getMetadata() {
1277 throw new UnsupportedOperationException();
1281 public void commitDone(WriteTraits writeTraits, long csid) {
1285 public void clearUndoList(WriteTraits writeTraits) {
1289 public int clearMetadata() {
1294 public void startUndo() {
1298 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1302 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1304 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1307 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1311 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1313 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1315 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1316 writeState = writeStateT;
1318 assert (null != writer);
1319 // writer.state.barrier.inc();
1320 long start = System.nanoTime();
1321 T result = request.perform(writer);
1322 long duration = System.nanoTime() - start;
1324 System.err.println("################");
1325 System.err.println("WriteOnly duration " + 1e-9*duration);
1327 writeStateT.setResult(result);
1329 // This makes clusters available from server
1330 clusterStream.reallyFlush();
1332 // This will trigger query updates
1333 releaseWriteOnly(writer);
1334 assert (null != writer);
1336 handleUpdatesAndMetadata(writer);
1338 } catch (CancelTransactionException e) {
1340 releaseWriteOnly(writeState.getGraph());
1342 clusterTable.removeWriteOnlyClusters();
1343 state.stopWriteTransaction(clusterStream);
1345 } catch (Throwable e) {
1347 e.printStackTrace();
1349 releaseWriteOnly(writeState.getGraph());
1351 clusterTable.removeWriteOnlyClusters();
1353 if (callback != null)
1354 callback.exception(new DatabaseException(e));
1356 state.stopWriteTransaction(clusterStream);
1357 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1361 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1367 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
1368 scheduleRequest(request, callback, notify, null);
1372 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1376 assert (request != null);
1378 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1380 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1383 public void run(int thread) {
1385 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1389 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1392 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1396 VirtualGraph vg = getProvider(request.getProvider());
1397 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1399 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1401 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1404 public void execute(Object result) {
1405 if(callback != null) callback.run(null);
1409 public void exception(Throwable t) {
1410 if(callback != null) callback.run((DatabaseException)t);
1415 assert (null != writer);
1416 // writer.state.barrier.inc();
1420 request.perform(writer);
1422 } catch (Throwable e) {
1424 // writer.state.barrier.dec();
1425 // writer.waitAsync(null);
1427 releaseWriteOnly(writer);
1429 clusterTable.removeWriteOnlyClusters();
1431 if(!(e instanceof CancelTransactionException)) {
1432 if (callback != null)
1433 callback.run(new DatabaseException(e));
1436 writeState.except(e);
1442 // This makes clusters available from server
1443 boolean empty = clusterStream.reallyFlush();
1444 // This was needed to make WO requests call metadata listeners.
1445 // NOTE: the calling event does not contain clientChanges information.
1446 if (!empty && clientChanges.isEmpty())
1447 clientChanges.setNotEmpty(true);
1448 releaseWriteOnly(writer);
1449 assert (null != writer);
1452 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1465 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1466 scheduleRequest(request, callback, notify, null);
1470 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1473 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1475 assert (request != null);
1477 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1479 requestManager.scheduleWrite(new SessionTask(request, thread) {
1482 public void run(int thread) {
1484 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1486 performWriteOnly(request, notify, callback);
1496 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1498 assert (request != null);
1499 assert (procedure != null);
1501 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1503 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1506 public void run(int thread) {
1508 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1510 ListenerBase listener = getListenerBase(procedure);
1512 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1516 if (listener != null) {
1519 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1522 public void exception(AsyncReadGraph graph, Throwable t) {
1523 procedure.exception(graph, t);
1524 if(throwable != null) {
1527 // ErrorLogger.defaultLogError("Unhandled exception", t);
1532 public void execute(AsyncReadGraph graph, T t) {
1533 if(result != null) result.set(t);
1534 procedure.execute(graph, t);
1538 } catch (Throwable t) {
1539 // This is handled by the AsyncProcedure
1540 //Logger.defaultLogError("Internal error", t);
1547 // newGraph.state.barrier.inc();
1549 T t = request.perform(newGraph);
1553 if(result != null) result.set(t);
1554 procedure.execute(newGraph, t);
1556 } catch (Throwable th) {
1558 if(throwable != null) {
1561 Logger.defaultLogError("Unhandled exception", th);
1566 } catch (Throwable t) {
1569 t.printStackTrace();
1571 if(throwable != null) {
1574 Logger.defaultLogError("Unhandled exception", t);
1579 procedure.exception(newGraph, t);
1581 } catch (Throwable t2) {
1583 if(throwable != null) {
1586 Logger.defaultLogError("Unhandled exception", t2);
1593 // newGraph.state.barrier.dec();
1594 // newGraph.waitAsync(request);
1600 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1610 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1612 assert (request != null);
1613 assert (procedure != null);
1615 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1617 requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1620 public void run(int thread) {
1622 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1624 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1628 if (listener != null) {
1630 newGraph.processor.query(newGraph, request, null, procedure, listener);
1632 // newGraph.waitAsync(request);
1636 final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1637 procedure, "request");
1641 // newGraph.state.barrier.inc();
1643 request.perform(newGraph, wrapper);
1645 // newGraph.waitAsync(request);
1647 } catch (Throwable t) {
1649 wrapper.exception(newGraph, t);
1650 // newGraph.waitAsync(request);
1659 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1669 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1671 assert (request != null);
1672 assert (procedure != null);
1674 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1676 int sync = notify != null ? thread : -1;
1678 requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1681 public void run(int thread) {
1683 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1685 ListenerBase listener = getListenerBase(procedure);
1687 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1691 if (listener != null) {
1693 newGraph.processor.query(newGraph, request, null, procedure, listener);
1695 // newGraph.waitAsync(request);
1699 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1703 request.perform(newGraph, wrapper);
1705 } catch (Throwable t) {
1707 t.printStackTrace();
1715 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1725 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1727 assert (request != null);
1728 assert (procedure != null);
1730 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1732 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1735 public void run(int thread) {
1737 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1739 ListenerBase listener = getListenerBase(procedure);
1741 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1745 if (listener != null) {
1747 newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1750 public void exception(Throwable t) {
1751 procedure.exception(t);
1752 if(throwable != null) {
1758 public void execute(T t) {
1759 if(result != null) result.set(t);
1760 procedure.execute(t);
1765 // newGraph.waitAsync(request);
1769 // newGraph.state.barrier.inc();
1771 request.register(newGraph, new Listener<T>() {
1774 public void exception(Throwable t) {
1775 if(throwable != null) throwable.set(t);
1776 procedure.exception(t);
1777 // newGraph.state.barrier.dec();
1781 public void execute(T t) {
1782 if(result != null) result.set(t);
1783 procedure.execute(t);
1784 // newGraph.state.barrier.dec();
1788 public boolean isDisposed() {
1794 // newGraph.waitAsync(request);
1800 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1812 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1814 scheduleRequest(request, procedure, null, null, null);
1819 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1821 Semaphore notify = new Semaphore(0);
1822 DataContainer<Throwable> container = new DataContainer<Throwable>();
1823 DataContainer<T> result = new DataContainer<T>();
1824 scheduleRequest(request, procedure, notify, container, result);
1825 acquire(notify, request);
1826 Throwable throwable = container.get();
1827 if(throwable != null) {
1828 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1829 else throw new DatabaseException("Unexpected exception", throwable);
1831 return result.get();
1835 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1837 Semaphore notify = new Semaphore(0);
1838 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1839 final DataContainer<T> resultContainer = new DataContainer<T>();
1840 scheduleRequest(request, new AsyncProcedure<T>() {
1842 public void exception(AsyncReadGraph graph, Throwable throwable) {
1843 exceptionContainer.set(throwable);
1844 procedure.exception(graph, throwable);
1847 public void execute(AsyncReadGraph graph, T result) {
1848 resultContainer.set(result);
1849 procedure.execute(graph, result);
1851 }, getListenerBase(procedure), notify);
1852 acquire(notify, request, procedure);
1853 Throwable throwable = exceptionContainer.get();
1854 if (throwable != null) {
1855 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1856 else throw new DatabaseException("Unexpected exception", throwable);
1858 return resultContainer.get();
1863 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1865 Semaphore notify = new Semaphore(0);
1866 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1867 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1869 public void exception(AsyncReadGraph graph, Throwable throwable) {
1870 exceptionContainer.set(throwable);
1871 procedure.exception(graph, throwable);
1874 public void execute(AsyncReadGraph graph, T result) {
1875 procedure.execute(graph, result);
1878 public void finished(AsyncReadGraph graph) {
1879 procedure.finished(graph);
1882 acquire(notify, request, procedure);
1883 Throwable throwable = exceptionContainer.get();
1884 if (throwable != null) {
1885 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1886 else throw new DatabaseException("Unexpected exception", throwable);
1888 // TODO: implement return value
1889 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1894 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1895 return syncRequest(request, new ProcedureAdapter<T>());
1898 // assert(request != null);
1900 // final DataContainer<T> result = new DataContainer<T>();
1901 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1903 // syncRequest(request, new Procedure<T>() {
1906 // public void execute(T t) {
1911 // public void exception(Throwable t) {
1912 // exception.set(t);
1917 // Throwable t = exception.get();
1919 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1920 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1923 // return result.get();
1928 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1929 return syncRequest(request, (Procedure<T>)procedure);
1934 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1935 // assertNotSession();
1936 // Semaphore notify = new Semaphore(0);
1937 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1938 // DataContainer<T> result = new DataContainer<T>();
1939 // scheduleRequest(request, procedure, notify, container, result);
1940 // acquire(notify, request);
1941 // Throwable throwable = container.get();
1942 // if(throwable != null) {
1943 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1944 // else throw new DatabaseException("Unexpected exception", throwable);
1946 // return result.get();
1951 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1953 Semaphore notify = new Semaphore(0);
1954 final DataContainer<Throwable> container = new DataContainer<Throwable>();
1955 final DataContainer<T> result = new DataContainer<T>();
1956 scheduleRequest(request, procedure, notify, container, result);
1957 acquire(notify, request);
1958 Throwable throwable = container.get();
1959 if (throwable != null) {
1960 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1961 else throw new DatabaseException("Unexpected exception", throwable);
1963 return result.get();
1967 public void syncRequest(Write request) throws DatabaseException {
1970 Semaphore notify = new Semaphore(0);
1971 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1972 scheduleRequest(request, new Callback<DatabaseException>() {
1975 public void run(DatabaseException e) {
1980 acquire(notify, request);
1981 if(exception.get() != null) throw exception.get();
1985 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1987 Semaphore notify = new Semaphore(0);
1988 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1989 final DataContainer<T> result = new DataContainer<T>();
1990 scheduleRequest(request, new Procedure<T>() {
1993 public void exception(Throwable t) {
1998 public void execute(T t) {
2003 acquire(notify, request);
2004 if(exception.get() != null) {
2005 Throwable t = exception.get();
2006 if(t instanceof DatabaseException) throw (DatabaseException)t;
2007 else throw new DatabaseException(t);
2009 return result.get();
2013 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2015 Semaphore notify = new Semaphore(0);
2016 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2017 final DataContainer<T> result = new DataContainer<T>();
2018 scheduleRequest(request, new Procedure<T>() {
2021 public void exception(Throwable t) {
2026 public void execute(T t) {
2031 acquire(notify, request);
2032 if(exception.get() != null) {
2033 Throwable t = exception.get();
2034 if(t instanceof DatabaseException) throw (DatabaseException)t;
2035 else throw new DatabaseException(t);
2037 return result.get();
2041 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2043 Semaphore notify = new Semaphore(0);
2044 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2045 final DataContainer<T> result = new DataContainer<T>();
2046 scheduleRequest(request, new Procedure<T>() {
2049 public void exception(Throwable t) {
2054 public void execute(T t) {
2059 acquire(notify, request);
2060 if(exception.get() != null) {
2061 Throwable t = exception.get();
2062 if(t instanceof DatabaseException) throw (DatabaseException)t;
2063 else throw new DatabaseException(t);
2065 return result.get();
2069 public void syncRequest(DelayedWrite request) throws DatabaseException {
2071 Semaphore notify = new Semaphore(0);
2072 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2073 scheduleRequest(request, new Callback<DatabaseException>() {
2075 public void run(DatabaseException e) {
2079 acquire(notify, request);
2080 if(exception.get() != null) throw exception.get();
2084 public void syncRequest(WriteOnly request) throws DatabaseException {
2087 Semaphore notify = new Semaphore(0);
2088 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2089 scheduleRequest(request, new Callback<DatabaseException>() {
2091 public void run(DatabaseException e) {
2095 acquire(notify, request);
2096 if(exception.get() != null) throw exception.get();
2101 * GraphRequestProcessor interface
2105 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2107 scheduleRequest(request, procedure, null, null);
2112 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2114 scheduleRequest(request, procedure, null);
2119 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2121 scheduleRequest(request, procedure, null, null, null);
2126 public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
2128 scheduleRequest(request, callback, null);
2133 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2135 scheduleRequest(request, procedure, null);
2140 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2142 scheduleRequest(request, procedure, null);
2147 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2149 scheduleRequest(request, procedure, null);
2154 public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
2156 scheduleRequest(request, callback, null);
2161 public void asyncRequest(final Write r) {
2162 asyncRequest(r, null);
2166 public void asyncRequest(final DelayedWrite r) {
2167 asyncRequest(r, null);
2171 public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
2173 scheduleRequest(request, callback, null);
2178 public void asyncRequest(final WriteOnly request) {
2180 asyncRequest(request, null);
2185 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2186 r.request(this, procedure);
2190 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2191 r.request(this, procedure);
2195 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2196 r.request(this, procedure);
2200 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2201 r.request(this, procedure);
2205 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2206 r.request(this, procedure);
2210 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2211 r.request(this, procedure);
2215 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2216 return r.request(this);
2220 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2221 return r.request(this);
2225 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2226 r.request(this, procedure);
2230 public <T> void async(WriteInterface<T> r) {
2231 r.request(this, new ProcedureAdapter<T>());
2235 public void incAsync() {
2240 public void decAsync() {
2244 public long getCluster(ResourceImpl resource) {
2245 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2246 return cluster.getClusterId();
2249 public long getCluster(int id) {
2250 if (clusterTable == null)
2251 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2252 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2255 public ResourceImpl getResource(int id) {
2256 return new ResourceImpl(resourceSupport, id);
2259 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2260 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2261 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2262 int key = proxy.getClusterKey();
2263 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2264 return new ResourceImpl(resourceSupport, resourceKey);
2267 public ResourceImpl getResource2(int id) {
2269 return new ResourceImpl(resourceSupport, id);
2272 final public int getId(ResourceImpl impl) {
2276 public static final Charset UTF8 = Charset.forName("utf-8");
2281 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2282 for(TransientGraph g : support.providers) {
2283 if(g.isPending(subject)) return false;
2288 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2289 for(TransientGraph g : support.providers) {
2290 if(g.isPending(subject, predicate)) return false;
2295 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
2297 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2299 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2302 public void run(ReadGraphImpl graph) {
2303 if(ready.decrementAndGet() == 0) {
2304 runnable.run(graph);
2310 for(TransientGraph g : support.providers) {
2311 if(g.isPending(subject)) {
2313 g.load(graph, subject, composite);
2314 } catch (DatabaseException e) {
2315 e.printStackTrace();
2318 composite.run(graph);
2322 composite.run(graph);
2326 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
2328 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2330 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2333 public void run(ReadGraphImpl graph) {
2334 if(ready.decrementAndGet() == 0) {
2335 runnable.run(graph);
2341 for(TransientGraph g : support.providers) {
2342 if(g.isPending(subject, predicate)) {
2344 g.load(graph, subject, predicate, composite);
2345 } catch (DatabaseException e) {
2346 e.printStackTrace();
2349 composite.run(graph);
2353 composite.run(graph);
2357 // void dumpHeap() {
2360 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2361 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2362 // "d:/heap" + flushCounter + ".txt", true);
2363 // } catch (IOException e) {
2364 // e.printStackTrace();
2369 void fireReactionsToSynchronize(ChangeSet cs) {
2371 // Do not fire empty events
2375 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2376 // g.state.barrier.inc();
2380 if (!cs.isEmpty()) {
2381 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2382 for (ChangeListener l : changeListeners2) {
2385 } catch (Exception ex) {
2386 ex.printStackTrace();
2393 // g.state.barrier.dec();
2394 // g.waitAsync(null);
2400 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2403 // Do not fire empty events
2408 // graph.state.barrier.inc();
2412 if (!cs2.isEmpty()) {
2414 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2415 for (ChangeListener l : changeListeners2) {
2418 // System.out.println("changelistener " + l);
2419 } catch (Exception ex) {
2420 ex.printStackTrace();
2428 // graph.state.barrier.dec();
2429 // graph.waitAsync(null);
2432 } catch (Throwable t) {
2433 t.printStackTrace();
2437 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2441 // Do not fire empty events
2445 // Do not fire on virtual requests
2446 if(graph.getProvider() != null)
2449 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2453 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2454 for (ChangeListener l : metadataListeners) {
2457 } catch (Throwable ex) {
2458 ex.printStackTrace();
2466 } catch (Throwable t) {
2467 t.printStackTrace();
2476 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2479 public <T> T getService(Class<T> api) {
2480 T t = peekService(api);
2482 if (state.isClosed())
2483 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2484 throw new ServiceNotFoundException(this, api);
2492 protected abstract ServerInformation getCachedServerInformation();
2494 private Class<?> serviceKey1 = null;
2495 private Class<?> serviceKey2 = null;
2496 private Object service1 = null;
2497 private Object service2 = null;
2503 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2505 @SuppressWarnings("unchecked")
2507 public synchronized <T> T peekService(Class<T> api) {
2509 if(serviceKey1 == api) {
2511 } else if (serviceKey2 == api) {
2513 Object result = service2;
2514 service2 = service1;
2515 serviceKey2 = serviceKey1;
2521 if (Layer0.class == api)
2523 if (ServerInformation.class == api)
2524 return (T) getCachedServerInformation();
2525 else if (WriteGraphImpl.class == api)
2526 return (T) writeState.getGraph();
2527 else if (ClusterBuilder.class == api)
2528 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2529 else if (ClusterBuilderFactory.class == api)
2530 return (T)new ClusterBuilderFactoryImpl(this);
2532 service2 = service1;
2533 serviceKey2 = serviceKey1;
2535 service1 = serviceLocator.peekService(api);
2546 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2549 public boolean hasService(Class<?> api) {
2550 return serviceLocator.hasService(api);
2554 * @param api the api that must be implemented by the specified service
2555 * @param service the service implementation
2558 public <T> void registerService(Class<T> api, T service) {
2559 if(Layer0.class == api) {
2560 L0 = (Layer0)service;
2563 serviceLocator.registerService(api, service);
2564 if (TransactionPolicySupport.class == api) {
2565 transactionPolicy = (TransactionPolicySupport)service;
2566 state.resetTransactionPolicy();
2568 if (api == serviceKey1)
2570 else if (api == serviceKey2)
2578 void fireSessionVariableChange(String variable) {
2579 for (MonitorHandler h : monitorHandlers) {
2580 MonitorContext ctx = monitorContexts.getLeft(h);
2582 // SafeRunner functionality repeated here to avoid dependency.
2584 h.valuesChanged(ctx);
2585 } catch (Exception e) {
2586 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2587 } catch (LinkageError e) {
2588 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2594 class ResourceSerializerImpl implements ResourceSerializer {
2596 public long createRandomAccessId(int id) throws DatabaseException {
2599 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2600 long cluster = getCluster(id);
2602 return 0; // Better to return 0 then invalid id.
2603 long result = ClusterTraitsBase.createResourceId(cluster, index);
2608 public long getRandomAccessId(Resource resource) throws DatabaseException {
2610 ResourceImpl resourceImpl = (ResourceImpl) resource;
2611 return createRandomAccessId(resourceImpl.id);
2616 public int getTransientId(Resource resource) throws DatabaseException {
2618 ResourceImpl resourceImpl = (ResourceImpl) resource;
2619 return resourceImpl.id;
2624 public String createRandomAccessId(Resource resource)
2625 throws InvalidResourceReferenceException {
2627 if(resource == null) throw new IllegalArgumentException();
2629 ResourceImpl resourceImpl = (ResourceImpl) resource;
2631 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2635 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2636 } catch (DatabaseException e1) {
2637 throw new InvalidResourceReferenceException(e1);
2640 // Serialize as '<resource index>_<cluster id>'
2641 return "" + r + "_" + getCluster(resourceImpl);
2642 } catch (Throwable e) {
2643 e.printStackTrace();
2644 throw new InvalidResourceReferenceException(e);
2649 public int getTransientId(long serialized) throws DatabaseException {
2650 if (serialized <= 0)
2651 return (int)serialized;
2652 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2653 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2654 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2655 if (cluster == null)
2656 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2657 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2662 public Resource getResource(long randomAccessId) throws DatabaseException {
2663 return getResourceByKey(getTransientId(randomAccessId));
2667 public Resource getResource(int transientId) throws DatabaseException {
2668 return getResourceByKey(transientId);
2672 public Resource getResource(String randomAccessId)
2673 throws InvalidResourceReferenceException {
2675 int i = randomAccessId.indexOf('_');
2677 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2678 + randomAccessId + "'");
2679 int r = Integer.parseInt(randomAccessId.substring(0, i));
2680 if(r < 0) return getResourceByKey(r);
2681 long c = Long.parseLong(randomAccessId.substring(i + 1));
2682 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2683 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2684 if (cluster.hasResource(key, clusterTranslator))
2685 return getResourceByKey(key);
2686 } catch (InvalidResourceReferenceException e) {
2688 } catch (NumberFormatException e) {
2689 throw new InvalidResourceReferenceException(e);
2690 } catch (Throwable e) {
2691 e.printStackTrace();
2692 throw new InvalidResourceReferenceException(e);
2695 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2699 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2702 } catch (Throwable e) {
2703 e.printStackTrace();
2704 throw new InvalidResourceReferenceException(e);
2710 // Copied from old SessionImpl
2713 if (state.isClosed())
2714 throw new Error("Session closed.");
2719 public ResourceImpl getNewResource(long clusterId) {
2720 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2723 newId = cluster.createResource(clusterTranslator);
2724 } catch (DatabaseException e) {
2725 Logger.defaultLogError(e);
2728 return new ResourceImpl(resourceSupport, newId);
2731 public ResourceImpl getNewResource(Resource clusterSet)
2732 throws DatabaseException {
2733 long resourceId = clusterSet.getResourceId();
2734 Long clusterId = clusterSetsSupport.get(resourceId);
2735 if (null == clusterId)
2736 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2737 if (Constants.NewClusterId == clusterId) {
2738 clusterId = getService(ClusteringSupport.class).createCluster();
2739 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2740 createdClusters.add(clusterId);
2741 clusterSetsSupport.put(resourceId, clusterId);
2742 return getNewResource(clusterId);
2744 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2745 ResourceImpl result;
2746 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2747 clusterId = getService(ClusteringSupport.class).createCluster();
2748 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2749 createdClusters.add(clusterId);
2750 clusterSetsSupport.put(resourceId, clusterId);
2751 return getNewResource(clusterId);
2753 result = getNewResource(clusterId);
2754 int resultKey = querySupport.getId(result);
2755 long resultCluster = querySupport.getClusterId(resultKey);
2756 if (clusterId != resultCluster)
2757 clusterSetsSupport.put(resourceId, resultCluster);
2763 public void getNewClusterSet(Resource clusterSet)
2764 throws DatabaseException {
2766 System.out.println("new cluster set=" + clusterSet);
2767 long resourceId = clusterSet.getResourceId();
2768 if (clusterSetsSupport.containsKey(resourceId))
2769 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2770 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2772 public boolean containsClusterSet(Resource clusterSet)
2773 throws ServiceException {
2774 long resourceId = clusterSet.getResourceId();
2775 return clusterSetsSupport.containsKey(resourceId);
2777 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2778 Resource r = defaultClusterSet;
2779 defaultClusterSet = clusterSet;
2782 void printDiagnostics() {
2786 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2787 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2788 for(ClusterI cluster : clusterTable.getClusters()) {
2790 if (cluster.isLoaded() && !cluster.isEmpty()) {
2791 residentClusters.set(residentClusters.get() + 1);
2792 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2794 } catch (DatabaseException e) {
2795 Logger.defaultLogError(e);
2799 System.out.println("--------------------------------");
2800 System.out.println("Cluster information:");
2801 System.out.println("-amount of resident clusters=" + residentClusters.get());
2802 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2804 for(ClusterI cluster : clusterTable.getClusters()) {
2805 System.out.print("Cluster " + cluster.getClusterId() + " [");
2807 if (!cluster.isLoaded())
2808 System.out.println("not loaded]");
2809 else if (cluster.isEmpty())
2810 System.out.println("is empty]");
2812 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2813 System.out.print(",references=" + cluster.getReferenceCount());
2814 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2816 } catch (DatabaseException e) {
2817 Logger.defaultLogError(e);
2818 System.out.println("is corrupted]");
2822 queryProvider2.printDiagnostics();
2823 System.out.println("--------------------------------");
2828 public void fireStartReadTransaction() {
2831 System.out.println("StartReadTransaction");
2833 for (SessionEventListener listener : eventListeners) {
2835 listener.readTransactionStarted();
2836 } catch (Throwable t) {
2837 t.printStackTrace();
2843 public void fireFinishReadTransaction() {
2846 System.out.println("FinishReadTransaction");
2848 for (SessionEventListener listener : eventListeners) {
2850 listener.readTransactionFinished();
2851 } catch (Throwable t) {
2852 t.printStackTrace();
2858 public void fireStartWriteTransaction() {
2861 System.out.println("StartWriteTransaction");
2863 for (SessionEventListener listener : eventListeners) {
2865 listener.writeTransactionStarted();
2866 } catch (Throwable t) {
2867 t.printStackTrace();
2873 public void fireFinishWriteTransaction() {
2876 System.out.println("FinishWriteTransaction");
2878 for (SessionEventListener listener : eventListeners) {
2880 listener.writeTransactionFinished();
2881 } catch (Throwable t) {
2882 t.printStackTrace();
2886 Indexing.resetDependenciesIndexingDisabled();
2890 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2891 throw new Error("Not supported at the moment.");
2898 ClusterTable getClusterTable() {
2899 return clusterTable;
2902 public GraphSession getGraphSession() {
2903 return graphSession;
2906 public QueryProcessor getQueryProvider2() {
2907 return queryProvider2;
2910 ClientChangesImpl getClientChanges() {
2911 return clientChanges;
2914 boolean getWriteOnly() {
2918 static int counter = 0;
2920 public void onClusterLoaded(long clusterId) {
2922 clusterTable.updateSize();
2930 * Implementation of the interface RequestProcessor
2934 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2939 assert(request != null);
2941 final DataContainer<T> result = new DataContainer<T>();
2942 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2944 syncRequest(request, new AsyncProcedure<T>() {
2947 public void execute(AsyncReadGraph graph, T t) {
2952 public void exception(AsyncReadGraph graph, Throwable t) {
2958 Throwable t = exception.get();
2960 if(t instanceof DatabaseException) throw (DatabaseException)t;
2961 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2964 return result.get();
2969 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2970 // assertNotSession();
2971 // return syncRequest(request, (AsyncProcedure<T>)procedure);
2975 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2977 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2981 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2983 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2987 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2989 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2993 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2995 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2999 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3003 assert(request != null);
3005 final DataContainer<T> result = new DataContainer<T>();
3006 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3008 syncRequest(request, new AsyncProcedure<T>() {
3011 public void execute(AsyncReadGraph graph, T t) {
3016 public void exception(AsyncReadGraph graph, Throwable t) {
3022 Throwable t = exception.get();
3024 if(t instanceof DatabaseException) throw (DatabaseException)t;
3025 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3028 return result.get();
3033 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3035 return syncRequest(request, (AsyncProcedure<T>)procedure);
3039 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3041 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3045 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3047 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3051 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3053 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3057 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3059 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3063 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3067 assert(request != null);
3069 final ArrayList<T> result = new ArrayList<T>();
3070 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3072 syncRequest(request, new AsyncMultiProcedure<T>() {
3075 public void execute(AsyncReadGraph graph, T t) {
3076 synchronized(result) {
3082 public void finished(AsyncReadGraph graph) {
3086 public void exception(AsyncReadGraph graph, Throwable t) {
3093 Throwable t = exception.get();
3095 if(t instanceof DatabaseException) throw (DatabaseException)t;
3096 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3104 public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3106 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3110 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3112 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3116 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3118 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3122 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3124 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3128 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3130 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3134 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3138 assert(request != null);
3140 final ArrayList<T> result = new ArrayList<T>();
3141 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3143 syncRequest(request, new AsyncMultiProcedure<T>() {
3146 public void execute(AsyncReadGraph graph, T t) {
3147 synchronized(result) {
3153 public void finished(AsyncReadGraph graph) {
3157 public void exception(AsyncReadGraph graph, Throwable t) {
3164 Throwable t = exception.get();
3166 if(t instanceof DatabaseException) throw (DatabaseException)t;
3167 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3175 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3177 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3181 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3183 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3187 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3189 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3193 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3195 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3199 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3201 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3206 public <T> void asyncRequest(Read<T> request) {
3208 asyncRequest(request, new ProcedureAdapter<T>() {
3210 public void exception(Throwable t) {
3211 t.printStackTrace();
3218 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3219 asyncRequest(request, (AsyncProcedure<T>)procedure);
3223 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3224 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3228 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3229 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3233 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3234 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3238 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3239 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3243 final public <T> void asyncRequest(final AsyncRead<T> request) {
3245 assert(request != null);
3247 asyncRequest(request, new ProcedureAdapter<T>() {
3249 public void exception(Throwable t) {
3250 t.printStackTrace();
3257 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3258 scheduleRequest(request, procedure, procedure, null);
3262 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3263 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3267 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3268 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3272 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3273 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3277 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3278 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3282 public <T> void asyncRequest(MultiRead<T> request) {
3284 assert(request != null);
3286 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3288 public void exception(AsyncReadGraph graph, Throwable t) {
3289 t.printStackTrace();
3296 public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3297 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3301 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3302 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3306 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3307 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3311 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3312 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3316 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3317 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3321 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3323 assert(request != null);
3325 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3327 public void exception(AsyncReadGraph graph, Throwable t) {
3328 t.printStackTrace();
3335 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3336 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3340 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3341 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3345 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3346 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3350 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3351 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3355 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3356 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3360 public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3362 throw new Error("Not implemented!");
3366 public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3367 throw new Error("Not implemented!");
3371 final public <T> void asyncRequest(final ExternalRead<T> request) {
3373 assert(request != null);
3375 asyncRequest(request, new ProcedureAdapter<T>() {
3377 public void exception(Throwable t) {
3378 t.printStackTrace();
3385 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3386 asyncRequest(request, (Procedure<T>)procedure);
3391 void check(Throwable t) throws DatabaseException {
3393 if(t instanceof DatabaseException) throw (DatabaseException)t;
3394 else throw new DatabaseException("Unexpected exception", t);
3398 void check(DataContainer<Throwable> container) throws DatabaseException {
3399 Throwable t = container.get();
3401 if(t instanceof DatabaseException) throw (DatabaseException)t;
3402 else throw new DatabaseException("Unexpected exception", t);
3411 boolean sameProvider(Write request) {
3412 if(writeState.getGraph().provider != null) {
3413 return writeState.getGraph().provider.equals(request.getProvider());
3415 return request.getProvider() == null;
3419 boolean plainWrite(WriteGraphImpl graph) {
3420 if(graph == null) return false;
3421 if(graph.writeSupport.writeOnly()) return false;
3426 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3427 private void assertNotSession() throws DatabaseException {
3428 Thread current = Thread.currentThread();
3429 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3432 void assertAlive() {
3433 if (!state.isAlive())
3434 throw new RuntimeDatabaseException("Session has been shut down.");
3437 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3438 return querySupport.getValueStream(graph, querySupport.getId(resource));
3441 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3442 return querySupport.getValue(graph, querySupport.getId(resource));
3445 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3446 acquire(semaphore, request, null);
3449 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3453 // Loop until the semaphore is acquired reporting requests that take a long time.
3456 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3460 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3462 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3463 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3465 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3466 + (waitTime) + " ms. (procedure=" + procedure + ")");
3473 } catch (InterruptedException e) {
3474 e.printStackTrace();
3475 // FIXME: Should perhaps do something else in this case ??
3482 // public boolean holdOnToTransactionAfterCancel() {
3483 // return transactionPolicy.holdOnToTransactionAfterCancel();
3487 // public boolean holdOnToTransactionAfterCommit() {
3488 // return transactionPolicy.holdOnToTransactionAfterCommit();
3492 // public boolean holdOnToTransactionAfterRead() {
3493 // return transactionPolicy.holdOnToTransactionAfterRead();
3497 // public void onRelinquish() {
3498 // transactionPolicy.onRelinquish();
3502 // public void onRelinquishDone() {
3503 // transactionPolicy.onRelinquishDone();
3507 // public void onRelinquishError() {
3508 // transactionPolicy.onRelinquishError();
3513 public Session getSession() {
3518 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3519 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3521 public void ceased(int thread) {
3523 requestManager.ceased(thread);
3527 public int getAmountOfQueryThreads() {
3528 // This must be a power of two
3530 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3533 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3535 return new ResourceImpl(resourceSupport, key);
3539 public void acquireWriteOnly() {
3543 public void releaseWriteOnly(ReadGraphImpl graph) {
3545 queryProvider2.releaseWrite(graph);
3548 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3550 long start = System.nanoTime();
3552 while(dirtyPrimitives) {
3553 dirtyPrimitives = false;
3554 getQueryProvider2().performDirtyUpdates(writer);
3555 getQueryProvider2().performScheduledUpdates(writer);
3558 fireMetadataListeners(writer, clientChanges);
3560 if(DebugPolicy.PERFORMANCE_DATA) {
3561 long end = System.nanoTime();
3562 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3567 void removeTemporaryData() {
3568 File platform = Platform.getLocation().toFile();
3569 File tempFiles = new File(platform, "tempFiles");
3570 File temp = new File(tempFiles, "db");
3574 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3576 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3579 } catch (IOException e) {
3580 Logger.defaultLogError(e);
3582 return FileVisitResult.CONTINUE;
3585 } catch (IOException e) {
3586 Logger.defaultLogError(e);
3590 public void handleCreatedClusters() {
3591 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3592 createdClusters.forEach(new TLongProcedure() {
3595 public boolean execute(long value) {
3596 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3597 cluster.setImmutable(true, clusterTranslator);
3602 createdClusters.clear();
3606 public Object getModificationCounter() {
3607 return queryProvider2.modificationCounter;
3611 public void markUndoPoint() {
3612 state.setCombine(false);