1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
31 import org.eclipse.core.runtime.Platform;
32 import org.simantics.databoard.Bindings;
33 import org.simantics.db.AsyncReadGraph;
34 import org.simantics.db.ChangeSet;
35 import org.simantics.db.DevelopmentKeys;
36 import org.simantics.db.ExternalValueSupport;
37 import org.simantics.db.Metadata;
38 import org.simantics.db.MonitorContext;
39 import org.simantics.db.MonitorHandler;
40 import org.simantics.db.Resource;
41 import org.simantics.db.ResourceSerializer;
42 import org.simantics.db.Session;
43 import org.simantics.db.SessionManager;
44 import org.simantics.db.SessionVariables;
45 import org.simantics.db.VirtualGraph;
46 import org.simantics.db.WriteGraph;
47 import org.simantics.db.authentication.UserAuthenticationAgent;
48 import org.simantics.db.authentication.UserAuthenticator;
49 import org.simantics.db.common.Indexing;
50 import org.simantics.db.common.TransactionPolicyRelease;
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
63 import org.simantics.db.common.utils.Logger;
64 import org.simantics.db.event.ChangeEvent;
65 import org.simantics.db.event.ChangeListener;
66 import org.simantics.db.event.SessionEventListener;
67 import org.simantics.db.exception.CancelTransactionException;
68 import org.simantics.db.exception.ClusterSetExistException;
69 import org.simantics.db.exception.DatabaseException;
70 import org.simantics.db.exception.ImmutableException;
71 import org.simantics.db.exception.InvalidResourceReferenceException;
72 import org.simantics.db.exception.ResourceNotFoundException;
73 import org.simantics.db.exception.RuntimeDatabaseException;
74 import org.simantics.db.exception.ServiceException;
75 import org.simantics.db.exception.ServiceNotFoundException;
76 import org.simantics.db.impl.ClusterBase;
77 import org.simantics.db.impl.ClusterI;
78 import org.simantics.db.impl.ClusterTraitsBase;
79 import org.simantics.db.impl.ClusterTranslator;
80 import org.simantics.db.impl.ResourceImpl;
81 import org.simantics.db.impl.TransientGraph;
82 import org.simantics.db.impl.VirtualGraphImpl;
83 import org.simantics.db.impl.graph.DelayedWriteGraph;
84 import org.simantics.db.impl.graph.ReadGraphImpl;
85 import org.simantics.db.impl.graph.WriteGraphImpl;
86 import org.simantics.db.impl.graph.WriteSupport;
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
90 import org.simantics.db.impl.query.QueryProcessor;
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
93 import org.simantics.db.impl.service.QueryDebug;
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
96 import org.simantics.db.procedure.AsyncListener;
97 import org.simantics.db.procedure.AsyncMultiListener;
98 import org.simantics.db.procedure.AsyncMultiProcedure;
99 import org.simantics.db.procedure.AsyncProcedure;
100 import org.simantics.db.procedure.Listener;
101 import org.simantics.db.procedure.ListenerBase;
102 import org.simantics.db.procedure.MultiListener;
103 import org.simantics.db.procedure.MultiProcedure;
104 import org.simantics.db.procedure.Procedure;
105 import org.simantics.db.procedure.SyncListener;
106 import org.simantics.db.procedure.SyncMultiListener;
107 import org.simantics.db.procedure.SyncMultiProcedure;
108 import org.simantics.db.procedure.SyncProcedure;
109 import org.simantics.db.procore.cluster.ClusterImpl;
110 import org.simantics.db.procore.cluster.ClusterTraits;
111 import org.simantics.db.procore.protocol.Constants;
112 import org.simantics.db.procore.protocol.DebugPolicy;
113 import org.simantics.db.request.AsyncMultiRead;
114 import org.simantics.db.request.AsyncRead;
115 import org.simantics.db.request.DelayedWrite;
116 import org.simantics.db.request.DelayedWriteResult;
117 import org.simantics.db.request.ExternalRead;
118 import org.simantics.db.request.MultiRead;
119 import org.simantics.db.request.Read;
120 import org.simantics.db.request.ReadInterface;
121 import org.simantics.db.request.Write;
122 import org.simantics.db.request.WriteInterface;
123 import org.simantics.db.request.WriteOnly;
124 import org.simantics.db.request.WriteOnlyResult;
125 import org.simantics.db.request.WriteResult;
126 import org.simantics.db.request.WriteTraits;
127 import org.simantics.db.service.ByteReader;
128 import org.simantics.db.service.ClusterBuilder;
129 import org.simantics.db.service.ClusterBuilderFactory;
130 import org.simantics.db.service.ClusterControl;
131 import org.simantics.db.service.ClusterSetsSupport;
132 import org.simantics.db.service.ClusterUID;
133 import org.simantics.db.service.ClusteringSupport;
134 import org.simantics.db.service.CollectionSupport;
135 import org.simantics.db.service.DebugSupport;
136 import org.simantics.db.service.DirectQuerySupport;
137 import org.simantics.db.service.GraphChangeListenerSupport;
138 import org.simantics.db.service.InitSupport;
139 import org.simantics.db.service.LifecycleSupport;
140 import org.simantics.db.service.ManagementSupport;
141 import org.simantics.db.service.QueryControl;
142 import org.simantics.db.service.SerialisationSupport;
143 import org.simantics.db.service.ServerInformation;
144 import org.simantics.db.service.ServiceActivityMonitor;
145 import org.simantics.db.service.SessionEventSupport;
146 import org.simantics.db.service.SessionMonitorSupport;
147 import org.simantics.db.service.SessionUserSupport;
148 import org.simantics.db.service.StatementSupport;
149 import org.simantics.db.service.TransactionPolicySupport;
150 import org.simantics.db.service.TransactionSupport;
151 import org.simantics.db.service.TransferableGraphSupport;
152 import org.simantics.db.service.UndoRedoSupport;
153 import org.simantics.db.service.VirtualGraphSupport;
154 import org.simantics.db.service.XSupport;
155 import org.simantics.layer0.Layer0;
156 import org.simantics.utils.DataContainer;
157 import org.simantics.utils.Development;
158 import org.simantics.utils.datastructures.Callback;
159 import org.simantics.utils.threads.logger.ITask;
160 import org.simantics.utils.threads.logger.ThreadLogger;
162 import gnu.trove.procedure.TLongProcedure;
163 import gnu.trove.set.hash.TLongHashSet;
166 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
168 protected static final boolean DEBUG = false;
170 private static final boolean DIAGNOSTICS = false;
172 private TransactionPolicySupport transactionPolicy;
174 final private ClusterControl clusterControl;
176 final protected BuiltinSupportImpl builtinSupport;
177 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
178 final protected ClusterSetsSupport clusterSetsSupport;
179 final protected LifecycleSupportImpl lifecycleSupport;
181 protected QuerySupportImpl querySupport;
182 protected ResourceSupportImpl resourceSupport;
183 protected WriteSupport writeSupport;
184 public ClusterTranslator clusterTranslator;
186 boolean dirtyPrimitives = false;
188 public static final int SERVICE_MODE_CREATE = 2;
189 public static final int SERVICE_MODE_ALLOW = 1;
190 public int serviceMode = 0;
191 public boolean createdImmutableClusters = false;
192 public TLongHashSet createdClusters = new TLongHashSet();
197 * The service locator maintained by the workbench. These services are
198 * initialized during workbench during the <code>init</code> method.
200 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
202 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
204 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
206 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
208 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
210 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
212 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
214 final protected State state = new State();
216 protected GraphSession graphSession = null;
218 protected SessionManager sessionManagerImpl = null;
220 protected UserAuthenticationAgent authAgent = null;
222 protected UserAuthenticator authenticator = null;
224 protected Resource user = null;
226 protected ClusterStream clusterStream = null;
228 protected SessionRequestManager requestManager = null;
230 public ClusterTable clusterTable = null;
232 public QueryProcessor queryProvider2 = null;
234 ClientChangesImpl clientChanges = null;
236 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
238 // protected long newClusterId = Constants.NullClusterId;
240 protected int flushCounter = 0;
242 protected boolean writeOnly = false;
244 WriteState<?> writeState = null;
245 WriteStateBase<?> delayedWriteState = null;
246 protected Resource defaultClusterSet = null; // If not null then used for newResource().
248 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
250 // if (authAgent == null)
251 // throw new IllegalArgumentException("null authentication agent");
253 File t = StaticSessionProperties.virtualGraphStoragePath;
256 this.clusterTable = new ClusterTable(this, t);
257 this.builtinSupport = new BuiltinSupportImpl(this);
258 this.sessionManagerImpl = sessionManagerImpl;
260 // this.authAgent = authAgent;
262 serviceLocator.registerService(Session.class, this);
263 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
264 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
265 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
266 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
267 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
268 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
269 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
270 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
271 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
272 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
273 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
274 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
275 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
276 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
277 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
278 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
279 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
280 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
281 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
282 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
283 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
284 ServiceActivityUpdaterForWriteTransactions.register(this);
286 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
287 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
288 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
289 this.lifecycleSupport = new LifecycleSupportImpl(this);
290 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
291 this.transactionPolicy = new TransactionPolicyRelease();
292 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
293 this.clusterControl = new ClusterControlImpl(this);
294 serviceLocator.registerService(ClusterControl.class, clusterControl);
295 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
296 this.clusterSetsSupport.setReadDirectory(t.toPath());
297 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
298 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
308 public Resource getRootLibrary() {
309 return queryProvider2.getRootLibraryResource();
312 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
313 if (!graphSession.dbSession.refreshEnabled())
316 getClusterTable().refresh(csid, this, clusterUID);
317 } catch (Throwable t) {
318 Logger.defaultLogError("Refesh failed.", t);
322 static final class TaskHelper {
323 private final String name;
324 private Object result;
325 final Semaphore sema = new Semaphore(0);
326 private Throwable throwable = null;
327 final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
329 public void run(DatabaseException e) {
330 synchronized (TaskHelper.this) {
335 final Procedure<Object> proc = new Procedure<Object>() {
337 public void execute(Object result) {
341 public void exception(Throwable t) {
342 if (t instanceof DatabaseException)
343 callback.run((DatabaseException)t);
345 callback.run(new DatabaseException("" + name + "operation failed.", t));
348 final WriteTraits writeTraits = new WriteTraits() {};
349 TaskHelper(String name) {
355 void setResult(Object result) {
356 this.result = result;
358 // Throwable throwableGet() {
361 synchronized void throwableSet(Throwable t) {
364 // void throwableSet(String t) {
365 // throwable = new InternalException("" + name + " operation failed. " + t);
367 synchronized void throwableCheck()
368 throws DatabaseException {
369 if (null != throwable)
370 if (throwable instanceof DatabaseException)
371 throw (DatabaseException)throwable;
373 throw new DatabaseException("Undo operation failed.", throwable);
375 void throw_(String message)
376 throws DatabaseException {
377 throw new DatabaseException("" + name + " operation failed. " + message);
383 private ListenerBase getListenerBase(Object procedure) {
384 if (procedure instanceof ListenerBase)
385 return (ListenerBase) procedure;
390 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
391 scheduleRequest(request, callback, notify, null);
395 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
398 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
400 assert (request != null);
402 if(Development.DEVELOPMENT) {
404 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
405 System.err.println("schedule write '" + request + "'");
406 } catch (Throwable t) {
407 Logger.defaultLogError(t);
411 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
413 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
416 public void run(int thread) {
418 if(Development.DEVELOPMENT) {
420 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
421 System.err.println("perform write '" + request + "'");
422 } catch (Throwable t) {
423 Logger.defaultLogError(t);
427 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
429 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
434 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
436 VirtualGraph vg = getProvider(request.getProvider());
438 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
439 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
442 public void execute(Object result) {
443 if(callback != null) callback.run(null);
447 public void exception(Throwable t) {
448 if(callback != null) callback.run((DatabaseException)t);
453 assert (null != writer);
454 // writer.state.barrier.inc();
457 request.perform(writer);
458 assert (null != writer);
459 } catch (Throwable t) {
460 if (!(t instanceof CancelTransactionException))
461 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
462 writeState.except(t);
464 // writer.state.barrier.dec();
465 // writer.waitAsync(request);
469 assert(!queryProvider2.dirty);
471 } catch (Throwable e) {
473 // Log it first, just to be safe that the error is always logged.
474 if (!(e instanceof CancelTransactionException))
475 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
477 // writeState.getGraph().state.barrier.dec();
478 // writeState.getGraph().waitAsync(request);
480 writeState.except(e);
483 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
484 // // All we can do here is to log those, can't really pass them anywhere.
485 // if (callback != null) {
486 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
487 // else callback.run(new DatabaseException(e));
489 // } catch (Throwable e2) {
490 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
492 // boolean empty = clusterStream.reallyFlush();
493 // int callerThread = -1;
494 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
495 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
497 // // Send all local changes to server so it can calculate correct reverse change set.
498 // // This will call clusterStream.accept().
499 // state.cancelCommit(context, clusterStream);
501 // if (!context.isOk()) // this is a blocking operation
502 // throw new InternalException("Cancel failed. This should never happen. Contact application support.");
503 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
505 // state.cancelCommit2(context, clusterStream);
506 // } catch (DatabaseException e3) {
507 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
509 // clusterStream.setOff(false);
510 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
515 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
521 if(Development.DEVELOPMENT) {
523 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
524 System.err.println("finish write '" + request + "'");
525 } catch (Throwable t) {
526 Logger.defaultLogError(t);
536 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
537 scheduleRequest(request, procedure, notify, null);
541 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
544 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
546 assert (request != null);
548 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
550 requestManager.scheduleWrite(new SessionTask(request, thread) {
553 public void run(int thread) {
555 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
557 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
560 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
562 VirtualGraph vg = getProvider(request.getProvider());
563 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
566 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
567 writeState = writeStateT;
569 assert (null != writer);
570 // writer.state.barrier.inc();
571 writeStateT.setResult(request.perform(writer));
572 assert (null != writer);
574 // writer.state.barrier.dec();
575 // writer.waitAsync(null);
577 } catch (Throwable e) {
579 // writer.state.barrier.dec();
580 // writer.waitAsync(null);
582 writeState.except(e);
584 // state.stopWriteTransaction(clusterStream);
586 // } catch (Throwable e) {
587 // // Log it first, just to be safe that the error is always logged.
588 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
591 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
592 // // All we can do here is to log those, can't really pass them anywhere.
593 // if (procedure != null) {
594 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
595 // else procedure.exception(new DatabaseException(e));
597 // } catch (Throwable e2) {
598 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
601 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
603 // state.stopWriteTransaction(clusterStream);
606 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
609 // if(notify != null) notify.release();
619 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
620 scheduleRequest(request, callback, notify, null);
624 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
627 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
629 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
631 assert (request != null);
633 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
635 requestManager.scheduleWrite(new SessionTask(request, thread) {
638 public void run(int thread) {
639 fireSessionVariableChange(SessionVariables.QUEUED_READS);
641 Procedure<Object> stateProcedure = new Procedure<Object>() {
643 public void execute(Object result) {
644 if (callback != null)
648 public void exception(Throwable t) {
649 if (callback != null) {
650 if (t instanceof DatabaseException) callback.run((DatabaseException) t);
651 else callback.run(new DatabaseException(t));
653 Logger.defaultLogError("Unhandled exception", t);
657 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
658 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
659 DelayedWriteGraph dwg = null;
660 // newGraph.state.barrier.inc();
663 dwg = new DelayedWriteGraph(newGraph);
664 request.perform(dwg);
665 } catch (Throwable e) {
666 delayedWriteState.except(e);
671 // newGraph.state.barrier.dec();
672 // newGraph.waitAsync(request);
673 fireSessionVariableChange(SessionVariables.QUEUED_READS);
676 delayedWriteState = null;
678 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
679 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
682 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
686 VirtualGraph vg = getProvider(request.getProvider());
687 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
689 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
691 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
693 assert (null != writer);
694 // writer.state.barrier.inc();
698 dwg.commit(writer, request);
700 if(defaultClusterSet != null) {
701 XSupport xs = getService(XSupport.class);
702 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
703 ClusteringSupport cs = getService(ClusteringSupport.class);
704 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
707 // This makes clusters available from server
708 clusterStream.reallyFlush();
710 releaseWriteOnly(writer);
711 handleUpdatesAndMetadata(writer);
713 } catch (ServiceException e) {
714 // writer.state.barrier.dec();
715 // writer.waitAsync(null);
717 // These shall be requested from server
718 clusterTable.removeWriteOnlyClusters();
719 // This makes clusters available from server
720 clusterStream.reallyFlush();
722 releaseWriteOnly(writer);
723 writeState.except(e);
725 // Debugging & Profiling
726 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
736 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
737 scheduleRequest(request, procedure, notify, null);
741 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
744 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
745 throw new Error("Not implemented");
748 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
749 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
750 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
751 createdClusters.add(cluster.clusterId);
756 class WriteOnlySupport implements WriteSupport {
758 ClusterStream stream;
759 ClusterImpl currentCluster;
761 public WriteOnlySupport() {
762 this.stream = clusterStream;
766 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
767 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
771 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
773 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
775 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
776 if(s != queryProvider2.getRootLibrary())
777 throw new ImmutableException("Trying to modify immutable resource key=" + s);
780 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
781 } catch (DatabaseException e) {
782 Logger.defaultLogError(e);
786 clientChanges.invalidate(s);
788 if (cluster.isWriteOnly())
790 queryProvider2.updateStatements(s, p);
795 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
796 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
800 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
802 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
804 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
805 } catch (DatabaseException e) {
806 Logger.defaultLogError(e);
809 clientChanges.invalidate(rid);
811 if (cluster.isWriteOnly())
813 queryProvider2.updateValue(rid);
818 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
821 claimValue(provider,resource, reader.readBytes(null, amount));
825 byte[] bytes = new byte[65536];
827 int rid = ((ResourceImpl)resource).id;
828 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
832 int block = Math.min(left, 65536);
833 reader.readBytes(bytes, block);
834 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
837 } catch (DatabaseException e) {
838 Logger.defaultLogError(e);
841 clientChanges.invalidate(rid);
843 if (cluster.isWriteOnly())
845 queryProvider2.updateValue(rid);
849 private void maintainCluster(ClusterImpl before, ClusterI after_) {
850 if(after_ != null && after_ != before) {
851 ClusterImpl after = (ClusterImpl)after_;
852 if(currentCluster == before) {
853 currentCluster = after;
855 clusterTable.replaceCluster(after);
859 public int createResourceKey(int foreignCounter) throws DatabaseException {
860 if(currentCluster == null) {
861 currentCluster = getNewResourceCluster();
863 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
864 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
865 newCluster.foreignLookup = new byte[foreignCounter];
866 currentCluster = newCluster;
868 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
870 return currentCluster.createResource(clusterTranslator);
874 public Resource createResource(VirtualGraph provider) throws DatabaseException {
875 if(currentCluster == null) {
876 if (null != defaultClusterSet) {
877 ResourceImpl result = getNewResource(defaultClusterSet);
878 currentCluster = clusterTable.getClusterByResourceKey(result.id);
881 currentCluster = getNewResourceCluster();
884 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
885 if (null != defaultClusterSet) {
886 ResourceImpl result = getNewResource(defaultClusterSet);
887 currentCluster = clusterTable.getClusterByResourceKey(result.id);
890 currentCluster = getNewResourceCluster();
893 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
897 public Resource createResource(VirtualGraph provider, long clusterId)
898 throws DatabaseException {
899 return getNewResource(clusterId);
903 public Resource createResource(VirtualGraph provider, Resource clusterSet)
904 throws DatabaseException {
905 return getNewResource(clusterSet);
909 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
910 throws DatabaseException {
911 getNewClusterSet(clusterSet);
915 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
916 throws ServiceException {
917 return containsClusterSet(clusterSet);
920 public void selectCluster(long cluster) {
921 currentCluster = clusterTable.getClusterByClusterId(cluster);
922 long setResourceId = clusterSetsSupport.getSet(cluster);
923 clusterSetsSupport.put(setResourceId, cluster);
927 public Resource setDefaultClusterSet(Resource clusterSet)
928 throws ServiceException {
929 Resource result = setDefaultClusterSet4NewResource(clusterSet);
930 if(clusterSet != null) {
931 long id = clusterSetsSupport.get(clusterSet.getResourceId());
932 currentCluster = clusterTable.getClusterByClusterId(id);
935 currentCluster = null;
941 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
943 provider = getProvider(provider);
944 if (null == provider) {
945 int key = ((ResourceImpl)resource).id;
949 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
950 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
951 // if(key != queryProvider2.getRootLibrary())
952 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
955 // cluster.removeValue(key, clusterTranslator);
956 // } catch (DatabaseException e) {
957 // Logger.defaultLogError(e);
961 clusterTable.writeOnlyInvalidate(cluster);
964 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
965 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
966 clusterTranslator.removeValue(cluster);
967 } catch (DatabaseException e) {
968 Logger.defaultLogError(e);
971 queryProvider2.invalidateResource(key);
972 clientChanges.invalidate(key);
975 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
976 queryProvider2.updateValue(querySupport.getId(resource));
977 clientChanges.claimValue(resource);
984 public void flush(boolean intermediate) {
985 throw new UnsupportedOperationException();
989 public void flushCluster() {
990 clusterTable.flushCluster(graphSession);
991 if(defaultClusterSet != null) {
992 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
994 currentCluster = null;
998 public void flushCluster(Resource r) {
999 throw new UnsupportedOperationException("flushCluster resource " + r);
1007 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1010 int s = ((ResourceImpl)subject).id;
1011 int p = ((ResourceImpl)predicate).id;
1012 int o = ((ResourceImpl)object).id;
1014 provider = getProvider(provider);
1015 if (null == provider) {
1017 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1018 clusterTable.writeOnlyInvalidate(cluster);
1022 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1023 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1024 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1026 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1027 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1029 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1030 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1031 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1032 clusterTranslator.removeStatement(cluster);
1034 queryProvider2.invalidateResource(s);
1035 clientChanges.invalidate(s);
1037 } catch (DatabaseException e) {
1039 Logger.defaultLogError(e);
1047 ((VirtualGraphImpl)provider).deny(s, p, o);
1048 queryProvider2.invalidateResource(s);
1049 clientChanges.invalidate(s);
1058 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1059 throw new UnsupportedOperationException();
1063 public boolean writeOnly() {
1068 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1069 writeSupport.performWriteRequest(graph, request);
1073 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1074 throw new UnsupportedOperationException();
1078 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1079 throw new UnsupportedOperationException();
1083 public <T> void addMetadata(Metadata data) throws ServiceException {
1084 writeSupport.addMetadata(data);
1088 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1089 return writeSupport.getMetadata(clazz);
1093 public TreeMap<String, byte[]> getMetadata() {
1094 return writeSupport.getMetadata();
1098 public void commitDone(WriteTraits writeTraits, long csid) {
1099 writeSupport.commitDone(writeTraits, csid);
1103 public void clearUndoList(WriteTraits writeTraits) {
1104 writeSupport.clearUndoList(writeTraits);
1107 public int clearMetadata() {
1108 return writeSupport.clearMetadata();
1112 public void startUndo() {
1113 writeSupport.startUndo();
1117 class VirtualWriteOnlySupport implements WriteSupport {
1120 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1121 // Resource object) {
1122 // throw new UnsupportedOperationException();
1126 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1128 TransientGraph impl = (TransientGraph)provider;
1129 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1130 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1131 clientChanges.claim(subject, predicate, object);
1136 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1138 TransientGraph impl = (TransientGraph)provider;
1139 impl.claim(subject, predicate, object);
1140 getQueryProvider2().updateStatements(subject, predicate);
1141 clientChanges.claim(subject, predicate, object);
1146 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1147 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1151 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1152 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1153 getQueryProvider2().updateValue(resource);
1154 clientChanges.claimValue(resource);
1158 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1159 byte[] value = reader.readBytes(null, amount);
1160 claimValue(provider, resource, value);
1164 public Resource createResource(VirtualGraph provider) {
1165 TransientGraph impl = (TransientGraph)provider;
1166 return impl.getResource(impl.newResource(false));
1170 public Resource createResource(VirtualGraph provider, long clusterId) {
1171 throw new UnsupportedOperationException();
1175 public Resource createResource(VirtualGraph provider, Resource clusterSet)
1176 throws DatabaseException {
1177 throw new UnsupportedOperationException();
1181 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1182 throws DatabaseException {
1183 throw new UnsupportedOperationException();
1187 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1188 throws ServiceException {
1189 throw new UnsupportedOperationException();
1193 public Resource setDefaultClusterSet(Resource clusterSet)
1194 throws ServiceException {
1199 public void denyValue(VirtualGraph provider, Resource resource) {
1200 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1201 getQueryProvider2().updateValue(querySupport.getId(resource));
1202 // NOTE: this only keeps track of value changes by-resource.
1203 clientChanges.claimValue(resource);
1207 public void flush(boolean intermediate) {
1208 throw new UnsupportedOperationException();
1212 public void flushCluster() {
1213 throw new UnsupportedOperationException();
1217 public void flushCluster(Resource r) {
1218 throw new UnsupportedOperationException("Resource " + r);
1226 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1227 TransientGraph impl = (TransientGraph) provider;
1228 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1229 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1230 clientChanges.deny(subject, predicate, object);
1235 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1236 throw new UnsupportedOperationException();
1240 public boolean writeOnly() {
1245 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1246 throw new UnsupportedOperationException();
1250 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1251 throw new UnsupportedOperationException();
1255 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1256 throw new UnsupportedOperationException();
1260 public <T> void addMetadata(Metadata data) throws ServiceException {
1261 throw new UnsupportedOperationException();
1265 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1266 throw new UnsupportedOperationException();
1270 public TreeMap<String, byte[]> getMetadata() {
1271 throw new UnsupportedOperationException();
1275 public void commitDone(WriteTraits writeTraits, long csid) {
1279 public void clearUndoList(WriteTraits writeTraits) {
1283 public int clearMetadata() {
1288 public void startUndo() {
1292 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1296 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1298 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1301 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1305 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1307 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1309 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1310 writeState = writeStateT;
1312 assert (null != writer);
1313 // writer.state.barrier.inc();
1314 long start = System.nanoTime();
1315 T result = request.perform(writer);
1316 long duration = System.nanoTime() - start;
1318 System.err.println("################");
1319 System.err.println("WriteOnly duration " + 1e-9*duration);
1321 writeStateT.setResult(result);
1323 // This makes clusters available from server
1324 clusterStream.reallyFlush();
1326 // This will trigger query updates
1327 releaseWriteOnly(writer);
1328 assert (null != writer);
1330 handleUpdatesAndMetadata(writer);
1332 } catch (CancelTransactionException e) {
1334 releaseWriteOnly(writeState.getGraph());
1336 clusterTable.removeWriteOnlyClusters();
1337 state.stopWriteTransaction(clusterStream);
1339 } catch (Throwable e) {
1341 e.printStackTrace();
1343 releaseWriteOnly(writeState.getGraph());
1345 clusterTable.removeWriteOnlyClusters();
1347 if (callback != null)
1348 callback.exception(new DatabaseException(e));
1350 state.stopWriteTransaction(clusterStream);
1351 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1355 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1361 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
1362 scheduleRequest(request, callback, notify, null);
1366 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1370 assert (request != null);
1372 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1374 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1377 public void run(int thread) {
1379 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1383 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1386 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1390 VirtualGraph vg = getProvider(request.getProvider());
1391 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1393 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1395 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1398 public void execute(Object result) {
1399 if(callback != null) callback.run(null);
1403 public void exception(Throwable t) {
1404 if(callback != null) callback.run((DatabaseException)t);
1409 assert (null != writer);
1410 // writer.state.barrier.inc();
1414 request.perform(writer);
1416 } catch (Throwable e) {
1418 // writer.state.barrier.dec();
1419 // writer.waitAsync(null);
1421 releaseWriteOnly(writer);
1423 clusterTable.removeWriteOnlyClusters();
1425 if(!(e instanceof CancelTransactionException)) {
1426 if (callback != null)
1427 callback.run(new DatabaseException(e));
1430 writeState.except(e);
1436 // This makes clusters available from server
1437 boolean empty = clusterStream.reallyFlush();
1438 // This was needed to make WO requests call metadata listeners.
1439 // NOTE: the calling event does not contain clientChanges information.
1440 if (!empty && clientChanges.isEmpty())
1441 clientChanges.setNotEmpty(true);
1442 releaseWriteOnly(writer);
1443 assert (null != writer);
1446 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1459 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1460 scheduleRequest(request, callback, notify, null);
1464 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1467 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1469 assert (request != null);
1471 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1473 requestManager.scheduleWrite(new SessionTask(request, thread) {
1476 public void run(int thread) {
1478 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1480 performWriteOnly(request, notify, callback);
1490 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1492 assert (request != null);
1493 assert (procedure != null);
1495 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1497 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1500 public void run(int thread) {
1502 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1504 ListenerBase listener = getListenerBase(procedure);
1506 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1510 if (listener != null) {
1513 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1516 public void exception(AsyncReadGraph graph, Throwable t) {
1517 procedure.exception(graph, t);
1518 if(throwable != null) {
1521 // ErrorLogger.defaultLogError("Unhandled exception", t);
1526 public void execute(AsyncReadGraph graph, T t) {
1527 if(result != null) result.set(t);
1528 procedure.execute(graph, t);
1532 } catch (Throwable t) {
1533 // This is handled by the AsyncProcedure
1534 //Logger.defaultLogError("Internal error", t);
1541 // newGraph.state.barrier.inc();
1543 T t = request.perform(newGraph);
1547 if(result != null) result.set(t);
1548 procedure.execute(newGraph, t);
1550 } catch (Throwable th) {
1552 if(throwable != null) {
1555 Logger.defaultLogError("Unhandled exception", th);
1560 } catch (Throwable t) {
1563 t.printStackTrace();
1565 if(throwable != null) {
1568 Logger.defaultLogError("Unhandled exception", t);
1573 procedure.exception(newGraph, t);
1575 } catch (Throwable t2) {
1577 if(throwable != null) {
1580 Logger.defaultLogError("Unhandled exception", t2);
1587 // newGraph.state.barrier.dec();
1588 // newGraph.waitAsync(request);
1594 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1604 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1606 assert (request != null);
1607 assert (procedure != null);
1609 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1611 requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1614 public void run(int thread) {
1616 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1618 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1622 if (listener != null) {
1624 newGraph.processor.query(newGraph, request, null, procedure, listener);
1626 // newGraph.waitAsync(request);
1630 final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1631 procedure, "request");
1635 // newGraph.state.barrier.inc();
1637 request.perform(newGraph, wrapper);
1639 // newGraph.waitAsync(request);
1641 } catch (Throwable t) {
1643 wrapper.exception(newGraph, t);
1644 // newGraph.waitAsync(request);
1653 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1663 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1665 assert (request != null);
1666 assert (procedure != null);
1668 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1670 int sync = notify != null ? thread : -1;
1672 requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1675 public void run(int thread) {
1677 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1679 ListenerBase listener = getListenerBase(procedure);
1681 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1685 if (listener != null) {
1687 newGraph.processor.query(newGraph, request, null, procedure, listener);
1689 // newGraph.waitAsync(request);
1693 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1697 request.perform(newGraph, wrapper);
1699 } catch (Throwable t) {
1701 t.printStackTrace();
1709 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1719 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1721 assert (request != null);
1722 assert (procedure != null);
1724 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1726 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1729 public void run(int thread) {
1731 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1733 ListenerBase listener = getListenerBase(procedure);
1735 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1739 if (listener != null) {
1741 newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1744 public void exception(Throwable t) {
1745 procedure.exception(t);
1746 if(throwable != null) {
1752 public void execute(T t) {
1753 if(result != null) result.set(t);
1754 procedure.execute(t);
1759 // newGraph.waitAsync(request);
1763 // newGraph.state.barrier.inc();
1765 request.register(newGraph, new Listener<T>() {
1768 public void exception(Throwable t) {
1769 if(throwable != null) throwable.set(t);
1770 procedure.exception(t);
1771 // newGraph.state.barrier.dec();
1775 public void execute(T t) {
1776 if(result != null) result.set(t);
1777 procedure.execute(t);
1778 // newGraph.state.barrier.dec();
1782 public boolean isDisposed() {
1788 // newGraph.waitAsync(request);
1794 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1806 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1808 scheduleRequest(request, procedure, null, null, null);
1813 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1815 Semaphore notify = new Semaphore(0);
1816 DataContainer<Throwable> container = new DataContainer<Throwable>();
1817 DataContainer<T> result = new DataContainer<T>();
1818 scheduleRequest(request, procedure, notify, container, result);
1819 acquire(notify, request);
1820 Throwable throwable = container.get();
1821 if(throwable != null) {
1822 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1823 else throw new DatabaseException("Unexpected exception", throwable);
1825 return result.get();
1829 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1831 Semaphore notify = new Semaphore(0);
1832 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1833 final DataContainer<T> resultContainer = new DataContainer<T>();
1834 scheduleRequest(request, new AsyncProcedure<T>() {
1836 public void exception(AsyncReadGraph graph, Throwable throwable) {
1837 exceptionContainer.set(throwable);
1838 procedure.exception(graph, throwable);
1841 public void execute(AsyncReadGraph graph, T result) {
1842 resultContainer.set(result);
1843 procedure.execute(graph, result);
1845 }, getListenerBase(procedure), notify);
1846 acquire(notify, request, procedure);
1847 Throwable throwable = exceptionContainer.get();
1848 if (throwable != null) {
1849 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1850 else throw new DatabaseException("Unexpected exception", throwable);
1852 return resultContainer.get();
1857 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1859 Semaphore notify = new Semaphore(0);
1860 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1861 scheduleRequest(request, new AsyncMultiProcedure<T>() {
1863 public void exception(AsyncReadGraph graph, Throwable throwable) {
1864 exceptionContainer.set(throwable);
1865 procedure.exception(graph, throwable);
1868 public void execute(AsyncReadGraph graph, T result) {
1869 procedure.execute(graph, result);
1872 public void finished(AsyncReadGraph graph) {
1873 procedure.finished(graph);
1876 acquire(notify, request, procedure);
1877 Throwable throwable = exceptionContainer.get();
1878 if (throwable != null) {
1879 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1880 else throw new DatabaseException("Unexpected exception", throwable);
1882 // TODO: implement return value
1883 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1888 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1889 return syncRequest(request, new ProcedureAdapter<T>());
1892 // assert(request != null);
1894 // final DataContainer<T> result = new DataContainer<T>();
1895 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1897 // syncRequest(request, new Procedure<T>() {
1900 // public void execute(T t) {
1905 // public void exception(Throwable t) {
1906 // exception.set(t);
1911 // Throwable t = exception.get();
1913 // if(t instanceof DatabaseException) throw (DatabaseException)t;
1914 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1917 // return result.get();
1922 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1923 return syncRequest(request, (Procedure<T>)procedure);
1928 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1929 // assertNotSession();
1930 // Semaphore notify = new Semaphore(0);
1931 // DataContainer<Throwable> container = new DataContainer<Throwable>();
1932 // DataContainer<T> result = new DataContainer<T>();
1933 // scheduleRequest(request, procedure, notify, container, result);
1934 // acquire(notify, request);
1935 // Throwable throwable = container.get();
1936 // if(throwable != null) {
1937 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1938 // else throw new DatabaseException("Unexpected exception", throwable);
1940 // return result.get();
1945 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1947 Semaphore notify = new Semaphore(0);
1948 final DataContainer<Throwable> container = new DataContainer<Throwable>();
1949 final DataContainer<T> result = new DataContainer<T>();
1950 scheduleRequest(request, procedure, notify, container, result);
1951 acquire(notify, request);
1952 Throwable throwable = container.get();
1953 if (throwable != null) {
1954 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1955 else throw new DatabaseException("Unexpected exception", throwable);
1957 return result.get();
1961 public void syncRequest(Write request) throws DatabaseException {
1964 Semaphore notify = new Semaphore(0);
1965 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1966 scheduleRequest(request, new Callback<DatabaseException>() {
1969 public void run(DatabaseException e) {
1974 acquire(notify, request);
1975 if(exception.get() != null) throw exception.get();
1979 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1981 Semaphore notify = new Semaphore(0);
1982 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1983 final DataContainer<T> result = new DataContainer<T>();
1984 scheduleRequest(request, new Procedure<T>() {
1987 public void exception(Throwable t) {
1992 public void execute(T t) {
1997 acquire(notify, request);
1998 if(exception.get() != null) {
1999 Throwable t = exception.get();
2000 if(t instanceof DatabaseException) throw (DatabaseException)t;
2001 else throw new DatabaseException(t);
2003 return result.get();
2007 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
2009 Semaphore notify = new Semaphore(0);
2010 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2011 final DataContainer<T> result = new DataContainer<T>();
2012 scheduleRequest(request, new Procedure<T>() {
2015 public void exception(Throwable t) {
2020 public void execute(T t) {
2025 acquire(notify, request);
2026 if(exception.get() != null) {
2027 Throwable t = exception.get();
2028 if(t instanceof DatabaseException) throw (DatabaseException)t;
2029 else throw new DatabaseException(t);
2031 return result.get();
2035 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
2037 Semaphore notify = new Semaphore(0);
2038 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2039 final DataContainer<T> result = new DataContainer<T>();
2040 scheduleRequest(request, new Procedure<T>() {
2043 public void exception(Throwable t) {
2048 public void execute(T t) {
2053 acquire(notify, request);
2054 if(exception.get() != null) {
2055 Throwable t = exception.get();
2056 if(t instanceof DatabaseException) throw (DatabaseException)t;
2057 else throw new DatabaseException(t);
2059 return result.get();
2063 public void syncRequest(DelayedWrite request) throws DatabaseException {
2065 Semaphore notify = new Semaphore(0);
2066 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2067 scheduleRequest(request, new Callback<DatabaseException>() {
2069 public void run(DatabaseException e) {
2073 acquire(notify, request);
2074 if(exception.get() != null) throw exception.get();
2078 public void syncRequest(WriteOnly request) throws DatabaseException {
2081 Semaphore notify = new Semaphore(0);
2082 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2083 scheduleRequest(request, new Callback<DatabaseException>() {
2085 public void run(DatabaseException e) {
2089 acquire(notify, request);
2090 if(exception.get() != null) throw exception.get();
2095 * GraphRequestProcessor interface
2099 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2101 scheduleRequest(request, procedure, null, null);
2106 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2108 scheduleRequest(request, procedure, null);
2113 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2115 scheduleRequest(request, procedure, null, null, null);
2120 public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
2122 scheduleRequest(request, callback, null);
2127 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2129 scheduleRequest(request, procedure, null);
2134 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2136 scheduleRequest(request, procedure, null);
2141 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2143 scheduleRequest(request, procedure, null);
2148 public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
2150 scheduleRequest(request, callback, null);
2155 public void asyncRequest(final Write r) {
2156 asyncRequest(r, null);
2160 public void asyncRequest(final DelayedWrite r) {
2161 asyncRequest(r, null);
2165 public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
2167 scheduleRequest(request, callback, null);
2172 public void asyncRequest(final WriteOnly request) {
2174 asyncRequest(request, null);
2179 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2180 r.request(this, procedure);
2184 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2185 r.request(this, procedure);
2189 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2190 r.request(this, procedure);
2194 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2195 r.request(this, procedure);
2199 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2200 r.request(this, procedure);
2204 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2205 r.request(this, procedure);
2209 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2210 return r.request(this);
2214 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2215 return r.request(this);
2219 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2220 r.request(this, procedure);
2224 public <T> void async(WriteInterface<T> r) {
2225 r.request(this, new ProcedureAdapter<T>());
2229 public void incAsync() {
2234 public void decAsync() {
2238 public long getCluster(ResourceImpl resource) {
2239 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2240 return cluster.getClusterId();
2243 public long getCluster(int id) {
2244 if (clusterTable == null)
2245 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2246 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2249 public ResourceImpl getResource(int id) {
2250 return new ResourceImpl(resourceSupport, id);
2253 public ResourceImpl getResource(int resourceIndex, long clusterId) {
2254 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2255 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2256 int key = proxy.getClusterKey();
2257 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2258 return new ResourceImpl(resourceSupport, resourceKey);
2261 public ResourceImpl getResource2(int id) {
2263 return new ResourceImpl(resourceSupport, id);
2266 final public int getId(ResourceImpl impl) {
2270 public static final Charset UTF8 = Charset.forName("utf-8");
2275 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2276 for(TransientGraph g : support.providers) {
2277 if(g.isPending(subject)) return false;
2282 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2283 for(TransientGraph g : support.providers) {
2284 if(g.isPending(subject, predicate)) return false;
2289 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
2291 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2293 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2296 public void run(ReadGraphImpl graph) {
2297 if(ready.decrementAndGet() == 0) {
2298 runnable.run(graph);
2304 for(TransientGraph g : support.providers) {
2305 if(g.isPending(subject)) {
2307 g.load(graph, subject, composite);
2308 } catch (DatabaseException e) {
2309 e.printStackTrace();
2312 composite.run(graph);
2316 composite.run(graph);
2320 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
2322 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
2324 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2327 public void run(ReadGraphImpl graph) {
2328 if(ready.decrementAndGet() == 0) {
2329 runnable.run(graph);
2335 for(TransientGraph g : support.providers) {
2336 if(g.isPending(subject, predicate)) {
2338 g.load(graph, subject, predicate, composite);
2339 } catch (DatabaseException e) {
2340 e.printStackTrace();
2343 composite.run(graph);
2347 composite.run(graph);
2351 // void dumpHeap() {
2354 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2355 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2356 // "d:/heap" + flushCounter + ".txt", true);
2357 // } catch (IOException e) {
2358 // e.printStackTrace();
2363 void fireReactionsToSynchronize(ChangeSet cs) {
2365 // Do not fire empty events
2369 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2370 // g.state.barrier.inc();
2374 if (!cs.isEmpty()) {
2375 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2376 for (ChangeListener l : changeListeners2) {
2379 } catch (Exception ex) {
2380 ex.printStackTrace();
2387 // g.state.barrier.dec();
2388 // g.waitAsync(null);
2394 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2397 // Do not fire empty events
2402 // graph.state.barrier.inc();
2406 if (!cs2.isEmpty()) {
2408 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2409 for (ChangeListener l : changeListeners2) {
2412 // System.out.println("changelistener " + l);
2413 } catch (Exception ex) {
2414 ex.printStackTrace();
2422 // graph.state.barrier.dec();
2423 // graph.waitAsync(null);
2426 } catch (Throwable t) {
2427 t.printStackTrace();
2431 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2435 // Do not fire empty events
2439 // Do not fire on virtual requests
2440 if(graph.getProvider() != null)
2443 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2447 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2448 for (ChangeListener l : metadataListeners) {
2451 } catch (Throwable ex) {
2452 ex.printStackTrace();
2460 } catch (Throwable t) {
2461 t.printStackTrace();
2470 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2473 public <T> T getService(Class<T> api) {
2474 T t = peekService(api);
2476 if (state.isClosed())
2477 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2478 throw new ServiceNotFoundException(this, api);
2486 protected abstract ServerInformation getCachedServerInformation();
2488 private Class<?> serviceKey1 = null;
2489 private Class<?> serviceKey2 = null;
2490 private Object service1 = null;
2491 private Object service2 = null;
2497 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2499 @SuppressWarnings("unchecked")
2501 public synchronized <T> T peekService(Class<T> api) {
2503 if(serviceKey1 == api) {
2505 } else if (serviceKey2 == api) {
2507 Object result = service2;
2508 service2 = service1;
2509 serviceKey2 = serviceKey1;
2515 if (Layer0.class == api)
2517 if (ServerInformation.class == api)
2518 return (T) getCachedServerInformation();
2519 else if (WriteGraphImpl.class == api)
2520 return (T) writeState.getGraph();
2521 else if (ClusterBuilder.class == api)
2522 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2523 else if (ClusterBuilderFactory.class == api)
2524 return (T)new ClusterBuilderFactoryImpl(this);
2526 service2 = service1;
2527 serviceKey2 = serviceKey1;
2529 service1 = serviceLocator.peekService(api);
2540 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2543 public boolean hasService(Class<?> api) {
2544 return serviceLocator.hasService(api);
2548 * @param api the api that must be implemented by the specified service
2549 * @param service the service implementation
2552 public <T> void registerService(Class<T> api, T service) {
2553 if(Layer0.class == api) {
2554 L0 = (Layer0)service;
2557 serviceLocator.registerService(api, service);
2558 if (TransactionPolicySupport.class == api) {
2559 transactionPolicy = (TransactionPolicySupport)service;
2560 state.resetTransactionPolicy();
2562 if (api == serviceKey1)
2564 else if (api == serviceKey2)
2572 void fireSessionVariableChange(String variable) {
2573 for (MonitorHandler h : monitorHandlers) {
2574 MonitorContext ctx = monitorContexts.getLeft(h);
2576 // SafeRunner functionality repeated here to avoid dependency.
2578 h.valuesChanged(ctx);
2579 } catch (Exception e) {
2580 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2581 } catch (LinkageError e) {
2582 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2588 class ResourceSerializerImpl implements ResourceSerializer {
2590 public long createRandomAccessId(int id) throws DatabaseException {
2593 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2594 long cluster = getCluster(id);
2596 return 0; // Better to return 0 then invalid id.
2597 long result = ClusterTraitsBase.createResourceId(cluster, index);
2602 public long getRandomAccessId(Resource resource) throws DatabaseException {
2604 ResourceImpl resourceImpl = (ResourceImpl) resource;
2605 return createRandomAccessId(resourceImpl.id);
2610 public int getTransientId(Resource resource) throws DatabaseException {
2612 ResourceImpl resourceImpl = (ResourceImpl) resource;
2613 return resourceImpl.id;
2618 public String createRandomAccessId(Resource resource)
2619 throws InvalidResourceReferenceException {
2621 if(resource == null) throw new IllegalArgumentException();
2623 ResourceImpl resourceImpl = (ResourceImpl) resource;
2625 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2629 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2630 } catch (DatabaseException e1) {
2631 throw new InvalidResourceReferenceException(e1);
2634 // Serialize as '<resource index>_<cluster id>'
2635 return "" + r + "_" + getCluster(resourceImpl);
2636 } catch (Throwable e) {
2637 e.printStackTrace();
2638 throw new InvalidResourceReferenceException(e);
2643 public int getTransientId(long serialized) throws DatabaseException {
2644 if (serialized <= 0)
2645 return (int)serialized;
2646 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2647 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2648 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2649 if (cluster == null)
2650 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2651 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2656 public Resource getResource(long randomAccessId) throws DatabaseException {
2657 return getResourceByKey(getTransientId(randomAccessId));
2661 public Resource getResource(int transientId) throws DatabaseException {
2662 return getResourceByKey(transientId);
2666 public Resource getResource(String randomAccessId)
2667 throws InvalidResourceReferenceException {
2669 int i = randomAccessId.indexOf('_');
2671 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2672 + randomAccessId + "'");
2673 int r = Integer.parseInt(randomAccessId.substring(0, i));
2674 if(r < 0) return getResourceByKey(r);
2675 long c = Long.parseLong(randomAccessId.substring(i + 1));
2676 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2677 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2678 if (cluster.hasResource(key, clusterTranslator))
2679 return getResourceByKey(key);
2680 } catch (InvalidResourceReferenceException e) {
2682 } catch (NumberFormatException e) {
2683 throw new InvalidResourceReferenceException(e);
2684 } catch (Throwable e) {
2685 e.printStackTrace();
2686 throw new InvalidResourceReferenceException(e);
2689 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2693 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2696 } catch (Throwable e) {
2697 e.printStackTrace();
2698 throw new InvalidResourceReferenceException(e);
2704 // Copied from old SessionImpl
2707 if (state.isClosed())
2708 throw new Error("Session closed.");
2713 public ResourceImpl getNewResource(long clusterId) {
2714 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2717 newId = cluster.createResource(clusterTranslator);
2718 } catch (DatabaseException e) {
2719 Logger.defaultLogError(e);
2722 return new ResourceImpl(resourceSupport, newId);
2725 public ResourceImpl getNewResource(Resource clusterSet)
2726 throws DatabaseException {
2727 long resourceId = clusterSet.getResourceId();
2728 Long clusterId = clusterSetsSupport.get(resourceId);
2729 if (null == clusterId)
2730 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2731 if (Constants.NewClusterId == clusterId) {
2732 clusterId = getService(ClusteringSupport.class).createCluster();
2733 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2734 createdClusters.add(clusterId);
2735 clusterSetsSupport.put(resourceId, clusterId);
2736 return getNewResource(clusterId);
2738 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2739 ResourceImpl result;
2740 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2741 clusterId = getService(ClusteringSupport.class).createCluster();
2742 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2743 createdClusters.add(clusterId);
2744 clusterSetsSupport.put(resourceId, clusterId);
2745 return getNewResource(clusterId);
2747 result = getNewResource(clusterId);
2748 int resultKey = querySupport.getId(result);
2749 long resultCluster = querySupport.getClusterId(resultKey);
2750 if (clusterId != resultCluster)
2751 clusterSetsSupport.put(resourceId, resultCluster);
2757 public void getNewClusterSet(Resource clusterSet)
2758 throws DatabaseException {
2760 System.out.println("new cluster set=" + clusterSet);
2761 long resourceId = clusterSet.getResourceId();
2762 if (clusterSetsSupport.containsKey(resourceId))
2763 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2764 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2766 public boolean containsClusterSet(Resource clusterSet)
2767 throws ServiceException {
2768 long resourceId = clusterSet.getResourceId();
2769 return clusterSetsSupport.containsKey(resourceId);
2771 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2772 Resource r = defaultClusterSet;
2773 defaultClusterSet = clusterSet;
2776 void printDiagnostics() {
2780 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2781 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2782 for(ClusterI cluster : clusterTable.getClusters()) {
2784 if (cluster.isLoaded() && !cluster.isEmpty()) {
2785 residentClusters.set(residentClusters.get() + 1);
2786 totalMem.set(totalMem.get() + cluster.getUsedSpace());
2788 } catch (DatabaseException e) {
2789 Logger.defaultLogError(e);
2793 System.out.println("--------------------------------");
2794 System.out.println("Cluster information:");
2795 System.out.println("-amount of resident clusters=" + residentClusters.get());
2796 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2798 for(ClusterI cluster : clusterTable.getClusters()) {
2799 System.out.print("Cluster " + cluster.getClusterId() + " [");
2801 if (!cluster.isLoaded())
2802 System.out.println("not loaded]");
2803 else if (cluster.isEmpty())
2804 System.out.println("is empty]");
2806 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2807 System.out.print(",references=" + cluster.getReferenceCount());
2808 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2810 } catch (DatabaseException e) {
2811 Logger.defaultLogError(e);
2812 System.out.println("is corrupted]");
2816 queryProvider2.printDiagnostics();
2817 System.out.println("--------------------------------");
2822 public void fireStartReadTransaction() {
2825 System.out.println("StartReadTransaction");
2827 for (SessionEventListener listener : eventListeners) {
2829 listener.readTransactionStarted();
2830 } catch (Throwable t) {
2831 t.printStackTrace();
2837 public void fireFinishReadTransaction() {
2840 System.out.println("FinishReadTransaction");
2842 for (SessionEventListener listener : eventListeners) {
2844 listener.readTransactionFinished();
2845 } catch (Throwable t) {
2846 t.printStackTrace();
2852 public void fireStartWriteTransaction() {
2855 System.out.println("StartWriteTransaction");
2857 for (SessionEventListener listener : eventListeners) {
2859 listener.writeTransactionStarted();
2860 } catch (Throwable t) {
2861 t.printStackTrace();
2867 public void fireFinishWriteTransaction() {
2870 System.out.println("FinishWriteTransaction");
2872 for (SessionEventListener listener : eventListeners) {
2874 listener.writeTransactionFinished();
2875 } catch (Throwable t) {
2876 t.printStackTrace();
2880 Indexing.resetDependenciesIndexingDisabled();
2884 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2885 throw new Error("Not supported at the moment.");
2892 ClusterTable getClusterTable() {
2893 return clusterTable;
2896 public GraphSession getGraphSession() {
2897 return graphSession;
2900 public QueryProcessor getQueryProvider2() {
2901 return queryProvider2;
2904 ClientChangesImpl getClientChanges() {
2905 return clientChanges;
2908 boolean getWriteOnly() {
2912 static int counter = 0;
2914 public void onClusterLoaded(long clusterId) {
2916 clusterTable.updateSize();
2924 * Implementation of the interface RequestProcessor
2928 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2933 assert(request != null);
2935 final DataContainer<T> result = new DataContainer<T>();
2936 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2938 syncRequest(request, new AsyncProcedure<T>() {
2941 public void execute(AsyncReadGraph graph, T t) {
2946 public void exception(AsyncReadGraph graph, Throwable t) {
2952 Throwable t = exception.get();
2954 if(t instanceof DatabaseException) throw (DatabaseException)t;
2955 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2958 return result.get();
2963 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2964 // assertNotSession();
2965 // return syncRequest(request, (AsyncProcedure<T>)procedure);
2969 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2971 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2975 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2977 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2981 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2983 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2987 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2989 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2993 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2997 assert(request != null);
2999 final DataContainer<T> result = new DataContainer<T>();
3000 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3002 syncRequest(request, new AsyncProcedure<T>() {
3005 public void execute(AsyncReadGraph graph, T t) {
3010 public void exception(AsyncReadGraph graph, Throwable t) {
3016 Throwable t = exception.get();
3018 if(t instanceof DatabaseException) throw (DatabaseException)t;
3019 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3022 return result.get();
3027 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3029 return syncRequest(request, (AsyncProcedure<T>)procedure);
3033 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3035 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3039 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3041 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3045 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3047 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3051 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3053 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3057 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3061 assert(request != null);
3063 final ArrayList<T> result = new ArrayList<T>();
3064 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3066 syncRequest(request, new AsyncMultiProcedure<T>() {
3069 public void execute(AsyncReadGraph graph, T t) {
3070 synchronized(result) {
3076 public void finished(AsyncReadGraph graph) {
3080 public void exception(AsyncReadGraph graph, Throwable t) {
3087 Throwable t = exception.get();
3089 if(t instanceof DatabaseException) throw (DatabaseException)t;
3090 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3098 public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3100 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3104 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3106 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3110 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3112 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3116 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3118 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3122 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3124 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3128 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3132 assert(request != null);
3134 final ArrayList<T> result = new ArrayList<T>();
3135 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3137 syncRequest(request, new AsyncMultiProcedure<T>() {
3140 public void execute(AsyncReadGraph graph, T t) {
3141 synchronized(result) {
3147 public void finished(AsyncReadGraph graph) {
3151 public void exception(AsyncReadGraph graph, Throwable t) {
3158 Throwable t = exception.get();
3160 if(t instanceof DatabaseException) throw (DatabaseException)t;
3161 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3169 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3171 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3175 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3177 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3181 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3183 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3187 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3189 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3193 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3195 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3200 public <T> void asyncRequest(Read<T> request) {
3202 asyncRequest(request, new ProcedureAdapter<T>() {
3204 public void exception(Throwable t) {
3205 t.printStackTrace();
3212 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3213 asyncRequest(request, (AsyncProcedure<T>)procedure);
3217 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3218 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3222 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3223 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3227 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3228 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3232 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3233 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3237 final public <T> void asyncRequest(final AsyncRead<T> request) {
3239 assert(request != null);
3241 asyncRequest(request, new ProcedureAdapter<T>() {
3243 public void exception(Throwable t) {
3244 t.printStackTrace();
3251 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3252 scheduleRequest(request, procedure, procedure, null);
3256 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3257 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3261 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3262 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3266 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3267 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3271 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3272 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3276 public <T> void asyncRequest(MultiRead<T> request) {
3278 assert(request != null);
3280 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3282 public void exception(AsyncReadGraph graph, Throwable t) {
3283 t.printStackTrace();
3290 public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3291 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3295 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3296 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3300 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3301 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3305 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3306 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3310 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3311 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3315 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3317 assert(request != null);
3319 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3321 public void exception(AsyncReadGraph graph, Throwable t) {
3322 t.printStackTrace();
3329 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3330 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3334 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3335 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3339 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3340 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3344 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3345 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3349 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3350 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3354 public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3356 throw new Error("Not implemented!");
3360 public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3361 throw new Error("Not implemented!");
3365 final public <T> void asyncRequest(final ExternalRead<T> request) {
3367 assert(request != null);
3369 asyncRequest(request, new ProcedureAdapter<T>() {
3371 public void exception(Throwable t) {
3372 t.printStackTrace();
3379 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3380 asyncRequest(request, (Procedure<T>)procedure);
3385 void check(Throwable t) throws DatabaseException {
3387 if(t instanceof DatabaseException) throw (DatabaseException)t;
3388 else throw new DatabaseException("Unexpected exception", t);
3392 void check(DataContainer<Throwable> container) throws DatabaseException {
3393 Throwable t = container.get();
3395 if(t instanceof DatabaseException) throw (DatabaseException)t;
3396 else throw new DatabaseException("Unexpected exception", t);
3405 boolean sameProvider(Write request) {
3406 if(writeState.getGraph().provider != null) {
3407 return writeState.getGraph().provider.equals(request.getProvider());
3409 return request.getProvider() == null;
3413 boolean plainWrite(WriteGraphImpl graph) {
3414 if(graph == null) return false;
3415 if(graph.writeSupport.writeOnly()) return false;
3420 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3421 private void assertNotSession() throws DatabaseException {
3422 Thread current = Thread.currentThread();
3423 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3426 void assertAlive() {
3427 if (!state.isAlive())
3428 throw new RuntimeDatabaseException("Session has been shut down.");
3431 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3432 return querySupport.getValueStream(graph, querySupport.getId(resource));
3435 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3436 return querySupport.getValue(graph, querySupport.getId(resource));
3439 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3440 acquire(semaphore, request, null);
3443 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3447 // Loop until the semaphore is acquired reporting requests that take a long time.
3450 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3454 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3456 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3457 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3459 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3460 + (waitTime) + " ms. (procedure=" + procedure + ")");
3467 } catch (InterruptedException e) {
3468 e.printStackTrace();
3469 // FIXME: Should perhaps do something else in this case ??
3476 // public boolean holdOnToTransactionAfterCancel() {
3477 // return transactionPolicy.holdOnToTransactionAfterCancel();
3481 // public boolean holdOnToTransactionAfterCommit() {
3482 // return transactionPolicy.holdOnToTransactionAfterCommit();
3486 // public boolean holdOnToTransactionAfterRead() {
3487 // return transactionPolicy.holdOnToTransactionAfterRead();
3491 // public void onRelinquish() {
3492 // transactionPolicy.onRelinquish();
3496 // public void onRelinquishDone() {
3497 // transactionPolicy.onRelinquishDone();
3501 // public void onRelinquishError() {
3502 // transactionPolicy.onRelinquishError();
3507 public Session getSession() {
3512 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3513 protected abstract ResourceImpl getNewResource() throws DatabaseException;
3515 public void ceased(int thread) {
3517 requestManager.ceased(thread);
3521 public int getAmountOfQueryThreads() {
3522 // This must be a power of two
3524 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3527 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3529 return new ResourceImpl(resourceSupport, key);
3533 public void acquireWriteOnly() {
3537 public void releaseWriteOnly(ReadGraphImpl graph) {
3539 queryProvider2.releaseWrite(graph);
3542 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3544 long start = System.nanoTime();
3546 while(dirtyPrimitives) {
3547 dirtyPrimitives = false;
3548 getQueryProvider2().performDirtyUpdates(writer);
3549 getQueryProvider2().performScheduledUpdates(writer);
3552 fireMetadataListeners(writer, clientChanges);
3554 if(DebugPolicy.PERFORMANCE_DATA) {
3555 long end = System.nanoTime();
3556 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3561 void removeTemporaryData() {
3562 File platform = Platform.getLocation().toFile();
3563 File tempFiles = new File(platform, "tempFiles");
3564 File temp = new File(tempFiles, "db");
3568 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3570 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3573 } catch (IOException e) {
3574 Logger.defaultLogError(e);
3576 return FileVisitResult.CONTINUE;
3579 } catch (IOException e) {
3580 Logger.defaultLogError(e);
3584 public void handleCreatedClusters() {
3585 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3586 createdClusters.forEach(new TLongProcedure() {
3589 public boolean execute(long value) {
3590 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3591 cluster.setImmutable(true, clusterTranslator);
3596 createdClusters.clear();
3600 public Object getModificationCounter() {
3601 return queryProvider2.modificationCounter;
3605 public void markUndoPoint() {
3606 state.setCombine(false);