1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package fi.vtt.simantics.procore.internal;
\r
14 import java.io.File;
\r
15 import java.io.IOException;
\r
16 import java.io.InputStream;
\r
17 import java.nio.charset.Charset;
\r
18 import java.nio.file.FileVisitResult;
\r
19 import java.nio.file.Files;
\r
20 import java.nio.file.Path;
\r
21 import java.nio.file.SimpleFileVisitor;
\r
22 import java.nio.file.attribute.BasicFileAttributes;
\r
23 import java.util.ArrayList;
\r
24 import java.util.Collection;
\r
25 import java.util.HashSet;
\r
26 import java.util.TreeMap;
\r
27 import java.util.concurrent.CopyOnWriteArrayList;
\r
28 import java.util.concurrent.Semaphore;
\r
29 import java.util.concurrent.atomic.AtomicInteger;
\r
31 import org.eclipse.core.runtime.Platform;
\r
32 import org.simantics.databoard.Bindings;
\r
33 import org.simantics.db.AsyncReadGraph;
\r
34 import org.simantics.db.ChangeSet;
\r
35 import org.simantics.db.DevelopmentKeys;
\r
36 import org.simantics.db.ExternalValueSupport;
\r
37 import org.simantics.db.Metadata;
\r
38 import org.simantics.db.MonitorContext;
\r
39 import org.simantics.db.MonitorHandler;
\r
40 import org.simantics.db.Resource;
\r
41 import org.simantics.db.ResourceSerializer;
\r
42 import org.simantics.db.Session;
\r
43 import org.simantics.db.SessionManager;
\r
44 import org.simantics.db.SessionVariables;
\r
45 import org.simantics.db.VirtualGraph;
\r
46 import org.simantics.db.WriteGraph;
\r
47 import org.simantics.db.authentication.UserAuthenticationAgent;
\r
48 import org.simantics.db.authentication.UserAuthenticator;
\r
49 import org.simantics.db.common.Indexing;
\r
50 import org.simantics.db.common.TransactionPolicyRelease;
\r
51 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
\r
52 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
\r
53 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
\r
54 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
\r
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
\r
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
\r
57 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
\r
58 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
\r
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
\r
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
\r
61 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
\r
62 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
\r
63 import org.simantics.db.common.utils.Logger;
\r
64 import org.simantics.db.event.ChangeEvent;
\r
65 import org.simantics.db.event.ChangeListener;
\r
66 import org.simantics.db.event.SessionEventListener;
\r
67 import org.simantics.db.exception.CancelTransactionException;
\r
68 import org.simantics.db.exception.ClusterSetExistException;
\r
69 import org.simantics.db.exception.DatabaseException;
\r
70 import org.simantics.db.exception.ImmutableException;
\r
71 import org.simantics.db.exception.InvalidResourceReferenceException;
\r
72 import org.simantics.db.exception.ResourceNotFoundException;
\r
73 import org.simantics.db.exception.RuntimeDatabaseException;
\r
74 import org.simantics.db.exception.ServiceException;
\r
75 import org.simantics.db.exception.ServiceNotFoundException;
\r
76 import org.simantics.db.impl.ClusterBase;
\r
77 import org.simantics.db.impl.ClusterI;
\r
78 import org.simantics.db.impl.ClusterTraitsBase;
\r
79 import org.simantics.db.impl.ClusterTranslator;
\r
80 import org.simantics.db.impl.ResourceImpl;
\r
81 import org.simantics.db.impl.TransientGraph;
\r
82 import org.simantics.db.impl.VirtualGraphImpl;
\r
83 import org.simantics.db.impl.graph.DelayedWriteGraph;
\r
84 import org.simantics.db.impl.graph.ReadGraphImpl;
\r
85 import org.simantics.db.impl.graph.WriteGraphImpl;
\r
86 import org.simantics.db.impl.graph.WriteSupport;
\r
87 import org.simantics.db.impl.internal.RandomAccessValueSupport;
\r
88 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
\r
89 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
\r
90 import org.simantics.db.impl.query.QueryProcessor;
\r
91 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
\r
92 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
\r
93 import org.simantics.db.impl.service.QueryDebug;
\r
94 import org.simantics.db.impl.support.VirtualGraphServerSupport;
\r
95 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
\r
96 import org.simantics.db.procedure.AsyncListener;
\r
97 import org.simantics.db.procedure.AsyncMultiListener;
\r
98 import org.simantics.db.procedure.AsyncMultiProcedure;
\r
99 import org.simantics.db.procedure.AsyncProcedure;
\r
100 import org.simantics.db.procedure.Listener;
\r
101 import org.simantics.db.procedure.ListenerBase;
\r
102 import org.simantics.db.procedure.MultiListener;
\r
103 import org.simantics.db.procedure.MultiProcedure;
\r
104 import org.simantics.db.procedure.Procedure;
\r
105 import org.simantics.db.procedure.SyncListener;
\r
106 import org.simantics.db.procedure.SyncMultiListener;
\r
107 import org.simantics.db.procedure.SyncMultiProcedure;
\r
108 import org.simantics.db.procedure.SyncProcedure;
\r
109 import org.simantics.db.procore.cluster.ClusterImpl;
\r
110 import org.simantics.db.procore.cluster.ClusterTraits;
\r
111 import org.simantics.db.procore.protocol.Constants;
\r
112 import org.simantics.db.procore.protocol.DebugPolicy;
\r
113 import org.simantics.db.request.AsyncMultiRead;
\r
114 import org.simantics.db.request.AsyncRead;
\r
115 import org.simantics.db.request.DelayedWrite;
\r
116 import org.simantics.db.request.DelayedWriteResult;
\r
117 import org.simantics.db.request.ExternalRead;
\r
118 import org.simantics.db.request.MultiRead;
\r
119 import org.simantics.db.request.Read;
\r
120 import org.simantics.db.request.ReadInterface;
\r
121 import org.simantics.db.request.UndoTraits;
\r
122 import org.simantics.db.request.Write;
\r
123 import org.simantics.db.request.WriteInterface;
\r
124 import org.simantics.db.request.WriteOnly;
\r
125 import org.simantics.db.request.WriteOnlyResult;
\r
126 import org.simantics.db.request.WriteResult;
\r
127 import org.simantics.db.request.WriteTraits;
\r
128 import org.simantics.db.service.ByteReader;
\r
129 import org.simantics.db.service.ClusterBuilder;
\r
130 import org.simantics.db.service.ClusterBuilderFactory;
\r
131 import org.simantics.db.service.ClusterControl;
\r
132 import org.simantics.db.service.ClusterSetsSupport;
\r
133 import org.simantics.db.service.ClusterUID;
\r
134 import org.simantics.db.service.ClusteringSupport;
\r
135 import org.simantics.db.service.CollectionSupport;
\r
136 import org.simantics.db.service.DebugSupport;
\r
137 import org.simantics.db.service.DirectQuerySupport;
\r
138 import org.simantics.db.service.GraphChangeListenerSupport;
\r
139 import org.simantics.db.service.InitSupport;
\r
140 import org.simantics.db.service.LifecycleSupport;
\r
141 import org.simantics.db.service.ManagementSupport;
\r
142 import org.simantics.db.service.QueryControl;
\r
143 import org.simantics.db.service.SerialisationSupport;
\r
144 import org.simantics.db.service.ServerInformation;
\r
145 import org.simantics.db.service.ServiceActivityMonitor;
\r
146 import org.simantics.db.service.SessionEventSupport;
\r
147 import org.simantics.db.service.SessionMonitorSupport;
\r
148 import org.simantics.db.service.SessionUserSupport;
\r
149 import org.simantics.db.service.StatementSupport;
\r
150 import org.simantics.db.service.TransactionPolicySupport;
\r
151 import org.simantics.db.service.TransactionSupport;
\r
152 import org.simantics.db.service.TransferableGraphSupport;
\r
153 import org.simantics.db.service.UndoRedoSupport;
\r
154 import org.simantics.db.service.VirtualGraphSupport;
\r
155 import org.simantics.db.service.XSupport;
\r
156 import org.simantics.layer0.Layer0;
\r
157 import org.simantics.utils.DataContainer;
\r
158 import org.simantics.utils.Development;
\r
159 import org.simantics.utils.datastructures.Callback;
\r
160 import org.simantics.utils.threads.logger.ITask;
\r
161 import org.simantics.utils.threads.logger.ThreadLogger;
\r
163 import gnu.trove.procedure.TLongProcedure;
\r
164 import gnu.trove.set.hash.TLongHashSet;
\r
167 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
\r
169 protected static final boolean DEBUG = false;
\r
171 private static final boolean DIAGNOSTICS = false;
\r
173 private TransactionPolicySupport transactionPolicy;
\r
175 final private ClusterControl clusterControl;
\r
177 final protected BuiltinSupportImpl builtinSupport;
\r
178 final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
\r
179 final protected ClusterSetsSupport clusterSetsSupport;
\r
180 final protected LifecycleSupportImpl lifecycleSupport;
\r
182 protected QuerySupportImpl querySupport;
\r
183 protected ResourceSupportImpl resourceSupport;
\r
184 protected WriteSupport writeSupport;
\r
185 public ClusterTranslator clusterTranslator;
\r
187 boolean dirtyPrimitives = false;
\r
189 public static final int SERVICE_MODE_CREATE = 2;
\r
190 public static final int SERVICE_MODE_ALLOW = 1;
\r
191 public int serviceMode = 0;
\r
192 public boolean createdImmutableClusters = false;
\r
193 public TLongHashSet createdClusters = new TLongHashSet();
\r
198 * The service locator maintained by the workbench. These services are
\r
199 * initialized during workbench during the <code>init</code> method.
\r
201 final protected ServiceLocatorImpl serviceLocator = new ServiceLocatorImpl();
\r
203 final public ResourceSerializerImpl resourceSerializer = new ResourceSerializerImpl();
\r
205 final CopyOnWriteArrayList<ChangeListener> changeListeners2 = new CopyOnWriteArrayList<ChangeListener>();
\r
207 final CopyOnWriteArrayList<ChangeListener> metadataListeners = new CopyOnWriteArrayList<ChangeListener>();
\r
209 final CopyOnWriteArrayList<SessionEventListener> eventListeners = new CopyOnWriteArrayList<SessionEventListener>();
\r
211 final HashSet<Thread> sessionThreads = new HashSet<Thread>();
\r
213 final BijectionMap<MonitorContext, MonitorHandler> monitorContexts = new BijectionMap<MonitorContext, MonitorHandler>();
\r
215 final protected State state = new State();
\r
217 protected GraphSession graphSession = null;
\r
219 protected SessionManager sessionManagerImpl = null;
\r
221 protected UserAuthenticationAgent authAgent = null;
\r
223 protected UserAuthenticator authenticator = null;
\r
225 protected Resource user = null;
\r
227 protected ClusterStream clusterStream = null;
\r
229 protected SessionRequestManager requestManager = null;
\r
231 public ClusterTable clusterTable = null;
\r
233 public QueryProcessor queryProvider2 = null;
\r
235 ClientChangesImpl clientChanges = null;
\r
237 MonitorHandler[] monitorHandlers = new MonitorHandler[0];
\r
239 // protected long newClusterId = Constants.NullClusterId;
\r
241 protected int flushCounter = 0;
\r
243 protected boolean writeOnly = false;
\r
245 WriteState<?> writeState = null;
\r
246 WriteStateBase<?> delayedWriteState = null;
\r
247 protected Resource defaultClusterSet = null; // If not null then used for newResource().
\r
249 public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
\r
251 // if (authAgent == null)
\r
252 // throw new IllegalArgumentException("null authentication agent");
\r
254 File t = StaticSessionProperties.virtualGraphStoragePath;
\r
257 this.clusterTable = new ClusterTable(this, t);
\r
258 this.builtinSupport = new BuiltinSupportImpl(this);
\r
259 this.sessionManagerImpl = sessionManagerImpl;
\r
261 // this.authAgent = authAgent;
\r
263 serviceLocator.registerService(Session.class, this);
\r
264 serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
\r
265 serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
\r
266 serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
\r
267 serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
\r
268 serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
\r
269 serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
\r
270 serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
\r
271 serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
\r
272 serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
\r
273 serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
\r
274 serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
\r
275 serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
\r
276 serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
\r
277 serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
\r
278 serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
\r
279 serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
\r
280 serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
\r
281 serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
\r
282 serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
\r
283 serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
\r
284 serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
\r
285 ServiceActivityUpdaterForWriteTransactions.register(this);
\r
287 this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
\r
288 serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
\r
289 serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
\r
290 this.lifecycleSupport = new LifecycleSupportImpl(this);
\r
291 serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
\r
292 this.transactionPolicy = new TransactionPolicyRelease();
\r
293 serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
\r
294 this.clusterControl = new ClusterControlImpl(this);
\r
295 serviceLocator.registerService(ClusterControl.class, clusterControl);
\r
296 this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
\r
297 this.clusterSetsSupport.setReadDirectory(t.toPath());
\r
298 this.clusterSetsSupport.updateWriteDirectory(t.toPath());
\r
299 serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
\r
304 * Session interface
\r
309 public Resource getRootLibrary() {
\r
310 return queryProvider2.getRootLibraryResource();
\r
313 void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
\r
314 if (!graphSession.dbSession.refreshEnabled())
\r
317 getClusterTable().refresh(csid, this, clusterUID);
\r
318 } catch (Throwable t) {
\r
319 Logger.defaultLogError("Refesh failed.", t);
\r
323 static final class TaskHelper {
\r
324 private final String name;
\r
325 private Object result;
\r
326 final Semaphore sema = new Semaphore(0);
\r
327 private Throwable throwable = null;
\r
328 final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
\r
330 public void run(DatabaseException e) {
\r
331 synchronized (TaskHelper.this) {
\r
336 final Procedure<Object> proc = new Procedure<Object>() {
\r
338 public void execute(Object result) {
\r
339 callback.run(null);
\r
342 public void exception(Throwable t) {
\r
343 if (t instanceof DatabaseException)
\r
344 callback.run((DatabaseException)t);
\r
346 callback.run(new DatabaseException("" + name + "operation failed.", t));
\r
349 final WriteTraits writeTraits = new WriteTraits() {
\r
351 public UndoTraits getUndoTraits() {
\r
355 public VirtualGraph getProvider() {
\r
359 TaskHelper(String name) {
\r
362 <T> T getResult() {
\r
365 void setResult(Object result) {
\r
366 this.result = result;
\r
368 // Throwable throwableGet() {
\r
369 // return throwable;
\r
371 synchronized void throwableSet(Throwable t) {
\r
374 // void throwableSet(String t) {
\r
375 // throwable = new InternalException("" + name + " operation failed. " + t);
\r
377 synchronized void throwableCheck()
\r
378 throws DatabaseException {
\r
379 if (null != throwable)
\r
380 if (throwable instanceof DatabaseException)
\r
381 throw (DatabaseException)throwable;
\r
383 throw new DatabaseException("Undo operation failed.", throwable);
\r
385 void throw_(String message)
\r
386 throws DatabaseException {
\r
387 throw new DatabaseException("" + name + " operation failed. " + message);
\r
393 private ListenerBase getListenerBase(Object procedure) {
\r
394 if (procedure instanceof ListenerBase)
\r
395 return (ListenerBase) procedure;
\r
400 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
\r
401 scheduleRequest(request, callback, notify, null);
\r
405 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
\r
408 public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
\r
410 assert (request != null);
\r
412 if(Development.DEVELOPMENT) {
\r
414 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
\r
415 System.err.println("schedule write '" + request + "'");
\r
416 } catch (Throwable t) {
\r
417 Logger.defaultLogError(t);
\r
421 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
423 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
\r
426 public void run(int thread) {
\r
428 if(Development.DEVELOPMENT) {
\r
430 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
\r
431 System.err.println("perform write '" + request + "'");
\r
432 } catch (Throwable t) {
\r
433 Logger.defaultLogError(t);
\r
437 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
\r
439 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
444 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
446 VirtualGraph vg = getProvider(request.getProvider());
\r
448 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
\r
449 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
\r
452 public void execute(Object result) {
\r
453 if(callback != null) callback.run(null);
\r
457 public void exception(Throwable t) {
\r
458 if(callback != null) callback.run((DatabaseException)t);
\r
463 assert (null != writer);
\r
464 // writer.state.barrier.inc();
\r
467 request.perform(writer);
\r
468 assert (null != writer);
\r
469 } catch (Throwable t) {
\r
470 if (!(t instanceof CancelTransactionException))
\r
471 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
\r
472 writeState.except(t);
\r
474 // writer.state.barrier.dec();
\r
475 // writer.waitAsync(request);
\r
479 assert(!queryProvider2.dirty);
\r
481 } catch (Throwable e) {
\r
483 // Log it first, just to be safe that the error is always logged.
\r
484 if (!(e instanceof CancelTransactionException))
\r
485 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
\r
487 // writeState.getGraph().state.barrier.dec();
\r
488 // writeState.getGraph().waitAsync(request);
\r
490 writeState.except(e);
\r
493 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
\r
494 // // All we can do here is to log those, can't really pass them anywhere.
\r
495 // if (callback != null) {
\r
496 // if(e instanceof DatabaseException) callback.run((DatabaseException)e);
\r
497 // else callback.run(new DatabaseException(e));
\r
499 // } catch (Throwable e2) {
\r
500 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
\r
502 // boolean empty = clusterStream.reallyFlush();
\r
503 // int callerThread = -1;
\r
504 // ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
\r
505 // SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
\r
507 // // Send all local changes to server so it can calculate correct reverse change set.
\r
508 // // This will call clusterStream.accept().
\r
509 // state.cancelCommit(context, clusterStream);
\r
511 // if (!context.isOk()) // this is a blocking operation
\r
512 // throw new InternalException("Cancel failed. This should never happen. Contact application support.");
\r
513 // getQueryProvider2().performDirtyUpdates(writeState.getGraph());
\r
515 // state.cancelCommit2(context, clusterStream);
\r
516 // } catch (DatabaseException e3) {
\r
517 // Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
\r
519 // clusterStream.setOff(false);
\r
520 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
525 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
531 if(Development.DEVELOPMENT) {
\r
533 if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
\r
534 System.err.println("finish write '" + request + "'");
\r
535 } catch (Throwable t) {
\r
536 Logger.defaultLogError(t);
\r
546 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
\r
547 scheduleRequest(request, procedure, notify, null);
\r
551 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
\r
554 public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
\r
556 assert (request != null);
\r
558 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
560 requestManager.scheduleWrite(new SessionTask(request, thread) {
\r
563 public void run(int thread) {
\r
565 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
\r
567 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
570 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
572 VirtualGraph vg = getProvider(request.getProvider());
\r
573 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
\r
576 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
\r
577 writeState = writeStateT;
\r
579 assert (null != writer);
\r
580 // writer.state.barrier.inc();
\r
581 writeStateT.setResult(request.perform(writer));
\r
582 assert (null != writer);
\r
584 // writer.state.barrier.dec();
\r
585 // writer.waitAsync(null);
\r
587 } catch (Throwable e) {
\r
589 // writer.state.barrier.dec();
\r
590 // writer.waitAsync(null);
\r
592 writeState.except(e);
\r
594 // state.stopWriteTransaction(clusterStream);
\r
596 // } catch (Throwable e) {
\r
597 // // Log it first, just to be safe that the error is always logged.
\r
598 // Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
\r
601 // // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
\r
602 // // All we can do here is to log those, can't really pass them anywhere.
\r
603 // if (procedure != null) {
\r
604 // if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
\r
605 // else procedure.exception(new DatabaseException(e));
\r
607 // } catch (Throwable e2) {
\r
608 // Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
\r
611 // clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
613 // state.stopWriteTransaction(clusterStream);
\r
616 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
619 // if(notify != null) notify.release();
\r
629 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
\r
630 scheduleRequest(request, callback, notify, null);
\r
634 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
\r
637 public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
\r
639 final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
\r
641 assert (request != null);
\r
643 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
645 requestManager.scheduleWrite(new SessionTask(request, thread) {
\r
648 public void run(int thread) {
\r
649 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
651 Procedure<Object> stateProcedure = new Procedure<Object>() {
\r
653 public void execute(Object result) {
\r
654 if (callback != null)
\r
655 callback.run(null);
\r
658 public void exception(Throwable t) {
\r
659 if (callback != null) {
\r
660 if (t instanceof DatabaseException) callback.run((DatabaseException) t);
\r
661 else callback.run(new DatabaseException(t));
\r
663 Logger.defaultLogError("Unhandled exception", t);
\r
667 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
\r
668 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
\r
669 DelayedWriteGraph dwg = null;
\r
670 // newGraph.state.barrier.inc();
\r
673 dwg = new DelayedWriteGraph(newGraph);
\r
674 request.perform(dwg);
\r
675 } catch (Throwable e) {
\r
676 delayedWriteState.except(e);
\r
680 // newGraph.state.barrier.dec();
\r
681 // newGraph.waitAsync(request);
\r
682 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
685 delayedWriteState = null;
\r
687 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
\r
688 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
691 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
693 acquireWriteOnly();
\r
695 VirtualGraph vg = getProvider(request.getProvider());
\r
696 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
\r
698 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
\r
700 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
\r
702 assert (null != writer);
\r
703 // writer.state.barrier.inc();
\r
707 dwg.commit(writer, request);
\r
709 if(defaultClusterSet != null) {
\r
710 XSupport xs = getService(XSupport.class);
\r
711 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
\r
712 ClusteringSupport cs = getService(ClusteringSupport.class);
\r
713 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
\r
716 // This makes clusters available from server
\r
717 clusterStream.reallyFlush();
\r
719 releaseWriteOnly(writer);
\r
720 handleUpdatesAndMetadata(writer);
\r
722 } catch (ServiceException e) {
\r
723 // writer.state.barrier.dec();
\r
724 // writer.waitAsync(null);
\r
726 // These shall be requested from server
\r
727 clusterTable.removeWriteOnlyClusters();
\r
728 // This makes clusters available from server
\r
729 clusterStream.reallyFlush();
\r
731 releaseWriteOnly(writer);
\r
732 writeState.except(e);
\r
734 // Debugging & Profiling
\r
735 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
745 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
\r
746 scheduleRequest(request, procedure, notify, null);
\r
750 * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
\r
753 public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
\r
754 throw new Error("Not implemented");
\r
757 protected ClusterImpl getNewResourceCluster() throws DatabaseException {
\r
758 ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
\r
759 if((serviceMode & SERVICE_MODE_CREATE) > 0) {
\r
760 createdClusters.add(cluster.clusterId);
\r
765 class WriteOnlySupport implements WriteSupport {
\r
767 ClusterStream stream;
\r
768 ClusterImpl currentCluster;
\r
770 public WriteOnlySupport() {
\r
771 this.stream = clusterStream;
\r
775 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
\r
776 claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
\r
780 public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
\r
782 ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
\r
784 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
\r
785 if(s != queryProvider2.getRootLibrary())
\r
786 throw new ImmutableException("Trying to modify immutable resource key=" + s);
\r
789 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
\r
790 } catch (DatabaseException e) {
\r
791 Logger.defaultLogError(e);
\r
795 clientChanges.invalidate(s);
\r
797 if (cluster.isWriteOnly())
\r
799 queryProvider2.updateStatements(s, p);
\r
804 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
\r
805 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
\r
809 public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
\r
811 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
\r
813 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
\r
814 } catch (DatabaseException e) {
\r
815 Logger.defaultLogError(e);
\r
818 clientChanges.invalidate(rid);
\r
820 if (cluster.isWriteOnly())
\r
822 queryProvider2.updateValue(rid);
\r
827 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
\r
829 if(amount < 65536) {
\r
830 claimValue(provider,resource, reader.readBytes(null, amount));
\r
834 byte[] bytes = new byte[65536];
\r
836 int rid = ((ResourceImpl)resource).id;
\r
837 ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
\r
841 int block = Math.min(left, 65536);
\r
842 reader.readBytes(bytes, block);
\r
843 maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
\r
846 } catch (DatabaseException e) {
\r
847 Logger.defaultLogError(e);
\r
850 clientChanges.invalidate(rid);
\r
852 if (cluster.isWriteOnly())
\r
854 queryProvider2.updateValue(rid);
\r
858 private void maintainCluster(ClusterImpl before, ClusterI after_) {
\r
859 if(after_ != null && after_ != before) {
\r
860 ClusterImpl after = (ClusterImpl)after_;
\r
861 if(currentCluster == before) currentCluster = after;
\r
862 clusterTable.replaceCluster(after);
\r
866 public int createResourceKey(int foreignCounter) throws DatabaseException {
\r
867 if(currentCluster == null)
\r
868 currentCluster = getNewResourceCluster();
\r
869 if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
\r
870 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
\r
871 newCluster.foreignLookup = new byte[foreignCounter];
\r
872 currentCluster = newCluster;
\r
874 System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
\r
876 return currentCluster.createResource(clusterTranslator);
\r
880 public Resource createResource(VirtualGraph provider) throws DatabaseException {
\r
881 if(currentCluster == null) {
\r
882 if (null != defaultClusterSet) {
\r
883 ResourceImpl result = getNewResource(defaultClusterSet);
\r
884 currentCluster = clusterTable.getClusterByResourceKey(result.id);
\r
887 currentCluster = getNewResourceCluster();
\r
890 if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
\r
891 if (null != defaultClusterSet) {
\r
892 ResourceImpl result = getNewResource(defaultClusterSet);
\r
893 currentCluster = clusterTable.getClusterByResourceKey(result.id);
\r
896 currentCluster = getNewResourceCluster();
\r
899 return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
\r
903 public Resource createResource(VirtualGraph provider, long clusterId)
\r
904 throws DatabaseException {
\r
905 return getNewResource(clusterId);
\r
909 public Resource createResource(VirtualGraph provider, Resource clusterSet)
\r
910 throws DatabaseException {
\r
911 return getNewResource(clusterSet);
\r
915 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
\r
916 throws DatabaseException {
\r
917 getNewClusterSet(clusterSet);
\r
921 public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
\r
922 throws ServiceException {
\r
923 return containsClusterSet(clusterSet);
\r
926 public void selectCluster(long cluster) {
\r
927 currentCluster = clusterTable.getClusterByClusterId(cluster);
\r
928 long setResourceId = clusterSetsSupport.getSet(cluster);
\r
929 clusterSetsSupport.put(setResourceId, cluster);
\r
933 public Resource setDefaultClusterSet(Resource clusterSet)
\r
934 throws ServiceException {
\r
935 Resource result = setDefaultClusterSet4NewResource(clusterSet);
\r
936 if(clusterSet != null) {
\r
937 long id = clusterSetsSupport.get(clusterSet.getResourceId());
\r
938 currentCluster = clusterTable.getClusterByClusterId(id);
\r
941 currentCluster = null;
\r
947 public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
\r
949 provider = getProvider(provider);
\r
950 if (null == provider) {
\r
951 int key = ((ResourceImpl)resource).id;
\r
955 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
\r
956 // if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
\r
957 // if(key != queryProvider2.getRootLibrary())
\r
958 // throw new ImmutableException("Trying to modify immutable resource key=" + key);
\r
961 // cluster.removeValue(key, clusterTranslator);
\r
962 // } catch (DatabaseException e) {
\r
963 // Logger.defaultLogError(e);
\r
967 clusterTable.writeOnlyInvalidate(cluster);
\r
970 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
\r
971 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
\r
972 clusterTranslator.removeValue(cluster);
\r
973 } catch (DatabaseException e) {
\r
974 Logger.defaultLogError(e);
\r
977 queryProvider2.invalidateResource(key);
\r
978 clientChanges.invalidate(key);
\r
981 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
\r
982 queryProvider2.updateValue(querySupport.getId(resource));
\r
983 clientChanges.claimValue(resource);
\r
990 public void flush(boolean intermediate) {
\r
991 throw new UnsupportedOperationException();
\r
995 public void flushCluster() {
\r
996 clusterTable.flushCluster(graphSession);
\r
997 if(defaultClusterSet != null) {
\r
998 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
\r
1000 currentCluster = null;
\r
1004 public void flushCluster(Resource r) {
\r
1005 throw new UnsupportedOperationException("flushCluster resource " + r);
\r
1009 public void gc() {
\r
1013 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
\r
1014 Resource object) {
\r
1016 int s = ((ResourceImpl)subject).id;
\r
1017 int p = ((ResourceImpl)predicate).id;
\r
1018 int o = ((ResourceImpl)object).id;
\r
1020 provider = getProvider(provider);
\r
1021 if (null == provider) {
\r
1023 ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
\r
1024 clusterTable.writeOnlyInvalidate(cluster);
\r
1028 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
\r
1029 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
\r
1030 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
\r
1032 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
\r
1033 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
\r
1035 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
\r
1036 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
\r
1037 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
\r
1038 clusterTranslator.removeStatement(cluster);
\r
1040 queryProvider2.invalidateResource(s);
\r
1041 clientChanges.invalidate(s);
\r
1043 } catch (DatabaseException e) {
\r
1045 Logger.defaultLogError(e);
\r
1053 ((VirtualGraphImpl)provider).deny(s, p, o);
\r
1054 queryProvider2.invalidateResource(s);
\r
1055 clientChanges.invalidate(s);
\r
1064 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
\r
1065 throw new UnsupportedOperationException();
\r
1069 public boolean writeOnly() {
\r
1074 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
\r
1075 writeSupport.performWriteRequest(graph, request);
\r
1079 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
\r
1080 throw new UnsupportedOperationException();
\r
1084 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
\r
1085 throw new UnsupportedOperationException();
\r
1089 public <T> void addMetadata(Metadata data) throws ServiceException {
\r
1090 writeSupport.addMetadata(data);
\r
1094 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
\r
1095 return writeSupport.getMetadata(clazz);
\r
1099 public TreeMap<String, byte[]> getMetadata() {
\r
1100 return writeSupport.getMetadata();
\r
1104 public void commitDone(WriteTraits writeTraits, long csid) {
\r
1105 writeSupport.commitDone(writeTraits, csid);
\r
1109 public void clearUndoList(WriteTraits writeTraits) {
\r
1110 writeSupport.clearUndoList(writeTraits);
\r
1113 public int clearMetadata() {
\r
1114 return writeSupport.clearMetadata();
\r
1118 public void startUndo() {
\r
1119 writeSupport.startUndo();
\r
1123 class VirtualWriteOnlySupport implements WriteSupport {
\r
1126 // public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
\r
1127 // Resource object) {
\r
1128 // throw new UnsupportedOperationException();
\r
1132 public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
\r
1134 TransientGraph impl = (TransientGraph)provider;
\r
1135 impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
\r
1136 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
\r
1137 clientChanges.claim(subject, predicate, object);
\r
1142 public void claim(VirtualGraph provider, int subject, int predicate, int object) {
\r
1144 TransientGraph impl = (TransientGraph)provider;
\r
1145 impl.claim(subject, predicate, object);
\r
1146 getQueryProvider2().updateStatements(subject, predicate);
\r
1147 clientChanges.claim(subject, predicate, object);
\r
1152 public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
\r
1153 claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
\r
1157 public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
\r
1158 ((VirtualGraphImpl)provider).claimValue(resource, value, length);
\r
1159 getQueryProvider2().updateValue(resource);
\r
1160 clientChanges.claimValue(resource);
\r
1164 public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
\r
1165 byte[] value = reader.readBytes(null, amount);
\r
1166 claimValue(provider, resource, value);
\r
1170 public Resource createResource(VirtualGraph provider) {
\r
1171 TransientGraph impl = (TransientGraph)provider;
\r
1172 return impl.getResource(impl.newResource(false));
\r
1176 public Resource createResource(VirtualGraph provider, long clusterId) {
\r
1177 throw new UnsupportedOperationException();
\r
1181 public Resource createResource(VirtualGraph provider, Resource clusterSet)
\r
1182 throws DatabaseException {
\r
1183 throw new UnsupportedOperationException();
\r
1187 public void createClusterSet(VirtualGraph provider, Resource clusterSet)
\r
1188 throws DatabaseException {
\r
1189 throw new UnsupportedOperationException();
\r
1193 public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
\r
1194 throws ServiceException {
\r
1195 throw new UnsupportedOperationException();
\r
1199 public Resource setDefaultClusterSet(Resource clusterSet)
\r
1200 throws ServiceException {
\r
1205 public void denyValue(VirtualGraph provider, Resource resource) {
\r
1206 ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
\r
1207 getQueryProvider2().updateValue(querySupport.getId(resource));
\r
1208 // NOTE: this only keeps track of value changes by-resource.
\r
1209 clientChanges.claimValue(resource);
\r
1213 public void flush(boolean intermediate) {
\r
1214 throw new UnsupportedOperationException();
\r
1218 public void flushCluster() {
\r
1219 throw new UnsupportedOperationException();
\r
1223 public void flushCluster(Resource r) {
\r
1224 throw new UnsupportedOperationException("Resource " + r);
\r
1228 public void gc() {
\r
1232 public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
\r
1233 TransientGraph impl = (TransientGraph) provider;
\r
1234 impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
\r
1235 getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
\r
1236 clientChanges.deny(subject, predicate, object);
\r
1241 public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
\r
1242 throw new UnsupportedOperationException();
\r
1246 public boolean writeOnly() {
\r
1251 public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
\r
1252 throw new UnsupportedOperationException();
\r
1256 public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
\r
1257 throw new UnsupportedOperationException();
\r
1261 public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
\r
1262 throw new UnsupportedOperationException();
\r
1266 public <T> void addMetadata(Metadata data) throws ServiceException {
\r
1267 throw new UnsupportedOperationException();
\r
1271 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
\r
1272 throw new UnsupportedOperationException();
\r
1276 public TreeMap<String, byte[]> getMetadata() {
\r
1277 throw new UnsupportedOperationException();
\r
1281 public void commitDone(WriteTraits writeTraits, long csid) {
\r
1285 public void clearUndoList(WriteTraits writeTraits) {
\r
1289 public int clearMetadata() {
\r
1294 public void startUndo() {
\r
1298 private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
\r
1302 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1304 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
1307 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
1309 acquireWriteOnly();
\r
1311 WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
\r
1313 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
\r
1315 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
\r
1316 writeState = writeStateT;
\r
1318 assert (null != writer);
\r
1319 // writer.state.barrier.inc();
\r
1320 long start = System.nanoTime();
\r
1321 T result = request.perform(writer);
\r
1322 long duration = System.nanoTime() - start;
\r
1324 System.err.println("################");
\r
1325 System.err.println("WriteOnly duration " + 1e-9*duration);
\r
1327 writeStateT.setResult(result);
\r
1329 // This makes clusters available from server
\r
1330 clusterStream.reallyFlush();
\r
1332 // This will trigger query updates
\r
1333 releaseWriteOnly(writer);
\r
1334 assert (null != writer);
\r
1336 handleUpdatesAndMetadata(writer);
\r
1338 } catch (CancelTransactionException e) {
\r
1340 releaseWriteOnly(writeState.getGraph());
\r
1342 clusterTable.removeWriteOnlyClusters();
\r
1343 state.stopWriteTransaction(clusterStream);
\r
1345 } catch (Throwable e) {
\r
1347 e.printStackTrace();
\r
1349 releaseWriteOnly(writeState.getGraph());
\r
1351 clusterTable.removeWriteOnlyClusters();
\r
1353 if (callback != null)
\r
1354 callback.exception(new DatabaseException(e));
\r
1356 state.stopWriteTransaction(clusterStream);
\r
1357 Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
\r
1361 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
1367 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
\r
1368 scheduleRequest(request, callback, notify, null);
\r
1372 public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
\r
1376 assert (request != null);
\r
1378 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1380 requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
\r
1383 public void run(int thread) {
\r
1385 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
\r
1389 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
1392 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
\r
1394 acquireWriteOnly();
\r
1396 VirtualGraph vg = getProvider(request.getProvider());
\r
1397 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
\r
1399 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
\r
1401 writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
\r
1404 public void execute(Object result) {
\r
1405 if(callback != null) callback.run(null);
\r
1409 public void exception(Throwable t) {
\r
1410 if(callback != null) callback.run((DatabaseException)t);
\r
1415 assert (null != writer);
\r
1416 // writer.state.barrier.inc();
\r
1420 request.perform(writer);
\r
1422 } catch (Throwable e) {
\r
1424 // writer.state.barrier.dec();
\r
1425 // writer.waitAsync(null);
\r
1427 releaseWriteOnly(writer);
\r
1429 clusterTable.removeWriteOnlyClusters();
\r
1431 if(!(e instanceof CancelTransactionException)) {
\r
1432 if (callback != null)
\r
1433 callback.run(new DatabaseException(e));
\r
1436 writeState.except(e);
\r
1442 // This makes clusters available from server
\r
1443 boolean empty = clusterStream.reallyFlush();
\r
1444 // This was needed to make WO requests call metadata listeners.
\r
1445 // NOTE: the calling event does not contain clientChanges information.
\r
1446 if (!empty && clientChanges.isEmpty())
\r
1447 clientChanges.setNotEmpty(true);
\r
1448 releaseWriteOnly(writer);
\r
1449 assert (null != writer);
\r
1452 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
\r
1465 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
\r
1466 scheduleRequest(request, callback, notify, null);
\r
1470 * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
\r
1473 public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
\r
1475 assert (request != null);
\r
1477 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1479 requestManager.scheduleWrite(new SessionTask(request, thread) {
\r
1482 public void run(int thread) {
\r
1484 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
\r
1486 performWriteOnly(request, notify, callback);
\r
1496 public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
\r
1498 assert (request != null);
\r
1499 assert (procedure != null);
\r
1501 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1503 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
\r
1506 public void run(int thread) {
\r
1508 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1510 ListenerBase listener = getListenerBase(procedure);
\r
1512 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
\r
1516 if (listener != null) {
\r
1519 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
\r
1522 public void exception(AsyncReadGraph graph, Throwable t) {
\r
1523 procedure.exception(graph, t);
\r
1524 if(throwable != null) {
\r
1527 // ErrorLogger.defaultLogError("Unhandled exception", t);
\r
1532 public void execute(AsyncReadGraph graph, T t) {
\r
1533 if(result != null) result.set(t);
\r
1534 procedure.execute(graph, t);
\r
1538 } catch (Throwable t) {
\r
1539 // This is handled by the AsyncProcedure
\r
1540 //Logger.defaultLogError("Internal error", t);
\r
1547 // newGraph.state.barrier.inc();
\r
1549 T t = request.perform(newGraph);
\r
1553 if(result != null) result.set(t);
\r
1554 procedure.execute(newGraph, t);
\r
1556 } catch (Throwable th) {
\r
1558 if(throwable != null) {
\r
1559 throwable.set(th);
\r
1561 Logger.defaultLogError("Unhandled exception", th);
\r
1566 } catch (Throwable t) {
\r
1569 t.printStackTrace();
\r
1571 if(throwable != null) {
\r
1574 Logger.defaultLogError("Unhandled exception", t);
\r
1579 procedure.exception(newGraph, t);
\r
1581 } catch (Throwable t2) {
\r
1583 if(throwable != null) {
\r
1584 throwable.set(t2);
\r
1586 Logger.defaultLogError("Unhandled exception", t2);
\r
1593 // newGraph.state.barrier.dec();
\r
1594 // newGraph.waitAsync(request);
\r
1600 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1610 public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
\r
1612 assert (request != null);
\r
1613 assert (procedure != null);
\r
1615 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1617 requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
\r
1620 public void run(int thread) {
\r
1622 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1624 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
\r
1628 if (listener != null) {
\r
1630 newGraph.processor.query(newGraph, request, null, procedure, listener);
\r
1632 // newGraph.waitAsync(request);
\r
1636 final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
\r
1637 procedure, "request");
\r
1641 // newGraph.state.barrier.inc();
\r
1643 request.perform(newGraph, wrapper);
\r
1645 // newGraph.waitAsync(request);
\r
1647 } catch (Throwable t) {
\r
1649 wrapper.exception(newGraph, t);
\r
1650 // newGraph.waitAsync(request);
\r
1659 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1669 public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
\r
1671 assert (request != null);
\r
1672 assert (procedure != null);
\r
1674 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1676 int sync = notify != null ? thread : -1;
\r
1678 requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
\r
1681 public void run(int thread) {
\r
1683 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1685 ListenerBase listener = getListenerBase(procedure);
\r
1687 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
\r
1691 if (listener != null) {
\r
1693 newGraph.processor.query(newGraph, request, null, procedure, listener);
\r
1695 // newGraph.waitAsync(request);
\r
1699 final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
\r
1703 request.perform(newGraph, wrapper);
\r
1705 } catch (Throwable t) {
\r
1707 t.printStackTrace();
\r
1715 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1725 public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
\r
1727 assert (request != null);
\r
1728 assert (procedure != null);
\r
1730 int thread = request.hashCode() & queryProvider2.THREAD_MASK;
\r
1732 requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
\r
1735 public void run(int thread) {
\r
1737 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1739 ListenerBase listener = getListenerBase(procedure);
\r
1741 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
\r
1745 if (listener != null) {
\r
1747 newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
\r
1750 public void exception(Throwable t) {
\r
1751 procedure.exception(t);
\r
1752 if(throwable != null) {
\r
1758 public void execute(T t) {
\r
1759 if(result != null) result.set(t);
\r
1760 procedure.execute(t);
\r
1765 // newGraph.waitAsync(request);
\r
1769 // newGraph.state.barrier.inc();
\r
1771 request.register(newGraph, new Listener<T>() {
\r
1774 public void exception(Throwable t) {
\r
1775 if(throwable != null) throwable.set(t);
\r
1776 procedure.exception(t);
\r
1777 // newGraph.state.barrier.dec();
\r
1781 public void execute(T t) {
\r
1782 if(result != null) result.set(t);
\r
1783 procedure.execute(t);
\r
1784 // newGraph.state.barrier.dec();
\r
1788 public boolean isDisposed() {
\r
1794 // newGraph.waitAsync(request);
\r
1800 fireSessionVariableChange(SessionVariables.QUEUED_READS);
\r
1812 public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
\r
1814 scheduleRequest(request, procedure, null, null, null);
\r
1819 public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
\r
1820 assertNotSession();
\r
1821 Semaphore notify = new Semaphore(0);
\r
1822 DataContainer<Throwable> container = new DataContainer<Throwable>();
\r
1823 DataContainer<T> result = new DataContainer<T>();
\r
1824 scheduleRequest(request, procedure, notify, container, result);
\r
1825 acquire(notify, request);
\r
1826 Throwable throwable = container.get();
\r
1827 if(throwable != null) {
\r
1828 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
\r
1829 else throw new DatabaseException("Unexpected exception", throwable);
\r
1831 return result.get();
\r
1835 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
\r
1836 assertNotSession();
\r
1837 Semaphore notify = new Semaphore(0);
\r
1838 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
\r
1839 final DataContainer<T> resultContainer = new DataContainer<T>();
\r
1840 scheduleRequest(request, new AsyncProcedure<T>() {
\r
1842 public void exception(AsyncReadGraph graph, Throwable throwable) {
\r
1843 exceptionContainer.set(throwable);
\r
1844 procedure.exception(graph, throwable);
\r
1847 public void execute(AsyncReadGraph graph, T result) {
\r
1848 resultContainer.set(result);
\r
1849 procedure.execute(graph, result);
\r
1851 }, getListenerBase(procedure), notify);
\r
1852 acquire(notify, request, procedure);
\r
1853 Throwable throwable = exceptionContainer.get();
\r
1854 if (throwable != null) {
\r
1855 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
\r
1856 else throw new DatabaseException("Unexpected exception", throwable);
\r
1858 return resultContainer.get();
\r
1863 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
\r
1864 assertNotSession();
\r
1865 Semaphore notify = new Semaphore(0);
\r
1866 final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
\r
1867 scheduleRequest(request, new AsyncMultiProcedure<T>() {
\r
1869 public void exception(AsyncReadGraph graph, Throwable throwable) {
\r
1870 exceptionContainer.set(throwable);
\r
1871 procedure.exception(graph, throwable);
\r
1874 public void execute(AsyncReadGraph graph, T result) {
\r
1875 procedure.execute(graph, result);
\r
1878 public void finished(AsyncReadGraph graph) {
\r
1879 procedure.finished(graph);
\r
1882 acquire(notify, request, procedure);
\r
1883 Throwable throwable = exceptionContainer.get();
\r
1884 if (throwable != null) {
\r
1885 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
\r
1886 else throw new DatabaseException("Unexpected exception", throwable);
\r
1888 // TODO: implement return value
\r
1889 System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
\r
1894 public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
\r
1895 return syncRequest(request, new ProcedureAdapter<T>());
\r
1898 // assert(request != null);
\r
1900 // final DataContainer<T> result = new DataContainer<T>();
\r
1901 // final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
1903 // syncRequest(request, new Procedure<T>() {
\r
1906 // public void execute(T t) {
\r
1911 // public void exception(Throwable t) {
\r
1912 // exception.set(t);
\r
1917 // Throwable t = exception.get();
\r
1918 // if(t != null) {
\r
1919 // if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
1920 // else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
\r
1923 // return result.get();
\r
1928 public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
\r
1929 return syncRequest(request, (Procedure<T>)procedure);
\r
1934 // public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
\r
1935 // assertNotSession();
\r
1936 // Semaphore notify = new Semaphore(0);
\r
1937 // DataContainer<Throwable> container = new DataContainer<Throwable>();
\r
1938 // DataContainer<T> result = new DataContainer<T>();
\r
1939 // scheduleRequest(request, procedure, notify, container, result);
\r
1940 // acquire(notify, request);
\r
1941 // Throwable throwable = container.get();
\r
1942 // if(throwable != null) {
\r
1943 // if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
\r
1944 // else throw new DatabaseException("Unexpected exception", throwable);
\r
1946 // return result.get();
\r
1951 public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
\r
1952 assertNotSession();
\r
1953 Semaphore notify = new Semaphore(0);
\r
1954 final DataContainer<Throwable> container = new DataContainer<Throwable>();
\r
1955 final DataContainer<T> result = new DataContainer<T>();
\r
1956 scheduleRequest(request, procedure, notify, container, result);
\r
1957 acquire(notify, request);
\r
1958 Throwable throwable = container.get();
\r
1959 if (throwable != null) {
\r
1960 if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
\r
1961 else throw new DatabaseException("Unexpected exception", throwable);
\r
1963 return result.get();
\r
1967 public void syncRequest(Write request) throws DatabaseException {
\r
1968 assertNotSession();
\r
1970 Semaphore notify = new Semaphore(0);
\r
1971 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
\r
1972 scheduleRequest(request, new Callback<DatabaseException>() {
\r
1975 public void run(DatabaseException e) {
\r
1980 acquire(notify, request);
\r
1981 if(exception.get() != null) throw exception.get();
\r
1985 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
\r
1986 assertNotSession();
\r
1987 Semaphore notify = new Semaphore(0);
\r
1988 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
1989 final DataContainer<T> result = new DataContainer<T>();
\r
1990 scheduleRequest(request, new Procedure<T>() {
\r
1993 public void exception(Throwable t) {
\r
1998 public void execute(T t) {
\r
2003 acquire(notify, request);
\r
2004 if(exception.get() != null) {
\r
2005 Throwable t = exception.get();
\r
2006 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
2007 else throw new DatabaseException(t);
\r
2009 return result.get();
\r
2013 public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException {
\r
2014 assertNotSession();
\r
2015 Semaphore notify = new Semaphore(0);
\r
2016 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
2017 final DataContainer<T> result = new DataContainer<T>();
\r
2018 scheduleRequest(request, new Procedure<T>() {
\r
2021 public void exception(Throwable t) {
\r
2026 public void execute(T t) {
\r
2031 acquire(notify, request);
\r
2032 if(exception.get() != null) {
\r
2033 Throwable t = exception.get();
\r
2034 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
2035 else throw new DatabaseException(t);
\r
2037 return result.get();
\r
2041 public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException {
\r
2042 assertNotSession();
\r
2043 Semaphore notify = new Semaphore(0);
\r
2044 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
2045 final DataContainer<T> result = new DataContainer<T>();
\r
2046 scheduleRequest(request, new Procedure<T>() {
\r
2049 public void exception(Throwable t) {
\r
2054 public void execute(T t) {
\r
2059 acquire(notify, request);
\r
2060 if(exception.get() != null) {
\r
2061 Throwable t = exception.get();
\r
2062 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
2063 else throw new DatabaseException(t);
\r
2065 return result.get();
\r
2069 public void syncRequest(DelayedWrite request) throws DatabaseException {
\r
2070 assertNotSession();
\r
2071 Semaphore notify = new Semaphore(0);
\r
2072 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
\r
2073 scheduleRequest(request, new Callback<DatabaseException>() {
\r
2075 public void run(DatabaseException e) {
\r
2079 acquire(notify, request);
\r
2080 if(exception.get() != null) throw exception.get();
\r
2084 public void syncRequest(WriteOnly request) throws DatabaseException {
\r
2085 assertNotSession();
\r
2087 Semaphore notify = new Semaphore(0);
\r
2088 final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
\r
2089 scheduleRequest(request, new Callback<DatabaseException>() {
\r
2091 public void run(DatabaseException e) {
\r
2095 acquire(notify, request);
\r
2096 if(exception.get() != null) throw exception.get();
\r
2101 * GraphRequestProcessor interface
\r
2105 public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
\r
2107 scheduleRequest(request, procedure, null, null);
\r
2112 public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
\r
2114 scheduleRequest(request, procedure, null);
\r
2119 public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
\r
2121 scheduleRequest(request, procedure, null, null, null);
\r
2126 public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
\r
2128 scheduleRequest(request, callback, null);
\r
2133 public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
\r
2135 scheduleRequest(request, procedure, null);
\r
2140 public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
\r
2142 scheduleRequest(request, procedure, null);
\r
2147 public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
\r
2149 scheduleRequest(request, procedure, null);
\r
2154 public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
\r
2156 scheduleRequest(request, callback, null);
\r
2161 public void asyncRequest(final Write r) {
\r
2162 asyncRequest(r, null);
\r
2166 public void asyncRequest(final DelayedWrite r) {
\r
2167 asyncRequest(r, null);
\r
2171 public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
\r
2173 scheduleRequest(request, callback, null);
\r
2178 public void asyncRequest(final WriteOnly request) {
\r
2180 asyncRequest(request, null);
\r
2185 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
\r
2186 r.request(this, procedure);
\r
2190 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
\r
2191 r.request(this, procedure);
\r
2195 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
\r
2196 r.request(this, procedure);
\r
2200 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
\r
2201 r.request(this, procedure);
\r
2205 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
\r
2206 r.request(this, procedure);
\r
2210 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
\r
2211 r.request(this, procedure);
\r
2215 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
\r
2216 return r.request(this);
\r
2220 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
\r
2221 return r.request(this);
\r
2225 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
\r
2226 r.request(this, procedure);
\r
2230 public <T> void async(WriteInterface<T> r) {
\r
2231 r.request(this, new ProcedureAdapter<T>());
\r
2235 public void incAsync() {
\r
2240 public void decAsync() {
\r
2244 public long getCluster(ResourceImpl resource) {
\r
2245 ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
\r
2246 return cluster.getClusterId();
\r
2249 public long getCluster(int id) {
\r
2250 if (clusterTable == null)
\r
2251 System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
\r
2252 return clusterTable.getClusterIdByResourceKeyNoThrow(id);
\r
2255 public ResourceImpl getResource(int id) {
\r
2256 return new ResourceImpl(resourceSupport, id);
\r
2259 public ResourceImpl getResource(int resourceIndex, long clusterId) {
\r
2260 assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
\r
2261 ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
\r
2262 int key = proxy.getClusterKey();
\r
2263 int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
\r
2264 return new ResourceImpl(resourceSupport, resourceKey);
\r
2267 public ResourceImpl getResource2(int id) {
\r
2269 return new ResourceImpl(resourceSupport, id);
\r
2272 final public int getId(ResourceImpl impl) {
\r
2276 public static final Charset UTF8 = Charset.forName("utf-8");
\r
2281 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
\r
2282 for(TransientGraph g : support.providers) {
\r
2283 if(g.isPending(subject)) return false;
\r
2288 static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
\r
2289 for(TransientGraph g : support.providers) {
\r
2290 if(g.isPending(subject, predicate)) return false;
\r
2295 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
\r
2297 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
\r
2299 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
\r
2302 public void run(ReadGraphImpl graph) {
\r
2303 if(ready.decrementAndGet() == 0) {
\r
2304 runnable.run(graph);
\r
2310 for(TransientGraph g : support.providers) {
\r
2311 if(g.isPending(subject)) {
\r
2313 g.load(graph, subject, composite);
\r
2314 } catch (DatabaseException e) {
\r
2315 e.printStackTrace();
\r
2318 composite.run(graph);
\r
2322 composite.run(graph);
\r
2326 static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
\r
2328 Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
\r
2330 AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
\r
2333 public void run(ReadGraphImpl graph) {
\r
2334 if(ready.decrementAndGet() == 0) {
\r
2335 runnable.run(graph);
\r
2341 for(TransientGraph g : support.providers) {
\r
2342 if(g.isPending(subject, predicate)) {
\r
2344 g.load(graph, subject, predicate, composite);
\r
2345 } catch (DatabaseException e) {
\r
2346 e.printStackTrace();
\r
2349 composite.run(graph);
\r
2353 composite.run(graph);
\r
2357 // void dumpHeap() {
\r
2360 // ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
\r
2361 // "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
\r
2362 // "d:/heap" + flushCounter + ".txt", true);
\r
2363 // } catch (IOException e) {
\r
2364 // e.printStackTrace();
\r
2369 void fireReactionsToSynchronize(ChangeSet cs) {
\r
2371 // Do not fire empty events
\r
2375 ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
\r
2376 // g.state.barrier.inc();
\r
2380 if (!cs.isEmpty()) {
\r
2381 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
\r
2382 for (ChangeListener l : changeListeners2) {
\r
2384 l.graphChanged(e2);
\r
2385 } catch (Exception ex) {
\r
2386 ex.printStackTrace();
\r
2393 // g.state.barrier.dec();
\r
2394 // g.waitAsync(null);
\r
2400 void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
\r
2403 // Do not fire empty events
\r
2404 if (cs2.isEmpty())
\r
2407 // graph.restart();
\r
2408 // graph.state.barrier.inc();
\r
2412 if (!cs2.isEmpty()) {
\r
2414 ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
\r
2415 for (ChangeListener l : changeListeners2) {
\r
2417 l.graphChanged(e2);
\r
2418 // System.out.println("changelistener " + l);
\r
2419 } catch (Exception ex) {
\r
2420 ex.printStackTrace();
\r
2428 // graph.state.barrier.dec();
\r
2429 // graph.waitAsync(null);
\r
2432 } catch (Throwable t) {
\r
2433 t.printStackTrace();
\r
2437 void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
\r
2441 // Do not fire empty events
\r
2442 if (cs2.isEmpty())
\r
2445 // Do not fire on virtual requests
\r
2446 if(graph.getProvider() != null)
\r
2449 WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
\r
2453 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
\r
2454 for (ChangeListener l : metadataListeners) {
\r
2456 l.graphChanged(e2);
\r
2457 } catch (Throwable ex) {
\r
2458 ex.printStackTrace();
\r
2466 } catch (Throwable t) {
\r
2467 t.printStackTrace();
\r
2476 * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
\r
2479 public <T> T getService(Class<T> api) {
\r
2480 T t = peekService(api);
\r
2482 if (state.isClosed())
\r
2483 throw new ServiceNotFoundException(this, api, "Session has been shut down");
\r
2484 throw new ServiceNotFoundException(this, api);
\r
2492 protected abstract ServerInformation getCachedServerInformation();
\r
2494 private Class<?> serviceKey1 = null;
\r
2495 private Class<?> serviceKey2 = null;
\r
2496 private Object service1 = null;
\r
2497 private Object service2 = null;
\r
2503 * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
\r
2505 @SuppressWarnings("unchecked")
\r
2507 public synchronized <T> T peekService(Class<T> api) {
\r
2509 if(serviceKey1 == api) {
\r
2510 return (T)service1;
\r
2511 } else if (serviceKey2 == api) {
\r
2512 // Promote this key
\r
2513 Object result = service2;
\r
2514 service2 = service1;
\r
2515 serviceKey2 = serviceKey1;
\r
2516 service1 = result;
\r
2517 serviceKey1 = api;
\r
2521 if (Layer0.class == api)
\r
2523 if (ServerInformation.class == api)
\r
2524 return (T) getCachedServerInformation();
\r
2525 else if (WriteGraphImpl.class == api)
\r
2526 return (T) writeState.getGraph();
\r
2527 else if (ClusterBuilder.class == api)
\r
2528 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
\r
2529 else if (ClusterBuilderFactory.class == api)
\r
2530 return (T)new ClusterBuilderFactoryImpl(this);
\r
2532 service2 = service1;
\r
2533 serviceKey2 = serviceKey1;
\r
2535 service1 = serviceLocator.peekService(api);
\r
2536 serviceKey1 = api;
\r
2538 return (T)service1;
\r
2546 * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
\r
2549 public boolean hasService(Class<?> api) {
\r
2550 return serviceLocator.hasService(api);
\r
2554 * @param api the api that must be implemented by the specified service
\r
2555 * @param service the service implementation
\r
2558 public <T> void registerService(Class<T> api, T service) {
\r
2559 if(Layer0.class == api) {
\r
2560 L0 = (Layer0)service;
\r
2563 serviceLocator.registerService(api, service);
\r
2564 if (TransactionPolicySupport.class == api) {
\r
2565 transactionPolicy = (TransactionPolicySupport)service;
\r
2566 state.resetTransactionPolicy();
\r
2568 if (api == serviceKey1)
\r
2569 service1 = service;
\r
2570 else if (api == serviceKey2)
\r
2571 service2 = service;
\r
2574 // ----------------
\r
2576 // ----------------
\r
2578 void fireSessionVariableChange(String variable) {
\r
2579 for (MonitorHandler h : monitorHandlers) {
\r
2580 MonitorContext ctx = monitorContexts.getLeft(h);
\r
2581 assert ctx != null;
\r
2582 // SafeRunner functionality repeated here to avoid dependency.
\r
2584 h.valuesChanged(ctx);
\r
2585 } catch (Exception e) {
\r
2586 Logger.defaultLogError("monitor handler notification produced the following exception", e);
\r
2587 } catch (LinkageError e) {
\r
2588 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
\r
2594 class ResourceSerializerImpl implements ResourceSerializer {
\r
2596 public long createRandomAccessId(int id) throws DatabaseException {
\r
2599 int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
\r
2600 long cluster = getCluster(id);
\r
2602 return 0; // Better to return 0 then invalid id.
\r
2603 long result = ClusterTraitsBase.createResourceId(cluster, index);
\r
2608 public long getRandomAccessId(Resource resource) throws DatabaseException {
\r
2610 ResourceImpl resourceImpl = (ResourceImpl) resource;
\r
2611 return createRandomAccessId(resourceImpl.id);
\r
2616 public int getTransientId(Resource resource) throws DatabaseException {
\r
2618 ResourceImpl resourceImpl = (ResourceImpl) resource;
\r
2619 return resourceImpl.id;
\r
2624 public String createRandomAccessId(Resource resource)
\r
2625 throws InvalidResourceReferenceException {
\r
2627 if(resource == null) throw new IllegalArgumentException();
\r
2629 ResourceImpl resourceImpl = (ResourceImpl) resource;
\r
2631 if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
\r
2635 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
\r
2636 } catch (DatabaseException e1) {
\r
2637 throw new InvalidResourceReferenceException(e1);
\r
2640 // Serialize as '<resource index>_<cluster id>'
\r
2641 return "" + r + "_" + getCluster(resourceImpl);
\r
2642 } catch (Throwable e) {
\r
2643 e.printStackTrace();
\r
2644 throw new InvalidResourceReferenceException(e);
\r
2649 public int getTransientId(long serialized) throws DatabaseException {
\r
2650 if (serialized <= 0)
\r
2651 return (int)serialized;
\r
2652 int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
\r
2653 long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
\r
2654 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
\r
2655 if (cluster == null)
\r
2656 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
\r
2657 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
\r
2662 public Resource getResource(long randomAccessId) throws DatabaseException {
\r
2663 return getResourceByKey(getTransientId(randomAccessId));
\r
2667 public Resource getResource(int transientId) throws DatabaseException {
\r
2668 return getResourceByKey(transientId);
\r
2672 public Resource getResource(String randomAccessId)
\r
2673 throws InvalidResourceReferenceException {
\r
2675 int i = randomAccessId.indexOf('_');
\r
2677 throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
\r
2678 + randomAccessId + "'");
\r
2679 int r = Integer.parseInt(randomAccessId.substring(0, i));
\r
2680 if(r < 0) return getResourceByKey(r);
\r
2681 long c = Long.parseLong(randomAccessId.substring(i + 1));
\r
2682 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
\r
2683 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
\r
2684 if (cluster.hasResource(key, clusterTranslator))
\r
2685 return getResourceByKey(key);
\r
2686 } catch (InvalidResourceReferenceException e) {
\r
2688 } catch (NumberFormatException e) {
\r
2689 throw new InvalidResourceReferenceException(e);
\r
2690 } catch (Throwable e) {
\r
2691 e.printStackTrace();
\r
2692 throw new InvalidResourceReferenceException(e);
\r
2695 throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
\r
2699 public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
\r
2702 } catch (Throwable e) {
\r
2703 e.printStackTrace();
\r
2704 throw new InvalidResourceReferenceException(e);
\r
2710 // Copied from old SessionImpl
\r
2713 if (state.isClosed())
\r
2714 throw new Error("Session closed.");
\r
2719 public ResourceImpl getNewResource(long clusterId) {
\r
2720 ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
\r
2723 newId = cluster.createResource(clusterTranslator);
\r
2724 } catch (DatabaseException e) {
\r
2725 Logger.defaultLogError(e);
\r
2728 return new ResourceImpl(resourceSupport, newId);
\r
2731 public ResourceImpl getNewResource(Resource clusterSet)
\r
2732 throws DatabaseException {
\r
2733 long resourceId = clusterSet.getResourceId();
\r
2734 Long clusterId = clusterSetsSupport.get(resourceId);
\r
2735 if (null == clusterId)
\r
2736 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
\r
2737 if (Constants.NewClusterId == clusterId) {
\r
2738 clusterId = getService(ClusteringSupport.class).createCluster();
\r
2739 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
\r
2740 createdClusters.add(clusterId);
\r
2741 clusterSetsSupport.put(resourceId, clusterId);
\r
2742 return getNewResource(clusterId);
\r
2744 ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
\r
2745 ResourceImpl result;
\r
2746 if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
\r
2747 clusterId = getService(ClusteringSupport.class).createCluster();
\r
2748 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
\r
2749 createdClusters.add(clusterId);
\r
2750 clusterSetsSupport.put(resourceId, clusterId);
\r
2751 return getNewResource(clusterId);
\r
2753 result = getNewResource(clusterId);
\r
2754 int resultKey = querySupport.getId(result);
\r
2755 long resultCluster = querySupport.getClusterId(resultKey);
\r
2756 if (clusterId != resultCluster)
\r
2757 clusterSetsSupport.put(resourceId, resultCluster);
\r
2763 public void getNewClusterSet(Resource clusterSet)
\r
2764 throws DatabaseException {
\r
2766 System.out.println("new cluster set=" + clusterSet);
\r
2767 long resourceId = clusterSet.getResourceId();
\r
2768 if (clusterSetsSupport.containsKey(resourceId))
\r
2769 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
\r
2770 clusterSetsSupport.put(resourceId, Constants.NewClusterId);
\r
2772 public boolean containsClusterSet(Resource clusterSet)
\r
2773 throws ServiceException {
\r
2774 long resourceId = clusterSet.getResourceId();
\r
2775 return clusterSetsSupport.containsKey(resourceId);
\r
2777 public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
\r
2778 Resource r = defaultClusterSet;
\r
2779 defaultClusterSet = clusterSet;
\r
2782 void printDiagnostics() {
\r
2784 if (DIAGNOSTICS) {
\r
2786 final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
\r
2787 final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
\r
2788 for(ClusterI cluster : clusterTable.getClusters()) {
\r
2790 if (cluster.isLoaded() && !cluster.isEmpty()) {
\r
2791 residentClusters.set(residentClusters.get() + 1);
\r
2792 totalMem.set(totalMem.get() + cluster.getUsedSpace());
\r
2794 } catch (DatabaseException e) {
\r
2795 Logger.defaultLogError(e);
\r
2799 System.out.println("--------------------------------");
\r
2800 System.out.println("Cluster information:");
\r
2801 System.out.println("-amount of resident clusters=" + residentClusters.get());
\r
2802 System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
\r
2804 for(ClusterI cluster : clusterTable.getClusters()) {
\r
2805 System.out.print("Cluster " + cluster.getClusterId() + " [");
\r
2807 if (!cluster.isLoaded())
\r
2808 System.out.println("not loaded]");
\r
2809 else if (cluster.isEmpty())
\r
2810 System.out.println("is empty]");
\r
2812 System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
\r
2813 System.out.print(",references=" + cluster.getReferenceCount());
\r
2814 System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
\r
2816 } catch (DatabaseException e) {
\r
2817 Logger.defaultLogError(e);
\r
2818 System.out.println("is corrupted]");
\r
2822 queryProvider2.printDiagnostics();
\r
2823 System.out.println("--------------------------------");
\r
2828 public void fireStartReadTransaction() {
\r
2831 System.out.println("StartReadTransaction");
\r
2833 for (SessionEventListener listener : eventListeners) {
\r
2835 listener.readTransactionStarted();
\r
2836 } catch (Throwable t) {
\r
2837 t.printStackTrace();
\r
2843 public void fireFinishReadTransaction() {
\r
2846 System.out.println("FinishReadTransaction");
\r
2848 for (SessionEventListener listener : eventListeners) {
\r
2850 listener.readTransactionFinished();
\r
2851 } catch (Throwable t) {
\r
2852 t.printStackTrace();
\r
2858 public void fireStartWriteTransaction() {
\r
2861 System.out.println("StartWriteTransaction");
\r
2863 for (SessionEventListener listener : eventListeners) {
\r
2865 listener.writeTransactionStarted();
\r
2866 } catch (Throwable t) {
\r
2867 t.printStackTrace();
\r
2873 public void fireFinishWriteTransaction() {
\r
2876 System.out.println("FinishWriteTransaction");
\r
2878 for (SessionEventListener listener : eventListeners) {
\r
2880 listener.writeTransactionFinished();
\r
2881 } catch (Throwable t) {
\r
2882 t.printStackTrace();
\r
2886 Indexing.resetDependenciesIndexingDisabled();
\r
2890 Resource deserialize(final long resourceId, final long clusterId) throws IOException {
\r
2891 throw new Error("Not supported at the moment.");
\r
2894 State getState() {
\r
2898 ClusterTable getClusterTable() {
\r
2899 return clusterTable;
\r
2902 public GraphSession getGraphSession() {
\r
2903 return graphSession;
\r
2906 public QueryProcessor getQueryProvider2() {
\r
2907 return queryProvider2;
\r
2910 ClientChangesImpl getClientChanges() {
\r
2911 return clientChanges;
\r
2914 boolean getWriteOnly() {
\r
2918 static int counter = 0;
\r
2920 public void onClusterLoaded(long clusterId) {
\r
2922 clusterTable.updateSize();
\r
2930 * Implementation of the interface RequestProcessor
\r
2934 public <T> T syncRequest(final Read<T> request) throws DatabaseException {
\r
2936 assertNotSession();
\r
2939 assert(request != null);
\r
2941 final DataContainer<T> result = new DataContainer<T>();
\r
2942 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
2944 syncRequest(request, new AsyncProcedure<T>() {
\r
2947 public void execute(AsyncReadGraph graph, T t) {
\r
2952 public void exception(AsyncReadGraph graph, Throwable t) {
\r
2958 Throwable t = exception.get();
\r
2960 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
2961 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
\r
2964 return result.get();
\r
2969 // public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
\r
2970 // assertNotSession();
\r
2971 // return syncRequest(request, (AsyncProcedure<T>)procedure);
\r
2975 public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
\r
2976 assertNotSession();
\r
2977 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
\r
2981 public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
\r
2982 assertNotSession();
\r
2983 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
\r
2987 public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
\r
2988 assertNotSession();
\r
2989 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
\r
2993 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
\r
2994 assertNotSession();
\r
2995 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
\r
2999 public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
\r
3001 assertNotSession();
\r
3003 assert(request != null);
\r
3005 final DataContainer<T> result = new DataContainer<T>();
\r
3006 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
3008 syncRequest(request, new AsyncProcedure<T>() {
\r
3011 public void execute(AsyncReadGraph graph, T t) {
\r
3016 public void exception(AsyncReadGraph graph, Throwable t) {
\r
3022 Throwable t = exception.get();
\r
3024 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
3025 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
\r
3028 return result.get();
\r
3033 public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
\r
3034 assertNotSession();
\r
3035 return syncRequest(request, (AsyncProcedure<T>)procedure);
\r
3039 public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
\r
3040 assertNotSession();
\r
3041 return syncRequest(request, new SyncToAsyncListener<T>(procedure));
\r
3045 public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
\r
3046 assertNotSession();
\r
3047 return syncRequest(request, new NoneToAsyncListener<T>(procedure));
\r
3051 public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
\r
3052 assertNotSession();
\r
3053 return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
\r
3057 final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
\r
3058 assertNotSession();
\r
3059 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
\r
3063 public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
\r
3065 assertNotSession();
\r
3067 assert(request != null);
\r
3069 final ArrayList<T> result = new ArrayList<T>();
\r
3070 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
3072 syncRequest(request, new AsyncMultiProcedure<T>() {
\r
3075 public void execute(AsyncReadGraph graph, T t) {
\r
3076 synchronized(result) {
\r
3082 public void finished(AsyncReadGraph graph) {
\r
3086 public void exception(AsyncReadGraph graph, Throwable t) {
\r
3093 Throwable t = exception.get();
\r
3095 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
3096 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
\r
3104 public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
\r
3105 assertNotSession();
\r
3106 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
\r
3110 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
\r
3111 assertNotSession();
\r
3112 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
\r
3116 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
\r
3117 assertNotSession();
\r
3118 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
\r
3122 public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
\r
3123 assertNotSession();
\r
3124 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
\r
3128 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
\r
3129 assertNotSession();
\r
3130 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
\r
3134 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
\r
3136 assertNotSession();
\r
3138 assert(request != null);
\r
3140 final ArrayList<T> result = new ArrayList<T>();
\r
3141 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
3143 syncRequest(request, new AsyncMultiProcedure<T>() {
\r
3146 public void execute(AsyncReadGraph graph, T t) {
\r
3147 synchronized(result) {
\r
3153 public void finished(AsyncReadGraph graph) {
\r
3157 public void exception(AsyncReadGraph graph, Throwable t) {
\r
3164 Throwable t = exception.get();
\r
3166 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
3167 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
\r
3175 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
\r
3176 assertNotSession();
\r
3177 return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
\r
3181 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
\r
3182 assertNotSession();
\r
3183 return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
\r
3187 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
\r
3188 assertNotSession();
\r
3189 return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
\r
3193 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
\r
3194 assertNotSession();
\r
3195 return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
\r
3199 final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
\r
3200 assertNotSession();
\r
3201 return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
\r
3206 public <T> void asyncRequest(Read<T> request) {
\r
3208 asyncRequest(request, new ProcedureAdapter<T>() {
\r
3210 public void exception(Throwable t) {
\r
3211 t.printStackTrace();
\r
3218 public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
\r
3219 asyncRequest(request, (AsyncProcedure<T>)procedure);
\r
3223 public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
\r
3224 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
\r
3228 public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
\r
3229 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
\r
3233 public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
\r
3234 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
\r
3238 public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
\r
3239 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
\r
3243 final public <T> void asyncRequest(final AsyncRead<T> request) {
\r
3245 assert(request != null);
\r
3247 asyncRequest(request, new ProcedureAdapter<T>() {
\r
3249 public void exception(Throwable t) {
\r
3250 t.printStackTrace();
\r
3257 public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
\r
3258 scheduleRequest(request, procedure, procedure, null);
\r
3262 public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
\r
3263 asyncRequest(request, new SyncToAsyncListener<T>(procedure));
\r
3267 public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
\r
3268 asyncRequest(request, new NoneToAsyncListener<T>(procedure));
\r
3272 public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
\r
3273 asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
\r
3277 final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
\r
3278 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
\r
3282 public <T> void asyncRequest(MultiRead<T> request) {
\r
3284 assert(request != null);
\r
3286 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
\r
3288 public void exception(AsyncReadGraph graph, Throwable t) {
\r
3289 t.printStackTrace();
\r
3296 public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
\r
3297 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
\r
3301 public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
\r
3302 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
\r
3306 public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
\r
3307 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
\r
3311 public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
\r
3312 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
\r
3316 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
\r
3317 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
\r
3321 final public <T> void asyncRequest(AsyncMultiRead<T> request) {
\r
3323 assert(request != null);
\r
3325 asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
\r
3327 public void exception(AsyncReadGraph graph, Throwable t) {
\r
3328 t.printStackTrace();
\r
3335 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
\r
3336 asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
\r
3340 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
\r
3341 asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
\r
3345 public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
\r
3346 asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
\r
3350 public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
\r
3351 asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
\r
3355 final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
\r
3356 asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
\r
3360 public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
\r
3361 assertNotSession();
\r
3362 throw new Error("Not implemented!");
\r
3366 public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
\r
3367 throw new Error("Not implemented!");
\r
3371 final public <T> void asyncRequest(final ExternalRead<T> request) {
\r
3373 assert(request != null);
\r
3375 asyncRequest(request, new ProcedureAdapter<T>() {
\r
3377 public void exception(Throwable t) {
\r
3378 t.printStackTrace();
\r
3385 public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
\r
3386 asyncRequest(request, (Procedure<T>)procedure);
\r
3391 void check(Throwable t) throws DatabaseException {
\r
3393 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
3394 else throw new DatabaseException("Unexpected exception", t);
\r
3398 void check(DataContainer<Throwable> container) throws DatabaseException {
\r
3399 Throwable t = container.get();
\r
3401 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
3402 else throw new DatabaseException("Unexpected exception", t);
\r
3411 boolean sameProvider(Write request) {
\r
3412 if(writeState.getGraph().provider != null) {
\r
3413 return writeState.getGraph().provider.equals(request.getProvider());
\r
3415 return request.getProvider() == null;
\r
3419 boolean plainWrite(WriteGraphImpl graph) {
\r
3420 if(graph == null) return false;
\r
3421 if(graph.writeSupport.writeOnly()) return false;
\r
3426 public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
\r
3427 private void assertNotSession() throws DatabaseException {
\r
3428 Thread current = Thread.currentThread();
\r
3429 if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
\r
3432 void assertAlive() {
\r
3433 if (!state.isAlive())
\r
3434 throw new RuntimeDatabaseException("Session has been shut down.");
\r
3437 public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
\r
3438 return querySupport.getValueStream(graph, querySupport.getId(resource));
\r
3441 public byte[] getValue(ReadGraphImpl graph, Resource resource) {
\r
3442 return querySupport.getValue(graph, querySupport.getId(resource));
\r
3445 <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
\r
3446 acquire(semaphore, request, null);
\r
3449 private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
\r
3452 long tryCount = 0;
\r
3453 // Loop until the semaphore is acquired reporting requests that take a long time.
\r
3456 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
\r
3460 if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
\r
3462 long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
\r
3463 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
\r
3465 System.err.println("DB client request '" + request + "' is taking long to execute, so far "
\r
3466 + (waitTime) + " ms. (procedure=" + procedure + ")");
\r
3473 } catch (InterruptedException e) {
\r
3474 e.printStackTrace();
\r
3475 // FIXME: Should perhaps do something else in this case ??
\r
3482 // public boolean holdOnToTransactionAfterCancel() {
\r
3483 // return transactionPolicy.holdOnToTransactionAfterCancel();
\r
3487 // public boolean holdOnToTransactionAfterCommit() {
\r
3488 // return transactionPolicy.holdOnToTransactionAfterCommit();
\r
3492 // public boolean holdOnToTransactionAfterRead() {
\r
3493 // return transactionPolicy.holdOnToTransactionAfterRead();
\r
3497 // public void onRelinquish() {
\r
3498 // transactionPolicy.onRelinquish();
\r
3502 // public void onRelinquishDone() {
\r
3503 // transactionPolicy.onRelinquishDone();
\r
3507 // public void onRelinquishError() {
\r
3508 // transactionPolicy.onRelinquishError();
\r
3513 public Session getSession() {
\r
3518 protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
\r
3519 protected abstract ResourceImpl getNewResource() throws DatabaseException;
\r
3521 public void ceased(int thread) {
\r
3523 requestManager.ceased(thread);
\r
3527 public int getAmountOfQueryThreads() {
\r
3528 // This must be a power of two
\r
3530 // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
\r
3533 public Resource getResourceByKey(int key) throws ResourceNotFoundException {
\r
3535 return new ResourceImpl(resourceSupport, key);
\r
3539 public void acquireWriteOnly() {
\r
3543 public void releaseWriteOnly(ReadGraphImpl graph) {
\r
3544 writeOnly = false;
\r
3545 queryProvider2.releaseWrite(graph);
\r
3548 public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
\r
3550 long start = System.nanoTime();
\r
3552 while(dirtyPrimitives) {
\r
3553 dirtyPrimitives = false;
\r
3554 getQueryProvider2().performDirtyUpdates(writer);
\r
3555 getQueryProvider2().performScheduledUpdates(writer);
\r
3558 fireMetadataListeners(writer, clientChanges);
\r
3560 if(DebugPolicy.PERFORMANCE_DATA) {
\r
3561 long end = System.nanoTime();
\r
3562 System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
\r
3567 void removeTemporaryData() {
\r
3568 File platform = Platform.getLocation().toFile();
\r
3569 File tempFiles = new File(platform, "tempFiles");
\r
3570 File temp = new File(tempFiles, "db");
\r
3571 if(!temp.exists())
\r
3574 Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
\r
3576 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
\r
3578 Files.delete(file);
\r
3579 } catch (IOException e) {
\r
3580 Logger.defaultLogError(e);
\r
3582 return FileVisitResult.CONTINUE;
\r
3585 } catch (IOException e) {
\r
3586 Logger.defaultLogError(e);
\r
3590 public void handleCreatedClusters() {
\r
3591 if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
\r
3592 createdClusters.forEach(new TLongProcedure() {
\r
3595 public boolean execute(long value) {
\r
3596 ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
\r
3597 cluster.setImmutable(true, clusterTranslator);
\r
3602 createdClusters.clear();
\r
3606 public Object getModificationCounter() {
\r
3607 return queryProvider2.modificationCounter;
\r
3611 public void markUndoPoint() {
\r
3612 state.setCombine(false);
\r