1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
31 import org.eclipse.core.runtime.Platform;
32 import org.simantics.databoard.Bindings;
33 import org.simantics.db.AsyncReadGraph;
34 import org.simantics.db.ChangeSet;
35 import org.simantics.db.DevelopmentKeys;
36 import org.simantics.db.ExternalValueSupport;
37 import org.simantics.db.Metadata;
38 import org.simantics.db.MonitorContext;
39 import org.simantics.db.MonitorHandler;
40 import org.simantics.db.Resource;
41 import org.simantics.db.ResourceSerializer;
42 import org.simantics.db.Session;
43 import org.simantics.db.SessionManager;
44 import org.simantics.db.SessionVariables;
45 import org.simantics.db.VirtualGraph;
46 import org.simantics.db.WriteGraph;
47 import org.simantics.db.authentication.UserAuthenticationAgent;
48 import org.simantics.db.authentication.UserAuthenticator;
49 import org.simantics.db.common.Indexing;
50 import org.simantics.db.common.TransactionPolicyRelease;
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
63 import org.simantics.db.common.utils.Logger;
64 import org.simantics.db.event.ChangeEvent;
65 import org.simantics.db.event.ChangeListener;
66 import org.simantics.db.event.SessionEventListener;
67 import org.simantics.db.exception.CancelTransactionException;
68 import org.simantics.db.exception.ClusterSetExistException;
69 import org.simantics.db.exception.DatabaseException;
70 import org.simantics.db.exception.ImmutableException;
71 import org.simantics.db.exception.InvalidResourceReferenceException;
72 import org.simantics.db.exception.ResourceNotFoundException;
73 import org.simantics.db.exception.RuntimeDatabaseException;
74 import org.simantics.db.exception.ServiceException;
75 import org.simantics.db.exception.ServiceNotFoundException;
76 import org.simantics.db.impl.ClusterBase;
77 import org.simantics.db.impl.ClusterI;
78 import org.simantics.db.impl.ClusterTraitsBase;
79 import org.simantics.db.impl.ClusterTranslator;
80 import org.simantics.db.impl.ResourceImpl;
81 import org.simantics.db.impl.TransientGraph;
82 import org.simantics.db.impl.VirtualGraphImpl;
83 import org.simantics.db.impl.graph.DelayedWriteGraph;
84 import org.simantics.db.impl.graph.ReadGraphImpl;
85 import org.simantics.db.impl.graph.WriteGraphImpl;
86 import org.simantics.db.impl.graph.WriteSupport;
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
90 import org.simantics.db.impl.query.QueryProcessor;
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
93 import org.simantics.db.impl.service.QueryDebug;
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
96 import org.simantics.db.procedure.AsyncListener;
97 import org.simantics.db.procedure.AsyncMultiListener;
98 import org.simantics.db.procedure.AsyncMultiProcedure;
99 import org.simantics.db.procedure.AsyncProcedure;
100 import org.simantics.db.procedure.Listener;
101 import org.simantics.db.procedure.ListenerBase;
102 import org.simantics.db.procedure.MultiListener;
103 import org.simantics.db.procedure.MultiProcedure;
104 import org.simantics.db.procedure.Procedure;
105 import org.simantics.db.procedure.SyncListener;
106 import org.simantics.db.procedure.SyncMultiListener;
107 import org.simantics.db.procedure.SyncMultiProcedure;
108 import org.simantics.db.procedure.SyncProcedure;
109 import org.simantics.db.procore.cluster.ClusterImpl;
110 import org.simantics.db.procore.cluster.ClusterTraits;
111 import org.simantics.db.procore.protocol.Constants;
112 import org.simantics.db.procore.protocol.DebugPolicy;
113 import org.simantics.db.request.AsyncMultiRead;
114 import org.simantics.db.request.AsyncRead;
115 import org.simantics.db.request.DelayedWrite;
116 import org.simantics.db.request.DelayedWriteResult;
117 import org.simantics.db.request.ExternalRead;
118 import org.simantics.db.request.MultiRead;
119 import org.simantics.db.request.Read;
120 import org.simantics.db.request.ReadInterface;
121 import org.simantics.db.request.Write;
122 import org.simantics.db.request.WriteInterface;
123 import org.simantics.db.request.WriteOnly;
124 import org.simantics.db.request.WriteOnlyResult;
125 import org.simantics.db.request.WriteResult;
126 import org.simantics.db.request.WriteTraits;
127 import org.simantics.db.service.ByteReader;
128 import org.simantics.db.service.ClusterBuilder;
129 import org.simantics.db.service.ClusterBuilderFactory;
130 import org.simantics.db.service.ClusterControl;
131 import org.simantics.db.service.ClusterSetsSupport;
132 import org.simantics.db.service.ClusterUID;
133 import org.simantics.db.service.ClusteringSupport;
134 import org.simantics.db.service.CollectionSupport;
135 import org.simantics.db.service.DebugSupport;
136 import org.simantics.db.service.DirectQuerySupport;
137 import org.simantics.db.service.GraphChangeListenerSupport;
138 import org.simantics.db.service.InitSupport;
139 import org.simantics.db.service.LifecycleSupport;
140 import org.simantics.db.service.ManagementSupport;
141 import org.simantics.db.service.QueryControl;
142 import org.simantics.db.service.SerialisationSupport;
143 import org.simantics.db.service.ServerInformation;
144 import org.simantics.db.service.ServiceActivityMonitor;
145 import org.simantics.db.service.SessionEventSupport;
146 import org.simantics.db.service.SessionMonitorSupport;
147 import org.simantics.db.service.SessionUserSupport;
148 import org.simantics.db.service.StatementSupport;
149 import org.simantics.db.service.TransactionPolicySupport;
150 import org.simantics.db.service.TransactionSupport;
151 import org.simantics.db.service.TransferableGraphSupport;
152 import org.simantics.db.service.UndoRedoSupport;
153 import org.simantics.db.service.VirtualGraphSupport;
154 import org.simantics.db.service.XSupport;
155 import org.simantics.layer0.Layer0;
156 import org.simantics.utils.DataContainer;
157 import org.simantics.utils.Development;
158 import org.simantics.utils.datastructures.Callback;
159 import org.simantics.utils.threads.logger.ITask;
160 import org.simantics.utils.threads.logger.ThreadLogger;
162 import gnu.trove.procedure.TLongProcedure;
163 import gnu.trove.set.hash.TLongHashSet;
166 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
168 protected static final boolean DEBUG = false;
170 private static final boolean DIAGNOSTICS = false;
172 private TransactionPolicySupport transactionPolicy;
174 final private ClusterControl clusterControl;
176 final protected BuiltinSupportImpl builtinSupport;
177 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
178 final protected ClusterSetsSupport clusterSetsSupport;
179 final protected LifecycleSupportImpl lifecycleSupport;
181 protected QuerySupportImpl querySupport;
182 protected ResourceSupportImpl resourceSupport;
183 protected WriteSupport writeSupport;
184 public ClusterTranslator clusterTranslator;
186 boolean dirtyPrimitives = false;
188 public static final int SERVICE_MODE_CREATE = 2;
189 public static final int SERVICE_MODE_ALLOW = 1;
190 public int serviceMode = 0;
191 public boolean createdImmutableClusters = false;
192 public TLongHashSet createdClusters = new TLongHashSet();
197 * The service locator maintained by the workbench. These services are
198 * initialized during workbench during the <code>init</code> method.
200 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
202 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
204 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
206 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
208 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
210 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
212 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
214 final protected State state = new State();
216 protected GraphSession graphSession = null;
218 protected SessionManager sessionManagerImpl = null;
220 protected UserAuthenticationAgent authAgent = null;
222 protected UserAuthenticator authenticator = null;
224 protected Resource user = null;
226 protected ClusterStream clusterStream = null;
228 protected SessionRequestManager requestManager = null;
230 public ClusterTable clusterTable = null;
232 public QueryProcessor queryProvider2 = null;
234 ClientChangesImpl clientChanges = null;
236 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
238 // protected long newClusterId = Constants.NullClusterId;
240 protected int flushCounter = 0;
242 protected boolean writeOnly = false;
244 WriteState<?> writeState = null;
245 WriteStateBase<?> delayedWriteState = null;
246 protected Resource defaultClusterSet = null; // If not null then used for newResource().
248 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
250 // if (authAgent == null)
251 // throw new IllegalArgumentException("null authentication agent");
253 File t = StaticSessionProperties.virtualGraphStoragePath;
256 this.clusterTable = new ClusterTable(this, t);
257 this.builtinSupport = new BuiltinSupportImpl(this);
258 this.sessionManagerImpl = sessionManagerImpl;
260 // this.authAgent = authAgent;
262 serviceLocator.registerService(Session.class, this);
263 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
264 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
265 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
266 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
267 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
268 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
269 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
270 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
271 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
272 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
273 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
274 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
275 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
276 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
277 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
278 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
279 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
280 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
281 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
282 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
283 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
284 ServiceActivityUpdaterForWriteTransactions.register(this);
286 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
287 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
288 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
289 this.lifecycleSupport = new LifecycleSupportImpl(this);
290 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
291 this.transactionPolicy = new TransactionPolicyRelease();
292 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
293 this.clusterControl = new ClusterControlImpl(this);
294 serviceLocator.registerService(ClusterControl.class, clusterControl);
295 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
296 this.clusterSetsSupport.setReadDirectory(t.toPath());
297 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
298 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
308 public Resource getRootLibrary() {
309 return queryProvider2.getRootLibraryResource();
312 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
313 if (!graphSession.dbSession.refreshEnabled())
316 getClusterTable().refresh(csid, this, clusterUID);
317 } catch (Throwable t) {
318 Logger.defaultLogError("Refesh failed.", t);
322 static final class TaskHelper {
323 private final String name;
324 private Object result;
325 final Semaphore sema = new Semaphore(0);
326 private Throwable throwable = null;
327 final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
329 public void run(DatabaseException e) {
330 synchronized (TaskHelper.this) {
335 final Procedure<Object> proc = new Procedure<Object>() {
337 public void execute(Object result) {
341 public void exception(Throwable t) {
342 if (t instanceof DatabaseException)
343 callback.run((DatabaseException)t);
345 callback.run(new DatabaseException("" + name + "operation failed.", t));
348 final WriteTraits writeTraits = new WriteTraits() {};
349 TaskHelper(String name) {
355 void setResult(Object result) {
356 this.result = result;
358 // Throwable throwableGet() {
361 synchronized void throwableSet(Throwable t) {
364 // void throwableSet(String t) {
365 // throwable = new InternalException("" + name + " operation failed. " + t);
367 synchronized void throwableCheck()
368 throws DatabaseException {
369 if (null != throwable)
370 if (throwable instanceof DatabaseException)
371 throw (DatabaseException)throwable;
373 throw new DatabaseException("Undo operation failed.", throwable);
375 void throw_(String message)
376 throws DatabaseException {
377 throw new DatabaseException("" + name + " operation failed. " + message);
383 private ListenerBase getListenerBase(Object procedure) {
384 if (procedure instanceof ListenerBase)
385 return (ListenerBase) procedure;
390 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
391 scheduleRequest(request, callback, notify, null);
395 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
398 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
400 assert (request != null);
402 if(Development.DEVELOPMENT) {
404 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
405 System.err.println("schedule write '" + request + "'");
406 } catch (Throwable t) {
407 Logger.defaultLogError(t);
411 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
413 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
416 public void run(int thread) {
418 if(Development.DEVELOPMENT) {
420 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
421 System.err.println("perform write '" + request + "'");
422 } catch (Throwable t) {
423 Logger.defaultLogError(t);
427 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
429 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
434 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
436 VirtualGraph vg = getProvider(request.getProvider());
438 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
439 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
442 public void execute(Object result) {
443 if(callback != null) callback.run(null);
447 public void exception(Throwable t) {
448 if(callback != null) callback.run((DatabaseException)t);
453 assert (null != writer);
454 // writer.state.barrier.inc();
457 request.perform(writer);
458 assert (null != writer);
459 } catch (Throwable t) {
460 if (!(t instanceof CancelTransactionException))
461 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
462 writeState.except(t);
464 // writer.state.barrier.dec();
465 // writer.waitAsync(request);
469 assert(!queryProvider2.dirty);
471 } catch (Throwable e) {
473 // Log it first, just to be safe that the error is always logged.
474 if (!(e instanceof CancelTransactionException))
475 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
477 // writeState.getGraph().state.barrier.dec();
478 // writeState.getGraph().waitAsync(request);
480 writeState.except(e);
483 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
484 // // All we can do here is to log those, can't really pass them anywhere.
485 // if (callback != null) {
486 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
487 // else callback.run(new DatabaseException(e));
489 // } catch (Throwable e2) {
490 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
492 // boolean empty = clusterStream.reallyFlush();
493 // int callerThread = -1;
494 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
495 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
497 // // Send all local changes to server so it can calculate correct reverse change set.
498 // // This will call clusterStream.accept().
499 // state.cancelCommit(context, clusterStream);
501 // if (!context.isOk()) // this is a blocking operation
502 // throw new InternalException("Cancel failed. This should never happen. Contact application support.");
503 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
505 // state.cancelCommit2(context, clusterStream);
506 // } catch (DatabaseException e3) {
507 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
509 // clusterStream.setOff(false);
510 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
515 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
521 if(Development.DEVELOPMENT) {
523 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
524 System.err.println("finish write '" + request + "'");
525 } catch (Throwable t) {
526 Logger.defaultLogError(t);
536 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
537 scheduleRequest(request, procedure, notify, null);
541 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
544 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
546 assert (request != null);
548 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
550 requestManager.scheduleWrite(new SessionTask(request, thread) {
553 public void run(int thread) {
555 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
557 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
560 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
562 VirtualGraph vg = getProvider(request.getProvider());
563 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
566 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
567 writeState = writeStateT;
569 assert (null != writer);
570 // writer.state.barrier.inc();
571 writeStateT.setResult(request.perform(writer));
572 assert (null != writer);
574 // writer.state.barrier.dec();
575 // writer.waitAsync(null);
577 } catch (Throwable e) {
579 // writer.state.barrier.dec();
580 // writer.waitAsync(null);
582 writeState.except(e);
584 // state.stopWriteTransaction(clusterStream);
586 // } catch (Throwable e) {
587 // // Log it first, just to be safe that the error is always logged.
588 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
591 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
592 // // All we can do here is to log those, can't really pass them anywhere.
593 // if (procedure != null) {
594 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
595 // else procedure.exception(new DatabaseException(e));
597 // } catch (Throwable e2) {
598 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
601 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
603 // state.stopWriteTransaction(clusterStream);
606 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
609 // if(notify != null) notify.release();
619 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
620 scheduleRequest(request, callback, notify, null);
624 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
627 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
629 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
631 assert (request != null);
633 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
635 requestManager.scheduleWrite(new SessionTask(request, thread) {
638 public void run(int thread) {
639 fireSessionVariableChange(SessionVariables.QUEUED_READS);
641 Procedure<Object> stateProcedure = new Procedure<Object>() {
643 public void execute(Object result) {
644 if (callback != null)
648 public void exception(Throwable t) {
649 if (callback != null) {
650 if (t instanceof DatabaseException) callback.run((DatabaseException) t);
651 else callback.run(new DatabaseException(t));
653 Logger.defaultLogError("Unhandled exception", t);
657 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
658 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
659 DelayedWriteGraph dwg = null;
660 // newGraph.state.barrier.inc();
663 dwg = new DelayedWriteGraph(newGraph);
664 request.perform(dwg);
665 } catch (Throwable e) {
666 delayedWriteState.except(e);
670 // newGraph.state.barrier.dec();
671 // newGraph.waitAsync(request);
672 fireSessionVariableChange(SessionVariables.QUEUED_READS);
675 delayedWriteState = null;
677 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
678 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
681 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
685 VirtualGraph vg = getProvider(request.getProvider());
686 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
688 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
690 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
692 assert (null != writer);
693 // writer.state.barrier.inc();
697 dwg.commit(writer, request);
699 if(defaultClusterSet != null) {
700 XSupport xs = getService(XSupport.class);
701 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
702 ClusteringSupport cs = getService(ClusteringSupport.class);
703 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
706 // This makes clusters available from server
707 clusterStream.reallyFlush();
709 releaseWriteOnly(writer);
710 handleUpdatesAndMetadata(writer);
712 } catch (ServiceException e) {
713 // writer.state.barrier.dec();
714 // writer.waitAsync(null);
716 // These shall be requested from server
717 clusterTable.removeWriteOnlyClusters();
718 // This makes clusters available from server
719 clusterStream.reallyFlush();
721 releaseWriteOnly(writer);
722 writeState.except(e);
724 // Debugging & Profiling
725 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
735 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
736 scheduleRequest(request, procedure, notify, null);
740 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
743 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
744 throw new Error("Not implemented");
747 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
748 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
749 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
750 createdClusters.add(cluster.clusterId);
755 class WriteOnlySupport implements WriteSupport {
757 ClusterStream stream;
758 ClusterImpl currentCluster;
760 public WriteOnlySupport() {
761 this.stream = clusterStream;
765 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
766 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
770 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
772 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
774 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
775 if(s != queryProvider2.getRootLibrary())
776 throw new ImmutableException("Trying to modify immutable resource key=" + s);
779 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
780 } catch (DatabaseException e) {
781 Logger.defaultLogError(e);
785 clientChanges.invalidate(s);
787 if (cluster.isWriteOnly())
789 queryProvider2.updateStatements(s, p);
794 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
795 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
799 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
801 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
803 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
804 } catch (DatabaseException e) {
805 Logger.defaultLogError(e);
808 clientChanges.invalidate(rid);
810 if (cluster.isWriteOnly())
812 queryProvider2.updateValue(rid);
817 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
820 claimValue(provider,resource, reader.readBytes(null, amount));
824 byte[] bytes = new byte[65536];
826 int rid = ((ResourceImpl)resource).id;
827 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
831 int block = Math.min(left, 65536);
832 reader.readBytes(bytes, block);
833 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
836 } catch (DatabaseException e) {
837 Logger.defaultLogError(e);
840 clientChanges.invalidate(rid);
842 if (cluster.isWriteOnly())
844 queryProvider2.updateValue(rid);
848 private void maintainCluster(ClusterImpl before, ClusterI after_) {
849 if(after_ != null && after_ != before) {
850 ClusterImpl after = (ClusterImpl)after_;
851 if(currentCluster == before) currentCluster = after;
852 clusterTable.replaceCluster(after);
856 public int createResourceKey(int foreignCounter) throws DatabaseException {
857 if(currentCluster == null)
858 currentCluster = getNewResourceCluster();
859 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
860 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
861 newCluster.foreignLookup = new byte[foreignCounter];
862 currentCluster = newCluster;
864 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
866 return currentCluster.createResource(clusterTranslator);
870 public Resource createResource(VirtualGraph provider) throws DatabaseException {
871 if(currentCluster == null) {
872 if (null != defaultClusterSet) {
873 ResourceImpl result = getNewResource(defaultClusterSet);
874 currentCluster = clusterTable.getClusterByResourceKey(result.id);
877 currentCluster = getNewResourceCluster();
880 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
881 if (null != defaultClusterSet) {
882 ResourceImpl result = getNewResource(defaultClusterSet);
883 currentCluster = clusterTable.getClusterByResourceKey(result.id);
886 currentCluster = getNewResourceCluster();
889 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
893 public Resource createResource(VirtualGraph provider, long clusterId)
894 throws DatabaseException {
895 return getNewResource(clusterId);
899 public Resource createResource(VirtualGraph provider, Resource clusterSet)
900 throws DatabaseException {
901 return getNewResource(clusterSet);
905 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
906 throws DatabaseException {
907 getNewClusterSet(clusterSet);
911 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
912 throws ServiceException {
913 return containsClusterSet(clusterSet);
916 public void selectCluster(long cluster) {
917 currentCluster = clusterTable.getClusterByClusterId(cluster);
918 long setResourceId = clusterSetsSupport.getSet(cluster);
919 clusterSetsSupport.put(setResourceId, cluster);
923 public Resource setDefaultClusterSet(Resource clusterSet)
924 throws ServiceException {
925 Resource result = setDefaultClusterSet4NewResource(clusterSet);
926 if(clusterSet != null) {
927 long id = clusterSetsSupport.get(clusterSet.getResourceId());
928 currentCluster = clusterTable.getClusterByClusterId(id);
931 currentCluster = null;
937 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
939 provider = getProvider(provider);
940 if (null == provider) {
941 int key = ((ResourceImpl)resource).id;
945 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
946 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
947 // if(key != queryProvider2.getRootLibrary())
948 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
951 // cluster.removeValue(key, clusterTranslator);
952 // } catch (DatabaseException e) {
953 // Logger.defaultLogError(e);
957 clusterTable.writeOnlyInvalidate(cluster);
960 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
961 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
962 clusterTranslator.removeValue(cluster);
963 } catch (DatabaseException e) {
964 Logger.defaultLogError(e);
967 queryProvider2.invalidateResource(key);
968 clientChanges.invalidate(key);
971 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
972 queryProvider2.updateValue(querySupport.getId(resource));
973 clientChanges.claimValue(resource);
980 public void flush(boolean intermediate) {
981 throw new UnsupportedOperationException();
985 public void flushCluster() {
986 clusterTable.flushCluster(graphSession);
987 if(defaultClusterSet != null) {
988 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
990 currentCluster = null;
994 public void flushCluster(Resource r) {
995 throw new UnsupportedOperationException("flushCluster resource " + r);
1003 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1006 int s = ((ResourceImpl)subject).id;
1007 int p = ((ResourceImpl)predicate).id;
1008 int o = ((ResourceImpl)object).id;
1010 provider = getProvider(provider);
1011 if (null == provider) {
1013 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1014 clusterTable.writeOnlyInvalidate(cluster);
1018 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1019 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1020 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1022 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1023 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1025 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1026 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1027 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1028 clusterTranslator.removeStatement(cluster);
1030 queryProvider2.invalidateResource(s);
1031 clientChanges.invalidate(s);
1033 } catch (DatabaseException e) {
1035 Logger.defaultLogError(e);
1043 ((VirtualGraphImpl)provider).deny(s, p, o);
1044 queryProvider2.invalidateResource(s);
1045 clientChanges.invalidate(s);
1054 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1055 throw new UnsupportedOperationException();
1059 public boolean writeOnly() {
1064 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1065 writeSupport.performWriteRequest(graph, request);
1069 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1070 throw new UnsupportedOperationException();
1074 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1075 throw new UnsupportedOperationException();
1079 public <T> void addMetadata(Metadata data) throws ServiceException {
1080 writeSupport.addMetadata(data);
1084 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1085 return writeSupport.getMetadata(clazz);
1089 public TreeMap<String, byte[]> getMetadata() {
1090 return writeSupport.getMetadata();
1094 public void commitDone(WriteTraits writeTraits, long csid) {
1095 writeSupport.commitDone(writeTraits, csid);
1099 public void clearUndoList(WriteTraits writeTraits) {
1100 writeSupport.clearUndoList(writeTraits);
1103 public int clearMetadata() {
1104 return writeSupport.clearMetadata();
1108 public void startUndo() {
1109 writeSupport.startUndo();
1113 class VirtualWriteOnlySupport implements WriteSupport {
1116 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1117 // Resource object) {
1118 // throw new UnsupportedOperationException();
1122 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1124 TransientGraph impl = (TransientGraph)provider;
1125 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1126 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1127 clientChanges.claim(subject, predicate, object);
1132 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1134 TransientGraph impl = (TransientGraph)provider;
1135 impl.claim(subject, predicate, object);
1136 getQueryProvider2().updateStatements(subject, predicate);
1137 clientChanges.claim(subject, predicate, object);
1142 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1143 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1147 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1148 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1149 getQueryProvider2().updateValue(resource);
1150 clientChanges.claimValue(resource);
1154 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1155 byte[] value = reader.readBytes(null, amount);
1156 claimValue(provider, resource, value);
1160 public Resource createResource(VirtualGraph provider) {
1161 TransientGraph impl = (TransientGraph)provider;
1162 return impl.getResource(impl.newResource(false));
1166 public Resource createResource(VirtualGraph provider, long clusterId) {
1167 throw new UnsupportedOperationException();
1171 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1172 throws DatabaseException {
1173 throw new UnsupportedOperationException();
1177 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1178 throws DatabaseException {
1179 throw new UnsupportedOperationException();
1183 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1184 throws ServiceException {
1185 throw new UnsupportedOperationException();
1189 public Resource setDefaultClusterSet(Resource clusterSet)
1190 throws ServiceException {
1195 public void denyValue(VirtualGraph provider, Resource resource) {
1196 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1197 getQueryProvider2().updateValue(querySupport.getId(resource));
1198 // NOTE: this only keeps track of value changes by-resource.
1199 clientChanges.claimValue(resource);
1203 public void flush(boolean intermediate) {
1204 throw new UnsupportedOperationException();
1208 public void flushCluster() {
1209 throw new UnsupportedOperationException();
1213 public void flushCluster(Resource r) {
1214 throw new UnsupportedOperationException("Resource " + r);
1222 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1223 TransientGraph impl = (TransientGraph) provider;
1224 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1225 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1226 clientChanges.deny(subject, predicate, object);
1231 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1232 throw new UnsupportedOperationException();
1236 public boolean writeOnly() {
1241 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1242 throw new UnsupportedOperationException();
1246 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1247 throw new UnsupportedOperationException();
1251 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1252 throw new UnsupportedOperationException();
1256 public <T> void addMetadata(Metadata data) throws ServiceException {
1257 throw new UnsupportedOperationException();
1261 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1262 throw new UnsupportedOperationException();
1266 public TreeMap<String, byte[]> getMetadata() {
1267 throw new UnsupportedOperationException();
1271 public void commitDone(WriteTraits writeTraits, long csid) {
1275 public void clearUndoList(WriteTraits writeTraits) {
1279 public int clearMetadata() {
1284 public void startUndo() {
1288 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1292 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1294 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1297 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1301 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1303 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1305 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1306 writeState = writeStateT;
1308 assert (null != writer);
1309 // writer.state.barrier.inc();
1310 long start = System.nanoTime();
1311 T result = request.perform(writer);
1312 long duration = System.nanoTime() - start;
1314 System.err.println("################");
1315 System.err.println("WriteOnly duration " + 1e-9*duration);
1317 writeStateT.setResult(result);
1319 // This makes clusters available from server
1320 clusterStream.reallyFlush();
1322 // This will trigger query updates
1323 releaseWriteOnly(writer);
1324 assert (null != writer);
1326 handleUpdatesAndMetadata(writer);
1328 } catch (CancelTransactionException e) {
1330 releaseWriteOnly(writeState.getGraph());
1332 clusterTable.removeWriteOnlyClusters();
1333 state.stopWriteTransaction(clusterStream);
1335 } catch (Throwable e) {
1337 e.printStackTrace();
1339 releaseWriteOnly(writeState.getGraph());
1341 clusterTable.removeWriteOnlyClusters();
1343 if (callback != null)
1344 callback.exception(new DatabaseException(e));
1346 state.stopWriteTransaction(clusterStream);
1347 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1351 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1357 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
1358 scheduleRequest(request, callback, notify, null);
1362 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1366 assert (request != null);
1368 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1370 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1373 public void run(int thread) {
1375 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1379 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1382 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1386 VirtualGraph vg = getProvider(request.getProvider());
1387 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1389 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1391 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1394 public void execute(Object result) {
1395 if(callback != null) callback.run(null);
1399 public void exception(Throwable t) {
1400 if(callback != null) callback.run((DatabaseException)t);
1405 assert (null != writer);
1406 // writer.state.barrier.inc();
1410 request.perform(writer);
1412 } catch (Throwable e) {
1414 // writer.state.barrier.dec();
1415 // writer.waitAsync(null);
1417 releaseWriteOnly(writer);
1419 clusterTable.removeWriteOnlyClusters();
1421 if(!(e instanceof CancelTransactionException)) {
1422 if (callback != null)
1423 callback.run(new DatabaseException(e));
1426 writeState.except(e);
1432 // This makes clusters available from server
1433 boolean empty = clusterStream.reallyFlush();
1434 // This was needed to make WO requests call metadata listeners.
1435 // NOTE: the calling event does not contain clientChanges information.
1436 if (!empty && clientChanges.isEmpty())
1437 clientChanges.setNotEmpty(true);
1438 releaseWriteOnly(writer);
1439 assert (null != writer);
1442 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1455 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1456 scheduleRequest(request, callback, notify, null);
1460 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1463 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1465 assert (request != null);
1467 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1469 requestManager.scheduleWrite(new SessionTask(request, thread) {
1472 public void run(int thread) {
1474 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1476 performWriteOnly(request, notify, callback);
1486 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1488 assert (request != null);
1489 assert (procedure != null);
1491 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1493 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1496 public void run(int thread) {
1498 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1500 ListenerBase listener = getListenerBase(procedure);
1502 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1506 if (listener != null) {
1509 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1512 public void exception(AsyncReadGraph graph, Throwable t) {
1513 procedure.exception(graph, t);
1514 if(throwable != null) {
1517 // ErrorLogger.defaultLogError("Unhandled exception", t);
1522 public void execute(AsyncReadGraph graph, T t) {
1523 if(result != null) result.set(t);
1524 procedure.execute(graph, t);
1528 } catch (Throwable t) {
1529 // This is handled by the AsyncProcedure
1530 //Logger.defaultLogError("Internal error", t);
1537 // newGraph.state.barrier.inc();
1539 T t = request.perform(newGraph);
1543 if(result != null) result.set(t);
1544 procedure.execute(newGraph, t);
1546 } catch (Throwable th) {
1548 if(throwable != null) {
1551 Logger.defaultLogError("Unhandled exception", th);
1556 } catch (Throwable t) {
1559 t.printStackTrace();
1561 if(throwable != null) {
1564 Logger.defaultLogError("Unhandled exception", t);
1569 procedure.exception(newGraph, t);
1571 } catch (Throwable t2) {
1573 if(throwable != null) {
1576 Logger.defaultLogError("Unhandled exception", t2);
1583 // newGraph.state.barrier.dec();
1584 // newGraph.waitAsync(request);
1590 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1600 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1602 assert (request != null);
1603 assert (procedure != null);
1605 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1607 requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1610 public void run(int thread) {
1612 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1614 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1618 if (listener != null) {
1620 newGraph.processor.query(newGraph, request, null, procedure, listener);
1622 // newGraph.waitAsync(request);
1626 final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1627 procedure, "request");
1631 // newGraph.state.barrier.inc();
1633 request.perform(newGraph, wrapper);
1635 // newGraph.waitAsync(request);
1637 } catch (Throwable t) {
1639 wrapper.exception(newGraph, t);
1640 // newGraph.waitAsync(request);
1649 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1659 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1661 assert (request != null);
1662 assert (procedure != null);
1664 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1666 int sync = notify != null ? thread : -1;
1668 requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1671 public void run(int thread) {
1673 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1675 ListenerBase listener = getListenerBase(procedure);
1677 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1681 if (listener != null) {
1683 newGraph.processor.query(newGraph, request, null, procedure, listener);
1685 // newGraph.waitAsync(request);
1689 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1693 request.perform(newGraph, wrapper);
1695 } catch (Throwable t) {
1697 t.printStackTrace();
1705 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1715 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1717 assert (request != null);
1718 assert (procedure != null);
1720 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1722 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1725 public void run(int thread) {
1727 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1729 ListenerBase listener = getListenerBase(procedure);
1731 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1735 if (listener != null) {
1737 newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1740 public void exception(Throwable t) {
1741 procedure.exception(t);
1742 if(throwable != null) {
1748 public void execute(T t) {
1749 if(result != null) result.set(t);
1750 procedure.execute(t);
1755 // newGraph.waitAsync(request);
1759 // newGraph.state.barrier.inc();
1761 request.register(newGraph, new Listener<T>() {
1764 public void exception(Throwable t) {
1765 if(throwable != null) throwable.set(t);
1766 procedure.exception(t);
1767 // newGraph.state.barrier.dec();
1771 public void execute(T t) {
1772 if(result != null) result.set(t);
1773 procedure.execute(t);
1774 // newGraph.state.barrier.dec();
1778 public boolean isDisposed() {
1784 // newGraph.waitAsync(request);
1790 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1802 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1804 scheduleRequest(request, procedure, null, null, null);
1809 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1811 Semaphore notify = new Semaphore(0);
1812 DataContainer<Throwable> container = new DataContainer<Throwable>();
1813 DataContainer<T> result = new DataContainer<T>();
1814 scheduleRequest(request, procedure, notify, container, result);
1815 acquire(notify, request);
1816 Throwable throwable = container.get();
1817 if(throwable != null) {
1818 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1819 else throw new DatabaseException("Unexpected exception", throwable);
1821 return result.get();
1825 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1827 Semaphore notify = new Semaphore(0);
1828 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1829 final DataContainer<T> resultContainer = new DataContainer<T>();
1830 scheduleRequest(request, new AsyncProcedure<T>() {
1832 public void exception(AsyncReadGraph graph, Throwable throwable) {
1833 exceptionContainer.set(throwable);
1834 procedure.exception(graph, throwable);
1837 public void execute(AsyncReadGraph graph, T result) {
1838 resultContainer.set(result);
1839 procedure.execute(graph, result);
1841 }, getListenerBase(procedure), notify);
1842 acquire(notify, request, procedure);
1843 Throwable throwable = exceptionContainer.get();
1844 if (throwable != null) {
1845 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1846 else throw new DatabaseException("Unexpected exception", throwable);
1848 return resultContainer.get();
1853 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1855 Semaphore notify = new Semaphore(0);
1856 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1857 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1859 public void exception(AsyncReadGraph graph, Throwable throwable) {
1860 exceptionContainer.set(throwable);
1861 procedure.exception(graph, throwable);
1864 public void execute(AsyncReadGraph graph, T result) {
1865 procedure.execute(graph, result);
1868 public void finished(AsyncReadGraph graph) {
1869 procedure.finished(graph);
1872 acquire(notify, request, procedure);
1873 Throwable throwable = exceptionContainer.get();
1874 if (throwable != null) {
1875 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1876 else throw new DatabaseException("Unexpected exception", throwable);
1878 // TODO: implement return value
1879 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1884 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1885 return syncRequest(request, new ProcedureAdapter<T>());
1888 // assert(request != null);
1890 // final DataContainer<T> result = new DataContainer<T>();
1891 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1893 // syncRequest(request, new Procedure<T>() {
1896 // public void execute(T t) {
1901 // public void exception(Throwable t) {
1902 // exception.set(t);
1907 // Throwable t = exception.get();
1909 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1910 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1913 // return result.get();
1918 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1919 return syncRequest(request, (Procedure<T>)procedure);
1924 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1925 // assertNotSession();
1926 // Semaphore notify = new Semaphore(0);
1927 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1928 // DataContainer<T> result = new DataContainer<T>();
1929 // scheduleRequest(request, procedure, notify, container, result);
1930 // acquire(notify, request);
1931 // Throwable throwable = container.get();
1932 // if(throwable != null) {
1933 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1934 // else throw new DatabaseException("Unexpected exception", throwable);
1936 // return result.get();
1941 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1943 Semaphore notify = new Semaphore(0);
1944 final DataContainer<Throwable> container = new DataContainer<Throwable>();
1945 final DataContainer<T> result = new DataContainer<T>();
1946 scheduleRequest(request, procedure, notify, container, result);
1947 acquire(notify, request);
1948 Throwable throwable = container.get();
1949 if (throwable != null) {
1950 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1951 else throw new DatabaseException("Unexpected exception", throwable);
1953 return result.get();
1957 public void syncRequest(Write request) throws DatabaseException {
1960 Semaphore notify = new Semaphore(0);
1961 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1962 scheduleRequest(request, new Callback<DatabaseException>() {
1965 public void run(DatabaseException e) {
1970 acquire(notify, request);
1971 if(exception.get() != null) throw exception.get();
1975 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1977 Semaphore notify = new Semaphore(0);
1978 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1979 final DataContainer<T> result = new DataContainer<T>();
1980 scheduleRequest(request, new Procedure<T>() {
1983 public void exception(Throwable t) {
1988 public void execute(T t) {
1993 acquire(notify, request);
1994 if(exception.get() != null) {
1995 Throwable t = exception.get();
1996 if(t instanceof DatabaseException) throw (DatabaseException)t;
1997 else throw new DatabaseException(t);
1999 return result.get();
2003 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2005 Semaphore notify = new Semaphore(0);
2006 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2007 final DataContainer<T> result = new DataContainer<T>();
2008 scheduleRequest(request, new Procedure<T>() {
2011 public void exception(Throwable t) {
2016 public void execute(T t) {
2021 acquire(notify, request);
2022 if(exception.get() != null) {
2023 Throwable t = exception.get();
2024 if(t instanceof DatabaseException) throw (DatabaseException)t;
2025 else throw new DatabaseException(t);
2027 return result.get();
2031 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2033 Semaphore notify = new Semaphore(0);
2034 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2035 final DataContainer<T> result = new DataContainer<T>();
2036 scheduleRequest(request, new Procedure<T>() {
2039 public void exception(Throwable t) {
2044 public void execute(T t) {
2049 acquire(notify, request);
2050 if(exception.get() != null) {
2051 Throwable t = exception.get();
2052 if(t instanceof DatabaseException) throw (DatabaseException)t;
2053 else throw new DatabaseException(t);
2055 return result.get();
2059 public void syncRequest(DelayedWrite request) throws DatabaseException {
2061 Semaphore notify = new Semaphore(0);
2062 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2063 scheduleRequest(request, new Callback<DatabaseException>() {
2065 public void run(DatabaseException e) {
2069 acquire(notify, request);
2070 if(exception.get() != null) throw exception.get();
2074 public void syncRequest(WriteOnly request) throws DatabaseException {
2077 Semaphore notify = new Semaphore(0);
2078 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2079 scheduleRequest(request, new Callback<DatabaseException>() {
2081 public void run(DatabaseException e) {
2085 acquire(notify, request);
2086 if(exception.get() != null) throw exception.get();
2091 * GraphRequestProcessor interface
2095 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2097 scheduleRequest(request, procedure, null, null);
2102 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2104 scheduleRequest(request, procedure, null);
2109 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2111 scheduleRequest(request, procedure, null, null, null);
2116 public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
2118 scheduleRequest(request, callback, null);
2123 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2125 scheduleRequest(request, procedure, null);
2130 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2132 scheduleRequest(request, procedure, null);
2137 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2139 scheduleRequest(request, procedure, null);
2144 public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
2146 scheduleRequest(request, callback, null);
2151 public void asyncRequest(final Write r) {
2152 asyncRequest(r, null);
2156 public void asyncRequest(final DelayedWrite r) {
2157 asyncRequest(r, null);
2161 public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
2163 scheduleRequest(request, callback, null);
2168 public void asyncRequest(final WriteOnly request) {
2170 asyncRequest(request, null);
2175 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2176 r.request(this, procedure);
2180 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2181 r.request(this, procedure);
2185 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2186 r.request(this, procedure);
2190 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2191 r.request(this, procedure);
2195 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2196 r.request(this, procedure);
2200 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2201 r.request(this, procedure);
2205 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2206 return r.request(this);
2210 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2211 return r.request(this);
2215 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2216 r.request(this, procedure);
2220 public <T> void async(WriteInterface<T> r) {
2221 r.request(this, new ProcedureAdapter<T>());
2225 public void incAsync() {
2230 public void decAsync() {
2234 public long getCluster(ResourceImpl resource) {
2235 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2236 return cluster.getClusterId();
2239 public long getCluster(int id) {
2240 if (clusterTable == null)
2241 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2242 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2245 public ResourceImpl getResource(int id) {
2246 return new ResourceImpl(resourceSupport, id);
2249 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2250 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2251 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2252 int key = proxy.getClusterKey();
2253 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2254 return new ResourceImpl(resourceSupport, resourceKey);
2257 public ResourceImpl getResource2(int id) {
2259 return new ResourceImpl(resourceSupport, id);
2262 final public int getId(ResourceImpl impl) {
2266 public static final Charset UTF8 = Charset.forName("utf-8");
2271 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2272 for(TransientGraph g : support.providers) {
2273 if(g.isPending(subject)) return false;
2278 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2279 for(TransientGraph g : support.providers) {
2280 if(g.isPending(subject, predicate)) return false;
2285 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
2287 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2289 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2292 public void run(ReadGraphImpl graph) {
2293 if(ready.decrementAndGet() == 0) {
2294 runnable.run(graph);
2300 for(TransientGraph g : support.providers) {
2301 if(g.isPending(subject)) {
2303 g.load(graph, subject, composite);
2304 } catch (DatabaseException e) {
2305 e.printStackTrace();
2308 composite.run(graph);
2312 composite.run(graph);
2316 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
2318 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2320 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2323 public void run(ReadGraphImpl graph) {
2324 if(ready.decrementAndGet() == 0) {
2325 runnable.run(graph);
2331 for(TransientGraph g : support.providers) {
2332 if(g.isPending(subject, predicate)) {
2334 g.load(graph, subject, predicate, composite);
2335 } catch (DatabaseException e) {
2336 e.printStackTrace();
2339 composite.run(graph);
2343 composite.run(graph);
2347 // void dumpHeap() {
2350 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2351 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2352 // "d:/heap" + flushCounter + ".txt", true);
2353 // } catch (IOException e) {
2354 // e.printStackTrace();
2359 void fireReactionsToSynchronize(ChangeSet cs) {
2361 // Do not fire empty events
2365 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2366 // g.state.barrier.inc();
2370 if (!cs.isEmpty()) {
2371 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2372 for (ChangeListener l : changeListeners2) {
2375 } catch (Exception ex) {
2376 ex.printStackTrace();
2383 // g.state.barrier.dec();
2384 // g.waitAsync(null);
2390 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2393 // Do not fire empty events
2398 // graph.state.barrier.inc();
2402 if (!cs2.isEmpty()) {
2404 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2405 for (ChangeListener l : changeListeners2) {
2408 // System.out.println("changelistener " + l);
2409 } catch (Exception ex) {
2410 ex.printStackTrace();
2418 // graph.state.barrier.dec();
2419 // graph.waitAsync(null);
2422 } catch (Throwable t) {
2423 t.printStackTrace();
2427 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2431 // Do not fire empty events
2435 // Do not fire on virtual requests
2436 if(graph.getProvider() != null)
2439 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2443 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2444 for (ChangeListener l : metadataListeners) {
2447 } catch (Throwable ex) {
2448 ex.printStackTrace();
2456 } catch (Throwable t) {
2457 t.printStackTrace();
2466 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2469 public <T> T getService(Class<T> api) {
2470 T t = peekService(api);
2472 if (state.isClosed())
2473 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2474 throw new ServiceNotFoundException(this, api);
2482 protected abstract ServerInformation getCachedServerInformation();
2484 private Class<?> serviceKey1 = null;
2485 private Class<?> serviceKey2 = null;
2486 private Object service1 = null;
2487 private Object service2 = null;
2493 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2495 @SuppressWarnings("unchecked")
2497 public synchronized <T> T peekService(Class<T> api) {
2499 if(serviceKey1 == api) {
2501 } else if (serviceKey2 == api) {
2503 Object result = service2;
2504 service2 = service1;
2505 serviceKey2 = serviceKey1;
2511 if (Layer0.class == api)
2513 if (ServerInformation.class == api)
2514 return (T) getCachedServerInformation();
2515 else if (WriteGraphImpl.class == api)
2516 return (T) writeState.getGraph();
2517 else if (ClusterBuilder.class == api)
2518 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2519 else if (ClusterBuilderFactory.class == api)
2520 return (T)new ClusterBuilderFactoryImpl(this);
2522 service2 = service1;
2523 serviceKey2 = serviceKey1;
2525 service1 = serviceLocator.peekService(api);
2536 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2539 public boolean hasService(Class<?> api) {
2540 return serviceLocator.hasService(api);
2544 * @param api the api that must be implemented by the specified service
2545 * @param service the service implementation
2548 public <T> void registerService(Class<T> api, T service) {
2549 if(Layer0.class == api) {
2550 L0 = (Layer0)service;
2553 serviceLocator.registerService(api, service);
2554 if (TransactionPolicySupport.class == api) {
2555 transactionPolicy = (TransactionPolicySupport)service;
2556 state.resetTransactionPolicy();
2558 if (api == serviceKey1)
2560 else if (api == serviceKey2)
2568 void fireSessionVariableChange(String variable) {
2569 for (MonitorHandler h : monitorHandlers) {
2570 MonitorContext ctx = monitorContexts.getLeft(h);
2572 // SafeRunner functionality repeated here to avoid dependency.
2574 h.valuesChanged(ctx);
2575 } catch (Exception e) {
2576 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2577 } catch (LinkageError e) {
2578 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2584 class ResourceSerializerImpl implements ResourceSerializer {
2586 public long createRandomAccessId(int id) throws DatabaseException {
2589 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2590 long cluster = getCluster(id);
2592 return 0; // Better to return 0 then invalid id.
2593 long result = ClusterTraitsBase.createResourceId(cluster, index);
2598 public long getRandomAccessId(Resource resource) throws DatabaseException {
2600 ResourceImpl resourceImpl = (ResourceImpl) resource;
2601 return createRandomAccessId(resourceImpl.id);
2606 public int getTransientId(Resource resource) throws DatabaseException {
2608 ResourceImpl resourceImpl = (ResourceImpl) resource;
2609 return resourceImpl.id;
2614 public String createRandomAccessId(Resource resource)
2615 throws InvalidResourceReferenceException {
2617 if(resource == null) throw new IllegalArgumentException();
2619 ResourceImpl resourceImpl = (ResourceImpl) resource;
2621 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2625 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2626 } catch (DatabaseException e1) {
2627 throw new InvalidResourceReferenceException(e1);
2630 // Serialize as '<resource index>_<cluster id>'
2631 return "" + r + "_" + getCluster(resourceImpl);
2632 } catch (Throwable e) {
2633 e.printStackTrace();
2634 throw new InvalidResourceReferenceException(e);
2639 public int getTransientId(long serialized) throws DatabaseException {
2640 if (serialized <= 0)
2641 return (int)serialized;
2642 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2643 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2644 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2645 if (cluster == null)
2646 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2647 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2652 public Resource getResource(long randomAccessId) throws DatabaseException {
2653 return getResourceByKey(getTransientId(randomAccessId));
2657 public Resource getResource(int transientId) throws DatabaseException {
2658 return getResourceByKey(transientId);
2662 public Resource getResource(String randomAccessId)
2663 throws InvalidResourceReferenceException {
2665 int i = randomAccessId.indexOf('_');
2667 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2668 + randomAccessId + "'");
2669 int r = Integer.parseInt(randomAccessId.substring(0, i));
2670 if(r < 0) return getResourceByKey(r);
2671 long c = Long.parseLong(randomAccessId.substring(i + 1));
2672 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2673 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2674 if (cluster.hasResource(key, clusterTranslator))
2675 return getResourceByKey(key);
2676 } catch (InvalidResourceReferenceException e) {
2678 } catch (NumberFormatException e) {
2679 throw new InvalidResourceReferenceException(e);
2680 } catch (Throwable e) {
2681 e.printStackTrace();
2682 throw new InvalidResourceReferenceException(e);
2685 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2689 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2692 } catch (Throwable e) {
2693 e.printStackTrace();
2694 throw new InvalidResourceReferenceException(e);
2700 // Copied from old SessionImpl
2703 if (state.isClosed())
2704 throw new Error("Session closed.");
2709 public ResourceImpl getNewResource(long clusterId) {
2710 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2713 newId = cluster.createResource(clusterTranslator);
2714 } catch (DatabaseException e) {
2715 Logger.defaultLogError(e);
2718 return new ResourceImpl(resourceSupport, newId);
2721 public ResourceImpl getNewResource(Resource clusterSet)
2722 throws DatabaseException {
2723 long resourceId = clusterSet.getResourceId();
2724 Long clusterId = clusterSetsSupport.get(resourceId);
2725 if (null == clusterId)
2726 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2727 if (Constants.NewClusterId == clusterId) {
2728 clusterId = getService(ClusteringSupport.class).createCluster();
2729 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2730 createdClusters.add(clusterId);
2731 clusterSetsSupport.put(resourceId, clusterId);
2732 return getNewResource(clusterId);
2734 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2735 ResourceImpl result;
2736 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2737 clusterId = getService(ClusteringSupport.class).createCluster();
2738 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2739 createdClusters.add(clusterId);
2740 clusterSetsSupport.put(resourceId, clusterId);
2741 return getNewResource(clusterId);
2743 result = getNewResource(clusterId);
2744 int resultKey = querySupport.getId(result);
2745 long resultCluster = querySupport.getClusterId(resultKey);
2746 if (clusterId != resultCluster)
2747 clusterSetsSupport.put(resourceId, resultCluster);
2753 public void getNewClusterSet(Resource clusterSet)
2754 throws DatabaseException {
2756 System.out.println("new cluster set=" + clusterSet);
2757 long resourceId = clusterSet.getResourceId();
2758 if (clusterSetsSupport.containsKey(resourceId))
2759 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2760 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2762 public boolean containsClusterSet(Resource clusterSet)
2763 throws ServiceException {
2764 long resourceId = clusterSet.getResourceId();
2765 return clusterSetsSupport.containsKey(resourceId);
2767 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2768 Resource r = defaultClusterSet;
2769 defaultClusterSet = clusterSet;
2772 void printDiagnostics() {
2776 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2777 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2778 for(ClusterI cluster : clusterTable.getClusters()) {
2780 if (cluster.isLoaded() && !cluster.isEmpty()) {
2781 residentClusters.set(residentClusters.get() + 1);
2782 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2784 } catch (DatabaseException e) {
2785 Logger.defaultLogError(e);
2789 System.out.println("--------------------------------");
2790 System.out.println("Cluster information:");
2791 System.out.println("-amount of resident clusters=" + residentClusters.get());
2792 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2794 for(ClusterI cluster : clusterTable.getClusters()) {
2795 System.out.print("Cluster " + cluster.getClusterId() + " [");
2797 if (!cluster.isLoaded())
2798 System.out.println("not loaded]");
2799 else if (cluster.isEmpty())
2800 System.out.println("is empty]");
2802 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2803 System.out.print(",references=" + cluster.getReferenceCount());
2804 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2806 } catch (DatabaseException e) {
2807 Logger.defaultLogError(e);
2808 System.out.println("is corrupted]");
2812 queryProvider2.printDiagnostics();
2813 System.out.println("--------------------------------");
2818 public void fireStartReadTransaction() {
2821 System.out.println("StartReadTransaction");
2823 for (SessionEventListener listener : eventListeners) {
2825 listener.readTransactionStarted();
2826 } catch (Throwable t) {
2827 t.printStackTrace();
2833 public void fireFinishReadTransaction() {
2836 System.out.println("FinishReadTransaction");
2838 for (SessionEventListener listener : eventListeners) {
2840 listener.readTransactionFinished();
2841 } catch (Throwable t) {
2842 t.printStackTrace();
2848 public void fireStartWriteTransaction() {
2851 System.out.println("StartWriteTransaction");
2853 for (SessionEventListener listener : eventListeners) {
2855 listener.writeTransactionStarted();
2856 } catch (Throwable t) {
2857 t.printStackTrace();
2863 public void fireFinishWriteTransaction() {
2866 System.out.println("FinishWriteTransaction");
2868 for (SessionEventListener listener : eventListeners) {
2870 listener.writeTransactionFinished();
2871 } catch (Throwable t) {
2872 t.printStackTrace();
2876 Indexing.resetDependenciesIndexingDisabled();
2880 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2881 throw new Error("Not supported at the moment.");
2888 ClusterTable getClusterTable() {
2889 return clusterTable;
2892 public GraphSession getGraphSession() {
2893 return graphSession;
2896 public QueryProcessor getQueryProvider2() {
2897 return queryProvider2;
2900 ClientChangesImpl getClientChanges() {
2901 return clientChanges;
2904 boolean getWriteOnly() {
2908 static int counter = 0;
2910 public void onClusterLoaded(long clusterId) {
2912 clusterTable.updateSize();
2920 * Implementation of the interface RequestProcessor
2924 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2929 assert(request != null);
2931 final DataContainer<T> result = new DataContainer<T>();
2932 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2934 syncRequest(request, new AsyncProcedure<T>() {
2937 public void execute(AsyncReadGraph graph, T t) {
2942 public void exception(AsyncReadGraph graph, Throwable t) {
2948 Throwable t = exception.get();
2950 if(t instanceof DatabaseException) throw (DatabaseException)t;
2951 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2954 return result.get();
2959 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2960 // assertNotSession();
2961 // return syncRequest(request, (AsyncProcedure<T>)procedure);
2965 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2967 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2971 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2973 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2977 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2979 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2983 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2985 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2989 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2993 assert(request != null);
2995 final DataContainer<T> result = new DataContainer<T>();
2996 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2998 syncRequest(request, new AsyncProcedure<T>() {
3001 public void execute(AsyncReadGraph graph, T t) {
3006 public void exception(AsyncReadGraph graph, Throwable t) {
3012 Throwable t = exception.get();
3014 if(t instanceof DatabaseException) throw (DatabaseException)t;
3015 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3018 return result.get();
3023 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3025 return syncRequest(request, (AsyncProcedure<T>)procedure);
3029 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3031 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3035 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3037 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3041 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3043 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3047 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3049 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3053 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3057 assert(request != null);
3059 final ArrayList<T> result = new ArrayList<T>();
3060 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3062 syncRequest(request, new AsyncMultiProcedure<T>() {
3065 public void execute(AsyncReadGraph graph, T t) {
3066 synchronized(result) {
3072 public void finished(AsyncReadGraph graph) {
3076 public void exception(AsyncReadGraph graph, Throwable t) {
3083 Throwable t = exception.get();
3085 if(t instanceof DatabaseException) throw (DatabaseException)t;
3086 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3094 public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3096 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3100 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3102 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3106 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3108 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3112 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3114 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3118 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3120 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3124 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3128 assert(request != null);
3130 final ArrayList<T> result = new ArrayList<T>();
3131 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3133 syncRequest(request, new AsyncMultiProcedure<T>() {
3136 public void execute(AsyncReadGraph graph, T t) {
3137 synchronized(result) {
3143 public void finished(AsyncReadGraph graph) {
3147 public void exception(AsyncReadGraph graph, Throwable t) {
3154 Throwable t = exception.get();
3156 if(t instanceof DatabaseException) throw (DatabaseException)t;
3157 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3165 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3167 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3171 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3173 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3177 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3179 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3183 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3185 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3189 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3191 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3196 public <T> void asyncRequest(Read<T> request) {
3198 asyncRequest(request, new ProcedureAdapter<T>() {
3200 public void exception(Throwable t) {
3201 t.printStackTrace();
3208 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3209 asyncRequest(request, (AsyncProcedure<T>)procedure);
3213 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3214 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3218 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3219 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3223 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3224 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3228 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3229 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3233 final public <T> void asyncRequest(final AsyncRead<T> request) {
3235 assert(request != null);
3237 asyncRequest(request, new ProcedureAdapter<T>() {
3239 public void exception(Throwable t) {
3240 t.printStackTrace();
3247 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3248 scheduleRequest(request, procedure, procedure, null);
3252 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3253 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3257 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3258 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3262 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3263 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3267 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3268 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3272 public <T> void asyncRequest(MultiRead<T> request) {
3274 assert(request != null);
3276 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3278 public void exception(AsyncReadGraph graph, Throwable t) {
3279 t.printStackTrace();
3286 public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3287 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3291 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3292 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3296 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3297 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3301 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3302 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3306 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3307 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3311 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3313 assert(request != null);
3315 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3317 public void exception(AsyncReadGraph graph, Throwable t) {
3318 t.printStackTrace();
3325 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3326 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3330 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3331 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3335 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3336 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3340 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3341 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3345 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3346 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3350 public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3352 throw new Error("Not implemented!");
3356 public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3357 throw new Error("Not implemented!");
3361 final public <T> void asyncRequest(final ExternalRead<T> request) {
3363 assert(request != null);
3365 asyncRequest(request, new ProcedureAdapter<T>() {
3367 public void exception(Throwable t) {
3368 t.printStackTrace();
3375 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3376 asyncRequest(request, (Procedure<T>)procedure);
3381 void check(Throwable t) throws DatabaseException {
3383 if(t instanceof DatabaseException) throw (DatabaseException)t;
3384 else throw new DatabaseException("Unexpected exception", t);
3388 void check(DataContainer<Throwable> container) throws DatabaseException {
3389 Throwable t = container.get();
3391 if(t instanceof DatabaseException) throw (DatabaseException)t;
3392 else throw new DatabaseException("Unexpected exception", t);
3401 boolean sameProvider(Write request) {
3402 if(writeState.getGraph().provider != null) {
3403 return writeState.getGraph().provider.equals(request.getProvider());
3405 return request.getProvider() == null;
3409 boolean plainWrite(WriteGraphImpl graph) {
3410 if(graph == null) return false;
3411 if(graph.writeSupport.writeOnly()) return false;
3416 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3417 private void assertNotSession() throws DatabaseException {
3418 Thread current = Thread.currentThread();
3419 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3422 void assertAlive() {
3423 if (!state.isAlive())
3424 throw new RuntimeDatabaseException("Session has been shut down.");
3427 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3428 return querySupport.getValueStream(graph, querySupport.getId(resource));
3431 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3432 return querySupport.getValue(graph, querySupport.getId(resource));
3435 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3436 acquire(semaphore, request, null);
3439 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3443 // Loop until the semaphore is acquired reporting requests that take a long time.
3446 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3450 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3452 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3453 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3455 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3456 + (waitTime) + " ms. (procedure=" + procedure + ")");
3463 } catch (InterruptedException e) {
3464 e.printStackTrace();
3465 // FIXME: Should perhaps do something else in this case ??
3472 // public boolean holdOnToTransactionAfterCancel() {
3473 // return transactionPolicy.holdOnToTransactionAfterCancel();
3477 // public boolean holdOnToTransactionAfterCommit() {
3478 // return transactionPolicy.holdOnToTransactionAfterCommit();
3482 // public boolean holdOnToTransactionAfterRead() {
3483 // return transactionPolicy.holdOnToTransactionAfterRead();
3487 // public void onRelinquish() {
3488 // transactionPolicy.onRelinquish();
3492 // public void onRelinquishDone() {
3493 // transactionPolicy.onRelinquishDone();
3497 // public void onRelinquishError() {
3498 // transactionPolicy.onRelinquishError();
3503 public Session getSession() {
3508 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3509 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3511 public void ceased(int thread) {
3513 requestManager.ceased(thread);
3517 public int getAmountOfQueryThreads() {
3518 // This must be a power of two
3520 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3523 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3525 return new ResourceImpl(resourceSupport, key);
3529 public void acquireWriteOnly() {
3533 public void releaseWriteOnly(ReadGraphImpl graph) {
3535 queryProvider2.releaseWrite(graph);
3538 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3540 long start = System.nanoTime();
3542 while(dirtyPrimitives) {
3543 dirtyPrimitives = false;
3544 getQueryProvider2().performDirtyUpdates(writer);
3545 getQueryProvider2().performScheduledUpdates(writer);
3548 fireMetadataListeners(writer, clientChanges);
3550 if(DebugPolicy.PERFORMANCE_DATA) {
3551 long end = System.nanoTime();
3552 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3557 void removeTemporaryData() {
3558 File platform = Platform.getLocation().toFile();
3559 File tempFiles = new File(platform, "tempFiles");
3560 File temp = new File(tempFiles, "db");
3564 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3566 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3569 } catch (IOException e) {
3570 Logger.defaultLogError(e);
3572 return FileVisitResult.CONTINUE;
3575 } catch (IOException e) {
3576 Logger.defaultLogError(e);
3580 public void handleCreatedClusters() {
3581 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3582 createdClusters.forEach(new TLongProcedure() {
3585 public boolean execute(long value) {
3586 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3587 cluster.setImmutable(true, clusterTranslator);
3592 createdClusters.clear();
3596 public Object getModificationCounter() {
3597 return queryProvider2.modificationCounter;
3601 public void markUndoPoint() {
3602 state.setCombine(false);