1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
31 import org.eclipse.core.runtime.Platform;
32 import org.simantics.databoard.Bindings;
33 import org.simantics.db.AsyncReadGraph;
34 import org.simantics.db.ChangeSet;
35 import org.simantics.db.DevelopmentKeys;
36 import org.simantics.db.ExternalValueSupport;
37 import org.simantics.db.Metadata;
38 import org.simantics.db.MonitorContext;
39 import org.simantics.db.MonitorHandler;
40 import org.simantics.db.Resource;
41 import org.simantics.db.ResourceSerializer;
42 import org.simantics.db.Session;
43 import org.simantics.db.SessionManager;
44 import org.simantics.db.SessionVariables;
45 import org.simantics.db.VirtualGraph;
46 import org.simantics.db.WriteGraph;
47 import org.simantics.db.authentication.UserAuthenticationAgent;
48 import org.simantics.db.authentication.UserAuthenticator;
49 import org.simantics.db.common.Indexing;
50 import org.simantics.db.common.TransactionPolicyRelease;
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
63 import org.simantics.db.common.utils.Logger;
64 import org.simantics.db.event.ChangeEvent;
65 import org.simantics.db.event.ChangeListener;
66 import org.simantics.db.event.SessionEventListener;
67 import org.simantics.db.exception.CancelTransactionException;
68 import org.simantics.db.exception.ClusterSetExistException;
69 import org.simantics.db.exception.DatabaseException;
70 import org.simantics.db.exception.ImmutableException;
71 import org.simantics.db.exception.InvalidResourceReferenceException;
72 import org.simantics.db.exception.ResourceNotFoundException;
73 import org.simantics.db.exception.RuntimeDatabaseException;
74 import org.simantics.db.exception.ServiceException;
75 import org.simantics.db.exception.ServiceNotFoundException;
76 import org.simantics.db.impl.ClusterBase;
77 import org.simantics.db.impl.ClusterI;
78 import org.simantics.db.impl.ClusterTraitsBase;
79 import org.simantics.db.impl.ClusterTranslator;
80 import org.simantics.db.impl.ResourceImpl;
81 import org.simantics.db.impl.TransientGraph;
82 import org.simantics.db.impl.VirtualGraphImpl;
83 import org.simantics.db.impl.graph.DelayedWriteGraph;
84 import org.simantics.db.impl.graph.ReadGraphImpl;
85 import org.simantics.db.impl.graph.WriteGraphImpl;
86 import org.simantics.db.impl.graph.WriteSupport;
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
90 import org.simantics.db.impl.query.QueryProcessor;
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
93 import org.simantics.db.impl.service.QueryDebug;
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
96 import org.simantics.db.procedure.AsyncListener;
97 import org.simantics.db.procedure.AsyncMultiListener;
98 import org.simantics.db.procedure.AsyncMultiProcedure;
99 import org.simantics.db.procedure.AsyncProcedure;
100 import org.simantics.db.procedure.Listener;
101 import org.simantics.db.procedure.ListenerBase;
102 import org.simantics.db.procedure.MultiListener;
103 import org.simantics.db.procedure.MultiProcedure;
104 import org.simantics.db.procedure.Procedure;
105 import org.simantics.db.procedure.SyncListener;
106 import org.simantics.db.procedure.SyncMultiListener;
107 import org.simantics.db.procedure.SyncMultiProcedure;
108 import org.simantics.db.procedure.SyncProcedure;
109 import org.simantics.db.procore.cluster.ClusterImpl;
110 import org.simantics.db.procore.cluster.ClusterTraits;
111 import org.simantics.db.procore.protocol.Constants;
112 import org.simantics.db.procore.protocol.DebugPolicy;
113 import org.simantics.db.request.AsyncMultiRead;
114 import org.simantics.db.request.AsyncRead;
115 import org.simantics.db.request.DelayedWrite;
116 import org.simantics.db.request.DelayedWriteResult;
117 import org.simantics.db.request.ExternalRead;
118 import org.simantics.db.request.MultiRead;
119 import org.simantics.db.request.Read;
120 import org.simantics.db.request.ReadInterface;
121 import org.simantics.db.request.UndoTraits;
122 import org.simantics.db.request.Write;
123 import org.simantics.db.request.WriteInterface;
124 import org.simantics.db.request.WriteOnly;
125 import org.simantics.db.request.WriteOnlyResult;
126 import org.simantics.db.request.WriteResult;
127 import org.simantics.db.request.WriteTraits;
128 import org.simantics.db.service.ByteReader;
129 import org.simantics.db.service.ClusterBuilder;
130 import org.simantics.db.service.ClusterBuilderFactory;
131 import org.simantics.db.service.ClusterControl;
132 import org.simantics.db.service.ClusterSetsSupport;
133 import org.simantics.db.service.ClusterUID;
134 import org.simantics.db.service.ClusteringSupport;
135 import org.simantics.db.service.CollectionSupport;
136 import org.simantics.db.service.DebugSupport;
137 import org.simantics.db.service.DirectQuerySupport;
138 import org.simantics.db.service.GraphChangeListenerSupport;
139 import org.simantics.db.service.InitSupport;
140 import org.simantics.db.service.LifecycleSupport;
141 import org.simantics.db.service.ManagementSupport;
142 import org.simantics.db.service.QueryControl;
143 import org.simantics.db.service.SerialisationSupport;
144 import org.simantics.db.service.ServerInformation;
145 import org.simantics.db.service.ServiceActivityMonitor;
146 import org.simantics.db.service.SessionEventSupport;
147 import org.simantics.db.service.SessionMonitorSupport;
148 import org.simantics.db.service.SessionUserSupport;
149 import org.simantics.db.service.StatementSupport;
150 import org.simantics.db.service.TransactionPolicySupport;
151 import org.simantics.db.service.TransactionSupport;
152 import org.simantics.db.service.TransferableGraphSupport;
153 import org.simantics.db.service.UndoRedoSupport;
154 import org.simantics.db.service.VirtualGraphSupport;
155 import org.simantics.db.service.XSupport;
156 import org.simantics.layer0.Layer0;
157 import org.simantics.utils.DataContainer;
158 import org.simantics.utils.Development;
159 import org.simantics.utils.datastructures.Callback;
160 import org.simantics.utils.threads.logger.ITask;
161 import org.simantics.utils.threads.logger.ThreadLogger;
163 import gnu.trove.procedure.TLongProcedure;
164 import gnu.trove.set.hash.TLongHashSet;
167 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
169 protected static final boolean DEBUG = false;
171 private static final boolean DIAGNOSTICS = false;
173 private TransactionPolicySupport transactionPolicy;
175 final private ClusterControl clusterControl;
177 final protected BuiltinSupportImpl builtinSupport;
178 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
179 final protected ClusterSetsSupport clusterSetsSupport;
180 final protected LifecycleSupportImpl lifecycleSupport;
182 protected QuerySupportImpl querySupport;
183 protected ResourceSupportImpl resourceSupport;
184 protected WriteSupport writeSupport;
185 public ClusterTranslator clusterTranslator;
187 boolean dirtyPrimitives = false;
189 public static final int SERVICE_MODE_CREATE = 2;
190 public static final int SERVICE_MODE_ALLOW = 1;
191 public int serviceMode = 0;
192 public boolean createdImmutableClusters = false;
193 public TLongHashSet createdClusters = new TLongHashSet();
198 * The service locator maintained by the workbench. These services are
199 * initialized during workbench during the <code>init</code> method.
201 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
203 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
205 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
207 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
209 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
211 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
213 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
215 final protected State state = new State();
217 protected GraphSession graphSession = null;
219 protected SessionManager sessionManagerImpl = null;
221 protected UserAuthenticationAgent authAgent = null;
223 protected UserAuthenticator authenticator = null;
225 protected Resource user = null;
227 protected ClusterStream clusterStream = null;
229 protected SessionRequestManager requestManager = null;
231 public ClusterTable clusterTable = null;
233 public QueryProcessor queryProvider2 = null;
235 ClientChangesImpl clientChanges = null;
237 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
239 // protected long newClusterId = Constants.NullClusterId;
241 protected int flushCounter = 0;
243 protected boolean writeOnly = false;
245 WriteState<?> writeState = null;
246 WriteStateBase<?> delayedWriteState = null;
247 protected Resource defaultClusterSet = null; // If not null then used for newResource().
249 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
251 // if (authAgent == null)
252 // throw new IllegalArgumentException("null authentication agent");
254 File t = StaticSessionProperties.virtualGraphStoragePath;
257 this.clusterTable = new ClusterTable(this, t);
258 this.builtinSupport = new BuiltinSupportImpl(this);
259 this.sessionManagerImpl = sessionManagerImpl;
261 // this.authAgent = authAgent;
263 serviceLocator.registerService(Session.class, this);
264 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
265 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
266 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
267 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
268 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
269 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
270 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
271 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
272 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
273 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
274 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
275 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
276 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
277 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
278 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
279 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
280 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
281 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
282 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
283 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
284 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
285 ServiceActivityUpdaterForWriteTransactions.register(this);
287 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
288 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
289 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
290 this.lifecycleSupport = new LifecycleSupportImpl(this);
291 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
292 this.transactionPolicy = new TransactionPolicyRelease();
293 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
294 this.clusterControl = new ClusterControlImpl(this);
295 serviceLocator.registerService(ClusterControl.class, clusterControl);
296 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
297 this.clusterSetsSupport.setReadDirectory(t.toPath());
298 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
299 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
309 public Resource getRootLibrary() {
310 return queryProvider2.getRootLibraryResource();
313 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
314 if (!graphSession.dbSession.refreshEnabled())
317 getClusterTable().refresh(csid, this, clusterUID);
318 } catch (Throwable t) {
319 Logger.defaultLogError("Refesh failed.", t);
323 static final class TaskHelper {
324 private final String name;
325 private Object result;
326 final Semaphore sema = new Semaphore(0);
327 private Throwable throwable = null;
328 final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
330 public void run(DatabaseException e) {
331 synchronized (TaskHelper.this) {
336 final Procedure<Object> proc = new Procedure<Object>() {
338 public void execute(Object result) {
342 public void exception(Throwable t) {
343 if (t instanceof DatabaseException)
344 callback.run((DatabaseException)t);
346 callback.run(new DatabaseException("" + name + "operation failed.", t));
349 final WriteTraits writeTraits = new WriteTraits() {
351 public UndoTraits getUndoTraits() {
355 public VirtualGraph getProvider() {
359 TaskHelper(String name) {
365 void setResult(Object result) {
366 this.result = result;
368 // Throwable throwableGet() {
371 synchronized void throwableSet(Throwable t) {
374 // void throwableSet(String t) {
375 // throwable = new InternalException("" + name + " operation failed. " + t);
377 synchronized void throwableCheck()
378 throws DatabaseException {
379 if (null != throwable)
380 if (throwable instanceof DatabaseException)
381 throw (DatabaseException)throwable;
383 throw new DatabaseException("Undo operation failed.", throwable);
385 void throw_(String message)
386 throws DatabaseException {
387 throw new DatabaseException("" + name + " operation failed. " + message);
393 private ListenerBase getListenerBase(Object procedure) {
394 if (procedure instanceof ListenerBase)
395 return (ListenerBase) procedure;
400 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
401 scheduleRequest(request, callback, notify, null);
405 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
408 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
410 assert (request != null);
412 if(Development.DEVELOPMENT) {
414 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
415 System.err.println("schedule write '" + request + "'");
416 } catch (Throwable t) {
417 Logger.defaultLogError(t);
421 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
423 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
426 public void run(int thread) {
428 if(Development.DEVELOPMENT) {
430 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
431 System.err.println("perform write '" + request + "'");
432 } catch (Throwable t) {
433 Logger.defaultLogError(t);
437 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
439 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
444 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
446 VirtualGraph vg = getProvider(request.getProvider());
448 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
449 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
452 public void execute(Object result) {
453 if(callback != null) callback.run(null);
457 public void exception(Throwable t) {
458 if(callback != null) callback.run((DatabaseException)t);
463 assert (null != writer);
464 // writer.state.barrier.inc();
467 request.perform(writer);
468 assert (null != writer);
469 } catch (Throwable t) {
470 if (!(t instanceof CancelTransactionException))
471 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
472 writeState.except(t);
474 // writer.state.barrier.dec();
475 // writer.waitAsync(request);
479 assert(!queryProvider2.dirty);
481 } catch (Throwable e) {
483 // Log it first, just to be safe that the error is always logged.
484 if (!(e instanceof CancelTransactionException))
485 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
487 // writeState.getGraph().state.barrier.dec();
488 // writeState.getGraph().waitAsync(request);
490 writeState.except(e);
493 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
494 // // All we can do here is to log those, can't really pass them anywhere.
495 // if (callback != null) {
496 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
497 // else callback.run(new DatabaseException(e));
499 // } catch (Throwable e2) {
500 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
502 // boolean empty = clusterStream.reallyFlush();
503 // int callerThread = -1;
504 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
505 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
507 // // Send all local changes to server so it can calculate correct reverse change set.
508 // // This will call clusterStream.accept().
509 // state.cancelCommit(context, clusterStream);
511 // if (!context.isOk()) // this is a blocking operation
512 // throw new InternalException("Cancel failed. This should never happen. Contact application support.");
513 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
515 // state.cancelCommit2(context, clusterStream);
516 // } catch (DatabaseException e3) {
517 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
519 // clusterStream.setOff(false);
520 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
525 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
531 if(Development.DEVELOPMENT) {
533 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
534 System.err.println("finish write '" + request + "'");
535 } catch (Throwable t) {
536 Logger.defaultLogError(t);
546 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
547 scheduleRequest(request, procedure, notify, null);
551 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
554 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
556 assert (request != null);
558 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
560 requestManager.scheduleWrite(new SessionTask(request, thread) {
563 public void run(int thread) {
565 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
567 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
570 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
572 VirtualGraph vg = getProvider(request.getProvider());
573 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
576 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
577 writeState = writeStateT;
579 assert (null != writer);
580 // writer.state.barrier.inc();
581 writeStateT.setResult(request.perform(writer));
582 assert (null != writer);
584 // writer.state.barrier.dec();
585 // writer.waitAsync(null);
587 } catch (Throwable e) {
589 // writer.state.barrier.dec();
590 // writer.waitAsync(null);
592 writeState.except(e);
594 // state.stopWriteTransaction(clusterStream);
596 // } catch (Throwable e) {
597 // // Log it first, just to be safe that the error is always logged.
598 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
601 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
602 // // All we can do here is to log those, can't really pass them anywhere.
603 // if (procedure != null) {
604 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
605 // else procedure.exception(new DatabaseException(e));
607 // } catch (Throwable e2) {
608 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
611 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
613 // state.stopWriteTransaction(clusterStream);
616 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
619 // if(notify != null) notify.release();
629 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
630 scheduleRequest(request, callback, notify, null);
634 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
637 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
639 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
641 assert (request != null);
643 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
645 requestManager.scheduleWrite(new SessionTask(request, thread) {
648 public void run(int thread) {
649 fireSessionVariableChange(SessionVariables.QUEUED_READS);
651 Procedure<Object> stateProcedure = new Procedure<Object>() {
653 public void execute(Object result) {
654 if (callback != null)
658 public void exception(Throwable t) {
659 if (callback != null) {
660 if (t instanceof DatabaseException) callback.run((DatabaseException) t);
661 else callback.run(new DatabaseException(t));
663 Logger.defaultLogError("Unhandled exception", t);
667 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
668 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
669 DelayedWriteGraph dwg = null;
670 // newGraph.state.barrier.inc();
673 dwg = new DelayedWriteGraph(newGraph);
674 request.perform(dwg);
675 } catch (Throwable e) {
676 delayedWriteState.except(e);
681 // newGraph.state.barrier.dec();
682 // newGraph.waitAsync(request);
683 fireSessionVariableChange(SessionVariables.QUEUED_READS);
686 delayedWriteState = null;
688 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
689 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
692 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
696 VirtualGraph vg = getProvider(request.getProvider());
697 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
699 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
701 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
703 assert (null != writer);
704 // writer.state.barrier.inc();
708 dwg.commit(writer, request);
710 if(defaultClusterSet != null) {
711 XSupport xs = getService(XSupport.class);
712 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
713 ClusteringSupport cs = getService(ClusteringSupport.class);
714 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
717 // This makes clusters available from server
718 clusterStream.reallyFlush();
720 releaseWriteOnly(writer);
721 handleUpdatesAndMetadata(writer);
723 } catch (ServiceException e) {
724 // writer.state.barrier.dec();
725 // writer.waitAsync(null);
727 // These shall be requested from server
728 clusterTable.removeWriteOnlyClusters();
729 // This makes clusters available from server
730 clusterStream.reallyFlush();
732 releaseWriteOnly(writer);
733 writeState.except(e);
735 // Debugging & Profiling
736 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
746 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
747 scheduleRequest(request, procedure, notify, null);
751 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
754 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
755 throw new Error("Not implemented");
758 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
759 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
760 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
761 createdClusters.add(cluster.clusterId);
766 class WriteOnlySupport implements WriteSupport {
768 ClusterStream stream;
769 ClusterImpl currentCluster;
771 public WriteOnlySupport() {
772 this.stream = clusterStream;
776 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
777 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
781 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
783 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
785 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
786 if(s != queryProvider2.getRootLibrary())
787 throw new ImmutableException("Trying to modify immutable resource key=" + s);
790 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
791 } catch (DatabaseException e) {
792 Logger.defaultLogError(e);
796 clientChanges.invalidate(s);
798 if (cluster.isWriteOnly())
800 queryProvider2.updateStatements(s, p);
805 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
806 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
810 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
812 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
814 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
815 } catch (DatabaseException e) {
816 Logger.defaultLogError(e);
819 clientChanges.invalidate(rid);
821 if (cluster.isWriteOnly())
823 queryProvider2.updateValue(rid);
828 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
831 claimValue(provider,resource, reader.readBytes(null, amount));
835 byte[] bytes = new byte[65536];
837 int rid = ((ResourceImpl)resource).id;
838 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
842 int block = Math.min(left, 65536);
843 reader.readBytes(bytes, block);
844 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
847 } catch (DatabaseException e) {
848 Logger.defaultLogError(e);
851 clientChanges.invalidate(rid);
853 if (cluster.isWriteOnly())
855 queryProvider2.updateValue(rid);
859 private void maintainCluster(ClusterImpl before, ClusterI after_) {
860 if(after_ != null && after_ != before) {
861 ClusterImpl after = (ClusterImpl)after_;
862 if(currentCluster == before) currentCluster = after;
863 clusterTable.replaceCluster(after);
867 public int createResourceKey(int foreignCounter) throws DatabaseException {
868 if(currentCluster == null)
869 currentCluster = getNewResourceCluster();
870 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
871 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
872 newCluster.foreignLookup = new byte[foreignCounter];
873 currentCluster = newCluster;
875 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
877 return currentCluster.createResource(clusterTranslator);
881 public Resource createResource(VirtualGraph provider) throws DatabaseException {
882 if(currentCluster == null) {
883 if (null != defaultClusterSet) {
884 ResourceImpl result = getNewResource(defaultClusterSet);
885 currentCluster = clusterTable.getClusterByResourceKey(result.id);
888 currentCluster = getNewResourceCluster();
891 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
892 if (null != defaultClusterSet) {
893 ResourceImpl result = getNewResource(defaultClusterSet);
894 currentCluster = clusterTable.getClusterByResourceKey(result.id);
897 currentCluster = getNewResourceCluster();
900 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
904 public Resource createResource(VirtualGraph provider, long clusterId)
905 throws DatabaseException {
906 return getNewResource(clusterId);
910 public Resource createResource(VirtualGraph provider, Resource clusterSet)
911 throws DatabaseException {
912 return getNewResource(clusterSet);
916 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
917 throws DatabaseException {
918 getNewClusterSet(clusterSet);
922 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
923 throws ServiceException {
924 return containsClusterSet(clusterSet);
927 public void selectCluster(long cluster) {
928 currentCluster = clusterTable.getClusterByClusterId(cluster);
929 long setResourceId = clusterSetsSupport.getSet(cluster);
930 clusterSetsSupport.put(setResourceId, cluster);
934 public Resource setDefaultClusterSet(Resource clusterSet)
935 throws ServiceException {
936 Resource result = setDefaultClusterSet4NewResource(clusterSet);
937 if(clusterSet != null) {
938 long id = clusterSetsSupport.get(clusterSet.getResourceId());
939 currentCluster = clusterTable.getClusterByClusterId(id);
942 currentCluster = null;
948 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
950 provider = getProvider(provider);
951 if (null == provider) {
952 int key = ((ResourceImpl)resource).id;
956 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
957 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
958 // if(key != queryProvider2.getRootLibrary())
959 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
962 // cluster.removeValue(key, clusterTranslator);
963 // } catch (DatabaseException e) {
964 // Logger.defaultLogError(e);
968 clusterTable.writeOnlyInvalidate(cluster);
971 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
972 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
973 clusterTranslator.removeValue(cluster);
974 } catch (DatabaseException e) {
975 Logger.defaultLogError(e);
978 queryProvider2.invalidateResource(key);
979 clientChanges.invalidate(key);
982 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
983 queryProvider2.updateValue(querySupport.getId(resource));
984 clientChanges.claimValue(resource);
991 public void flush(boolean intermediate) {
992 throw new UnsupportedOperationException();
996 public void flushCluster() {
997 clusterTable.flushCluster(graphSession);
998 if(defaultClusterSet != null) {
999 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
1001 currentCluster = null;
1005 public void flushCluster(Resource r) {
1006 throw new UnsupportedOperationException("flushCluster resource " + r);
1014 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1017 int s = ((ResourceImpl)subject).id;
1018 int p = ((ResourceImpl)predicate).id;
1019 int o = ((ResourceImpl)object).id;
1021 provider = getProvider(provider);
1022 if (null == provider) {
1024 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1025 clusterTable.writeOnlyInvalidate(cluster);
1029 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1030 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1031 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1033 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1034 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1036 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1037 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1038 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1039 clusterTranslator.removeStatement(cluster);
1041 queryProvider2.invalidateResource(s);
1042 clientChanges.invalidate(s);
1044 } catch (DatabaseException e) {
1046 Logger.defaultLogError(e);
1054 ((VirtualGraphImpl)provider).deny(s, p, o);
1055 queryProvider2.invalidateResource(s);
1056 clientChanges.invalidate(s);
1065 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1066 throw new UnsupportedOperationException();
1070 public boolean writeOnly() {
1075 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1076 writeSupport.performWriteRequest(graph, request);
1080 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1081 throw new UnsupportedOperationException();
1085 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1086 throw new UnsupportedOperationException();
1090 public <T> void addMetadata(Metadata data) throws ServiceException {
1091 writeSupport.addMetadata(data);
1095 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1096 return writeSupport.getMetadata(clazz);
1100 public TreeMap<String, byte[]> getMetadata() {
1101 return writeSupport.getMetadata();
1105 public void commitDone(WriteTraits writeTraits, long csid) {
1106 writeSupport.commitDone(writeTraits, csid);
1110 public void clearUndoList(WriteTraits writeTraits) {
1111 writeSupport.clearUndoList(writeTraits);
1114 public int clearMetadata() {
1115 return writeSupport.clearMetadata();
1119 public void startUndo() {
1120 writeSupport.startUndo();
1124 class VirtualWriteOnlySupport implements WriteSupport {
1127 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1128 // Resource object) {
1129 // throw new UnsupportedOperationException();
1133 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1135 TransientGraph impl = (TransientGraph)provider;
1136 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1137 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1138 clientChanges.claim(subject, predicate, object);
1143 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1145 TransientGraph impl = (TransientGraph)provider;
1146 impl.claim(subject, predicate, object);
1147 getQueryProvider2().updateStatements(subject, predicate);
1148 clientChanges.claim(subject, predicate, object);
1153 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1154 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1158 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1159 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1160 getQueryProvider2().updateValue(resource);
1161 clientChanges.claimValue(resource);
1165 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1166 byte[] value = reader.readBytes(null, amount);
1167 claimValue(provider, resource, value);
1171 public Resource createResource(VirtualGraph provider) {
1172 TransientGraph impl = (TransientGraph)provider;
1173 return impl.getResource(impl.newResource(false));
1177 public Resource createResource(VirtualGraph provider, long clusterId) {
1178 throw new UnsupportedOperationException();
1182 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1183 throws DatabaseException {
1184 throw new UnsupportedOperationException();
1188 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1189 throws DatabaseException {
1190 throw new UnsupportedOperationException();
1194 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1195 throws ServiceException {
1196 throw new UnsupportedOperationException();
1200 public Resource setDefaultClusterSet(Resource clusterSet)
1201 throws ServiceException {
1206 public void denyValue(VirtualGraph provider, Resource resource) {
1207 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1208 getQueryProvider2().updateValue(querySupport.getId(resource));
1209 // NOTE: this only keeps track of value changes by-resource.
1210 clientChanges.claimValue(resource);
1214 public void flush(boolean intermediate) {
1215 throw new UnsupportedOperationException();
1219 public void flushCluster() {
1220 throw new UnsupportedOperationException();
1224 public void flushCluster(Resource r) {
1225 throw new UnsupportedOperationException("Resource " + r);
1233 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1234 TransientGraph impl = (TransientGraph) provider;
1235 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1236 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1237 clientChanges.deny(subject, predicate, object);
1242 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1243 throw new UnsupportedOperationException();
1247 public boolean writeOnly() {
1252 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1253 throw new UnsupportedOperationException();
1257 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1258 throw new UnsupportedOperationException();
1262 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1263 throw new UnsupportedOperationException();
1267 public <T> void addMetadata(Metadata data) throws ServiceException {
1268 throw new UnsupportedOperationException();
1272 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1273 throw new UnsupportedOperationException();
1277 public TreeMap<String, byte[]> getMetadata() {
1278 throw new UnsupportedOperationException();
1282 public void commitDone(WriteTraits writeTraits, long csid) {
1286 public void clearUndoList(WriteTraits writeTraits) {
1290 public int clearMetadata() {
1295 public void startUndo() {
1299 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1303 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1305 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1308 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1312 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1314 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1316 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1317 writeState = writeStateT;
1319 assert (null != writer);
1320 // writer.state.barrier.inc();
1321 long start = System.nanoTime();
1322 T result = request.perform(writer);
1323 long duration = System.nanoTime() - start;
1325 System.err.println("################");
1326 System.err.println("WriteOnly duration " + 1e-9*duration);
1328 writeStateT.setResult(result);
1330 // This makes clusters available from server
1331 clusterStream.reallyFlush();
1333 // This will trigger query updates
1334 releaseWriteOnly(writer);
1335 assert (null != writer);
1337 handleUpdatesAndMetadata(writer);
1339 } catch (CancelTransactionException e) {
1341 releaseWriteOnly(writeState.getGraph());
1343 clusterTable.removeWriteOnlyClusters();
1344 state.stopWriteTransaction(clusterStream);
1346 } catch (Throwable e) {
1348 e.printStackTrace();
1350 releaseWriteOnly(writeState.getGraph());
1352 clusterTable.removeWriteOnlyClusters();
1354 if (callback != null)
1355 callback.exception(new DatabaseException(e));
1357 state.stopWriteTransaction(clusterStream);
1358 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1362 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1368 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
1369 scheduleRequest(request, callback, notify, null);
1373 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1377 assert (request != null);
1379 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1381 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1384 public void run(int thread) {
1386 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1390 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1393 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1397 VirtualGraph vg = getProvider(request.getProvider());
1398 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1400 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1402 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1405 public void execute(Object result) {
1406 if(callback != null) callback.run(null);
1410 public void exception(Throwable t) {
1411 if(callback != null) callback.run((DatabaseException)t);
1416 assert (null != writer);
1417 // writer.state.barrier.inc();
1421 request.perform(writer);
1423 } catch (Throwable e) {
1425 // writer.state.barrier.dec();
1426 // writer.waitAsync(null);
1428 releaseWriteOnly(writer);
1430 clusterTable.removeWriteOnlyClusters();
1432 if(!(e instanceof CancelTransactionException)) {
1433 if (callback != null)
1434 callback.run(new DatabaseException(e));
1437 writeState.except(e);
1443 // This makes clusters available from server
1444 boolean empty = clusterStream.reallyFlush();
1445 // This was needed to make WO requests call metadata listeners.
1446 // NOTE: the calling event does not contain clientChanges information.
1447 if (!empty && clientChanges.isEmpty())
1448 clientChanges.setNotEmpty(true);
1449 releaseWriteOnly(writer);
1450 assert (null != writer);
1453 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1466 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1467 scheduleRequest(request, callback, notify, null);
1471 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1474 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1476 assert (request != null);
1478 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1480 requestManager.scheduleWrite(new SessionTask(request, thread) {
1483 public void run(int thread) {
1485 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1487 performWriteOnly(request, notify, callback);
1497 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1499 assert (request != null);
1500 assert (procedure != null);
1502 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1504 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1507 public void run(int thread) {
1509 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1511 ListenerBase listener = getListenerBase(procedure);
1513 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1517 if (listener != null) {
1520 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1523 public void exception(AsyncReadGraph graph, Throwable t) {
1524 procedure.exception(graph, t);
1525 if(throwable != null) {
1528 // ErrorLogger.defaultLogError("Unhandled exception", t);
1533 public void execute(AsyncReadGraph graph, T t) {
1534 if(result != null) result.set(t);
1535 procedure.execute(graph, t);
1539 } catch (Throwable t) {
1540 // This is handled by the AsyncProcedure
1541 //Logger.defaultLogError("Internal error", t);
1548 // newGraph.state.barrier.inc();
1550 T t = request.perform(newGraph);
1554 if(result != null) result.set(t);
1555 procedure.execute(newGraph, t);
1557 } catch (Throwable th) {
1559 if(throwable != null) {
1562 Logger.defaultLogError("Unhandled exception", th);
1567 } catch (Throwable t) {
1570 t.printStackTrace();
1572 if(throwable != null) {
1575 Logger.defaultLogError("Unhandled exception", t);
1580 procedure.exception(newGraph, t);
1582 } catch (Throwable t2) {
1584 if(throwable != null) {
1587 Logger.defaultLogError("Unhandled exception", t2);
1594 // newGraph.state.barrier.dec();
1595 // newGraph.waitAsync(request);
1601 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1611 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1613 assert (request != null);
1614 assert (procedure != null);
1616 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1618 requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1621 public void run(int thread) {
1623 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1625 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1629 if (listener != null) {
1631 newGraph.processor.query(newGraph, request, null, procedure, listener);
1633 // newGraph.waitAsync(request);
1637 final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1638 procedure, "request");
1642 // newGraph.state.barrier.inc();
1644 request.perform(newGraph, wrapper);
1646 // newGraph.waitAsync(request);
1648 } catch (Throwable t) {
1650 wrapper.exception(newGraph, t);
1651 // newGraph.waitAsync(request);
1660 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1670 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1672 assert (request != null);
1673 assert (procedure != null);
1675 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1677 int sync = notify != null ? thread : -1;
1679 requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1682 public void run(int thread) {
1684 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1686 ListenerBase listener = getListenerBase(procedure);
1688 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1692 if (listener != null) {
1694 newGraph.processor.query(newGraph, request, null, procedure, listener);
1696 // newGraph.waitAsync(request);
1700 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1704 request.perform(newGraph, wrapper);
1706 } catch (Throwable t) {
1708 t.printStackTrace();
1716 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1726 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1728 assert (request != null);
1729 assert (procedure != null);
1731 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1733 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1736 public void run(int thread) {
1738 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1740 ListenerBase listener = getListenerBase(procedure);
1742 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1746 if (listener != null) {
1748 newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1751 public void exception(Throwable t) {
1752 procedure.exception(t);
1753 if(throwable != null) {
1759 public void execute(T t) {
1760 if(result != null) result.set(t);
1761 procedure.execute(t);
1766 // newGraph.waitAsync(request);
1770 // newGraph.state.barrier.inc();
1772 request.register(newGraph, new Listener<T>() {
1775 public void exception(Throwable t) {
1776 if(throwable != null) throwable.set(t);
1777 procedure.exception(t);
1778 // newGraph.state.barrier.dec();
1782 public void execute(T t) {
1783 if(result != null) result.set(t);
1784 procedure.execute(t);
1785 // newGraph.state.barrier.dec();
1789 public boolean isDisposed() {
1795 // newGraph.waitAsync(request);
1801 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1813 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1815 scheduleRequest(request, procedure, null, null, null);
1820 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1822 Semaphore notify = new Semaphore(0);
1823 DataContainer<Throwable> container = new DataContainer<Throwable>();
1824 DataContainer<T> result = new DataContainer<T>();
1825 scheduleRequest(request, procedure, notify, container, result);
1826 acquire(notify, request);
1827 Throwable throwable = container.get();
1828 if(throwable != null) {
1829 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1830 else throw new DatabaseException("Unexpected exception", throwable);
1832 return result.get();
1836 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1838 Semaphore notify = new Semaphore(0);
1839 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1840 final DataContainer<T> resultContainer = new DataContainer<T>();
1841 scheduleRequest(request, new AsyncProcedure<T>() {
1843 public void exception(AsyncReadGraph graph, Throwable throwable) {
1844 exceptionContainer.set(throwable);
1845 procedure.exception(graph, throwable);
1848 public void execute(AsyncReadGraph graph, T result) {
1849 resultContainer.set(result);
1850 procedure.execute(graph, result);
1852 }, getListenerBase(procedure), notify);
1853 acquire(notify, request, procedure);
1854 Throwable throwable = exceptionContainer.get();
1855 if (throwable != null) {
1856 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1857 else throw new DatabaseException("Unexpected exception", throwable);
1859 return resultContainer.get();
1864 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1866 Semaphore notify = new Semaphore(0);
1867 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1868 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1870 public void exception(AsyncReadGraph graph, Throwable throwable) {
1871 exceptionContainer.set(throwable);
1872 procedure.exception(graph, throwable);
1875 public void execute(AsyncReadGraph graph, T result) {
1876 procedure.execute(graph, result);
1879 public void finished(AsyncReadGraph graph) {
1880 procedure.finished(graph);
1883 acquire(notify, request, procedure);
1884 Throwable throwable = exceptionContainer.get();
1885 if (throwable != null) {
1886 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1887 else throw new DatabaseException("Unexpected exception", throwable);
1889 // TODO: implement return value
1890 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1895 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1896 return syncRequest(request, new ProcedureAdapter<T>());
1899 // assert(request != null);
1901 // final DataContainer<T> result = new DataContainer<T>();
1902 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1904 // syncRequest(request, new Procedure<T>() {
1907 // public void execute(T t) {
1912 // public void exception(Throwable t) {
1913 // exception.set(t);
1918 // Throwable t = exception.get();
1920 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1921 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1924 // return result.get();
1929 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1930 return syncRequest(request, (Procedure<T>)procedure);
1935 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1936 // assertNotSession();
1937 // Semaphore notify = new Semaphore(0);
1938 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1939 // DataContainer<T> result = new DataContainer<T>();
1940 // scheduleRequest(request, procedure, notify, container, result);
1941 // acquire(notify, request);
1942 // Throwable throwable = container.get();
1943 // if(throwable != null) {
1944 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1945 // else throw new DatabaseException("Unexpected exception", throwable);
1947 // return result.get();
1952 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1954 Semaphore notify = new Semaphore(0);
1955 final DataContainer<Throwable> container = new DataContainer<Throwable>();
1956 final DataContainer<T> result = new DataContainer<T>();
1957 scheduleRequest(request, procedure, notify, container, result);
1958 acquire(notify, request);
1959 Throwable throwable = container.get();
1960 if (throwable != null) {
1961 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1962 else throw new DatabaseException("Unexpected exception", throwable);
1964 return result.get();
1968 public void syncRequest(Write request) throws DatabaseException {
1971 Semaphore notify = new Semaphore(0);
1972 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1973 scheduleRequest(request, new Callback<DatabaseException>() {
1976 public void run(DatabaseException e) {
1981 acquire(notify, request);
1982 if(exception.get() != null) throw exception.get();
1986 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1988 Semaphore notify = new Semaphore(0);
1989 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1990 final DataContainer<T> result = new DataContainer<T>();
1991 scheduleRequest(request, new Procedure<T>() {
1994 public void exception(Throwable t) {
1999 public void execute(T t) {
2004 acquire(notify, request);
2005 if(exception.get() != null) {
2006 Throwable t = exception.get();
2007 if(t instanceof DatabaseException) throw (DatabaseException)t;
2008 else throw new DatabaseException(t);
2010 return result.get();
2014 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2016 Semaphore notify = new Semaphore(0);
2017 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2018 final DataContainer<T> result = new DataContainer<T>();
2019 scheduleRequest(request, new Procedure<T>() {
2022 public void exception(Throwable t) {
2027 public void execute(T t) {
2032 acquire(notify, request);
2033 if(exception.get() != null) {
2034 Throwable t = exception.get();
2035 if(t instanceof DatabaseException) throw (DatabaseException)t;
2036 else throw new DatabaseException(t);
2038 return result.get();
2042 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2044 Semaphore notify = new Semaphore(0);
2045 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2046 final DataContainer<T> result = new DataContainer<T>();
2047 scheduleRequest(request, new Procedure<T>() {
2050 public void exception(Throwable t) {
2055 public void execute(T t) {
2060 acquire(notify, request);
2061 if(exception.get() != null) {
2062 Throwable t = exception.get();
2063 if(t instanceof DatabaseException) throw (DatabaseException)t;
2064 else throw new DatabaseException(t);
2066 return result.get();
2070 public void syncRequest(DelayedWrite request) throws DatabaseException {
2072 Semaphore notify = new Semaphore(0);
2073 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2074 scheduleRequest(request, new Callback<DatabaseException>() {
2076 public void run(DatabaseException e) {
2080 acquire(notify, request);
2081 if(exception.get() != null) throw exception.get();
2085 public void syncRequest(WriteOnly request) throws DatabaseException {
2088 Semaphore notify = new Semaphore(0);
2089 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2090 scheduleRequest(request, new Callback<DatabaseException>() {
2092 public void run(DatabaseException e) {
2096 acquire(notify, request);
2097 if(exception.get() != null) throw exception.get();
2102 * GraphRequestProcessor interface
2106 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2108 scheduleRequest(request, procedure, null, null);
2113 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2115 scheduleRequest(request, procedure, null);
2120 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2122 scheduleRequest(request, procedure, null, null, null);
2127 public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
2129 scheduleRequest(request, callback, null);
2134 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2136 scheduleRequest(request, procedure, null);
2141 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2143 scheduleRequest(request, procedure, null);
2148 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2150 scheduleRequest(request, procedure, null);
2155 public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
2157 scheduleRequest(request, callback, null);
2162 public void asyncRequest(final Write r) {
2163 asyncRequest(r, null);
2167 public void asyncRequest(final DelayedWrite r) {
2168 asyncRequest(r, null);
2172 public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
2174 scheduleRequest(request, callback, null);
2179 public void asyncRequest(final WriteOnly request) {
2181 asyncRequest(request, null);
2186 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2187 r.request(this, procedure);
2191 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2192 r.request(this, procedure);
2196 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2197 r.request(this, procedure);
2201 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2202 r.request(this, procedure);
2206 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2207 r.request(this, procedure);
2211 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2212 r.request(this, procedure);
2216 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2217 return r.request(this);
2221 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2222 return r.request(this);
2226 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2227 r.request(this, procedure);
2231 public <T> void async(WriteInterface<T> r) {
2232 r.request(this, new ProcedureAdapter<T>());
2236 public void incAsync() {
2241 public void decAsync() {
2245 public long getCluster(ResourceImpl resource) {
2246 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2247 return cluster.getClusterId();
2250 public long getCluster(int id) {
2251 if (clusterTable == null)
2252 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2253 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2256 public ResourceImpl getResource(int id) {
2257 return new ResourceImpl(resourceSupport, id);
2260 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2261 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2262 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2263 int key = proxy.getClusterKey();
2264 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2265 return new ResourceImpl(resourceSupport, resourceKey);
2268 public ResourceImpl getResource2(int id) {
2270 return new ResourceImpl(resourceSupport, id);
2273 final public int getId(ResourceImpl impl) {
2277 public static final Charset UTF8 = Charset.forName("utf-8");
2282 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2283 for(TransientGraph g : support.providers) {
2284 if(g.isPending(subject)) return false;
2289 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2290 for(TransientGraph g : support.providers) {
2291 if(g.isPending(subject, predicate)) return false;
2296 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
2298 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2300 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2303 public void run(ReadGraphImpl graph) {
2304 if(ready.decrementAndGet() == 0) {
2305 runnable.run(graph);
2311 for(TransientGraph g : support.providers) {
2312 if(g.isPending(subject)) {
2314 g.load(graph, subject, composite);
2315 } catch (DatabaseException e) {
2316 e.printStackTrace();
2319 composite.run(graph);
2323 composite.run(graph);
2327 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
2329 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2331 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2334 public void run(ReadGraphImpl graph) {
2335 if(ready.decrementAndGet() == 0) {
2336 runnable.run(graph);
2342 for(TransientGraph g : support.providers) {
2343 if(g.isPending(subject, predicate)) {
2345 g.load(graph, subject, predicate, composite);
2346 } catch (DatabaseException e) {
2347 e.printStackTrace();
2350 composite.run(graph);
2354 composite.run(graph);
2358 // void dumpHeap() {
2361 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2362 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2363 // "d:/heap" + flushCounter + ".txt", true);
2364 // } catch (IOException e) {
2365 // e.printStackTrace();
2370 void fireReactionsToSynchronize(ChangeSet cs) {
2372 // Do not fire empty events
2376 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2377 // g.state.barrier.inc();
2381 if (!cs.isEmpty()) {
2382 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2383 for (ChangeListener l : changeListeners2) {
2386 } catch (Exception ex) {
2387 ex.printStackTrace();
2394 // g.state.barrier.dec();
2395 // g.waitAsync(null);
2401 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2404 // Do not fire empty events
2409 // graph.state.barrier.inc();
2413 if (!cs2.isEmpty()) {
2415 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2416 for (ChangeListener l : changeListeners2) {
2419 // System.out.println("changelistener " + l);
2420 } catch (Exception ex) {
2421 ex.printStackTrace();
2429 // graph.state.barrier.dec();
2430 // graph.waitAsync(null);
2433 } catch (Throwable t) {
2434 t.printStackTrace();
2438 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2442 // Do not fire empty events
2446 // Do not fire on virtual requests
2447 if(graph.getProvider() != null)
2450 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2454 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2455 for (ChangeListener l : metadataListeners) {
2458 } catch (Throwable ex) {
2459 ex.printStackTrace();
2467 } catch (Throwable t) {
2468 t.printStackTrace();
2477 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2480 public <T> T getService(Class<T> api) {
2481 T t = peekService(api);
2483 if (state.isClosed())
2484 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2485 throw new ServiceNotFoundException(this, api);
2493 protected abstract ServerInformation getCachedServerInformation();
2495 private Class<?> serviceKey1 = null;
2496 private Class<?> serviceKey2 = null;
2497 private Object service1 = null;
2498 private Object service2 = null;
2504 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2506 @SuppressWarnings("unchecked")
2508 public synchronized <T> T peekService(Class<T> api) {
2510 if(serviceKey1 == api) {
2512 } else if (serviceKey2 == api) {
2514 Object result = service2;
2515 service2 = service1;
2516 serviceKey2 = serviceKey1;
2522 if (Layer0.class == api)
2524 if (ServerInformation.class == api)
2525 return (T) getCachedServerInformation();
2526 else if (WriteGraphImpl.class == api)
2527 return (T) writeState.getGraph();
2528 else if (ClusterBuilder.class == api)
2529 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2530 else if (ClusterBuilderFactory.class == api)
2531 return (T)new ClusterBuilderFactoryImpl(this);
2533 service2 = service1;
2534 serviceKey2 = serviceKey1;
2536 service1 = serviceLocator.peekService(api);
2547 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2550 public boolean hasService(Class<?> api) {
2551 return serviceLocator.hasService(api);
2555 * @param api the api that must be implemented by the specified service
2556 * @param service the service implementation
2559 public <T> void registerService(Class<T> api, T service) {
2560 if(Layer0.class == api) {
2561 L0 = (Layer0)service;
2564 serviceLocator.registerService(api, service);
2565 if (TransactionPolicySupport.class == api) {
2566 transactionPolicy = (TransactionPolicySupport)service;
2567 state.resetTransactionPolicy();
2569 if (api == serviceKey1)
2571 else if (api == serviceKey2)
2579 void fireSessionVariableChange(String variable) {
2580 for (MonitorHandler h : monitorHandlers) {
2581 MonitorContext ctx = monitorContexts.getLeft(h);
2583 // SafeRunner functionality repeated here to avoid dependency.
2585 h.valuesChanged(ctx);
2586 } catch (Exception e) {
2587 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2588 } catch (LinkageError e) {
2589 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2595 class ResourceSerializerImpl implements ResourceSerializer {
2597 public long createRandomAccessId(int id) throws DatabaseException {
2600 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2601 long cluster = getCluster(id);
2603 return 0; // Better to return 0 then invalid id.
2604 long result = ClusterTraitsBase.createResourceId(cluster, index);
2609 public long getRandomAccessId(Resource resource) throws DatabaseException {
2611 ResourceImpl resourceImpl = (ResourceImpl) resource;
2612 return createRandomAccessId(resourceImpl.id);
2617 public int getTransientId(Resource resource) throws DatabaseException {
2619 ResourceImpl resourceImpl = (ResourceImpl) resource;
2620 return resourceImpl.id;
2625 public String createRandomAccessId(Resource resource)
2626 throws InvalidResourceReferenceException {
2628 if(resource == null) throw new IllegalArgumentException();
2630 ResourceImpl resourceImpl = (ResourceImpl) resource;
2632 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2636 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2637 } catch (DatabaseException e1) {
2638 throw new InvalidResourceReferenceException(e1);
2641 // Serialize as '<resource index>_<cluster id>'
2642 return "" + r + "_" + getCluster(resourceImpl);
2643 } catch (Throwable e) {
2644 e.printStackTrace();
2645 throw new InvalidResourceReferenceException(e);
2650 public int getTransientId(long serialized) throws DatabaseException {
2651 if (serialized <= 0)
2652 return (int)serialized;
2653 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2654 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2655 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2656 if (cluster == null)
2657 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2658 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2663 public Resource getResource(long randomAccessId) throws DatabaseException {
2664 return getResourceByKey(getTransientId(randomAccessId));
2668 public Resource getResource(int transientId) throws DatabaseException {
2669 return getResourceByKey(transientId);
2673 public Resource getResource(String randomAccessId)
2674 throws InvalidResourceReferenceException {
2676 int i = randomAccessId.indexOf('_');
2678 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2679 + randomAccessId + "'");
2680 int r = Integer.parseInt(randomAccessId.substring(0, i));
2681 if(r < 0) return getResourceByKey(r);
2682 long c = Long.parseLong(randomAccessId.substring(i + 1));
2683 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2684 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2685 if (cluster.hasResource(key, clusterTranslator))
2686 return getResourceByKey(key);
2687 } catch (InvalidResourceReferenceException e) {
2689 } catch (NumberFormatException e) {
2690 throw new InvalidResourceReferenceException(e);
2691 } catch (Throwable e) {
2692 e.printStackTrace();
2693 throw new InvalidResourceReferenceException(e);
2696 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2700 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2703 } catch (Throwable e) {
2704 e.printStackTrace();
2705 throw new InvalidResourceReferenceException(e);
2711 // Copied from old SessionImpl
2714 if (state.isClosed())
2715 throw new Error("Session closed.");
2720 public ResourceImpl getNewResource(long clusterId) {
2721 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2724 newId = cluster.createResource(clusterTranslator);
2725 } catch (DatabaseException e) {
2726 Logger.defaultLogError(e);
2729 return new ResourceImpl(resourceSupport, newId);
2732 public ResourceImpl getNewResource(Resource clusterSet)
2733 throws DatabaseException {
2734 long resourceId = clusterSet.getResourceId();
2735 Long clusterId = clusterSetsSupport.get(resourceId);
2736 if (null == clusterId)
2737 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2738 if (Constants.NewClusterId == clusterId) {
2739 clusterId = getService(ClusteringSupport.class).createCluster();
2740 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2741 createdClusters.add(clusterId);
2742 clusterSetsSupport.put(resourceId, clusterId);
2743 return getNewResource(clusterId);
2745 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2746 ResourceImpl result;
2747 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2748 clusterId = getService(ClusteringSupport.class).createCluster();
2749 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2750 createdClusters.add(clusterId);
2751 clusterSetsSupport.put(resourceId, clusterId);
2752 return getNewResource(clusterId);
2754 result = getNewResource(clusterId);
2755 int resultKey = querySupport.getId(result);
2756 long resultCluster = querySupport.getClusterId(resultKey);
2757 if (clusterId != resultCluster)
2758 clusterSetsSupport.put(resourceId, resultCluster);
2764 public void getNewClusterSet(Resource clusterSet)
2765 throws DatabaseException {
2767 System.out.println("new cluster set=" + clusterSet);
2768 long resourceId = clusterSet.getResourceId();
2769 if (clusterSetsSupport.containsKey(resourceId))
2770 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2771 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2773 public boolean containsClusterSet(Resource clusterSet)
2774 throws ServiceException {
2775 long resourceId = clusterSet.getResourceId();
2776 return clusterSetsSupport.containsKey(resourceId);
2778 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2779 Resource r = defaultClusterSet;
2780 defaultClusterSet = clusterSet;
2783 void printDiagnostics() {
2787 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2788 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2789 for(ClusterI cluster : clusterTable.getClusters()) {
2791 if (cluster.isLoaded() && !cluster.isEmpty()) {
2792 residentClusters.set(residentClusters.get() + 1);
2793 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2795 } catch (DatabaseException e) {
2796 Logger.defaultLogError(e);
2800 System.out.println("--------------------------------");
2801 System.out.println("Cluster information:");
2802 System.out.println("-amount of resident clusters=" + residentClusters.get());
2803 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2805 for(ClusterI cluster : clusterTable.getClusters()) {
2806 System.out.print("Cluster " + cluster.getClusterId() + " [");
2808 if (!cluster.isLoaded())
2809 System.out.println("not loaded]");
2810 else if (cluster.isEmpty())
2811 System.out.println("is empty]");
2813 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2814 System.out.print(",references=" + cluster.getReferenceCount());
2815 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2817 } catch (DatabaseException e) {
2818 Logger.defaultLogError(e);
2819 System.out.println("is corrupted]");
2823 queryProvider2.printDiagnostics();
2824 System.out.println("--------------------------------");
2829 public void fireStartReadTransaction() {
2832 System.out.println("StartReadTransaction");
2834 for (SessionEventListener listener : eventListeners) {
2836 listener.readTransactionStarted();
2837 } catch (Throwable t) {
2838 t.printStackTrace();
2844 public void fireFinishReadTransaction() {
2847 System.out.println("FinishReadTransaction");
2849 for (SessionEventListener listener : eventListeners) {
2851 listener.readTransactionFinished();
2852 } catch (Throwable t) {
2853 t.printStackTrace();
2859 public void fireStartWriteTransaction() {
2862 System.out.println("StartWriteTransaction");
2864 for (SessionEventListener listener : eventListeners) {
2866 listener.writeTransactionStarted();
2867 } catch (Throwable t) {
2868 t.printStackTrace();
2874 public void fireFinishWriteTransaction() {
2877 System.out.println("FinishWriteTransaction");
2879 for (SessionEventListener listener : eventListeners) {
2881 listener.writeTransactionFinished();
2882 } catch (Throwable t) {
2883 t.printStackTrace();
2887 Indexing.resetDependenciesIndexingDisabled();
2891 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2892 throw new Error("Not supported at the moment.");
2899 ClusterTable getClusterTable() {
2900 return clusterTable;
2903 public GraphSession getGraphSession() {
2904 return graphSession;
2907 public QueryProcessor getQueryProvider2() {
2908 return queryProvider2;
2911 ClientChangesImpl getClientChanges() {
2912 return clientChanges;
2915 boolean getWriteOnly() {
2919 static int counter = 0;
2921 public void onClusterLoaded(long clusterId) {
2923 clusterTable.updateSize();
2931 * Implementation of the interface RequestProcessor
2935 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2940 assert(request != null);
2942 final DataContainer<T> result = new DataContainer<T>();
2943 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2945 syncRequest(request, new AsyncProcedure<T>() {
2948 public void execute(AsyncReadGraph graph, T t) {
2953 public void exception(AsyncReadGraph graph, Throwable t) {
2959 Throwable t = exception.get();
2961 if(t instanceof DatabaseException) throw (DatabaseException)t;
2962 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2965 return result.get();
2970 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2971 // assertNotSession();
2972 // return syncRequest(request, (AsyncProcedure<T>)procedure);
2976 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2978 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2982 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2984 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2988 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2990 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2994 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2996 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3000 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3004 assert(request != null);
3006 final DataContainer<T> result = new DataContainer<T>();
3007 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3009 syncRequest(request, new AsyncProcedure<T>() {
3012 public void execute(AsyncReadGraph graph, T t) {
3017 public void exception(AsyncReadGraph graph, Throwable t) {
3023 Throwable t = exception.get();
3025 if(t instanceof DatabaseException) throw (DatabaseException)t;
3026 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3029 return result.get();
3034 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3036 return syncRequest(request, (AsyncProcedure<T>)procedure);
3040 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3042 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3046 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3048 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3052 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3054 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3058 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3060 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3064 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3068 assert(request != null);
3070 final ArrayList<T> result = new ArrayList<T>();
3071 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3073 syncRequest(request, new AsyncMultiProcedure<T>() {
3076 public void execute(AsyncReadGraph graph, T t) {
3077 synchronized(result) {
3083 public void finished(AsyncReadGraph graph) {
3087 public void exception(AsyncReadGraph graph, Throwable t) {
3094 Throwable t = exception.get();
3096 if(t instanceof DatabaseException) throw (DatabaseException)t;
3097 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3105 public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3107 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3111 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3113 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3117 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3119 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3123 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3125 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3129 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3131 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3135 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3139 assert(request != null);
3141 final ArrayList<T> result = new ArrayList<T>();
3142 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3144 syncRequest(request, new AsyncMultiProcedure<T>() {
3147 public void execute(AsyncReadGraph graph, T t) {
3148 synchronized(result) {
3154 public void finished(AsyncReadGraph graph) {
3158 public void exception(AsyncReadGraph graph, Throwable t) {
3165 Throwable t = exception.get();
3167 if(t instanceof DatabaseException) throw (DatabaseException)t;
3168 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3176 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3178 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3182 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3184 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3188 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3190 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3194 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3196 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3200 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3202 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3207 public <T> void asyncRequest(Read<T> request) {
3209 asyncRequest(request, new ProcedureAdapter<T>() {
3211 public void exception(Throwable t) {
3212 t.printStackTrace();
3219 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3220 asyncRequest(request, (AsyncProcedure<T>)procedure);
3224 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3225 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3229 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3230 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3234 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3235 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3239 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3240 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3244 final public <T> void asyncRequest(final AsyncRead<T> request) {
3246 assert(request != null);
3248 asyncRequest(request, new ProcedureAdapter<T>() {
3250 public void exception(Throwable t) {
3251 t.printStackTrace();
3258 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3259 scheduleRequest(request, procedure, procedure, null);
3263 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3264 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3268 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3269 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3273 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3274 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3278 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3279 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3283 public <T> void asyncRequest(MultiRead<T> request) {
3285 assert(request != null);
3287 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3289 public void exception(AsyncReadGraph graph, Throwable t) {
3290 t.printStackTrace();
3297 public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3298 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3302 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3303 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3307 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3308 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3312 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3313 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3317 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3318 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3322 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3324 assert(request != null);
3326 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3328 public void exception(AsyncReadGraph graph, Throwable t) {
3329 t.printStackTrace();
3336 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3337 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3341 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3342 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3346 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3347 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3351 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3352 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3356 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3357 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3361 public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3363 throw new Error("Not implemented!");
3367 public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3368 throw new Error("Not implemented!");
3372 final public <T> void asyncRequest(final ExternalRead<T> request) {
3374 assert(request != null);
3376 asyncRequest(request, new ProcedureAdapter<T>() {
3378 public void exception(Throwable t) {
3379 t.printStackTrace();
3386 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3387 asyncRequest(request, (Procedure<T>)procedure);
3392 void check(Throwable t) throws DatabaseException {
3394 if(t instanceof DatabaseException) throw (DatabaseException)t;
3395 else throw new DatabaseException("Unexpected exception", t);
3399 void check(DataContainer<Throwable> container) throws DatabaseException {
3400 Throwable t = container.get();
3402 if(t instanceof DatabaseException) throw (DatabaseException)t;
3403 else throw new DatabaseException("Unexpected exception", t);
3412 boolean sameProvider(Write request) {
3413 if(writeState.getGraph().provider != null) {
3414 return writeState.getGraph().provider.equals(request.getProvider());
3416 return request.getProvider() == null;
3420 boolean plainWrite(WriteGraphImpl graph) {
3421 if(graph == null) return false;
3422 if(graph.writeSupport.writeOnly()) return false;
3427 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3428 private void assertNotSession() throws DatabaseException {
3429 Thread current = Thread.currentThread();
3430 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3433 void assertAlive() {
3434 if (!state.isAlive())
3435 throw new RuntimeDatabaseException("Session has been shut down.");
3438 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3439 return querySupport.getValueStream(graph, querySupport.getId(resource));
3442 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3443 return querySupport.getValue(graph, querySupport.getId(resource));
3446 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3447 acquire(semaphore, request, null);
3450 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3454 // Loop until the semaphore is acquired reporting requests that take a long time.
3457 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3461 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3463 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3464 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3466 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3467 + (waitTime) + " ms. (procedure=" + procedure + ")");
3474 } catch (InterruptedException e) {
3475 e.printStackTrace();
3476 // FIXME: Should perhaps do something else in this case ??
3483 // public boolean holdOnToTransactionAfterCancel() {
3484 // return transactionPolicy.holdOnToTransactionAfterCancel();
3488 // public boolean holdOnToTransactionAfterCommit() {
3489 // return transactionPolicy.holdOnToTransactionAfterCommit();
3493 // public boolean holdOnToTransactionAfterRead() {
3494 // return transactionPolicy.holdOnToTransactionAfterRead();
3498 // public void onRelinquish() {
3499 // transactionPolicy.onRelinquish();
3503 // public void onRelinquishDone() {
3504 // transactionPolicy.onRelinquishDone();
3508 // public void onRelinquishError() {
3509 // transactionPolicy.onRelinquishError();
3514 public Session getSession() {
3519 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3520 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3522 public void ceased(int thread) {
3524 requestManager.ceased(thread);
3528 public int getAmountOfQueryThreads() {
3529 // This must be a power of two
3531 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3534 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3536 return new ResourceImpl(resourceSupport, key);
3540 public void acquireWriteOnly() {
3544 public void releaseWriteOnly(ReadGraphImpl graph) {
3546 queryProvider2.releaseWrite(graph);
3549 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3551 long start = System.nanoTime();
3553 while(dirtyPrimitives) {
3554 dirtyPrimitives = false;
3555 getQueryProvider2().performDirtyUpdates(writer);
3556 getQueryProvider2().performScheduledUpdates(writer);
3559 fireMetadataListeners(writer, clientChanges);
3561 if(DebugPolicy.PERFORMANCE_DATA) {
3562 long end = System.nanoTime();
3563 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3568 void removeTemporaryData() {
3569 File platform = Platform.getLocation().toFile();
3570 File tempFiles = new File(platform, "tempFiles");
3571 File temp = new File(tempFiles, "db");
3575 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3577 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3580 } catch (IOException e) {
3581 Logger.defaultLogError(e);
3583 return FileVisitResult.CONTINUE;
3586 } catch (IOException e) {
3587 Logger.defaultLogError(e);
3591 public void handleCreatedClusters() {
3592 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3593 createdClusters.forEach(new TLongProcedure() {
3596 public boolean execute(long value) {
3597 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3598 cluster.setImmutable(true, clusterTranslator);
3603 createdClusters.clear();
3607 public Object getModificationCounter() {
3608 return queryProvider2.modificationCounter;
3612 public void markUndoPoint() {
3613 state.setCombine(false);