1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
31 import org.eclipse.core.runtime.Platform;
32 import org.simantics.databoard.Bindings;
33 import org.simantics.db.AsyncReadGraph;
34 import org.simantics.db.ChangeSet;
35 import org.simantics.db.DevelopmentKeys;
36 import org.simantics.db.ExternalValueSupport;
37 import org.simantics.db.Metadata;
38 import org.simantics.db.MonitorContext;
39 import org.simantics.db.MonitorHandler;
40 import org.simantics.db.Resource;
41 import org.simantics.db.ResourceSerializer;
42 import org.simantics.db.Session;
43 import org.simantics.db.SessionManager;
44 import org.simantics.db.SessionVariables;
45 import org.simantics.db.VirtualGraph;
46 import org.simantics.db.WriteGraph;
47 import org.simantics.db.authentication.UserAuthenticationAgent;
48 import org.simantics.db.authentication.UserAuthenticator;
49 import org.simantics.db.common.Indexing;
50 import org.simantics.db.common.TransactionPolicyRelease;
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
63 import org.simantics.db.common.utils.Logger;
64 import org.simantics.db.event.ChangeEvent;
65 import org.simantics.db.event.ChangeListener;
66 import org.simantics.db.event.SessionEventListener;
67 import org.simantics.db.exception.CancelTransactionException;
68 import org.simantics.db.exception.ClusterSetExistException;
69 import org.simantics.db.exception.DatabaseException;
70 import org.simantics.db.exception.ImmutableException;
71 import org.simantics.db.exception.InvalidResourceReferenceException;
72 import org.simantics.db.exception.ResourceNotFoundException;
73 import org.simantics.db.exception.RuntimeDatabaseException;
74 import org.simantics.db.exception.ServiceException;
75 import org.simantics.db.exception.ServiceNotFoundException;
76 import org.simantics.db.impl.ClusterBase;
77 import org.simantics.db.impl.ClusterI;
78 import org.simantics.db.impl.ClusterTraitsBase;
79 import org.simantics.db.impl.ClusterTranslator;
80 import org.simantics.db.impl.ResourceImpl;
81 import org.simantics.db.impl.TransientGraph;
82 import org.simantics.db.impl.VirtualGraphImpl;
83 import org.simantics.db.impl.graph.DelayedWriteGraph;
84 import org.simantics.db.impl.graph.ReadGraphImpl;
85 import org.simantics.db.impl.graph.WriteGraphImpl;
86 import org.simantics.db.impl.graph.WriteSupport;
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
90 import org.simantics.db.impl.query.QueryProcessor;
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
93 import org.simantics.db.impl.service.QueryDebug;
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
96 import org.simantics.db.procedure.AsyncListener;
97 import org.simantics.db.procedure.AsyncMultiListener;
98 import org.simantics.db.procedure.AsyncMultiProcedure;
99 import org.simantics.db.procedure.AsyncProcedure;
100 import org.simantics.db.procedure.Listener;
101 import org.simantics.db.procedure.ListenerBase;
102 import org.simantics.db.procedure.MultiListener;
103 import org.simantics.db.procedure.MultiProcedure;
104 import org.simantics.db.procedure.Procedure;
105 import org.simantics.db.procedure.SyncListener;
106 import org.simantics.db.procedure.SyncMultiListener;
107 import org.simantics.db.procedure.SyncMultiProcedure;
108 import org.simantics.db.procedure.SyncProcedure;
109 import org.simantics.db.procore.cluster.ClusterImpl;
110 import org.simantics.db.procore.cluster.ClusterTraits;
111 import org.simantics.db.procore.protocol.Constants;
112 import org.simantics.db.procore.protocol.DebugPolicy;
113 import org.simantics.db.request.AsyncMultiRead;
114 import org.simantics.db.request.AsyncRead;
115 import org.simantics.db.request.DelayedWrite;
116 import org.simantics.db.request.DelayedWriteResult;
117 import org.simantics.db.request.ExternalRead;
118 import org.simantics.db.request.MultiRead;
119 import org.simantics.db.request.Read;
120 import org.simantics.db.request.ReadInterface;
121 import org.simantics.db.request.Write;
122 import org.simantics.db.request.WriteInterface;
123 import org.simantics.db.request.WriteOnly;
124 import org.simantics.db.request.WriteOnlyResult;
125 import org.simantics.db.request.WriteResult;
126 import org.simantics.db.request.WriteTraits;
127 import org.simantics.db.service.ByteReader;
128 import org.simantics.db.service.ClusterBuilder;
129 import org.simantics.db.service.ClusterBuilderFactory;
130 import org.simantics.db.service.ClusterControl;
131 import org.simantics.db.service.ClusterSetsSupport;
132 import org.simantics.db.service.ClusterUID;
133 import org.simantics.db.service.ClusteringSupport;
134 import org.simantics.db.service.CollectionSupport;
135 import org.simantics.db.service.DebugSupport;
136 import org.simantics.db.service.DirectQuerySupport;
137 import org.simantics.db.service.GraphChangeListenerSupport;
138 import org.simantics.db.service.InitSupport;
139 import org.simantics.db.service.LifecycleSupport;
140 import org.simantics.db.service.ManagementSupport;
141 import org.simantics.db.service.QueryControl;
142 import org.simantics.db.service.SerialisationSupport;
143 import org.simantics.db.service.ServerInformation;
144 import org.simantics.db.service.ServiceActivityMonitor;
145 import org.simantics.db.service.SessionEventSupport;
146 import org.simantics.db.service.SessionMonitorSupport;
147 import org.simantics.db.service.SessionUserSupport;
148 import org.simantics.db.service.StatementSupport;
149 import org.simantics.db.service.TransactionPolicySupport;
150 import org.simantics.db.service.TransactionSupport;
151 import org.simantics.db.service.TransferableGraphSupport;
152 import org.simantics.db.service.UndoRedoSupport;
153 import org.simantics.db.service.VirtualGraphSupport;
154 import org.simantics.db.service.XSupport;
155 import org.simantics.layer0.Layer0;
156 import org.simantics.utils.DataContainer;
157 import org.simantics.utils.Development;
158 import org.simantics.utils.datastructures.Callback;
159 import org.simantics.utils.threads.logger.ITask;
160 import org.simantics.utils.threads.logger.ThreadLogger;
162 import gnu.trove.procedure.TLongProcedure;
163 import gnu.trove.set.hash.TLongHashSet;
166 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
168 protected static final boolean DEBUG = false;
170 private static final boolean DIAGNOSTICS = false;
172 private TransactionPolicySupport transactionPolicy;
174 final private ClusterControl clusterControl;
176 final protected BuiltinSupportImpl builtinSupport;
177 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
178 final protected ClusterSetsSupport clusterSetsSupport;
179 final protected LifecycleSupportImpl lifecycleSupport;
181 protected QuerySupportImpl querySupport;
182 protected ResourceSupportImpl resourceSupport;
183 protected WriteSupport writeSupport;
184 public ClusterTranslator clusterTranslator;
186 boolean dirtyPrimitives = false;
188 public static final int SERVICE_MODE_CREATE = 2;
189 public static final int SERVICE_MODE_ALLOW = 1;
190 public int serviceMode = 0;
191 public boolean createdImmutableClusters = false;
192 public TLongHashSet createdClusters = new TLongHashSet();
197 * The service locator maintained by the workbench. These services are
198 * initialized during workbench during the <code>init</code> method.
200 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
202 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
204 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
206 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
208 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
210 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
212 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
214 final protected State state = new State();
216 protected GraphSession graphSession = null;
218 protected SessionManager sessionManagerImpl = null;
220 protected UserAuthenticationAgent authAgent = null;
222 protected UserAuthenticator authenticator = null;
224 protected Resource user = null;
226 protected ClusterStream clusterStream = null;
228 protected SessionRequestManager requestManager = null;
230 public ClusterTable clusterTable = null;
232 public QueryProcessor queryProvider2 = null;
234 ClientChangesImpl clientChanges = null;
236 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
238 // protected long newClusterId = Constants.NullClusterId;
240 protected int flushCounter = 0;
242 protected boolean writeOnly = false;
244 WriteState<?> writeState = null;
245 WriteStateBase<?> delayedWriteState = null;
246 protected Resource defaultClusterSet = null; // If not null then used for newResource().
248 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
250 // if (authAgent == null)
251 // throw new IllegalArgumentException("null authentication agent");
253 File t = StaticSessionProperties.virtualGraphStoragePath;
256 this.clusterTable = new ClusterTable(this, t);
257 this.builtinSupport = new BuiltinSupportImpl(this);
258 this.sessionManagerImpl = sessionManagerImpl;
260 // this.authAgent = authAgent;
262 serviceLocator.registerService(Session.class, this);
263 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
264 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
265 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
266 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
267 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
268 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
269 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
270 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
271 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
272 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
273 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
274 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
275 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
276 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
277 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
278 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
279 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
280 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
281 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
282 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
283 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
284 ServiceActivityUpdaterForWriteTransactions.register(this);
286 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
287 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
288 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
289 this.lifecycleSupport = new LifecycleSupportImpl(this);
290 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
291 this.transactionPolicy = new TransactionPolicyRelease();
292 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
293 this.clusterControl = new ClusterControlImpl(this);
294 serviceLocator.registerService(ClusterControl.class, clusterControl);
295 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
296 this.clusterSetsSupport.setReadDirectory(t.toPath());
297 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
298 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
308 public Resource getRootLibrary() {
309 return queryProvider2.getRootLibraryResource();
312 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
313 if (!graphSession.dbSession.refreshEnabled())
316 getClusterTable().refresh(csid, this, clusterUID);
317 } catch (Throwable t) {
318 Logger.defaultLogError("Refesh failed.", t);
322 static final class TaskHelper {
323 private final String name;
324 private Object result;
325 final Semaphore sema = new Semaphore(0);
326 private Throwable throwable = null;
327 final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
329 public void run(DatabaseException e) {
330 synchronized (TaskHelper.this) {
335 final Procedure<Object> proc = new Procedure<Object>() {
337 public void execute(Object result) {
341 public void exception(Throwable t) {
342 if (t instanceof DatabaseException)
343 callback.run((DatabaseException)t);
345 callback.run(new DatabaseException("" + name + "operation failed.", t));
348 final WriteTraits writeTraits = new WriteTraits() {};
349 TaskHelper(String name) {
355 void setResult(Object result) {
356 this.result = result;
358 // Throwable throwableGet() {
361 synchronized void throwableSet(Throwable t) {
364 // void throwableSet(String t) {
365 // throwable = new InternalException("" + name + " operation failed. " + t);
367 synchronized void throwableCheck()
368 throws DatabaseException {
369 if (null != throwable)
370 if (throwable instanceof DatabaseException)
371 throw (DatabaseException)throwable;
373 throw new DatabaseException("Undo operation failed.", throwable);
375 void throw_(String message)
376 throws DatabaseException {
377 throw new DatabaseException("" + name + " operation failed. " + message);
383 private ListenerBase getListenerBase(Object procedure) {
384 if (procedure instanceof ListenerBase)
385 return (ListenerBase) procedure;
390 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
391 scheduleRequest(request, callback, notify, null);
395 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
398 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
400 assert (request != null);
402 if(Development.DEVELOPMENT) {
404 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
405 System.err.println("schedule write '" + request + "'");
406 } catch (Throwable t) {
407 Logger.defaultLogError(t);
411 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
413 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
416 public void run(int thread) {
418 if(Development.DEVELOPMENT) {
420 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
421 System.err.println("perform write '" + request + "'");
422 } catch (Throwable t) {
423 Logger.defaultLogError(t);
427 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
429 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
434 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
436 VirtualGraph vg = getProvider(request.getProvider());
438 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
439 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
442 public void execute(Object result) {
443 if(callback != null) callback.run(null);
447 public void exception(Throwable t) {
448 if(callback != null) callback.run((DatabaseException)t);
453 assert (null != writer);
454 // writer.state.barrier.inc();
457 request.perform(writer);
458 assert (null != writer);
459 } catch (Throwable t) {
460 if (!(t instanceof CancelTransactionException))
461 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
462 writeState.except(t);
464 // writer.state.barrier.dec();
465 // writer.waitAsync(request);
469 assert(!queryProvider2.dirty);
471 } catch (Throwable e) {
473 // Log it first, just to be safe that the error is always logged.
474 if (!(e instanceof CancelTransactionException))
475 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
477 // writeState.getGraph().state.barrier.dec();
478 // writeState.getGraph().waitAsync(request);
480 writeState.except(e);
483 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
484 // // All we can do here is to log those, can't really pass them anywhere.
485 // if (callback != null) {
486 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
487 // else callback.run(new DatabaseException(e));
489 // } catch (Throwable e2) {
490 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
492 // boolean empty = clusterStream.reallyFlush();
493 // int callerThread = -1;
494 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
495 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
497 // // Send all local changes to server so it can calculate correct reverse change set.
498 // // This will call clusterStream.accept().
499 // state.cancelCommit(context, clusterStream);
501 // if (!context.isOk()) // this is a blocking operation
502 // throw new InternalException("Cancel failed. This should never happen. Contact application support.");
503 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
505 // state.cancelCommit2(context, clusterStream);
506 // } catch (DatabaseException e3) {
507 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
509 // clusterStream.setOff(false);
510 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
515 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
521 if(Development.DEVELOPMENT) {
523 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
524 System.err.println("finish write '" + request + "'");
525 } catch (Throwable t) {
526 Logger.defaultLogError(t);
536 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
537 scheduleRequest(request, procedure, notify, null);
541 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
544 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
546 assert (request != null);
548 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
550 requestManager.scheduleWrite(new SessionTask(request, thread) {
553 public void run(int thread) {
555 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
557 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
560 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
562 VirtualGraph vg = getProvider(request.getProvider());
563 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
566 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
567 writeState = writeStateT;
569 assert (null != writer);
570 // writer.state.barrier.inc();
571 writeStateT.setResult(request.perform(writer));
572 assert (null != writer);
574 // writer.state.barrier.dec();
575 // writer.waitAsync(null);
577 } catch (Throwable e) {
579 // writer.state.barrier.dec();
580 // writer.waitAsync(null);
582 writeState.except(e);
584 // state.stopWriteTransaction(clusterStream);
586 // } catch (Throwable e) {
587 // // Log it first, just to be safe that the error is always logged.
588 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
591 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
592 // // All we can do here is to log those, can't really pass them anywhere.
593 // if (procedure != null) {
594 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
595 // else procedure.exception(new DatabaseException(e));
597 // } catch (Throwable e2) {
598 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
601 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
603 // state.stopWriteTransaction(clusterStream);
606 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
609 // if(notify != null) notify.release();
619 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
620 scheduleRequest(request, callback, notify, null);
624 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
627 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
629 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
631 assert (request != null);
633 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
635 requestManager.scheduleWrite(new SessionTask(request, thread) {
638 public void run(int thread) {
639 fireSessionVariableChange(SessionVariables.QUEUED_READS);
641 Procedure<Object> stateProcedure = new Procedure<Object>() {
643 public void execute(Object result) {
644 if (callback != null)
648 public void exception(Throwable t) {
649 if (callback != null) {
650 if (t instanceof DatabaseException) callback.run((DatabaseException) t);
651 else callback.run(new DatabaseException(t));
653 Logger.defaultLogError("Unhandled exception", t);
657 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
658 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
659 DelayedWriteGraph dwg = null;
660 // newGraph.state.barrier.inc();
663 dwg = new DelayedWriteGraph(newGraph);
664 request.perform(dwg);
665 } catch (Throwable e) {
666 delayedWriteState.except(e);
671 // newGraph.state.barrier.dec();
672 // newGraph.waitAsync(request);
673 fireSessionVariableChange(SessionVariables.QUEUED_READS);
676 delayedWriteState = null;
678 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
679 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
682 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
686 VirtualGraph vg = getProvider(request.getProvider());
687 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
689 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
691 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
693 assert (null != writer);
694 // writer.state.barrier.inc();
698 dwg.commit(writer, request);
700 if(defaultClusterSet != null) {
701 XSupport xs = getService(XSupport.class);
702 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
703 ClusteringSupport cs = getService(ClusteringSupport.class);
704 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
707 // This makes clusters available from server
708 clusterStream.reallyFlush();
710 releaseWriteOnly(writer);
711 handleUpdatesAndMetadata(writer);
713 } catch (ServiceException e) {
714 // writer.state.barrier.dec();
715 // writer.waitAsync(null);
717 // These shall be requested from server
718 clusterTable.removeWriteOnlyClusters();
719 // This makes clusters available from server
720 clusterStream.reallyFlush();
722 releaseWriteOnly(writer);
723 writeState.except(e);
725 // Debugging & Profiling
726 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
736 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
737 scheduleRequest(request, procedure, notify, null);
741 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
744 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
745 throw new Error("Not implemented");
748 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
749 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
750 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
751 createdClusters.add(cluster.clusterId);
756 class WriteOnlySupport implements WriteSupport {
758 ClusterStream stream;
759 ClusterImpl currentCluster;
761 public WriteOnlySupport() {
762 this.stream = clusterStream;
766 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
767 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
771 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
773 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
775 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
776 if(s != queryProvider2.getRootLibrary())
777 throw new ImmutableException("Trying to modify immutable resource key=" + s);
780 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
781 } catch (DatabaseException e) {
782 Logger.defaultLogError(e);
786 clientChanges.invalidate(s);
788 if (cluster.isWriteOnly())
790 queryProvider2.updateStatements(s, p);
795 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
796 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
800 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
802 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
804 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
805 } catch (DatabaseException e) {
806 Logger.defaultLogError(e);
809 clientChanges.invalidate(rid);
811 if (cluster.isWriteOnly())
813 queryProvider2.updateValue(rid);
818 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
821 claimValue(provider,resource, reader.readBytes(null, amount));
825 byte[] bytes = new byte[65536];
827 int rid = ((ResourceImpl)resource).id;
828 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
832 int block = Math.min(left, 65536);
833 reader.readBytes(bytes, block);
834 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
837 } catch (DatabaseException e) {
838 Logger.defaultLogError(e);
841 clientChanges.invalidate(rid);
843 if (cluster.isWriteOnly())
845 queryProvider2.updateValue(rid);
849 private void maintainCluster(ClusterImpl before, ClusterI after_) {
850 if(after_ != null && after_ != before) {
851 ClusterImpl after = (ClusterImpl)after_;
852 if(currentCluster == before) currentCluster = after;
853 clusterTable.replaceCluster(after);
857 public int createResourceKey(int foreignCounter) throws DatabaseException {
858 if(currentCluster == null)
859 currentCluster = getNewResourceCluster();
860 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
861 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
862 newCluster.foreignLookup = new byte[foreignCounter];
863 currentCluster = newCluster;
865 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
867 return currentCluster.createResource(clusterTranslator);
871 public Resource createResource(VirtualGraph provider) throws DatabaseException {
872 if(currentCluster == null) {
873 if (null != defaultClusterSet) {
874 ResourceImpl result = getNewResource(defaultClusterSet);
875 currentCluster = clusterTable.getClusterByResourceKey(result.id);
878 currentCluster = getNewResourceCluster();
881 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
882 if (null != defaultClusterSet) {
883 ResourceImpl result = getNewResource(defaultClusterSet);
884 currentCluster = clusterTable.getClusterByResourceKey(result.id);
887 currentCluster = getNewResourceCluster();
890 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
894 public Resource createResource(VirtualGraph provider, long clusterId)
895 throws DatabaseException {
896 return getNewResource(clusterId);
900 public Resource createResource(VirtualGraph provider, Resource clusterSet)
901 throws DatabaseException {
902 return getNewResource(clusterSet);
906 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
907 throws DatabaseException {
908 getNewClusterSet(clusterSet);
912 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
913 throws ServiceException {
914 return containsClusterSet(clusterSet);
917 public void selectCluster(long cluster) {
918 currentCluster = clusterTable.getClusterByClusterId(cluster);
919 long setResourceId = clusterSetsSupport.getSet(cluster);
920 clusterSetsSupport.put(setResourceId, cluster);
924 public Resource setDefaultClusterSet(Resource clusterSet)
925 throws ServiceException {
926 Resource result = setDefaultClusterSet4NewResource(clusterSet);
927 if(clusterSet != null) {
928 long id = clusterSetsSupport.get(clusterSet.getResourceId());
929 currentCluster = clusterTable.getClusterByClusterId(id);
932 currentCluster = null;
938 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
940 provider = getProvider(provider);
941 if (null == provider) {
942 int key = ((ResourceImpl)resource).id;
946 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
947 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
948 // if(key != queryProvider2.getRootLibrary())
949 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
952 // cluster.removeValue(key, clusterTranslator);
953 // } catch (DatabaseException e) {
954 // Logger.defaultLogError(e);
958 clusterTable.writeOnlyInvalidate(cluster);
961 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
962 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
963 clusterTranslator.removeValue(cluster);
964 } catch (DatabaseException e) {
965 Logger.defaultLogError(e);
968 queryProvider2.invalidateResource(key);
969 clientChanges.invalidate(key);
972 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
973 queryProvider2.updateValue(querySupport.getId(resource));
974 clientChanges.claimValue(resource);
981 public void flush(boolean intermediate) {
982 throw new UnsupportedOperationException();
986 public void flushCluster() {
987 clusterTable.flushCluster(graphSession);
988 if(defaultClusterSet != null) {
989 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
991 currentCluster = null;
995 public void flushCluster(Resource r) {
996 throw new UnsupportedOperationException("flushCluster resource " + r);
1004 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1007 int s = ((ResourceImpl)subject).id;
1008 int p = ((ResourceImpl)predicate).id;
1009 int o = ((ResourceImpl)object).id;
1011 provider = getProvider(provider);
1012 if (null == provider) {
1014 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1015 clusterTable.writeOnlyInvalidate(cluster);
1019 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1020 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1021 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1023 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1024 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1026 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1027 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1028 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1029 clusterTranslator.removeStatement(cluster);
1031 queryProvider2.invalidateResource(s);
1032 clientChanges.invalidate(s);
1034 } catch (DatabaseException e) {
1036 Logger.defaultLogError(e);
1044 ((VirtualGraphImpl)provider).deny(s, p, o);
1045 queryProvider2.invalidateResource(s);
1046 clientChanges.invalidate(s);
1055 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1056 throw new UnsupportedOperationException();
1060 public boolean writeOnly() {
1065 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1066 writeSupport.performWriteRequest(graph, request);
1070 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1071 throw new UnsupportedOperationException();
1075 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1076 throw new UnsupportedOperationException();
1080 public <T> void addMetadata(Metadata data) throws ServiceException {
1081 writeSupport.addMetadata(data);
1085 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1086 return writeSupport.getMetadata(clazz);
1090 public TreeMap<String, byte[]> getMetadata() {
1091 return writeSupport.getMetadata();
1095 public void commitDone(WriteTraits writeTraits, long csid) {
1096 writeSupport.commitDone(writeTraits, csid);
1100 public void clearUndoList(WriteTraits writeTraits) {
1101 writeSupport.clearUndoList(writeTraits);
1104 public int clearMetadata() {
1105 return writeSupport.clearMetadata();
1109 public void startUndo() {
1110 writeSupport.startUndo();
1114 class VirtualWriteOnlySupport implements WriteSupport {
1117 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1118 // Resource object) {
1119 // throw new UnsupportedOperationException();
1123 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1125 TransientGraph impl = (TransientGraph)provider;
1126 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1127 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1128 clientChanges.claim(subject, predicate, object);
1133 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1135 TransientGraph impl = (TransientGraph)provider;
1136 impl.claim(subject, predicate, object);
1137 getQueryProvider2().updateStatements(subject, predicate);
1138 clientChanges.claim(subject, predicate, object);
1143 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1144 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1148 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1149 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1150 getQueryProvider2().updateValue(resource);
1151 clientChanges.claimValue(resource);
1155 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1156 byte[] value = reader.readBytes(null, amount);
1157 claimValue(provider, resource, value);
1161 public Resource createResource(VirtualGraph provider) {
1162 TransientGraph impl = (TransientGraph)provider;
1163 return impl.getResource(impl.newResource(false));
1167 public Resource createResource(VirtualGraph provider, long clusterId) {
1168 throw new UnsupportedOperationException();
1172 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1173 throws DatabaseException {
1174 throw new UnsupportedOperationException();
1178 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1179 throws DatabaseException {
1180 throw new UnsupportedOperationException();
1184 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1185 throws ServiceException {
1186 throw new UnsupportedOperationException();
1190 public Resource setDefaultClusterSet(Resource clusterSet)
1191 throws ServiceException {
1196 public void denyValue(VirtualGraph provider, Resource resource) {
1197 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1198 getQueryProvider2().updateValue(querySupport.getId(resource));
1199 // NOTE: this only keeps track of value changes by-resource.
1200 clientChanges.claimValue(resource);
1204 public void flush(boolean intermediate) {
1205 throw new UnsupportedOperationException();
1209 public void flushCluster() {
1210 throw new UnsupportedOperationException();
1214 public void flushCluster(Resource r) {
1215 throw new UnsupportedOperationException("Resource " + r);
1223 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1224 TransientGraph impl = (TransientGraph) provider;
1225 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1226 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1227 clientChanges.deny(subject, predicate, object);
1232 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1233 throw new UnsupportedOperationException();
1237 public boolean writeOnly() {
1242 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1243 throw new UnsupportedOperationException();
1247 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1248 throw new UnsupportedOperationException();
1252 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1253 throw new UnsupportedOperationException();
1257 public <T> void addMetadata(Metadata data) throws ServiceException {
1258 throw new UnsupportedOperationException();
1262 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1263 throw new UnsupportedOperationException();
1267 public TreeMap<String, byte[]> getMetadata() {
1268 throw new UnsupportedOperationException();
1272 public void commitDone(WriteTraits writeTraits, long csid) {
1276 public void clearUndoList(WriteTraits writeTraits) {
1280 public int clearMetadata() {
1285 public void startUndo() {
1289 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1293 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1295 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1298 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1302 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1304 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1306 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1307 writeState = writeStateT;
1309 assert (null != writer);
1310 // writer.state.barrier.inc();
1311 long start = System.nanoTime();
1312 T result = request.perform(writer);
1313 long duration = System.nanoTime() - start;
1315 System.err.println("################");
1316 System.err.println("WriteOnly duration " + 1e-9*duration);
1318 writeStateT.setResult(result);
1320 // This makes clusters available from server
1321 clusterStream.reallyFlush();
1323 // This will trigger query updates
1324 releaseWriteOnly(writer);
1325 assert (null != writer);
1327 handleUpdatesAndMetadata(writer);
1329 } catch (CancelTransactionException e) {
1331 releaseWriteOnly(writeState.getGraph());
1333 clusterTable.removeWriteOnlyClusters();
1334 state.stopWriteTransaction(clusterStream);
1336 } catch (Throwable e) {
1338 e.printStackTrace();
1340 releaseWriteOnly(writeState.getGraph());
1342 clusterTable.removeWriteOnlyClusters();
1344 if (callback != null)
1345 callback.exception(new DatabaseException(e));
1347 state.stopWriteTransaction(clusterStream);
1348 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1352 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1358 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
1359 scheduleRequest(request, callback, notify, null);
1363 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1367 assert (request != null);
1369 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1371 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1374 public void run(int thread) {
1376 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1380 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1383 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1387 VirtualGraph vg = getProvider(request.getProvider());
1388 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1390 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1392 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1395 public void execute(Object result) {
1396 if(callback != null) callback.run(null);
1400 public void exception(Throwable t) {
1401 if(callback != null) callback.run((DatabaseException)t);
1406 assert (null != writer);
1407 // writer.state.barrier.inc();
1411 request.perform(writer);
1413 } catch (Throwable e) {
1415 // writer.state.barrier.dec();
1416 // writer.waitAsync(null);
1418 releaseWriteOnly(writer);
1420 clusterTable.removeWriteOnlyClusters();
1422 if(!(e instanceof CancelTransactionException)) {
1423 if (callback != null)
1424 callback.run(new DatabaseException(e));
1427 writeState.except(e);
1433 // This makes clusters available from server
1434 boolean empty = clusterStream.reallyFlush();
1435 // This was needed to make WO requests call metadata listeners.
1436 // NOTE: the calling event does not contain clientChanges information.
1437 if (!empty && clientChanges.isEmpty())
1438 clientChanges.setNotEmpty(true);
1439 releaseWriteOnly(writer);
1440 assert (null != writer);
1443 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1456 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1457 scheduleRequest(request, callback, notify, null);
1461 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1464 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1466 assert (request != null);
1468 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1470 requestManager.scheduleWrite(new SessionTask(request, thread) {
1473 public void run(int thread) {
1475 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1477 performWriteOnly(request, notify, callback);
1487 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1489 assert (request != null);
1490 assert (procedure != null);
1492 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1494 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1497 public void run(int thread) {
1499 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1501 ListenerBase listener = getListenerBase(procedure);
1503 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1507 if (listener != null) {
1510 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1513 public void exception(AsyncReadGraph graph, Throwable t) {
1514 procedure.exception(graph, t);
1515 if(throwable != null) {
1518 // ErrorLogger.defaultLogError("Unhandled exception", t);
1523 public void execute(AsyncReadGraph graph, T t) {
1524 if(result != null) result.set(t);
1525 procedure.execute(graph, t);
1529 } catch (Throwable t) {
1530 // This is handled by the AsyncProcedure
1531 //Logger.defaultLogError("Internal error", t);
1538 // newGraph.state.barrier.inc();
1540 T t = request.perform(newGraph);
1544 if(result != null) result.set(t);
1545 procedure.execute(newGraph, t);
1547 } catch (Throwable th) {
1549 if(throwable != null) {
1552 Logger.defaultLogError("Unhandled exception", th);
1557 } catch (Throwable t) {
1560 t.printStackTrace();
1562 if(throwable != null) {
1565 Logger.defaultLogError("Unhandled exception", t);
1570 procedure.exception(newGraph, t);
1572 } catch (Throwable t2) {
1574 if(throwable != null) {
1577 Logger.defaultLogError("Unhandled exception", t2);
1584 // newGraph.state.barrier.dec();
1585 // newGraph.waitAsync(request);
1591 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1601 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1603 assert (request != null);
1604 assert (procedure != null);
1606 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1608 requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1611 public void run(int thread) {
1613 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1615 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1619 if (listener != null) {
1621 newGraph.processor.query(newGraph, request, null, procedure, listener);
1623 // newGraph.waitAsync(request);
1627 final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1628 procedure, "request");
1632 // newGraph.state.barrier.inc();
1634 request.perform(newGraph, wrapper);
1636 // newGraph.waitAsync(request);
1638 } catch (Throwable t) {
1640 wrapper.exception(newGraph, t);
1641 // newGraph.waitAsync(request);
1650 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1660 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1662 assert (request != null);
1663 assert (procedure != null);
1665 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1667 int sync = notify != null ? thread : -1;
1669 requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1672 public void run(int thread) {
1674 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1676 ListenerBase listener = getListenerBase(procedure);
1678 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1682 if (listener != null) {
1684 newGraph.processor.query(newGraph, request, null, procedure, listener);
1686 // newGraph.waitAsync(request);
1690 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1694 request.perform(newGraph, wrapper);
1696 } catch (Throwable t) {
1698 t.printStackTrace();
1706 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1716 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1718 assert (request != null);
1719 assert (procedure != null);
1721 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1723 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1726 public void run(int thread) {
1728 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1730 ListenerBase listener = getListenerBase(procedure);
1732 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1736 if (listener != null) {
1738 newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1741 public void exception(Throwable t) {
1742 procedure.exception(t);
1743 if(throwable != null) {
1749 public void execute(T t) {
1750 if(result != null) result.set(t);
1751 procedure.execute(t);
1756 // newGraph.waitAsync(request);
1760 // newGraph.state.barrier.inc();
1762 request.register(newGraph, new Listener<T>() {
1765 public void exception(Throwable t) {
1766 if(throwable != null) throwable.set(t);
1767 procedure.exception(t);
1768 // newGraph.state.barrier.dec();
1772 public void execute(T t) {
1773 if(result != null) result.set(t);
1774 procedure.execute(t);
1775 // newGraph.state.barrier.dec();
1779 public boolean isDisposed() {
1785 // newGraph.waitAsync(request);
1791 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1803 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1805 scheduleRequest(request, procedure, null, null, null);
1810 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1812 Semaphore notify = new Semaphore(0);
1813 DataContainer<Throwable> container = new DataContainer<Throwable>();
1814 DataContainer<T> result = new DataContainer<T>();
1815 scheduleRequest(request, procedure, notify, container, result);
1816 acquire(notify, request);
1817 Throwable throwable = container.get();
1818 if(throwable != null) {
1819 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1820 else throw new DatabaseException("Unexpected exception", throwable);
1822 return result.get();
1826 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1828 Semaphore notify = new Semaphore(0);
1829 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1830 final DataContainer<T> resultContainer = new DataContainer<T>();
1831 scheduleRequest(request, new AsyncProcedure<T>() {
1833 public void exception(AsyncReadGraph graph, Throwable throwable) {
1834 exceptionContainer.set(throwable);
1835 procedure.exception(graph, throwable);
1838 public void execute(AsyncReadGraph graph, T result) {
1839 resultContainer.set(result);
1840 procedure.execute(graph, result);
1842 }, getListenerBase(procedure), notify);
1843 acquire(notify, request, procedure);
1844 Throwable throwable = exceptionContainer.get();
1845 if (throwable != null) {
1846 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1847 else throw new DatabaseException("Unexpected exception", throwable);
1849 return resultContainer.get();
1854 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1856 Semaphore notify = new Semaphore(0);
1857 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1858 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1860 public void exception(AsyncReadGraph graph, Throwable throwable) {
1861 exceptionContainer.set(throwable);
1862 procedure.exception(graph, throwable);
1865 public void execute(AsyncReadGraph graph, T result) {
1866 procedure.execute(graph, result);
1869 public void finished(AsyncReadGraph graph) {
1870 procedure.finished(graph);
1873 acquire(notify, request, procedure);
1874 Throwable throwable = exceptionContainer.get();
1875 if (throwable != null) {
1876 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1877 else throw new DatabaseException("Unexpected exception", throwable);
1879 // TODO: implement return value
1880 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1885 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1886 return syncRequest(request, new ProcedureAdapter<T>());
1889 // assert(request != null);
1891 // final DataContainer<T> result = new DataContainer<T>();
1892 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1894 // syncRequest(request, new Procedure<T>() {
1897 // public void execute(T t) {
1902 // public void exception(Throwable t) {
1903 // exception.set(t);
1908 // Throwable t = exception.get();
1910 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1911 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1914 // return result.get();
1919 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1920 return syncRequest(request, (Procedure<T>)procedure);
1925 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1926 // assertNotSession();
1927 // Semaphore notify = new Semaphore(0);
1928 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1929 // DataContainer<T> result = new DataContainer<T>();
1930 // scheduleRequest(request, procedure, notify, container, result);
1931 // acquire(notify, request);
1932 // Throwable throwable = container.get();
1933 // if(throwable != null) {
1934 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1935 // else throw new DatabaseException("Unexpected exception", throwable);
1937 // return result.get();
1942 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1944 Semaphore notify = new Semaphore(0);
1945 final DataContainer<Throwable> container = new DataContainer<Throwable>();
1946 final DataContainer<T> result = new DataContainer<T>();
1947 scheduleRequest(request, procedure, notify, container, result);
1948 acquire(notify, request);
1949 Throwable throwable = container.get();
1950 if (throwable != null) {
1951 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1952 else throw new DatabaseException("Unexpected exception", throwable);
1954 return result.get();
1958 public void syncRequest(Write request) throws DatabaseException {
1961 Semaphore notify = new Semaphore(0);
1962 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1963 scheduleRequest(request, new Callback<DatabaseException>() {
1966 public void run(DatabaseException e) {
1971 acquire(notify, request);
1972 if(exception.get() != null) throw exception.get();
1976 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1978 Semaphore notify = new Semaphore(0);
1979 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1980 final DataContainer<T> result = new DataContainer<T>();
1981 scheduleRequest(request, new Procedure<T>() {
1984 public void exception(Throwable t) {
1989 public void execute(T t) {
1994 acquire(notify, request);
1995 if(exception.get() != null) {
1996 Throwable t = exception.get();
1997 if(t instanceof DatabaseException) throw (DatabaseException)t;
1998 else throw new DatabaseException(t);
2000 return result.get();
2004 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2006 Semaphore notify = new Semaphore(0);
2007 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2008 final DataContainer<T> result = new DataContainer<T>();
2009 scheduleRequest(request, new Procedure<T>() {
2012 public void exception(Throwable t) {
2017 public void execute(T t) {
2022 acquire(notify, request);
2023 if(exception.get() != null) {
2024 Throwable t = exception.get();
2025 if(t instanceof DatabaseException) throw (DatabaseException)t;
2026 else throw new DatabaseException(t);
2028 return result.get();
2032 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2034 Semaphore notify = new Semaphore(0);
2035 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2036 final DataContainer<T> result = new DataContainer<T>();
2037 scheduleRequest(request, new Procedure<T>() {
2040 public void exception(Throwable t) {
2045 public void execute(T t) {
2050 acquire(notify, request);
2051 if(exception.get() != null) {
2052 Throwable t = exception.get();
2053 if(t instanceof DatabaseException) throw (DatabaseException)t;
2054 else throw new DatabaseException(t);
2056 return result.get();
2060 public void syncRequest(DelayedWrite request) throws DatabaseException {
2062 Semaphore notify = new Semaphore(0);
2063 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2064 scheduleRequest(request, new Callback<DatabaseException>() {
2066 public void run(DatabaseException e) {
2070 acquire(notify, request);
2071 if(exception.get() != null) throw exception.get();
2075 public void syncRequest(WriteOnly request) throws DatabaseException {
2078 Semaphore notify = new Semaphore(0);
2079 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2080 scheduleRequest(request, new Callback<DatabaseException>() {
2082 public void run(DatabaseException e) {
2086 acquire(notify, request);
2087 if(exception.get() != null) throw exception.get();
2092 * GraphRequestProcessor interface
2096 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2098 scheduleRequest(request, procedure, null, null);
2103 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2105 scheduleRequest(request, procedure, null);
2110 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2112 scheduleRequest(request, procedure, null, null, null);
2117 public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
2119 scheduleRequest(request, callback, null);
2124 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2126 scheduleRequest(request, procedure, null);
2131 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2133 scheduleRequest(request, procedure, null);
2138 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2140 scheduleRequest(request, procedure, null);
2145 public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
2147 scheduleRequest(request, callback, null);
2152 public void asyncRequest(final Write r) {
2153 asyncRequest(r, null);
2157 public void asyncRequest(final DelayedWrite r) {
2158 asyncRequest(r, null);
2162 public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
2164 scheduleRequest(request, callback, null);
2169 public void asyncRequest(final WriteOnly request) {
2171 asyncRequest(request, null);
2176 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2177 r.request(this, procedure);
2181 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2182 r.request(this, procedure);
2186 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2187 r.request(this, procedure);
2191 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2192 r.request(this, procedure);
2196 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2197 r.request(this, procedure);
2201 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2202 r.request(this, procedure);
2206 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2207 return r.request(this);
2211 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2212 return r.request(this);
2216 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2217 r.request(this, procedure);
2221 public <T> void async(WriteInterface<T> r) {
2222 r.request(this, new ProcedureAdapter<T>());
2226 public void incAsync() {
2231 public void decAsync() {
2235 public long getCluster(ResourceImpl resource) {
2236 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2237 return cluster.getClusterId();
2240 public long getCluster(int id) {
2241 if (clusterTable == null)
2242 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2243 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2246 public ResourceImpl getResource(int id) {
2247 return new ResourceImpl(resourceSupport, id);
2250 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2251 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2252 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2253 int key = proxy.getClusterKey();
2254 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2255 return new ResourceImpl(resourceSupport, resourceKey);
2258 public ResourceImpl getResource2(int id) {
2260 return new ResourceImpl(resourceSupport, id);
2263 final public int getId(ResourceImpl impl) {
2267 public static final Charset UTF8 = Charset.forName("utf-8");
2272 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2273 for(TransientGraph g : support.providers) {
2274 if(g.isPending(subject)) return false;
2279 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2280 for(TransientGraph g : support.providers) {
2281 if(g.isPending(subject, predicate)) return false;
2286 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
2288 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2290 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2293 public void run(ReadGraphImpl graph) {
2294 if(ready.decrementAndGet() == 0) {
2295 runnable.run(graph);
2301 for(TransientGraph g : support.providers) {
2302 if(g.isPending(subject)) {
2304 g.load(graph, subject, composite);
2305 } catch (DatabaseException e) {
2306 e.printStackTrace();
2309 composite.run(graph);
2313 composite.run(graph);
2317 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
2319 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2321 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2324 public void run(ReadGraphImpl graph) {
2325 if(ready.decrementAndGet() == 0) {
2326 runnable.run(graph);
2332 for(TransientGraph g : support.providers) {
2333 if(g.isPending(subject, predicate)) {
2335 g.load(graph, subject, predicate, composite);
2336 } catch (DatabaseException e) {
2337 e.printStackTrace();
2340 composite.run(graph);
2344 composite.run(graph);
2348 // void dumpHeap() {
2351 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2352 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2353 // "d:/heap" + flushCounter + ".txt", true);
2354 // } catch (IOException e) {
2355 // e.printStackTrace();
2360 void fireReactionsToSynchronize(ChangeSet cs) {
2362 // Do not fire empty events
2366 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2367 // g.state.barrier.inc();
2371 if (!cs.isEmpty()) {
2372 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2373 for (ChangeListener l : changeListeners2) {
2376 } catch (Exception ex) {
2377 ex.printStackTrace();
2384 // g.state.barrier.dec();
2385 // g.waitAsync(null);
2391 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2394 // Do not fire empty events
2399 // graph.state.barrier.inc();
2403 if (!cs2.isEmpty()) {
2405 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2406 for (ChangeListener l : changeListeners2) {
2409 // System.out.println("changelistener " + l);
2410 } catch (Exception ex) {
2411 ex.printStackTrace();
2419 // graph.state.barrier.dec();
2420 // graph.waitAsync(null);
2423 } catch (Throwable t) {
2424 t.printStackTrace();
2428 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2432 // Do not fire empty events
2436 // Do not fire on virtual requests
2437 if(graph.getProvider() != null)
2440 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2444 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2445 for (ChangeListener l : metadataListeners) {
2448 } catch (Throwable ex) {
2449 ex.printStackTrace();
2457 } catch (Throwable t) {
2458 t.printStackTrace();
2467 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2470 public <T> T getService(Class<T> api) {
2471 T t = peekService(api);
2473 if (state.isClosed())
2474 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2475 throw new ServiceNotFoundException(this, api);
2483 protected abstract ServerInformation getCachedServerInformation();
2485 private Class<?> serviceKey1 = null;
2486 private Class<?> serviceKey2 = null;
2487 private Object service1 = null;
2488 private Object service2 = null;
2494 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2496 @SuppressWarnings("unchecked")
2498 public synchronized <T> T peekService(Class<T> api) {
2500 if(serviceKey1 == api) {
2502 } else if (serviceKey2 == api) {
2504 Object result = service2;
2505 service2 = service1;
2506 serviceKey2 = serviceKey1;
2512 if (Layer0.class == api)
2514 if (ServerInformation.class == api)
2515 return (T) getCachedServerInformation();
2516 else if (WriteGraphImpl.class == api)
2517 return (T) writeState.getGraph();
2518 else if (ClusterBuilder.class == api)
2519 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2520 else if (ClusterBuilderFactory.class == api)
2521 return (T)new ClusterBuilderFactoryImpl(this);
2523 service2 = service1;
2524 serviceKey2 = serviceKey1;
2526 service1 = serviceLocator.peekService(api);
2537 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2540 public boolean hasService(Class<?> api) {
2541 return serviceLocator.hasService(api);
2545 * @param api the api that must be implemented by the specified service
2546 * @param service the service implementation
2549 public <T> void registerService(Class<T> api, T service) {
2550 if(Layer0.class == api) {
2551 L0 = (Layer0)service;
2554 serviceLocator.registerService(api, service);
2555 if (TransactionPolicySupport.class == api) {
2556 transactionPolicy = (TransactionPolicySupport)service;
2557 state.resetTransactionPolicy();
2559 if (api == serviceKey1)
2561 else if (api == serviceKey2)
2569 void fireSessionVariableChange(String variable) {
2570 for (MonitorHandler h : monitorHandlers) {
2571 MonitorContext ctx = monitorContexts.getLeft(h);
2573 // SafeRunner functionality repeated here to avoid dependency.
2575 h.valuesChanged(ctx);
2576 } catch (Exception e) {
2577 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2578 } catch (LinkageError e) {
2579 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2585 class ResourceSerializerImpl implements ResourceSerializer {
2587 public long createRandomAccessId(int id) throws DatabaseException {
2590 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2591 long cluster = getCluster(id);
2593 return 0; // Better to return 0 then invalid id.
2594 long result = ClusterTraitsBase.createResourceId(cluster, index);
2599 public long getRandomAccessId(Resource resource) throws DatabaseException {
2601 ResourceImpl resourceImpl = (ResourceImpl) resource;
2602 return createRandomAccessId(resourceImpl.id);
2607 public int getTransientId(Resource resource) throws DatabaseException {
2609 ResourceImpl resourceImpl = (ResourceImpl) resource;
2610 return resourceImpl.id;
2615 public String createRandomAccessId(Resource resource)
2616 throws InvalidResourceReferenceException {
2618 if(resource == null) throw new IllegalArgumentException();
2620 ResourceImpl resourceImpl = (ResourceImpl) resource;
2622 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2626 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2627 } catch (DatabaseException e1) {
2628 throw new InvalidResourceReferenceException(e1);
2631 // Serialize as '<resource index>_<cluster id>'
2632 return "" + r + "_" + getCluster(resourceImpl);
2633 } catch (Throwable e) {
2634 e.printStackTrace();
2635 throw new InvalidResourceReferenceException(e);
2640 public int getTransientId(long serialized) throws DatabaseException {
2641 if (serialized <= 0)
2642 return (int)serialized;
2643 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2644 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2645 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2646 if (cluster == null)
2647 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2648 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2653 public Resource getResource(long randomAccessId) throws DatabaseException {
2654 return getResourceByKey(getTransientId(randomAccessId));
2658 public Resource getResource(int transientId) throws DatabaseException {
2659 return getResourceByKey(transientId);
2663 public Resource getResource(String randomAccessId)
2664 throws InvalidResourceReferenceException {
2666 int i = randomAccessId.indexOf('_');
2668 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2669 + randomAccessId + "'");
2670 int r = Integer.parseInt(randomAccessId.substring(0, i));
2671 if(r < 0) return getResourceByKey(r);
2672 long c = Long.parseLong(randomAccessId.substring(i + 1));
2673 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2674 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2675 if (cluster.hasResource(key, clusterTranslator))
2676 return getResourceByKey(key);
2677 } catch (InvalidResourceReferenceException e) {
2679 } catch (NumberFormatException e) {
2680 throw new InvalidResourceReferenceException(e);
2681 } catch (Throwable e) {
2682 e.printStackTrace();
2683 throw new InvalidResourceReferenceException(e);
2686 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2690 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2693 } catch (Throwable e) {
2694 e.printStackTrace();
2695 throw new InvalidResourceReferenceException(e);
2701 // Copied from old SessionImpl
2704 if (state.isClosed())
2705 throw new Error("Session closed.");
2710 public ResourceImpl getNewResource(long clusterId) {
2711 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2714 newId = cluster.createResource(clusterTranslator);
2715 } catch (DatabaseException e) {
2716 Logger.defaultLogError(e);
2719 return new ResourceImpl(resourceSupport, newId);
2722 public ResourceImpl getNewResource(Resource clusterSet)
2723 throws DatabaseException {
2724 long resourceId = clusterSet.getResourceId();
2725 Long clusterId = clusterSetsSupport.get(resourceId);
2726 if (null == clusterId)
2727 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2728 if (Constants.NewClusterId == clusterId) {
2729 clusterId = getService(ClusteringSupport.class).createCluster();
2730 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2731 createdClusters.add(clusterId);
2732 clusterSetsSupport.put(resourceId, clusterId);
2733 return getNewResource(clusterId);
2735 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2736 ResourceImpl result;
2737 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2738 clusterId = getService(ClusteringSupport.class).createCluster();
2739 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2740 createdClusters.add(clusterId);
2741 clusterSetsSupport.put(resourceId, clusterId);
2742 return getNewResource(clusterId);
2744 result = getNewResource(clusterId);
2745 int resultKey = querySupport.getId(result);
2746 long resultCluster = querySupport.getClusterId(resultKey);
2747 if (clusterId != resultCluster)
2748 clusterSetsSupport.put(resourceId, resultCluster);
2754 public void getNewClusterSet(Resource clusterSet)
2755 throws DatabaseException {
2757 System.out.println("new cluster set=" + clusterSet);
2758 long resourceId = clusterSet.getResourceId();
2759 if (clusterSetsSupport.containsKey(resourceId))
2760 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2761 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2763 public boolean containsClusterSet(Resource clusterSet)
2764 throws ServiceException {
2765 long resourceId = clusterSet.getResourceId();
2766 return clusterSetsSupport.containsKey(resourceId);
2768 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2769 Resource r = defaultClusterSet;
2770 defaultClusterSet = clusterSet;
2773 void printDiagnostics() {
2777 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2778 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2779 for(ClusterI cluster : clusterTable.getClusters()) {
2781 if (cluster.isLoaded() && !cluster.isEmpty()) {
2782 residentClusters.set(residentClusters.get() + 1);
2783 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2785 } catch (DatabaseException e) {
2786 Logger.defaultLogError(e);
2790 System.out.println("--------------------------------");
2791 System.out.println("Cluster information:");
2792 System.out.println("-amount of resident clusters=" + residentClusters.get());
2793 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2795 for(ClusterI cluster : clusterTable.getClusters()) {
2796 System.out.print("Cluster " + cluster.getClusterId() + " [");
2798 if (!cluster.isLoaded())
2799 System.out.println("not loaded]");
2800 else if (cluster.isEmpty())
2801 System.out.println("is empty]");
2803 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2804 System.out.print(",references=" + cluster.getReferenceCount());
2805 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2807 } catch (DatabaseException e) {
2808 Logger.defaultLogError(e);
2809 System.out.println("is corrupted]");
2813 queryProvider2.printDiagnostics();
2814 System.out.println("--------------------------------");
2819 public void fireStartReadTransaction() {
2822 System.out.println("StartReadTransaction");
2824 for (SessionEventListener listener : eventListeners) {
2826 listener.readTransactionStarted();
2827 } catch (Throwable t) {
2828 t.printStackTrace();
2834 public void fireFinishReadTransaction() {
2837 System.out.println("FinishReadTransaction");
2839 for (SessionEventListener listener : eventListeners) {
2841 listener.readTransactionFinished();
2842 } catch (Throwable t) {
2843 t.printStackTrace();
2849 public void fireStartWriteTransaction() {
2852 System.out.println("StartWriteTransaction");
2854 for (SessionEventListener listener : eventListeners) {
2856 listener.writeTransactionStarted();
2857 } catch (Throwable t) {
2858 t.printStackTrace();
2864 public void fireFinishWriteTransaction() {
2867 System.out.println("FinishWriteTransaction");
2869 for (SessionEventListener listener : eventListeners) {
2871 listener.writeTransactionFinished();
2872 } catch (Throwable t) {
2873 t.printStackTrace();
2877 Indexing.resetDependenciesIndexingDisabled();
2881 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2882 throw new Error("Not supported at the moment.");
2889 ClusterTable getClusterTable() {
2890 return clusterTable;
2893 public GraphSession getGraphSession() {
2894 return graphSession;
2897 public QueryProcessor getQueryProvider2() {
2898 return queryProvider2;
2901 ClientChangesImpl getClientChanges() {
2902 return clientChanges;
2905 boolean getWriteOnly() {
2909 static int counter = 0;
2911 public void onClusterLoaded(long clusterId) {
2913 clusterTable.updateSize();
2921 * Implementation of the interface RequestProcessor
2925 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2930 assert(request != null);
2932 final DataContainer<T> result = new DataContainer<T>();
2933 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2935 syncRequest(request, new AsyncProcedure<T>() {
2938 public void execute(AsyncReadGraph graph, T t) {
2943 public void exception(AsyncReadGraph graph, Throwable t) {
2949 Throwable t = exception.get();
2951 if(t instanceof DatabaseException) throw (DatabaseException)t;
2952 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2955 return result.get();
2960 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2961 // assertNotSession();
2962 // return syncRequest(request, (AsyncProcedure<T>)procedure);
2966 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2968 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2972 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2974 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2978 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2980 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2984 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2986 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2990 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2994 assert(request != null);
2996 final DataContainer<T> result = new DataContainer<T>();
2997 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2999 syncRequest(request, new AsyncProcedure<T>() {
3002 public void execute(AsyncReadGraph graph, T t) {
3007 public void exception(AsyncReadGraph graph, Throwable t) {
3013 Throwable t = exception.get();
3015 if(t instanceof DatabaseException) throw (DatabaseException)t;
3016 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3019 return result.get();
3024 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3026 return syncRequest(request, (AsyncProcedure<T>)procedure);
3030 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3032 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3036 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3038 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3042 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3044 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3048 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3050 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3054 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3058 assert(request != null);
3060 final ArrayList<T> result = new ArrayList<T>();
3061 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3063 syncRequest(request, new AsyncMultiProcedure<T>() {
3066 public void execute(AsyncReadGraph graph, T t) {
3067 synchronized(result) {
3073 public void finished(AsyncReadGraph graph) {
3077 public void exception(AsyncReadGraph graph, Throwable t) {
3084 Throwable t = exception.get();
3086 if(t instanceof DatabaseException) throw (DatabaseException)t;
3087 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3095 public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3097 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3101 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3103 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3107 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3109 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3113 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3115 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3119 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3121 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3125 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3129 assert(request != null);
3131 final ArrayList<T> result = new ArrayList<T>();
3132 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3134 syncRequest(request, new AsyncMultiProcedure<T>() {
3137 public void execute(AsyncReadGraph graph, T t) {
3138 synchronized(result) {
3144 public void finished(AsyncReadGraph graph) {
3148 public void exception(AsyncReadGraph graph, Throwable t) {
3155 Throwable t = exception.get();
3157 if(t instanceof DatabaseException) throw (DatabaseException)t;
3158 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3166 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3168 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3172 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3174 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3178 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3180 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3184 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3186 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3190 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3192 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3197 public <T> void asyncRequest(Read<T> request) {
3199 asyncRequest(request, new ProcedureAdapter<T>() {
3201 public void exception(Throwable t) {
3202 t.printStackTrace();
3209 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3210 asyncRequest(request, (AsyncProcedure<T>)procedure);
3214 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3215 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3219 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3220 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3224 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3225 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3229 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3230 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3234 final public <T> void asyncRequest(final AsyncRead<T> request) {
3236 assert(request != null);
3238 asyncRequest(request, new ProcedureAdapter<T>() {
3240 public void exception(Throwable t) {
3241 t.printStackTrace();
3248 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3249 scheduleRequest(request, procedure, procedure, null);
3253 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3254 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3258 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3259 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3263 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3264 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3268 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3269 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3273 public <T> void asyncRequest(MultiRead<T> request) {
3275 assert(request != null);
3277 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3279 public void exception(AsyncReadGraph graph, Throwable t) {
3280 t.printStackTrace();
3287 public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3288 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3292 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3293 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3297 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3298 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3302 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3303 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3307 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3308 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3312 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3314 assert(request != null);
3316 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3318 public void exception(AsyncReadGraph graph, Throwable t) {
3319 t.printStackTrace();
3326 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3327 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3331 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3332 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3336 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3337 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3341 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3342 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3346 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3347 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3351 public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3353 throw new Error("Not implemented!");
3357 public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3358 throw new Error("Not implemented!");
3362 final public <T> void asyncRequest(final ExternalRead<T> request) {
3364 assert(request != null);
3366 asyncRequest(request, new ProcedureAdapter<T>() {
3368 public void exception(Throwable t) {
3369 t.printStackTrace();
3376 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3377 asyncRequest(request, (Procedure<T>)procedure);
3382 void check(Throwable t) throws DatabaseException {
3384 if(t instanceof DatabaseException) throw (DatabaseException)t;
3385 else throw new DatabaseException("Unexpected exception", t);
3389 void check(DataContainer<Throwable> container) throws DatabaseException {
3390 Throwable t = container.get();
3392 if(t instanceof DatabaseException) throw (DatabaseException)t;
3393 else throw new DatabaseException("Unexpected exception", t);
3402 boolean sameProvider(Write request) {
3403 if(writeState.getGraph().provider != null) {
3404 return writeState.getGraph().provider.equals(request.getProvider());
3406 return request.getProvider() == null;
3410 boolean plainWrite(WriteGraphImpl graph) {
3411 if(graph == null) return false;
3412 if(graph.writeSupport.writeOnly()) return false;
3417 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3418 private void assertNotSession() throws DatabaseException {
3419 Thread current = Thread.currentThread();
3420 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3423 void assertAlive() {
3424 if (!state.isAlive())
3425 throw new RuntimeDatabaseException("Session has been shut down.");
3428 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3429 return querySupport.getValueStream(graph, querySupport.getId(resource));
3432 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3433 return querySupport.getValue(graph, querySupport.getId(resource));
3436 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3437 acquire(semaphore, request, null);
3440 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3444 // Loop until the semaphore is acquired reporting requests that take a long time.
3447 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3451 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3453 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3454 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3456 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3457 + (waitTime) + " ms. (procedure=" + procedure + ")");
3464 } catch (InterruptedException e) {
3465 e.printStackTrace();
3466 // FIXME: Should perhaps do something else in this case ??
3473 // public boolean holdOnToTransactionAfterCancel() {
3474 // return transactionPolicy.holdOnToTransactionAfterCancel();
3478 // public boolean holdOnToTransactionAfterCommit() {
3479 // return transactionPolicy.holdOnToTransactionAfterCommit();
3483 // public boolean holdOnToTransactionAfterRead() {
3484 // return transactionPolicy.holdOnToTransactionAfterRead();
3488 // public void onRelinquish() {
3489 // transactionPolicy.onRelinquish();
3493 // public void onRelinquishDone() {
3494 // transactionPolicy.onRelinquishDone();
3498 // public void onRelinquishError() {
3499 // transactionPolicy.onRelinquishError();
3504 public Session getSession() {
3509 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3510 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3512 public void ceased(int thread) {
3514 requestManager.ceased(thread);
3518 public int getAmountOfQueryThreads() {
3519 // This must be a power of two
3521 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3524 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3526 return new ResourceImpl(resourceSupport, key);
3530 public void acquireWriteOnly() {
3534 public void releaseWriteOnly(ReadGraphImpl graph) {
3536 queryProvider2.releaseWrite(graph);
3539 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3541 long start = System.nanoTime();
3543 while(dirtyPrimitives) {
3544 dirtyPrimitives = false;
3545 getQueryProvider2().performDirtyUpdates(writer);
3546 getQueryProvider2().performScheduledUpdates(writer);
3549 fireMetadataListeners(writer, clientChanges);
3551 if(DebugPolicy.PERFORMANCE_DATA) {
3552 long end = System.nanoTime();
3553 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3558 void removeTemporaryData() {
3559 File platform = Platform.getLocation().toFile();
3560 File tempFiles = new File(platform, "tempFiles");
3561 File temp = new File(tempFiles, "db");
3565 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3567 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3570 } catch (IOException e) {
3571 Logger.defaultLogError(e);
3573 return FileVisitResult.CONTINUE;
3576 } catch (IOException e) {
3577 Logger.defaultLogError(e);
3581 public void handleCreatedClusters() {
3582 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3583 createdClusters.forEach(new TLongProcedure() {
3586 public boolean execute(long value) {
3587 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3588 cluster.setImmutable(true, clusterTranslator);
3593 createdClusters.clear();
3597 public Object getModificationCounter() {
3598 return queryProvider2.modificationCounter;
3602 public void markUndoPoint() {
3603 state.setCombine(false);