1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package fi.vtt.simantics.procore.internal;
\r
14 import java.io.File;
\r
15 import java.io.IOException;
\r
16 import java.io.InputStream;
\r
17 import java.nio.charset.Charset;
\r
18 import java.nio.file.FileVisitResult;
\r
19 import java.nio.file.Files;
\r
20 import java.nio.file.Path;
\r
21 import java.nio.file.SimpleFileVisitor;
\r
22 import java.nio.file.attribute.BasicFileAttributes;
\r
23 import java.util.ArrayList;
\r
24 import java.util.Collection;
\r
25 import java.util.HashSet;
\r
26 import java.util.TreeMap;
\r
27 import java.util.concurrent.CopyOnWriteArrayList;
\r
28 import java.util.concurrent.Semaphore;
\r
29 import java.util.concurrent.atomic.AtomicInteger;
\r
31 import org.eclipse.core.runtime.Platform;
\r
32 import org.simantics.databoard.Bindings;
\r
33 import org.simantics.db.AsyncReadGraph;
\r
34 import org.simantics.db.ChangeSet;
\r
35 import org.simantics.db.DevelopmentKeys;
\r
36 import org.simantics.db.ExternalValueSupport;
\r
37 import org.simantics.db.Metadata;
\r
38 import org.simantics.db.MonitorContext;
\r
39 import org.simantics.db.MonitorHandler;
\r
40 import org.simantics.db.Resource;
\r
41 import org.simantics.db.ResourceSerializer;
\r
42 import org.simantics.db.Session;
\r
43 import org.simantics.db.SessionManager;
\r
44 import org.simantics.db.SessionVariables;
\r
45 import org.simantics.db.VirtualGraph;
\r
46 import org.simantics.db.WriteGraph;
\r
47 import org.simantics.db.authentication.UserAuthenticationAgent;
\r
48 import org.simantics.db.authentication.UserAuthenticator;
\r
49 import org.simantics.db.common.Indexing;
\r
50 import org.simantics.db.common.TransactionPolicyRelease;
\r
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
\r
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
\r
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
\r
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
\r
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
\r
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
\r
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
\r
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
\r
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
\r
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
\r
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
\r
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
\r
63 import org.simantics.db.common.utils.Logger;
\r
64 import org.simantics.db.event.ChangeEvent;
\r
65 import org.simantics.db.event.ChangeListener;
\r
66 import org.simantics.db.event.SessionEventListener;
\r
67 import org.simantics.db.exception.CancelTransactionException;
\r
68 import org.simantics.db.exception.ClusterSetExistException;
\r
69 import org.simantics.db.exception.DatabaseException;
\r
70 import org.simantics.db.exception.ImmutableException;
\r
71 import org.simantics.db.exception.InvalidResourceReferenceException;
\r
72 import org.simantics.db.exception.ResourceNotFoundException;
\r
73 import org.simantics.db.exception.RuntimeDatabaseException;
\r
74 import org.simantics.db.exception.ServiceException;
\r
75 import org.simantics.db.exception.ServiceNotFoundException;
\r
76 import org.simantics.db.impl.ClusterBase;
\r
77 import org.simantics.db.impl.ClusterI;
\r
78 import org.simantics.db.impl.ClusterTraitsBase;
\r
79 import org.simantics.db.impl.ClusterTranslator;
\r
80 import org.simantics.db.impl.ResourceImpl;
\r
81 import org.simantics.db.impl.TransientGraph;
\r
82 import org.simantics.db.impl.VirtualGraphImpl;
\r
83 import org.simantics.db.impl.graph.DelayedWriteGraph;
\r
84 import org.simantics.db.impl.graph.ReadGraphImpl;
\r
85 import org.simantics.db.impl.graph.WriteGraphImpl;
\r
86 import org.simantics.db.impl.graph.WriteSupport;
\r
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;
\r
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
\r
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
\r
90 import org.simantics.db.impl.query.QueryProcessor;
\r
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
\r
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
\r
93 import org.simantics.db.impl.service.QueryDebug;
\r
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;
\r
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
\r
96 import org.simantics.db.procedure.AsyncListener;
\r
97 import org.simantics.db.procedure.AsyncMultiListener;
\r
98 import org.simantics.db.procedure.AsyncMultiProcedure;
\r
99 import org.simantics.db.procedure.AsyncProcedure;
\r
100 import org.simantics.db.procedure.Listener;
\r
101 import org.simantics.db.procedure.ListenerBase;
\r
102 import org.simantics.db.procedure.MultiListener;
\r
103 import org.simantics.db.procedure.MultiProcedure;
\r
104 import org.simantics.db.procedure.Procedure;
\r
105 import org.simantics.db.procedure.SyncListener;
\r
106 import org.simantics.db.procedure.SyncMultiListener;
\r
107 import org.simantics.db.procedure.SyncMultiProcedure;
\r
108 import org.simantics.db.procedure.SyncProcedure;
\r
109 import org.simantics.db.procore.cluster.ClusterImpl;
\r
110 import org.simantics.db.procore.cluster.ClusterTraits;
\r
111 import org.simantics.db.procore.protocol.Constants;
\r
112 import org.simantics.db.procore.protocol.DebugPolicy;
\r
113 import org.simantics.db.request.AsyncMultiRead;
\r
114 import org.simantics.db.request.AsyncRead;
\r
115 import org.simantics.db.request.DelayedWrite;
\r
116 import org.simantics.db.request.DelayedWriteResult;
\r
117 import org.simantics.db.request.ExternalRead;
\r
118 import org.simantics.db.request.MultiRead;
\r
119 import org.simantics.db.request.Read;
\r
120 import org.simantics.db.request.ReadInterface;
\r
121 import org.simantics.db.request.UndoTraits;
\r
122 import org.simantics.db.request.Write;
\r
123 import org.simantics.db.request.WriteInterface;
\r
124 import org.simantics.db.request.WriteOnly;
\r
125 import org.simantics.db.request.WriteOnlyResult;
\r
126 import org.simantics.db.request.WriteResult;
\r
127 import org.simantics.db.request.WriteTraits;
\r
128 import org.simantics.db.service.ByteReader;
\r
129 import org.simantics.db.service.ClusterBuilder;
\r
130 import org.simantics.db.service.ClusterBuilderFactory;
\r
131 import org.simantics.db.service.ClusterControl;
\r
132 import org.simantics.db.service.ClusterSetsSupport;
\r
133 import org.simantics.db.service.ClusterUID;
\r
134 import org.simantics.db.service.ClusteringSupport;
\r
135 import org.simantics.db.service.CollectionSupport;
\r
136 import org.simantics.db.service.DebugSupport;
\r
137 import org.simantics.db.service.DirectQuerySupport;
\r
138 import org.simantics.db.service.GraphChangeListenerSupport;
\r
139 import org.simantics.db.service.InitSupport;
\r
140 import org.simantics.db.service.LifecycleSupport;
\r
141 import org.simantics.db.service.ManagementSupport;
\r
142 import org.simantics.db.service.QueryControl;
\r
143 import org.simantics.db.service.SerialisationSupport;
\r
144 import org.simantics.db.service.ServerInformation;
\r
145 import org.simantics.db.service.ServiceActivityMonitor;
\r
146 import org.simantics.db.service.SessionEventSupport;
\r
147 import org.simantics.db.service.SessionMonitorSupport;
\r
148 import org.simantics.db.service.SessionUserSupport;
\r
149 import org.simantics.db.service.StatementSupport;
\r
150 import org.simantics.db.service.TransactionPolicySupport;
\r
151 import org.simantics.db.service.TransactionSupport;
\r
152 import org.simantics.db.service.TransferableGraphSupport;
\r
153 import org.simantics.db.service.UndoRedoSupport;
\r
154 import org.simantics.db.service.VirtualGraphSupport;
\r
155 import org.simantics.db.service.XSupport;
\r
156 import org.simantics.layer0.Layer0;
\r
157 import org.simantics.utils.DataContainer;
\r
158 import org.simantics.utils.Development;
\r
159 import org.simantics.utils.datastructures.Callback;
\r
160 import org.simantics.utils.threads.logger.ITask;
\r
161 import org.simantics.utils.threads.logger.ThreadLogger;
\r
163 import gnu.trove.procedure.TLongProcedure;
\r
164 import gnu.trove.set.hash.TLongHashSet;
\r
167 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
\r
169 protected static final boolean DEBUG = false;
\r
171 private static final boolean DIAGNOSTICS = false;
\r
173 private TransactionPolicySupport transactionPolicy;
\r
175 final private ClusterControl clusterControl;
\r
177 final protected BuiltinSupportImpl builtinSupport;
\r
178 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
\r
179 final protected ClusterSetsSupport clusterSetsSupport;
\r
180 final protected LifecycleSupportImpl lifecycleSupport;
\r
182 protected QuerySupportImpl querySupport;
\r
183 protected ResourceSupportImpl resourceSupport;
\r
184 protected WriteSupport writeSupport;
\r
185 public ClusterTranslator clusterTranslator;
\r
187 boolean dirtyPrimitives = false;
\r
189 public static final int SERVICE_MODE_CREATE = 2;
\r
190 public static final int SERVICE_MODE_ALLOW = 1;
\r
191 public int serviceMode = 0;
\r
192 public boolean createdImmutableClusters = false;
\r
193 public TLongHashSet createdClusters = new TLongHashSet();
\r
198 * The service locator maintained by the workbench. These services are
\r
199 * initialized during workbench during the <code>init</code> method.
\r
201 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
\r
203 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
\r
205 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
\r
207 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
\r
209 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
\r
211 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
\r
213 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
\r
215 final protected State state = new State();
\r
217 protected GraphSession graphSession = null;
\r
219 protected SessionManager sessionManagerImpl = null;
\r
221 protected UserAuthenticationAgent authAgent = null;
\r
223 protected UserAuthenticator authenticator = null;
\r
225 protected Resource user = null;
\r
227 protected ClusterStream clusterStream = null;
\r
229 protected SessionRequestManager requestManager = null;
\r
231 public ClusterTable clusterTable = null;
\r
233 public QueryProcessor queryProvider2 = null;
\r
235 ClientChangesImpl clientChanges = null;
\r
237 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
\r
239 // protected long newClusterId = Constants.NullClusterId;
\r
241 protected int flushCounter = 0;
\r
243 protected boolean writeOnly = false;
\r
245 WriteState<?> writeState = null;
\r
246 WriteStateBase<?> delayedWriteState = null;
\r
247 protected Resource defaultClusterSet = null; // If not null then used for newResource().
\r
249 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
\r
251 // if (authAgent == null)
\r
252 // throw new IllegalArgumentException("null authentication agent");
\r
254 File t = StaticSessionProperties.virtualGraphStoragePath;
\r
257 this.clusterTable = new ClusterTable(this, t);
\r
258 this.builtinSupport = new BuiltinSupportImpl(this);
\r
259 this.sessionManagerImpl = sessionManagerImpl;
\r
261 // this.authAgent = authAgent;
\r
263 serviceLocator.registerService(Session.class, this);
\r
264 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
\r
265 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
\r
266 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
\r
267 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
\r
268 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
\r
269 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
\r
270 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
\r
271 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
\r
272 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
\r
273 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
\r
274 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
\r
275 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
\r
276 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
\r
277 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
\r
278 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
\r
279 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
\r
280 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
\r
281 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
\r
282 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
\r
283 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
\r
284 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
\r
285 ServiceActivityUpdaterForWriteTransactions.register(this);
\r
287 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
\r
288 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
\r
289 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
\r
290 this.lifecycleSupport = new LifecycleSupportImpl(this);
\r
291 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
\r
292 this.transactionPolicy = new TransactionPolicyRelease();
\r
293 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
\r
294 this.clusterControl = new ClusterControlImpl(this);
\r
295 serviceLocator.registerService(ClusterControl.class, clusterControl);
\r
296 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
\r
297 this.clusterSetsSupport.updateReadAndWriteDirectories(t.toPath(), t.toPath());
\r
298 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
\r
303 * Session interface
\r
308 public Resource getRootLibrary() {
\r
309 return queryProvider2.getRootLibraryResource();
\r
312 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
\r
313 if (!graphSession.dbSession.refreshEnabled())
\r
316 getClusterTable().refresh(csid, this, clusterUID);
\r
317 } catch (Throwable t) {
\r
318 Logger.defaultLogError("Refesh failed.", t);
\r
322 static final class TaskHelper {
\r
323 private final String name;
\r
324 private Object result;
\r
325 final Semaphore sema = new Semaphore(0);
\r
326 private Throwable throwable = null;
\r
327 final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
\r
329 public void run(DatabaseException e) {
\r
330 synchronized (TaskHelper.this) {
\r
335 final Procedure<Object> proc = new Procedure<Object>() {
\r
337 public void execute(Object result) {
\r
338 callback.run(null);
\r
341 public void exception(Throwable t) {
\r
342 if (t instanceof DatabaseException)
\r
343 callback.run((DatabaseException)t);
\r
345 callback.run(new DatabaseException("" + name + "operation failed.", t));
\r
348 final WriteTraits writeTraits = new WriteTraits() {
\r
350 public UndoTraits getUndoTraits() {
\r
354 public VirtualGraph getProvider() {
\r
358 TaskHelper(String name) {
\r
361 <T> T getResult() {
\r
364 void setResult(Object result) {
\r
365 this.result = result;
\r
367 // Throwable throwableGet() {
\r
368 // return throwable;
\r
370 synchronized void throwableSet(Throwable t) {
\r
373 // void throwableSet(String t) {
\r
374 // throwable = new InternalException("" + name + " operation failed. " + t);
\r
376 synchronized void throwableCheck()
\r
377 throws DatabaseException {
\r
378 if (null != throwable)
\r
379 if (throwable instanceof DatabaseException)
\r
380 throw (DatabaseException)throwable;
\r
382 throw new DatabaseException("Undo operation failed.", throwable);
\r
384 void throw_(String message)
\r
385 throws DatabaseException {
\r
386 throw new DatabaseException("" + name + " operation failed. " + message);
\r
392 private ListenerBase getListenerBase(Object procedure) {
\r
393 if (procedure instanceof ListenerBase)
\r
394 return (ListenerBase) procedure;
\r
399 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
\r
400 scheduleRequest(request, callback, notify, null);
\r
404 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
\r
407 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
\r
409 assert (request != null);
\r
411 if(Development.DEVELOPMENT) {
\r
413 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
\r
414 System.err.println("schedule write '" + request + "'");
\r
415 } catch (Throwable t) {
\r
416 Logger.defaultLogError(t);
\r
420 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
422 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
\r
425 public void run(int thread) {
\r
427 if(Development.DEVELOPMENT) {
\r
429 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
\r
430 System.err.println("perform write '" + request + "'");
\r
431 } catch (Throwable t) {
\r
432 Logger.defaultLogError(t);
\r
436 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
\r
438 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
443 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
445 VirtualGraph vg = getProvider(request.getProvider());
\r
447 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
\r
448 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
\r
451 public void execute(Object result) {
\r
452 if(callback != null) callback.run(null);
\r
456 public void exception(Throwable t) {
\r
457 if(callback != null) callback.run((DatabaseException)t);
\r
462 assert (null != writer);
\r
463 // writer.state.barrier.inc();
\r
466 request.perform(writer);
\r
467 assert (null != writer);
\r
468 } catch (Throwable t) {
\r
469 if (!(t instanceof CancelTransactionException))
\r
470 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
\r
471 writeState.except(t);
\r
473 // writer.state.barrier.dec();
\r
474 // writer.waitAsync(request);
\r
478 assert(!queryProvider2.dirty);
\r
480 } catch (Throwable e) {
\r
482 // Log it first, just to be safe that the error is always logged.
\r
483 if (!(e instanceof CancelTransactionException))
\r
484 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
\r
486 // writeState.getGraph().state.barrier.dec();
\r
487 // writeState.getGraph().waitAsync(request);
\r
489 writeState.except(e);
\r
492 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
\r
493 // // All we can do here is to log those, can't really pass them anywhere.
\r
494 // if (callback != null) {
\r
495 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
\r
496 // else callback.run(new DatabaseException(e));
\r
498 // } catch (Throwable e2) {
\r
499 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
\r
501 // boolean empty = clusterStream.reallyFlush();
\r
502 // int callerThread = -1;
\r
503 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
\r
504 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
\r
506 // // Send all local changes to server so it can calculate correct reverse change set.
\r
507 // // This will call clusterStream.accept().
\r
508 // state.cancelCommit(context, clusterStream);
\r
510 // if (!context.isOk()) // this is a blocking operation
\r
511 // throw new InternalException("Cancel failed. This should never happen. Contact application support.");
\r
512 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
\r
514 // state.cancelCommit2(context, clusterStream);
\r
515 // } catch (DatabaseException e3) {
\r
516 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
\r
518 // clusterStream.setOff(false);
\r
519 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
524 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
530 if(Development.DEVELOPMENT) {
\r
532 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
\r
533 System.err.println("finish write '" + request + "'");
\r
534 } catch (Throwable t) {
\r
535 Logger.defaultLogError(t);
\r
545 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
\r
546 scheduleRequest(request, procedure, notify, null);
\r
550 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
\r
553 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
\r
555 assert (request != null);
\r
557 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
559 requestManager.scheduleWrite(new SessionTask(request, thread) {
\r
562 public void run(int thread) {
\r
564 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
\r
566 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
569 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
571 VirtualGraph vg = getProvider(request.getProvider());
\r
572 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
\r
575 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
\r
576 writeState = writeStateT;
\r
578 assert (null != writer);
\r
579 // writer.state.barrier.inc();
\r
580 writeStateT.setResult(request.perform(writer));
\r
581 assert (null != writer);
\r
583 // writer.state.barrier.dec();
\r
584 // writer.waitAsync(null);
\r
586 } catch (Throwable e) {
\r
588 // writer.state.barrier.dec();
\r
589 // writer.waitAsync(null);
\r
591 writeState.except(e);
\r
593 // state.stopWriteTransaction(clusterStream);
\r
595 // } catch (Throwable e) {
\r
596 // // Log it first, just to be safe that the error is always logged.
\r
597 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
\r
600 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
\r
601 // // All we can do here is to log those, can't really pass them anywhere.
\r
602 // if (procedure != null) {
\r
603 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
\r
604 // else procedure.exception(new DatabaseException(e));
\r
606 // } catch (Throwable e2) {
\r
607 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
\r
610 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
612 // state.stopWriteTransaction(clusterStream);
\r
615 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
618 // if(notify != null) notify.release();
\r
628 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
\r
629 scheduleRequest(request, callback, notify, null);
\r
633 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
\r
636 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
\r
638 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
\r
640 assert (request != null);
\r
642 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
644 requestManager.scheduleWrite(new SessionTask(request, thread) {
\r
647 public void run(int thread) {
\r
648 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
650 Procedure<Object> stateProcedure = new Procedure<Object>() {
\r
652 public void execute(Object result) {
\r
653 if (callback != null)
\r
654 callback.run(null);
\r
657 public void exception(Throwable t) {
\r
658 if (callback != null) {
\r
659 if (t instanceof DatabaseException) callback.run((DatabaseException) t);
\r
660 else callback.run(new DatabaseException(t));
\r
662 Logger.defaultLogError("Unhandled exception", t);
\r
666 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
\r
667 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
\r
668 DelayedWriteGraph dwg = null;
\r
669 // newGraph.state.barrier.inc();
\r
672 dwg = new DelayedWriteGraph(newGraph);
\r
673 request.perform(dwg);
\r
674 } catch (Throwable e) {
\r
675 delayedWriteState.except(e);
\r
679 // newGraph.state.barrier.dec();
\r
680 // newGraph.waitAsync(request);
\r
681 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
684 delayedWriteState = null;
\r
686 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
\r
687 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
690 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
692 acquireWriteOnly();
\r
694 VirtualGraph vg = getProvider(request.getProvider());
\r
695 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
\r
697 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
\r
699 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
\r
701 assert (null != writer);
\r
702 // writer.state.barrier.inc();
\r
706 dwg.commit(writer, request);
\r
708 if(defaultClusterSet != null) {
\r
709 XSupport xs = getService(XSupport.class);
\r
710 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
\r
711 ClusteringSupport cs = getService(ClusteringSupport.class);
\r
712 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
\r
715 // This makes clusters available from server
\r
716 clusterStream.reallyFlush();
\r
718 releaseWriteOnly(writer);
\r
719 handleUpdatesAndMetadata(writer);
\r
721 } catch (ServiceException e) {
\r
722 // writer.state.barrier.dec();
\r
723 // writer.waitAsync(null);
\r
725 // These shall be requested from server
\r
726 clusterTable.removeWriteOnlyClusters();
\r
727 // This makes clusters available from server
\r
728 clusterStream.reallyFlush();
\r
730 releaseWriteOnly(writer);
\r
731 writeState.except(e);
\r
733 // Debugging & Profiling
\r
734 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
744 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
\r
745 scheduleRequest(request, procedure, notify, null);
\r
749 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
\r
752 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
\r
753 throw new Error("Not implemented");
\r
756 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
\r
757 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
\r
758 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
\r
759 createdClusters.add(cluster.clusterId);
\r
764 class WriteOnlySupport implements WriteSupport {
\r
766 ClusterStream stream;
\r
767 ClusterImpl currentCluster;
\r
769 public WriteOnlySupport() {
\r
770 this.stream = clusterStream;
\r
774 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
\r
775 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
\r
779 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
\r
781 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
\r
783 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
\r
784 if(s != queryProvider2.getRootLibrary())
\r
785 throw new ImmutableException("Trying to modify immutable resource key=" + s);
\r
788 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
\r
789 } catch (DatabaseException e) {
\r
790 Logger.defaultLogError(e);
\r
794 clientChanges.invalidate(s);
\r
796 if (cluster.isWriteOnly())
\r
798 queryProvider2.updateStatements(s, p);
\r
803 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
\r
804 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
\r
808 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
\r
810 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
\r
812 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
\r
813 } catch (DatabaseException e) {
\r
814 Logger.defaultLogError(e);
\r
817 clientChanges.invalidate(rid);
\r
819 if (cluster.isWriteOnly())
\r
821 queryProvider2.updateValue(rid);
\r
826 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
\r
828 if(amount < 65536) {
\r
829 claimValue(provider,resource, reader.readBytes(null, amount));
\r
833 byte[] bytes = new byte[65536];
\r
835 int rid = ((ResourceImpl)resource).id;
\r
836 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
\r
840 int block = Math.min(left, 65536);
\r
841 reader.readBytes(bytes, block);
\r
842 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
\r
845 } catch (DatabaseException e) {
\r
846 Logger.defaultLogError(e);
\r
849 clientChanges.invalidate(rid);
\r
851 if (cluster.isWriteOnly())
\r
853 queryProvider2.updateValue(rid);
\r
857 private void maintainCluster(ClusterImpl before, ClusterI after_) {
\r
858 if(after_ != null && after_ != before) {
\r
859 ClusterImpl after = (ClusterImpl)after_;
\r
860 if(currentCluster == before) currentCluster = after;
\r
861 clusterTable.replaceCluster(after);
\r
865 public int createResourceKey(int foreignCounter) throws DatabaseException {
\r
866 if(currentCluster == null)
\r
867 currentCluster = getNewResourceCluster();
\r
868 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
\r
869 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
\r
870 newCluster.foreignLookup = new byte[foreignCounter];
\r
871 currentCluster = newCluster;
\r
873 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
\r
875 return currentCluster.createResource(clusterTranslator);
\r
879 public Resource createResource(VirtualGraph provider) throws DatabaseException {
\r
880 if(currentCluster == null) {
\r
881 if (null != defaultClusterSet) {
\r
882 ResourceImpl result = getNewResource(defaultClusterSet);
\r
883 currentCluster = clusterTable.getClusterByResourceKey(result.id);
\r
886 currentCluster = getNewResourceCluster();
\r
889 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
\r
890 if (null != defaultClusterSet) {
\r
891 ResourceImpl result = getNewResource(defaultClusterSet);
\r
892 currentCluster = clusterTable.getClusterByResourceKey(result.id);
\r
895 currentCluster = getNewResourceCluster();
\r
898 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
\r
902 public Resource createResource(VirtualGraph provider, long clusterId)
\r
903 throws DatabaseException {
\r
904 return getNewResource(clusterId);
\r
908 public Resource createResource(VirtualGraph provider, Resource clusterSet)
\r
909 throws DatabaseException {
\r
910 return getNewResource(clusterSet);
\r
914 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
\r
915 throws DatabaseException {
\r
916 getNewClusterSet(clusterSet);
\r
920 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
\r
921 throws ServiceException {
\r
922 return containsClusterSet(clusterSet);
\r
925 public void selectCluster(long cluster) {
\r
926 currentCluster = clusterTable.getClusterByClusterId(cluster);
\r
927 long setResourceId = clusterSetsSupport.getSet(cluster);
\r
928 clusterSetsSupport.put(setResourceId, cluster);
\r
932 public Resource setDefaultClusterSet(Resource clusterSet)
\r
933 throws ServiceException {
\r
934 Resource result = setDefaultClusterSet4NewResource(clusterSet);
\r
935 if(clusterSet != null) {
\r
936 long id = clusterSetsSupport.get(clusterSet.getResourceId());
\r
937 currentCluster = clusterTable.getClusterByClusterId(id);
\r
940 currentCluster = null;
\r
946 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
\r
948 provider = getProvider(provider);
\r
949 if (null == provider) {
\r
950 int key = ((ResourceImpl)resource).id;
\r
954 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
\r
955 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
\r
956 // if(key != queryProvider2.getRootLibrary())
\r
957 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
\r
960 // cluster.removeValue(key, clusterTranslator);
\r
961 // } catch (DatabaseException e) {
\r
962 // Logger.defaultLogError(e);
\r
966 clusterTable.writeOnlyInvalidate(cluster);
\r
969 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
\r
970 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
\r
971 clusterTranslator.removeValue(cluster);
\r
972 } catch (DatabaseException e) {
\r
973 Logger.defaultLogError(e);
\r
976 queryProvider2.invalidateResource(key);
\r
977 clientChanges.invalidate(key);
\r
980 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
\r
981 queryProvider2.updateValue(querySupport.getId(resource));
\r
982 clientChanges.claimValue(resource);
\r
989 public void flush(boolean intermediate) {
\r
990 throw new UnsupportedOperationException();
\r
994 public void flushCluster() {
\r
995 clusterTable.flushCluster(graphSession);
\r
996 if(defaultClusterSet != null) {
\r
997 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
\r
999 currentCluster = null;
\r
1003 public void flushCluster(Resource r) {
\r
1004 throw new UnsupportedOperationException("flushCluster resource " + r);
\r
1008 public void gc() {
\r
1012 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
\r
1013 Resource object) {
\r
1015 int s = ((ResourceImpl)subject).id;
\r
1016 int p = ((ResourceImpl)predicate).id;
\r
1017 int o = ((ResourceImpl)object).id;
\r
1019 provider = getProvider(provider);
\r
1020 if (null == provider) {
\r
1022 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
\r
1023 clusterTable.writeOnlyInvalidate(cluster);
\r
1027 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
\r
1028 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
\r
1029 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
\r
1031 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
\r
1032 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
\r
1034 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
\r
1035 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
\r
1036 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
\r
1037 clusterTranslator.removeStatement(cluster);
\r
1039 queryProvider2.invalidateResource(s);
\r
1040 clientChanges.invalidate(s);
\r
1042 } catch (DatabaseException e) {
\r
1044 Logger.defaultLogError(e);
\r
1052 ((VirtualGraphImpl)provider).deny(s, p, o);
\r
1053 queryProvider2.invalidateResource(s);
\r
1054 clientChanges.invalidate(s);
\r
1063 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
\r
1064 throw new UnsupportedOperationException();
\r
1068 public boolean writeOnly() {
\r
1073 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
\r
1074 writeSupport.performWriteRequest(graph, request);
\r
1078 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
\r
1079 throw new UnsupportedOperationException();
\r
1083 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
\r
1084 throw new UnsupportedOperationException();
\r
1088 public <T> void addMetadata(Metadata data) throws ServiceException {
\r
1089 writeSupport.addMetadata(data);
\r
1093 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
\r
1094 return writeSupport.getMetadata(clazz);
\r
1098 public TreeMap<String, byte[]> getMetadata() {
\r
1099 return writeSupport.getMetadata();
\r
1103 public void commitDone(WriteTraits writeTraits, long csid) {
\r
1104 writeSupport.commitDone(writeTraits, csid);
\r
1108 public void clearUndoList(WriteTraits writeTraits) {
\r
1109 writeSupport.clearUndoList(writeTraits);
\r
1112 public int clearMetadata() {
\r
1113 return writeSupport.clearMetadata();
\r
1117 public void startUndo() {
\r
1118 writeSupport.startUndo();
\r
1122 class VirtualWriteOnlySupport implements WriteSupport {
\r
1125 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
\r
1126 // Resource object) {
\r
1127 // throw new UnsupportedOperationException();
\r
1131 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
\r
1133 TransientGraph impl = (TransientGraph)provider;
\r
1134 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
\r
1135 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
\r
1136 clientChanges.claim(subject, predicate, object);
\r
1141 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
\r
1143 TransientGraph impl = (TransientGraph)provider;
\r
1144 impl.claim(subject, predicate, object);
\r
1145 getQueryProvider2().updateStatements(subject, predicate);
\r
1146 clientChanges.claim(subject, predicate, object);
\r
1151 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
\r
1152 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
\r
1156 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
\r
1157 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
\r
1158 getQueryProvider2().updateValue(resource);
\r
1159 clientChanges.claimValue(resource);
\r
1163 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
\r
1164 byte[] value = reader.readBytes(null, amount);
\r
1165 claimValue(provider, resource, value);
\r
1169 public Resource createResource(VirtualGraph provider) {
\r
1170 TransientGraph impl = (TransientGraph)provider;
\r
1171 return impl.getResource(impl.newResource(false));
\r
1175 public Resource createResource(VirtualGraph provider, long clusterId) {
\r
1176 throw new UnsupportedOperationException();
\r
1180 public Resource createResource(VirtualGraph provider, Resource clusterSet)
\r
1181 throws DatabaseException {
\r
1182 throw new UnsupportedOperationException();
\r
1186 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
\r
1187 throws DatabaseException {
\r
1188 throw new UnsupportedOperationException();
\r
1192 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
\r
1193 throws ServiceException {
\r
1194 throw new UnsupportedOperationException();
\r
1198 public Resource setDefaultClusterSet(Resource clusterSet)
\r
1199 throws ServiceException {
\r
1204 public void denyValue(VirtualGraph provider, Resource resource) {
\r
1205 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
\r
1206 getQueryProvider2().updateValue(querySupport.getId(resource));
\r
1207 // NOTE: this only keeps track of value changes by-resource.
\r
1208 clientChanges.claimValue(resource);
\r
1212 public void flush(boolean intermediate) {
\r
1213 throw new UnsupportedOperationException();
\r
1217 public void flushCluster() {
\r
1218 throw new UnsupportedOperationException();
\r
1222 public void flushCluster(Resource r) {
\r
1223 throw new UnsupportedOperationException("Resource " + r);
\r
1227 public void gc() {
\r
1231 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
\r
1232 TransientGraph impl = (TransientGraph) provider;
\r
1233 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
\r
1234 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
\r
1235 clientChanges.deny(subject, predicate, object);
\r
1240 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
\r
1241 throw new UnsupportedOperationException();
\r
1245 public boolean writeOnly() {
\r
1250 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
\r
1251 throw new UnsupportedOperationException();
\r
1255 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
\r
1256 throw new UnsupportedOperationException();
\r
1260 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
\r
1261 throw new UnsupportedOperationException();
\r
1265 public <T> void addMetadata(Metadata data) throws ServiceException {
\r
1266 throw new UnsupportedOperationException();
\r
1270 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
\r
1271 throw new UnsupportedOperationException();
\r
1275 public TreeMap<String, byte[]> getMetadata() {
\r
1276 throw new UnsupportedOperationException();
\r
1280 public void commitDone(WriteTraits writeTraits, long csid) {
\r
1284 public void clearUndoList(WriteTraits writeTraits) {
\r
1288 public int clearMetadata() {
\r
1293 public void startUndo() {
\r
1297 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
\r
1301 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1303 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
1306 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
1308 acquireWriteOnly();
\r
1310 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
\r
1312 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
\r
1314 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
\r
1315 writeState = writeStateT;
\r
1317 assert (null != writer);
\r
1318 // writer.state.barrier.inc();
\r
1319 long start = System.nanoTime();
\r
1320 T result = request.perform(writer);
\r
1321 long duration = System.nanoTime() - start;
\r
1323 System.err.println("################");
\r
1324 System.err.println("WriteOnly duration " + 1e-9*duration);
\r
1326 writeStateT.setResult(result);
\r
1328 // This makes clusters available from server
\r
1329 clusterStream.reallyFlush();
\r
1331 // This will trigger query updates
\r
1332 releaseWriteOnly(writer);
\r
1333 assert (null != writer);
\r
1335 handleUpdatesAndMetadata(writer);
\r
1337 } catch (CancelTransactionException e) {
\r
1339 releaseWriteOnly(writeState.getGraph());
\r
1341 clusterTable.removeWriteOnlyClusters();
\r
1342 state.stopWriteTransaction(clusterStream);
\r
1344 } catch (Throwable e) {
\r
1346 e.printStackTrace();
\r
1348 releaseWriteOnly(writeState.getGraph());
\r
1350 clusterTable.removeWriteOnlyClusters();
\r
1352 if (callback != null)
\r
1353 callback.exception(new DatabaseException(e));
\r
1355 state.stopWriteTransaction(clusterStream);
\r
1356 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
\r
1360 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
1366 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
\r
1367 scheduleRequest(request, callback, notify, null);
\r
1371 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
\r
1375 assert (request != null);
\r
1377 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1379 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
\r
1382 public void run(int thread) {
\r
1384 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
\r
1388 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
1391 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
1393 acquireWriteOnly();
\r
1395 VirtualGraph vg = getProvider(request.getProvider());
\r
1396 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
\r
1398 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
\r
1400 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
\r
1403 public void execute(Object result) {
\r
1404 if(callback != null) callback.run(null);
\r
1408 public void exception(Throwable t) {
\r
1409 if(callback != null) callback.run((DatabaseException)t);
\r
1414 assert (null != writer);
\r
1415 // writer.state.barrier.inc();
\r
1419 request.perform(writer);
\r
1421 } catch (Throwable e) {
\r
1423 // writer.state.barrier.dec();
\r
1424 // writer.waitAsync(null);
\r
1426 releaseWriteOnly(writer);
\r
1428 clusterTable.removeWriteOnlyClusters();
\r
1430 if(!(e instanceof CancelTransactionException)) {
\r
1431 if (callback != null)
\r
1432 callback.run(new DatabaseException(e));
\r
1435 writeState.except(e);
\r
1441 // This makes clusters available from server
\r
1442 boolean empty = clusterStream.reallyFlush();
\r
1443 // This was needed to make WO requests call metadata listeners.
\r
1444 // NOTE: the calling event does not contain clientChanges information.
\r
1445 if (!empty && clientChanges.isEmpty())
\r
1446 clientChanges.setNotEmpty(true);
\r
1447 releaseWriteOnly(writer);
\r
1448 assert (null != writer);
\r
1451 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
1464 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
\r
1465 scheduleRequest(request, callback, notify, null);
\r
1469 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
\r
1472 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
\r
1474 assert (request != null);
\r
1476 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1478 requestManager.scheduleWrite(new SessionTask(request, thread) {
\r
1481 public void run(int thread) {
\r
1483 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
\r
1485 performWriteOnly(request, notify, callback);
\r
1495 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
\r
1497 assert (request != null);
\r
1498 assert (procedure != null);
\r
1500 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1502 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
\r
1505 public void run(int thread) {
\r
1507 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1509 ListenerBase listener = getListenerBase(procedure);
\r
1511 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
\r
1515 if (listener != null) {
\r
1518 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
\r
1521 public void exception(AsyncReadGraph graph, Throwable t) {
\r
1522 procedure.exception(graph, t);
\r
1523 if(throwable != null) {
\r
1526 // ErrorLogger.defaultLogError("Unhandled exception", t);
\r
1531 public void execute(AsyncReadGraph graph, T t) {
\r
1532 if(result != null) result.set(t);
\r
1533 procedure.execute(graph, t);
\r
1537 } catch (Throwable t) {
\r
1538 // This is handled by the AsyncProcedure
\r
1539 //Logger.defaultLogError("Internal error", t);
\r
1546 // newGraph.state.barrier.inc();
\r
1548 T t = request.perform(newGraph);
\r
1552 if(result != null) result.set(t);
\r
1553 procedure.execute(newGraph, t);
\r
1555 } catch (Throwable th) {
\r
1557 if(throwable != null) {
\r
1558 throwable.set(th);
\r
1560 Logger.defaultLogError("Unhandled exception", th);
\r
1565 } catch (Throwable t) {
\r
1568 t.printStackTrace();
\r
1570 if(throwable != null) {
\r
1573 Logger.defaultLogError("Unhandled exception", t);
\r
1578 procedure.exception(newGraph, t);
\r
1580 } catch (Throwable t2) {
\r
1582 if(throwable != null) {
\r
1583 throwable.set(t2);
\r
1585 Logger.defaultLogError("Unhandled exception", t2);
\r
1592 // newGraph.state.barrier.dec();
\r
1593 // newGraph.waitAsync(request);
\r
1599 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1609 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
\r
1611 assert (request != null);
\r
1612 assert (procedure != null);
\r
1614 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1616 requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
\r
1619 public void run(int thread) {
\r
1621 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1623 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
\r
1627 if (listener != null) {
\r
1629 newGraph.processor.query(newGraph, request, null, procedure, listener);
\r
1631 // newGraph.waitAsync(request);
\r
1635 final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
\r
1636 procedure, "request");
\r
1640 // newGraph.state.barrier.inc();
\r
1642 request.perform(newGraph, wrapper);
\r
1644 // newGraph.waitAsync(request);
\r
1646 } catch (Throwable t) {
\r
1648 wrapper.exception(newGraph, t);
\r
1649 // newGraph.waitAsync(request);
\r
1658 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1668 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
\r
1670 assert (request != null);
\r
1671 assert (procedure != null);
\r
1673 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1675 int sync = notify != null ? thread : -1;
\r
1677 requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
\r
1680 public void run(int thread) {
\r
1682 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1684 ListenerBase listener = getListenerBase(procedure);
\r
1686 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
\r
1690 if (listener != null) {
\r
1692 newGraph.processor.query(newGraph, request, null, procedure, listener);
\r
1694 // newGraph.waitAsync(request);
\r
1698 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
\r
1702 request.perform(newGraph, wrapper);
\r
1704 } catch (Throwable t) {
\r
1706 t.printStackTrace();
\r
1714 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1724 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
\r
1726 assert (request != null);
\r
1727 assert (procedure != null);
\r
1729 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1731 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
\r
1734 public void run(int thread) {
\r
1736 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1738 ListenerBase listener = getListenerBase(procedure);
\r
1740 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
\r
1744 if (listener != null) {
\r
1746 newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
\r
1749 public void exception(Throwable t) {
\r
1750 procedure.exception(t);
\r
1751 if(throwable != null) {
\r
1757 public void execute(T t) {
\r
1758 if(result != null) result.set(t);
\r
1759 procedure.execute(t);
\r
1764 // newGraph.waitAsync(request);
\r
1768 // newGraph.state.barrier.inc();
\r
1770 request.register(newGraph, new Listener<T>() {
\r
1773 public void exception(Throwable t) {
\r
1774 if(throwable != null) throwable.set(t);
\r
1775 procedure.exception(t);
\r
1776 // newGraph.state.barrier.dec();
\r
1780 public void execute(T t) {
\r
1781 if(result != null) result.set(t);
\r
1782 procedure.execute(t);
\r
1783 // newGraph.state.barrier.dec();
\r
1787 public boolean isDisposed() {
\r
1793 // newGraph.waitAsync(request);
\r
1799 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1811 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
\r
1813 scheduleRequest(request, procedure, null, null, null);
\r
1818 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
\r
1819 assertNotSession();
\r
1820 Semaphore notify = new Semaphore(0);
\r
1821 DataContainer<Throwable> container = new DataContainer<Throwable>();
\r
1822 DataContainer<T> result = new DataContainer<T>();
\r
1823 scheduleRequest(request, procedure, notify, container, result);
\r
1824 acquire(notify, request);
\r
1825 Throwable throwable = container.get();
\r
1826 if(throwable != null) {
\r
1827 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
\r
1828 else throw new DatabaseException("Unexpected exception", throwable);
\r
1830 return result.get();
\r
1834 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
\r
1835 assertNotSession();
\r
1836 Semaphore notify = new Semaphore(0);
\r
1837 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
\r
1838 final DataContainer<T> resultContainer = new DataContainer<T>();
\r
1839 scheduleRequest(request, new AsyncProcedure<T>() {
\r
1841 public void exception(AsyncReadGraph graph, Throwable throwable) {
\r
1842 exceptionContainer.set(throwable);
\r
1843 procedure.exception(graph, throwable);
\r
1846 public void execute(AsyncReadGraph graph, T result) {
\r
1847 resultContainer.set(result);
\r
1848 procedure.execute(graph, result);
\r
1850 }, getListenerBase(procedure), notify);
\r
1851 acquire(notify, request, procedure);
\r
1852 Throwable throwable = exceptionContainer.get();
\r
1853 if (throwable != null) {
\r
1854 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
\r
1855 else throw new DatabaseException("Unexpected exception", throwable);
\r
1857 return resultContainer.get();
\r
1862 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
\r
1863 assertNotSession();
\r
1864 Semaphore notify = new Semaphore(0);
\r
1865 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
\r
1866 scheduleRequest(request, new AsyncMultiProcedure<T>() {
\r
1868 public void exception(AsyncReadGraph graph, Throwable throwable) {
\r
1869 exceptionContainer.set(throwable);
\r
1870 procedure.exception(graph, throwable);
\r
1873 public void execute(AsyncReadGraph graph, T result) {
\r
1874 procedure.execute(graph, result);
\r
1877 public void finished(AsyncReadGraph graph) {
\r
1878 procedure.finished(graph);
\r
1881 acquire(notify, request, procedure);
\r
1882 Throwable throwable = exceptionContainer.get();
\r
1883 if (throwable != null) {
\r
1884 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
\r
1885 else throw new DatabaseException("Unexpected exception", throwable);
\r
1887 // TODO: implement return value
\r
1888 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
\r
1893 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
\r
1894 return syncRequest(request, new ProcedureAdapter<T>());
\r
1897 // assert(request != null);
\r
1899 // final DataContainer<T> result = new DataContainer<T>();
\r
1900 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
1902 // syncRequest(request, new Procedure<T>() {
\r
1905 // public void execute(T t) {
\r
1910 // public void exception(Throwable t) {
\r
1911 // exception.set(t);
\r
1916 // Throwable t = exception.get();
\r
1917 // if(t != null) {
\r
1918 // if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
1919 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
\r
1922 // return result.get();
\r
1927 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
\r
1928 return syncRequest(request, (Procedure<T>)procedure);
\r
1933 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
\r
1934 // assertNotSession();
\r
1935 // Semaphore notify = new Semaphore(0);
\r
1936 // DataContainer<Throwable> container = new DataContainer<Throwable>();
\r
1937 // DataContainer<T> result = new DataContainer<T>();
\r
1938 // scheduleRequest(request, procedure, notify, container, result);
\r
1939 // acquire(notify, request);
\r
1940 // Throwable throwable = container.get();
\r
1941 // if(throwable != null) {
\r
1942 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
\r
1943 // else throw new DatabaseException("Unexpected exception", throwable);
\r
1945 // return result.get();
\r
1950 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
\r
1951 assertNotSession();
\r
1952 Semaphore notify = new Semaphore(0);
\r
1953 final DataContainer<Throwable> container = new DataContainer<Throwable>();
\r
1954 final DataContainer<T> result = new DataContainer<T>();
\r
1955 scheduleRequest(request, procedure, notify, container, result);
\r
1956 acquire(notify, request);
\r
1957 Throwable throwable = container.get();
\r
1958 if (throwable != null) {
\r
1959 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
\r
1960 else throw new DatabaseException("Unexpected exception", throwable);
\r
1962 return result.get();
\r
1966 public void syncRequest(Write request) throws DatabaseException {
\r
1967 assertNotSession();
\r
1969 Semaphore notify = new Semaphore(0);
\r
1970 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
\r
1971 scheduleRequest(request, new Callback<DatabaseException>() {
\r
1974 public void run(DatabaseException e) {
\r
1979 acquire(notify, request);
\r
1980 if(exception.get() != null) throw exception.get();
\r
1984 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
\r
1985 assertNotSession();
\r
1986 Semaphore notify = new Semaphore(0);
\r
1987 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
1988 final DataContainer<T> result = new DataContainer<T>();
\r
1989 scheduleRequest(request, new Procedure<T>() {
\r
1992 public void exception(Throwable t) {
\r
1997 public void execute(T t) {
\r
2002 acquire(notify, request);
\r
2003 if(exception.get() != null) {
\r
2004 Throwable t = exception.get();
\r
2005 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
2006 else throw new DatabaseException(t);
\r
2008 return result.get();
\r
2012 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
\r
2013 assertNotSession();
\r
2014 Semaphore notify = new Semaphore(0);
\r
2015 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
2016 final DataContainer<T> result = new DataContainer<T>();
\r
2017 scheduleRequest(request, new Procedure<T>() {
\r
2020 public void exception(Throwable t) {
\r
2025 public void execute(T t) {
\r
2030 acquire(notify, request);
\r
2031 if(exception.get() != null) {
\r
2032 Throwable t = exception.get();
\r
2033 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
2034 else throw new DatabaseException(t);
\r
2036 return result.get();
\r
2040 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
\r
2041 assertNotSession();
\r
2042 Semaphore notify = new Semaphore(0);
\r
2043 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
2044 final DataContainer<T> result = new DataContainer<T>();
\r
2045 scheduleRequest(request, new Procedure<T>() {
\r
2048 public void exception(Throwable t) {
\r
2053 public void execute(T t) {
\r
2058 acquire(notify, request);
\r
2059 if(exception.get() != null) {
\r
2060 Throwable t = exception.get();
\r
2061 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
2062 else throw new DatabaseException(t);
\r
2064 return result.get();
\r
2068 public void syncRequest(DelayedWrite request) throws DatabaseException {
\r
2069 assertNotSession();
\r
2070 Semaphore notify = new Semaphore(0);
\r
2071 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
\r
2072 scheduleRequest(request, new Callback<DatabaseException>() {
\r
2074 public void run(DatabaseException e) {
\r
2078 acquire(notify, request);
\r
2079 if(exception.get() != null) throw exception.get();
\r
2083 public void syncRequest(WriteOnly request) throws DatabaseException {
\r
2084 assertNotSession();
\r
2086 Semaphore notify = new Semaphore(0);
\r
2087 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
\r
2088 scheduleRequest(request, new Callback<DatabaseException>() {
\r
2090 public void run(DatabaseException e) {
\r
2094 acquire(notify, request);
\r
2095 if(exception.get() != null) throw exception.get();
\r
2100 * GraphRequestProcessor interface
\r
2104 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
\r
2106 scheduleRequest(request, procedure, null, null);
\r
2111 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
\r
2113 scheduleRequest(request, procedure, null);
\r
2118 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
\r
2120 scheduleRequest(request, procedure, null, null, null);
\r
2125 public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
\r
2127 scheduleRequest(request, callback, null);
\r
2132 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
\r
2134 scheduleRequest(request, procedure, null);
\r
2139 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
\r
2141 scheduleRequest(request, procedure, null);
\r
2146 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
\r
2148 scheduleRequest(request, procedure, null);
\r
2153 public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
\r
2155 scheduleRequest(request, callback, null);
\r
2160 public void asyncRequest(final Write r) {
\r
2161 asyncRequest(r, null);
\r
2165 public void asyncRequest(final DelayedWrite r) {
\r
2166 asyncRequest(r, null);
\r
2170 public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
\r
2172 scheduleRequest(request, callback, null);
\r
2177 public void asyncRequest(final WriteOnly request) {
\r
2179 asyncRequest(request, null);
\r
2184 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
\r
2185 r.request(this, procedure);
\r
2189 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
\r
2190 r.request(this, procedure);
\r
2194 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
\r
2195 r.request(this, procedure);
\r
2199 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
\r
2200 r.request(this, procedure);
\r
2204 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
\r
2205 r.request(this, procedure);
\r
2209 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
\r
2210 r.request(this, procedure);
\r
2214 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
\r
2215 return r.request(this);
\r
2219 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
\r
2220 return r.request(this);
\r
2224 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
\r
2225 r.request(this, procedure);
\r
2229 public <T> void async(WriteInterface<T> r) {
\r
2230 r.request(this, new ProcedureAdapter<T>());
\r
2234 public void incAsync() {
\r
2239 public void decAsync() {
\r
2243 public long getCluster(ResourceImpl resource) {
\r
2244 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
\r
2245 return cluster.getClusterId();
\r
2248 public long getCluster(int id) {
\r
2249 if (clusterTable == null)
\r
2250 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
\r
2251 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
\r
2254 public ResourceImpl getResource(int id) {
\r
2255 return new ResourceImpl(resourceSupport, id);
\r
2258 public ResourceImpl getResource(int resourceIndex, long clusterId) {
\r
2259 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
\r
2260 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
\r
2261 int key = proxy.getClusterKey();
\r
2262 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
\r
2263 return new ResourceImpl(resourceSupport, resourceKey);
\r
2266 public ResourceImpl getResource2(int id) {
\r
2268 return new ResourceImpl(resourceSupport, id);
\r
2271 final public int getId(ResourceImpl impl) {
\r
2275 public static final Charset UTF8 = Charset.forName("utf-8");
\r
2280 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
\r
2281 for(TransientGraph g : support.providers) {
\r
2282 if(g.isPending(subject)) return false;
\r
2287 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
\r
2288 for(TransientGraph g : support.providers) {
\r
2289 if(g.isPending(subject, predicate)) return false;
\r
2294 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
\r
2296 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
\r
2298 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
\r
2301 public void run(ReadGraphImpl graph) {
\r
2302 if(ready.decrementAndGet() == 0) {
\r
2303 runnable.run(graph);
\r
2309 for(TransientGraph g : support.providers) {
\r
2310 if(g.isPending(subject)) {
\r
2312 g.load(graph, subject, composite);
\r
2313 } catch (DatabaseException e) {
\r
2314 e.printStackTrace();
\r
2317 composite.run(graph);
\r
2321 composite.run(graph);
\r
2325 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
\r
2327 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
\r
2329 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
\r
2332 public void run(ReadGraphImpl graph) {
\r
2333 if(ready.decrementAndGet() == 0) {
\r
2334 runnable.run(graph);
\r
2340 for(TransientGraph g : support.providers) {
\r
2341 if(g.isPending(subject, predicate)) {
\r
2343 g.load(graph, subject, predicate, composite);
\r
2344 } catch (DatabaseException e) {
\r
2345 e.printStackTrace();
\r
2348 composite.run(graph);
\r
2352 composite.run(graph);
\r
2356 // void dumpHeap() {
\r
2359 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
\r
2360 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
\r
2361 // "d:/heap" + flushCounter + ".txt", true);
\r
2362 // } catch (IOException e) {
\r
2363 // e.printStackTrace();
\r
2368 void fireReactionsToSynchronize(ChangeSet cs) {
\r
2370 // Do not fire empty events
\r
2374 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
\r
2375 // g.state.barrier.inc();
\r
2379 if (!cs.isEmpty()) {
\r
2380 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
\r
2381 for (ChangeListener l : changeListeners2) {
\r
2383 l.graphChanged(e2);
\r
2384 } catch (Exception ex) {
\r
2385 ex.printStackTrace();
\r
2392 // g.state.barrier.dec();
\r
2393 // g.waitAsync(null);
\r
2399 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
\r
2402 // Do not fire empty events
\r
2403 if (cs2.isEmpty())
\r
2406 // graph.restart();
\r
2407 // graph.state.barrier.inc();
\r
2411 if (!cs2.isEmpty()) {
\r
2413 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
\r
2414 for (ChangeListener l : changeListeners2) {
\r
2416 l.graphChanged(e2);
\r
2417 // System.out.println("changelistener " + l);
\r
2418 } catch (Exception ex) {
\r
2419 ex.printStackTrace();
\r
2427 // graph.state.barrier.dec();
\r
2428 // graph.waitAsync(null);
\r
2431 } catch (Throwable t) {
\r
2432 t.printStackTrace();
\r
2436 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
\r
2440 // Do not fire empty events
\r
2441 if (cs2.isEmpty())
\r
2444 // Do not fire on virtual requests
\r
2445 if(graph.getProvider() != null)
\r
2448 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
\r
2452 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
\r
2453 for (ChangeListener l : metadataListeners) {
\r
2455 l.graphChanged(e2);
\r
2456 } catch (Throwable ex) {
\r
2457 ex.printStackTrace();
\r
2465 } catch (Throwable t) {
\r
2466 t.printStackTrace();
\r
2475 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
\r
2478 public <T> T getService(Class<T> api) {
\r
2479 T t = peekService(api);
\r
2481 if (state.isClosed())
\r
2482 throw new ServiceNotFoundException(this, api, "Session has been shut down");
\r
2483 throw new ServiceNotFoundException(this, api);
\r
2491 protected abstract ServerInformation getCachedServerInformation();
\r
2493 private Class<?> serviceKey1 = null;
\r
2494 private Class<?> serviceKey2 = null;
\r
2495 private Object service1 = null;
\r
2496 private Object service2 = null;
\r
2502 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
\r
2504 @SuppressWarnings("unchecked")
\r
2506 public synchronized <T> T peekService(Class<T> api) {
\r
2508 if(serviceKey1 == api) {
\r
2509 return (T)service1;
\r
2510 } else if (serviceKey2 == api) {
\r
2511 // Promote this key
\r
2512 Object result = service2;
\r
2513 service2 = service1;
\r
2514 serviceKey2 = serviceKey1;
\r
2515 service1 = result;
\r
2516 serviceKey1 = api;
\r
2520 if (Layer0.class == api)
\r
2522 if (ServerInformation.class == api)
\r
2523 return (T) getCachedServerInformation();
\r
2524 else if (WriteGraphImpl.class == api)
\r
2525 return (T) writeState.getGraph();
\r
2526 else if (ClusterBuilder.class == api)
\r
2527 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
\r
2528 else if (ClusterBuilderFactory.class == api)
\r
2529 return (T)new ClusterBuilderFactoryImpl(this);
\r
2531 service2 = service1;
\r
2532 serviceKey2 = serviceKey1;
\r
2534 service1 = serviceLocator.peekService(api);
\r
2535 serviceKey1 = api;
\r
2537 return (T)service1;
\r
2545 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
\r
2548 public boolean hasService(Class<?> api) {
\r
2549 return serviceLocator.hasService(api);
\r
2553 * @param api the api that must be implemented by the specified service
\r
2554 * @param service the service implementation
\r
2557 public <T> void registerService(Class<T> api, T service) {
\r
2558 if(Layer0.class == api) {
\r
2559 L0 = (Layer0)service;
\r
2562 serviceLocator.registerService(api, service);
\r
2563 if (TransactionPolicySupport.class == api) {
\r
2564 transactionPolicy = (TransactionPolicySupport)service;
\r
2565 state.resetTransactionPolicy();
\r
2567 if (api == serviceKey1)
\r
2568 service1 = service;
\r
2569 else if (api == serviceKey2)
\r
2570 service2 = service;
\r
2573 // ----------------
\r
2575 // ----------------
\r
2577 void fireSessionVariableChange(String variable) {
\r
2578 for (MonitorHandler h : monitorHandlers) {
\r
2579 MonitorContext ctx = monitorContexts.getLeft(h);
\r
2580 assert ctx != null;
\r
2581 // SafeRunner functionality repeated here to avoid dependency.
\r
2583 h.valuesChanged(ctx);
\r
2584 } catch (Exception e) {
\r
2585 Logger.defaultLogError("monitor handler notification produced the following exception", e);
\r
2586 } catch (LinkageError e) {
\r
2587 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
\r
2593 class ResourceSerializerImpl implements ResourceSerializer {
\r
2595 public long createRandomAccessId(int id) throws DatabaseException {
\r
2598 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
\r
2599 long cluster = getCluster(id);
\r
2601 return 0; // Better to return 0 then invalid id.
\r
2602 long result = ClusterTraitsBase.createResourceId(cluster, index);
\r
2607 public long getRandomAccessId(Resource resource) throws DatabaseException {
\r
2609 ResourceImpl resourceImpl = (ResourceImpl) resource;
\r
2610 return createRandomAccessId(resourceImpl.id);
\r
2615 public int getTransientId(Resource resource) throws DatabaseException {
\r
2617 ResourceImpl resourceImpl = (ResourceImpl) resource;
\r
2618 return resourceImpl.id;
\r
2623 public String createRandomAccessId(Resource resource)
\r
2624 throws InvalidResourceReferenceException {
\r
2626 if(resource == null) throw new IllegalArgumentException();
\r
2628 ResourceImpl resourceImpl = (ResourceImpl) resource;
\r
2630 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
\r
2634 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
\r
2635 } catch (DatabaseException e1) {
\r
2636 throw new InvalidResourceReferenceException(e1);
\r
2639 // Serialize as '<resource index>_<cluster id>'
\r
2640 return "" + r + "_" + getCluster(resourceImpl);
\r
2641 } catch (Throwable e) {
\r
2642 e.printStackTrace();
\r
2643 throw new InvalidResourceReferenceException(e);
\r
2648 public int getTransientId(long serialized) throws DatabaseException {
\r
2649 if (serialized <= 0)
\r
2650 return (int)serialized;
\r
2651 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
\r
2652 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
\r
2653 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
\r
2654 if (cluster == null)
\r
2655 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
\r
2656 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
\r
2661 public Resource getResource(long randomAccessId) throws DatabaseException {
\r
2662 return getResourceByKey(getTransientId(randomAccessId));
\r
2666 public Resource getResource(int transientId) throws DatabaseException {
\r
2667 return getResourceByKey(transientId);
\r
2671 public Resource getResource(String randomAccessId)
\r
2672 throws InvalidResourceReferenceException {
\r
2674 int i = randomAccessId.indexOf('_');
\r
2676 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
\r
2677 + randomAccessId + "'");
\r
2678 int r = Integer.parseInt(randomAccessId.substring(0, i));
\r
2679 if(r < 0) return getResourceByKey(r);
\r
2680 long c = Long.parseLong(randomAccessId.substring(i + 1));
\r
2681 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
\r
2682 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
\r
2683 if (cluster.hasResource(key, clusterTranslator))
\r
2684 return getResourceByKey(key);
\r
2685 } catch (InvalidResourceReferenceException e) {
\r
2687 } catch (NumberFormatException e) {
\r
2688 throw new InvalidResourceReferenceException(e);
\r
2689 } catch (Throwable e) {
\r
2690 e.printStackTrace();
\r
2691 throw new InvalidResourceReferenceException(e);
\r
2694 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
\r
2698 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
\r
2701 } catch (Throwable e) {
\r
2702 e.printStackTrace();
\r
2703 throw new InvalidResourceReferenceException(e);
\r
2709 // Copied from old SessionImpl
\r
2712 if (state.isClosed())
\r
2713 throw new Error("Session closed.");
\r
2718 public ResourceImpl getNewResource(long clusterId) {
\r
2719 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
\r
2722 newId = cluster.createResource(clusterTranslator);
\r
2723 } catch (DatabaseException e) {
\r
2724 Logger.defaultLogError(e);
\r
2727 return new ResourceImpl(resourceSupport, newId);
\r
2730 public ResourceImpl getNewResource(Resource clusterSet)
\r
2731 throws DatabaseException {
\r
2732 long resourceId = clusterSet.getResourceId();
\r
2733 Long clusterId = clusterSetsSupport.get(resourceId);
\r
2734 if (null == clusterId)
\r
2735 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
\r
2736 if (Constants.NewClusterId == clusterId) {
\r
2737 clusterId = getService(ClusteringSupport.class).createCluster();
\r
2738 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
\r
2739 createdClusters.add(clusterId);
\r
2740 clusterSetsSupport.put(resourceId, clusterId);
\r
2741 return getNewResource(clusterId);
\r
2743 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
\r
2744 ResourceImpl result;
\r
2745 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
\r
2746 clusterId = getService(ClusteringSupport.class).createCluster();
\r
2747 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
\r
2748 createdClusters.add(clusterId);
\r
2749 clusterSetsSupport.put(resourceId, clusterId);
\r
2750 return getNewResource(clusterId);
\r
2752 result = getNewResource(clusterId);
\r
2753 int resultKey = querySupport.getId(result);
\r
2754 long resultCluster = querySupport.getClusterId(resultKey);
\r
2755 if (clusterId != resultCluster)
\r
2756 clusterSetsSupport.put(resourceId, resultCluster);
\r
2762 public void getNewClusterSet(Resource clusterSet)
\r
2763 throws DatabaseException {
\r
2765 System.out.println("new cluster set=" + clusterSet);
\r
2766 long resourceId = clusterSet.getResourceId();
\r
2767 if (clusterSetsSupport.containsKey(resourceId))
\r
2768 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
\r
2769 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
\r
2771 public boolean containsClusterSet(Resource clusterSet)
\r
2772 throws ServiceException {
\r
2773 long resourceId = clusterSet.getResourceId();
\r
2774 return clusterSetsSupport.containsKey(resourceId);
\r
2776 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
\r
2777 Resource r = defaultClusterSet;
\r
2778 defaultClusterSet = clusterSet;
\r
2781 void printDiagnostics() {
\r
2783 if (DIAGNOSTICS) {
\r
2785 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
\r
2786 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
\r
2787 for(ClusterI cluster : clusterTable.getClusters()) {
\r
2789 if (cluster.isLoaded() && !cluster.isEmpty()) {
\r
2790 residentClusters.set(residentClusters.get() + 1);
\r
2791 totalMem.set(totalMem.get() + cluster.getUsedSpace());
\r
2793 } catch (DatabaseException e) {
\r
2794 Logger.defaultLogError(e);
\r
2798 System.out.println("--------------------------------");
\r
2799 System.out.println("Cluster information:");
\r
2800 System.out.println("-amount of resident clusters=" + residentClusters.get());
\r
2801 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
\r
2803 for(ClusterI cluster : clusterTable.getClusters()) {
\r
2804 System.out.print("Cluster " + cluster.getClusterId() + " [");
\r
2806 if (!cluster.isLoaded())
\r
2807 System.out.println("not loaded]");
\r
2808 else if (cluster.isEmpty())
\r
2809 System.out.println("is empty]");
\r
2811 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
\r
2812 System.out.print(",references=" + cluster.getReferenceCount());
\r
2813 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
\r
2815 } catch (DatabaseException e) {
\r
2816 Logger.defaultLogError(e);
\r
2817 System.out.println("is corrupted]");
\r
2821 queryProvider2.printDiagnostics();
\r
2822 System.out.println("--------------------------------");
\r
2827 public void fireStartReadTransaction() {
\r
2830 System.out.println("StartReadTransaction");
\r
2832 for (SessionEventListener listener : eventListeners) {
\r
2834 listener.readTransactionStarted();
\r
2835 } catch (Throwable t) {
\r
2836 t.printStackTrace();
\r
2842 public void fireFinishReadTransaction() {
\r
2845 System.out.println("FinishReadTransaction");
\r
2847 for (SessionEventListener listener : eventListeners) {
\r
2849 listener.readTransactionFinished();
\r
2850 } catch (Throwable t) {
\r
2851 t.printStackTrace();
\r
2857 public void fireStartWriteTransaction() {
\r
2860 System.out.println("StartWriteTransaction");
\r
2862 for (SessionEventListener listener : eventListeners) {
\r
2864 listener.writeTransactionStarted();
\r
2865 } catch (Throwable t) {
\r
2866 t.printStackTrace();
\r
2872 public void fireFinishWriteTransaction() {
\r
2875 System.out.println("FinishWriteTransaction");
\r
2877 for (SessionEventListener listener : eventListeners) {
\r
2879 listener.writeTransactionFinished();
\r
2880 } catch (Throwable t) {
\r
2881 t.printStackTrace();
\r
2885 Indexing.resetDependenciesIndexingDisabled();
\r
2889 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
\r
2890 throw new Error("Not supported at the moment.");
\r
2893 State getState() {
\r
2897 ClusterTable getClusterTable() {
\r
2898 return clusterTable;
\r
2901 public GraphSession getGraphSession() {
\r
2902 return graphSession;
\r
2905 public QueryProcessor getQueryProvider2() {
\r
2906 return queryProvider2;
\r
2909 ClientChangesImpl getClientChanges() {
\r
2910 return clientChanges;
\r
2913 boolean getWriteOnly() {
\r
2917 static int counter = 0;
\r
2919 public void onClusterLoaded(long clusterId) {
\r
2921 clusterTable.updateSize();
\r
2929 * Implementation of the interface RequestProcessor
\r
2933 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
\r
2935 assertNotSession();
\r
2938 assert(request != null);
\r
2940 final DataContainer<T> result = new DataContainer<T>();
\r
2941 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
2943 syncRequest(request, new AsyncProcedure<T>() {
\r
2946 public void execute(AsyncReadGraph graph, T t) {
\r
2951 public void exception(AsyncReadGraph graph, Throwable t) {
\r
2957 Throwable t = exception.get();
\r
2959 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
2960 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
\r
2963 return result.get();
\r
2968 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
\r
2969 // assertNotSession();
\r
2970 // return syncRequest(request, (AsyncProcedure<T>)procedure);
\r
2974 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
\r
2975 assertNotSession();
\r
2976 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
\r
2980 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
\r
2981 assertNotSession();
\r
2982 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
\r
2986 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
\r
2987 assertNotSession();
\r
2988 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
\r
2992 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
\r
2993 assertNotSession();
\r
2994 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
\r
2998 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
\r
3000 assertNotSession();
\r
3002 assert(request != null);
\r
3004 final DataContainer<T> result = new DataContainer<T>();
\r
3005 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
3007 syncRequest(request, new AsyncProcedure<T>() {
\r
3010 public void execute(AsyncReadGraph graph, T t) {
\r
3015 public void exception(AsyncReadGraph graph, Throwable t) {
\r
3021 Throwable t = exception.get();
\r
3023 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
3024 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
\r
3027 return result.get();
\r
3032 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
\r
3033 assertNotSession();
\r
3034 return syncRequest(request, (AsyncProcedure<T>)procedure);
\r
3038 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
\r
3039 assertNotSession();
\r
3040 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
\r
3044 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
\r
3045 assertNotSession();
\r
3046 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
\r
3050 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
\r
3051 assertNotSession();
\r
3052 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
\r
3056 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
\r
3057 assertNotSession();
\r
3058 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
\r
3062 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
\r
3064 assertNotSession();
\r
3066 assert(request != null);
\r
3068 final ArrayList<T> result = new ArrayList<T>();
\r
3069 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
3071 syncRequest(request, new AsyncMultiProcedure<T>() {
\r
3074 public void execute(AsyncReadGraph graph, T t) {
\r
3075 synchronized(result) {
\r
3081 public void finished(AsyncReadGraph graph) {
\r
3085 public void exception(AsyncReadGraph graph, Throwable t) {
\r
3092 Throwable t = exception.get();
\r
3094 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
3095 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
\r
3103 public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
\r
3104 assertNotSession();
\r
3105 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
\r
3109 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
\r
3110 assertNotSession();
\r
3111 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
\r
3115 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
\r
3116 assertNotSession();
\r
3117 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
\r
3121 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
\r
3122 assertNotSession();
\r
3123 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
\r
3127 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
\r
3128 assertNotSession();
\r
3129 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
\r
3133 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
\r
3135 assertNotSession();
\r
3137 assert(request != null);
\r
3139 final ArrayList<T> result = new ArrayList<T>();
\r
3140 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
3142 syncRequest(request, new AsyncMultiProcedure<T>() {
\r
3145 public void execute(AsyncReadGraph graph, T t) {
\r
3146 synchronized(result) {
\r
3152 public void finished(AsyncReadGraph graph) {
\r
3156 public void exception(AsyncReadGraph graph, Throwable t) {
\r
3163 Throwable t = exception.get();
\r
3165 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
3166 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
\r
3174 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
\r
3175 assertNotSession();
\r
3176 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
\r
3180 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
\r
3181 assertNotSession();
\r
3182 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
\r
3186 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
\r
3187 assertNotSession();
\r
3188 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
\r
3192 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
\r
3193 assertNotSession();
\r
3194 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
\r
3198 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
\r
3199 assertNotSession();
\r
3200 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
\r
3205 public <T> void asyncRequest(Read<T> request) {
\r
3207 asyncRequest(request, new ProcedureAdapter<T>() {
\r
3209 public void exception(Throwable t) {
\r
3210 t.printStackTrace();
\r
3217 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
\r
3218 asyncRequest(request, (AsyncProcedure<T>)procedure);
\r
3222 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
\r
3223 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
\r
3227 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
\r
3228 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
\r
3232 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
\r
3233 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
\r
3237 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
\r
3238 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
\r
3242 final public <T> void asyncRequest(final AsyncRead<T> request) {
\r
3244 assert(request != null);
\r
3246 asyncRequest(request, new ProcedureAdapter<T>() {
\r
3248 public void exception(Throwable t) {
\r
3249 t.printStackTrace();
\r
3256 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
\r
3257 scheduleRequest(request, procedure, procedure, null);
\r
3261 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
\r
3262 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
\r
3266 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
\r
3267 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
\r
3271 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
\r
3272 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
\r
3276 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
\r
3277 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
\r
3281 public <T> void asyncRequest(MultiRead<T> request) {
\r
3283 assert(request != null);
\r
3285 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
\r
3287 public void exception(AsyncReadGraph graph, Throwable t) {
\r
3288 t.printStackTrace();
\r
3295 public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
\r
3296 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
\r
3300 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
\r
3301 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
\r
3305 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
\r
3306 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
\r
3310 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
\r
3311 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
\r
3315 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
\r
3316 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
\r
3320 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
\r
3322 assert(request != null);
\r
3324 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
\r
3326 public void exception(AsyncReadGraph graph, Throwable t) {
\r
3327 t.printStackTrace();
\r
3334 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
\r
3335 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
\r
3339 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
\r
3340 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
\r
3344 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
\r
3345 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
\r
3349 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
\r
3350 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
\r
3354 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
\r
3355 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
\r
3359 public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
\r
3360 assertNotSession();
\r
3361 throw new Error("Not implemented!");
\r
3365 public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
\r
3366 throw new Error("Not implemented!");
\r
3370 final public <T> void asyncRequest(final ExternalRead<T> request) {
\r
3372 assert(request != null);
\r
3374 asyncRequest(request, new ProcedureAdapter<T>() {
\r
3376 public void exception(Throwable t) {
\r
3377 t.printStackTrace();
\r
3384 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
\r
3385 asyncRequest(request, (Procedure<T>)procedure);
\r
3390 void check(Throwable t) throws DatabaseException {
\r
3392 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
3393 else throw new DatabaseException("Unexpected exception", t);
\r
3397 void check(DataContainer<Throwable> container) throws DatabaseException {
\r
3398 Throwable t = container.get();
\r
3400 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
3401 else throw new DatabaseException("Unexpected exception", t);
\r
3410 boolean sameProvider(Write request) {
\r
3411 if(writeState.getGraph().provider != null) {
\r
3412 return writeState.getGraph().provider.equals(request.getProvider());
\r
3414 return request.getProvider() == null;
\r
3418 boolean plainWrite(WriteGraphImpl graph) {
\r
3419 if(graph == null) return false;
\r
3420 if(graph.writeSupport.writeOnly()) return false;
\r
3425 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
\r
3426 private void assertNotSession() throws DatabaseException {
\r
3427 Thread current = Thread.currentThread();
\r
3428 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
\r
3431 void assertAlive() {
\r
3432 if (!state.isAlive())
\r
3433 throw new RuntimeDatabaseException("Session has been shut down.");
\r
3436 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
\r
3437 return querySupport.getValueStream(graph, querySupport.getId(resource));
\r
3440 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
\r
3441 return querySupport.getValue(graph, querySupport.getId(resource));
\r
3444 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
\r
3445 acquire(semaphore, request, null);
\r
3448 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
\r
3451 long tryCount = 0;
\r
3452 // Loop until the semaphore is acquired reporting requests that take a long time.
\r
3455 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
\r
3459 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
\r
3461 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
\r
3462 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
\r
3464 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
\r
3465 + (waitTime) + " ms. (procedure=" + procedure + ")");
\r
3472 } catch (InterruptedException e) {
\r
3473 e.printStackTrace();
\r
3474 // FIXME: Should perhaps do something else in this case ??
\r
3481 // public boolean holdOnToTransactionAfterCancel() {
\r
3482 // return transactionPolicy.holdOnToTransactionAfterCancel();
\r
3486 // public boolean holdOnToTransactionAfterCommit() {
\r
3487 // return transactionPolicy.holdOnToTransactionAfterCommit();
\r
3491 // public boolean holdOnToTransactionAfterRead() {
\r
3492 // return transactionPolicy.holdOnToTransactionAfterRead();
\r
3496 // public void onRelinquish() {
\r
3497 // transactionPolicy.onRelinquish();
\r
3501 // public void onRelinquishDone() {
\r
3502 // transactionPolicy.onRelinquishDone();
\r
3506 // public void onRelinquishError() {
\r
3507 // transactionPolicy.onRelinquishError();
\r
3512 public Session getSession() {
\r
3517 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
\r
3518 protected abstract ResourceImpl getNewResource() throws DatabaseException;
\r
3520 public void ceased(int thread) {
\r
3522 requestManager.ceased(thread);
\r
3526 public int getAmountOfQueryThreads() {
\r
3527 // This must be a power of two
\r
3529 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
\r
3532 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
\r
3534 return new ResourceImpl(resourceSupport, key);
\r
3538 public void acquireWriteOnly() {
\r
3542 public void releaseWriteOnly(ReadGraphImpl graph) {
\r
3543 writeOnly = false;
\r
3544 queryProvider2.releaseWrite(graph);
\r
3547 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
\r
3549 long start = System.nanoTime();
\r
3551 while(dirtyPrimitives) {
\r
3552 dirtyPrimitives = false;
\r
3553 getQueryProvider2().performDirtyUpdates(writer);
\r
3554 getQueryProvider2().performScheduledUpdates(writer);
\r
3557 fireMetadataListeners(writer, clientChanges);
\r
3559 if(DebugPolicy.PERFORMANCE_DATA) {
\r
3560 long end = System.nanoTime();
\r
3561 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
\r
3566 void removeTemporaryData() {
\r
3567 File platform = Platform.getLocation().toFile();
\r
3568 File tempFiles = new File(platform, "tempFiles");
\r
3569 File temp = new File(tempFiles, "db");
\r
3570 if(!temp.exists())
\r
3573 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
\r
3575 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
\r
3577 Files.delete(file);
\r
3578 } catch (IOException e) {
\r
3579 Logger.defaultLogError(e);
\r
3581 return FileVisitResult.CONTINUE;
\r
3584 } catch (IOException e) {
\r
3585 Logger.defaultLogError(e);
\r
3589 public void handleCreatedClusters() {
\r
3590 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
\r
3591 createdClusters.forEach(new TLongProcedure() {
\r
3594 public boolean execute(long value) {
\r
3595 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
\r
3596 cluster.setImmutable(true, clusterTranslator);
\r
3601 createdClusters.clear();
\r
3605 public Object getModificationCounter() {
\r
3606 return queryProvider2.modificationCounter;
\r
3610 public void markUndoPoint() {
\r
3611 state.setCombine(false);
\r