1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package org.simantics.db.common.processor;
\r
14 import java.util.ArrayList;
\r
15 import java.util.Collection;
\r
16 import java.util.HashMap;
\r
17 import java.util.HashSet;
\r
18 import java.util.LinkedList;
\r
19 import java.util.Set;
\r
20 import java.util.UUID;
\r
21 import java.util.concurrent.Semaphore;
\r
23 import org.simantics.db.AsyncReadGraph;
\r
24 import org.simantics.db.AsyncRequestProcessor;
\r
25 import org.simantics.db.ReadGraph;
\r
26 import org.simantics.db.RequestProcessor;
\r
27 import org.simantics.db.Resource;
\r
28 import org.simantics.db.Session;
\r
29 import org.simantics.db.VirtualGraph;
\r
30 import org.simantics.db.WriteGraph;
\r
31 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
\r
32 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
\r
33 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
\r
34 import org.simantics.db.common.request.ReadRequest;
\r
35 import org.simantics.db.common.request.WriteRequest;
\r
36 import org.simantics.db.common.utils.Logger;
\r
37 import org.simantics.db.exception.CancelTransactionException;
\r
38 import org.simantics.db.exception.DatabaseException;
\r
39 import org.simantics.db.procedure.AsyncListener;
\r
40 import org.simantics.db.procedure.AsyncMultiListener;
\r
41 import org.simantics.db.procedure.AsyncMultiProcedure;
\r
42 import org.simantics.db.procedure.AsyncProcedure;
\r
43 import org.simantics.db.procedure.Listener;
\r
44 import org.simantics.db.procedure.MultiListener;
\r
45 import org.simantics.db.procedure.MultiProcedure;
\r
46 import org.simantics.db.procedure.Procedure;
\r
47 import org.simantics.db.procedure.SyncListener;
\r
48 import org.simantics.db.procedure.SyncMultiListener;
\r
49 import org.simantics.db.procedure.SyncMultiProcedure;
\r
50 import org.simantics.db.procedure.SyncProcedure;
\r
51 import org.simantics.db.request.AsyncMultiRead;
\r
52 import org.simantics.db.request.AsyncRead;
\r
53 import org.simantics.db.request.DelayedWrite;
\r
54 import org.simantics.db.request.DelayedWriteResult;
\r
55 import org.simantics.db.request.ExternalRead;
\r
56 import org.simantics.db.request.MultiRead;
\r
57 import org.simantics.db.request.Read;
\r
58 import org.simantics.db.request.ReadInterface;
\r
59 import org.simantics.db.request.UndoTraits;
\r
60 import org.simantics.db.request.Write;
\r
61 import org.simantics.db.request.WriteInterface;
\r
62 import org.simantics.db.request.WriteOnly;
\r
63 import org.simantics.db.request.WriteOnlyResult;
\r
64 import org.simantics.db.request.WriteResult;
\r
65 import org.simantics.utils.DataContainer;
\r
66 import org.simantics.utils.datastructures.Callback;
\r
67 import org.simantics.utils.datastructures.Pair;
\r
69 public class MergingGraphRequestProcessor implements RequestProcessor {
\r
71 private static class SyncWriteRequestAdapter implements Write {
\r
73 private Semaphore semaphore = new Semaphore(0);
\r
74 private Object request;
\r
75 private Throwable exception;
\r
76 SyncWriteRequestAdapter(Write r) {
\r
79 SyncWriteRequestAdapter(WriteOnly r) {
\r
83 // public GraphRequestStatus perform(Graph g) throws Exception {
\r
84 // return perform((ReadGraph)g);
\r
87 public void perform(WriteGraph g) throws DatabaseException, CancelTransactionException {
\r
88 if(request instanceof Write) {
\r
89 ((Write)request).perform(g);
\r
90 } else if(request instanceof DelayedWrite) {
\r
91 ((DelayedWrite)request).perform(g);
\r
93 ((WriteOnly)request).perform(g);
\r
97 // public String getId() {
\r
98 // if(request instanceof WriteGraphRequest) {
\r
99 // return ((WriteGraphRequest)request).getId();
\r
105 // public void requestCompleted(GraphRequestStatus status) {
\r
106 // if(request instanceof WriteGraphRequest) {
\r
107 // ((WriteGraphRequest)request).requestCompleted(status);
\r
112 // public void handleException(Throwable e) {
\r
113 // this.exception = e;
\r
114 // if(request instanceof WriteGraphRequest) {
\r
115 // ((WriteGraphRequest)request).handleException(e);
\r
119 public void throwOrWrapException() {
\r
120 if (exception == null)
\r
122 if (exception instanceof RuntimeException)
\r
123 throw (RuntimeException) exception;
\r
124 if (exception instanceof Error)
\r
125 throw (Error) exception;
\r
126 throw new RuntimeException("See cause for the real exception.", exception);
\r
130 public VirtualGraph getProvider() {
\r
135 // public void fillMetadata(Map<String, String> metadata) {
\r
138 public void acquire() {
\r
140 semaphore.acquire();
\r
141 } catch (InterruptedException e) {
\r
142 Logger.defaultLogError(e);
\r
146 public void release() {
\r
147 semaphore.release();
\r
151 public UndoTraits getUndoTraits() {
\r
156 public String toString() {
\r
157 return "SyncWriteRequestAdapter " + request;
\r
162 long transactionKeepalivePeriod;
\r
165 * Synchronization object for implementing {@link #synchronize()}.
\r
166 * {@link Object#notifyAll()} is invoked for this lock object every time a
\r
167 * single transaction is completed, thereby releasing all waiters in
\r
168 * {@link #synchronize()}.
\r
170 Object barrier = new Object();
\r
172 Set<Pair<Object, Object>> requestSet = new HashSet<Pair<Object, Object>>();
\r
173 LinkedList<Pair<Object, Object>> requestQueue = new LinkedList<Pair<Object, Object>>();
\r
174 boolean hasAlreadyRequest = false;
\r
177 * A set of requests which {@link #synchronize()} is depending on at the
\r
178 * moment. Every time a request within this set is completed, some thread in
\r
179 * {@link #synchronize()} should be released.
\r
181 // Set<Object> barrierRequests = new HashSet<Object>();
\r
182 Set<Object> syncRequests = new HashSet<Object>();
\r
184 private String name;
\r
186 private AsyncRequestProcessor processor;
\r
188 public MergingGraphRequestProcessor(String name, AsyncRequestProcessor processor, long transactionKeepalivePeriod) {
\r
190 this.processor = processor;
\r
191 this.transactionKeepalivePeriod = transactionKeepalivePeriod;
\r
194 public MergingGraphRequestProcessor(AsyncRequestProcessor processor, long transactionKeepalivePeriod) {
\r
195 this.name = "MergingGraphRequestProcessor" + UUID.randomUUID().toString();
\r
196 this.processor = processor;
\r
197 this.transactionKeepalivePeriod = transactionKeepalivePeriod;
\r
200 @SuppressWarnings({"unchecked", "rawtypes"})
\r
201 protected class MergedRead extends ReadRequest {
\r
203 Pair<Object, Object> currentRequest;
\r
205 // RunnerReadGraphRequest(GraphRequestProcessor processor) {
\r
206 // super(processor);
\r
210 // public void completed(boolean value) {
\r
211 //// System.out.println(this + "MGRP read completed");
\r
212 //// synchronized (MergingGraphRequestProcessor.this) {
\r
213 //// if (requestQueue.isEmpty())
\r
214 //// hasAlreadyRequest = false;
\r
216 //// newTransaction();
\r
221 public void run(ReadGraph graph) {
\r
223 // System.out.println(MergingGraphRequestProcessor.this + " reads");
\r
227 synchronized (MergingGraphRequestProcessor.this) {
\r
229 // Release #synchronize() invokers if necessary.
\r
230 // if (currentRequest != null && barrierRequests.contains(currentRequest)) {
\r
231 // synchronized (barrier) {
\r
232 // barrier.notifyAll();
\r
236 if(requestQueue.isEmpty()) {
\r
237 if (transactionKeepalivePeriod > 0) {
\r
238 // System.out.println("MGRP [" + MergingGraphRequestProcessor.this + "] waits " + transactionKeepalivePeriod + " ms. in " + Thread.currentThread() );
\r
240 MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);
\r
241 } catch (InterruptedException e) {
\r
242 Logger.defaultLogError(e);
\r
244 if (requestQueue.isEmpty())
\r
250 Object nextRequest = requestQueue.peekFirst().first;
\r
251 if(nextRequest instanceof Write || nextRequest instanceof DelayedWrite) {
\r
255 currentRequest = requestQueue.remove(0);
\r
256 requestSet.remove(currentRequest);
\r
260 // ReadGraphRequest req = (ReadGraphRequest)currentRequest.first;
\r
262 if( syncRequests.contains(currentRequest.first)) {
\r
266 if(currentRequest.second instanceof AsyncProcedure<?>) {
\r
267 if(currentRequest.first instanceof Read) {
\r
268 Read req = (Read)currentRequest.first;
\r
269 graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
\r
271 AsyncRead req = (AsyncRead)currentRequest.first;
\r
272 graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
\r
275 AsyncMultiRead req = (AsyncMultiRead)currentRequest.first;
\r
276 graph.syncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);
\r
279 } catch(Throwable t) {
\r
281 Logger.defaultLogError(t);
\r
283 if(currentRequest.second instanceof AsyncProcedure<?>) {
\r
284 ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
\r
286 ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
\r
291 synchronized (currentRequest.first) {
\r
292 syncRequests.remove(currentRequest.first);
\r
293 // System.out.println("notifying " + currentRequest.first);
\r
294 currentRequest.first.notify();
\r
302 if(currentRequest.second instanceof AsyncProcedure<?>) {
\r
303 if(currentRequest.first instanceof AsyncRead) {
\r
304 AsyncRead req = (AsyncRead)currentRequest.first;
\r
305 graph.asyncRequest(req, (AsyncProcedure<?>)currentRequest.second);
\r
307 Read req = (Read)currentRequest.first;
\r
308 graph.asyncRequest(req, (AsyncProcedure<?>)currentRequest.second);
\r
311 AsyncMultiRead req = (AsyncMultiRead)currentRequest.first;
\r
312 graph.asyncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);
\r
315 } catch(Throwable t) {
\r
317 Logger.defaultLogError(t);
\r
319 if(currentRequest.second instanceof AsyncProcedure<?>) {
\r
320 ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
\r
322 ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
\r
330 // System.out.println(MergingGraphRequestProcessor.this + " read completed");
\r
332 synchronized (MergingGraphRequestProcessor.this) {
\r
333 if (requestQueue.isEmpty())
\r
334 hasAlreadyRequest = false;
\r
342 public String toString() {
\r
343 return "MergedRead[" + requestQueue.size() + " requests]";
\r
348 protected class RunnerWriteGraphRequest extends WriteRequest {
\r
350 Pair<Object, Object> currentRequest;
\r
351 HashMap<String, String> metadata = new HashMap<String, String>();
\r
354 public void perform(WriteGraph graph) throws DatabaseException {
\r
356 // System.out.println(MergingGraphRequestProcessor.this + " writes");
\r
360 synchronized (MergingGraphRequestProcessor.this) {
\r
362 // Release #synchronize() invokers if necessary.
\r
363 // if (currentRequest != null && barrierRequests.contains(currentRequest)) {
\r
364 // synchronized (barrier) {
\r
365 // barrier.notifyAll();
\r
369 if(requestQueue.isEmpty()) {
\r
370 if (transactionKeepalivePeriod > 0) {
\r
372 MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);
\r
373 } catch (InterruptedException e) {
\r
374 Logger.defaultLogError(e);
\r
376 if (requestQueue.isEmpty())
\r
382 Object nextRequest = requestQueue.peekFirst().first;
\r
383 if(nextRequest instanceof AsyncMultiRead || nextRequest instanceof AsyncRead || nextRequest instanceof Read) {
\r
387 currentRequest = requestQueue.remove(0);
\r
388 requestSet.remove(currentRequest);
\r
392 @SuppressWarnings("unchecked")
\r
393 Callback<Throwable> callback = (Callback<Throwable>)currentRequest.second;
\r
395 if (currentRequest.first.getClass().equals(SyncWriteRequestAdapter.class)) {
\r
397 SyncWriteRequestAdapter adapter = (SyncWriteRequestAdapter)currentRequest.first;
\r
400 // System.out.println("merg.sync " + adapter);
\r
401 graph.syncRequest(adapter);
\r
402 if(callback != null) callback.run(null);
\r
403 } catch(Throwable t) {
\r
404 Logger.defaultLogError(t);
\r
405 if(callback != null) callback.run(t);
\r
409 // System.out.println("merg.sync.release " + adapter);
\r
414 if(currentRequest.first instanceof Write) graph.syncRequest((Write)currentRequest.first);
\r
415 else if(currentRequest.first instanceof DelayedWrite) graph.syncRequest((DelayedWrite)currentRequest.first);
\r
416 if(callback != null) callback.run(null);
\r
417 } catch(Throwable t) {
\r
418 Logger.defaultLogError(t);
\r
419 if(callback != null) callback.run(t);
\r
426 // System.out.println(MergingGraphRequestProcessor.this + " write completed");
\r
428 synchronized (MergingGraphRequestProcessor.this) {
\r
429 if (requestQueue.isEmpty())
\r
430 hasAlreadyRequest = false;
\r
439 private void newTransaction() {
\r
441 boolean write = false;
\r
443 synchronized (MergingGraphRequestProcessor.this) {
\r
444 assert(!requestQueue.isEmpty());
\r
445 Object nextRequest = requestQueue.peekFirst().first;
\r
446 write = (nextRequest instanceof Write || nextRequest instanceof DelayedWrite);
\r
450 processor.asyncRequest(new RunnerWriteGraphRequest(), null);
\r
452 processor.asyncRequest(new MergedRead());
\r
458 public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiProcedure<T> procedure) {
\r
460 // System.out.println(this + " asyncRequest(ReadGraphRequest<QueryProcedure4<T>> request, QueryProcedure4<T> procedure)");
\r
462 if (requestSet.contains(request))
\r
465 Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
\r
466 requestQueue.add(pair);
\r
467 requestSet.add(pair);
\r
469 if (!hasAlreadyRequest) {
\r
471 hasAlreadyRequest = true;
\r
479 public synchronized <T> void asyncRequest(AsyncRead<T> request, AsyncProcedure<T> procedure) {
\r
481 // System.out.println(this + " asyncRequest(ReadGraphRequest<SingleQueryProcedure4<T>> request, SingleQueryProcedure4<T> procedure) " + this);
\r
483 if (requestSet.contains(request))
\r
486 Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
\r
487 requestQueue.add(pair);
\r
488 requestSet.add(pair);
\r
490 if (!hasAlreadyRequest) {
\r
492 hasAlreadyRequest = true;
\r
494 // System.out.println("notify " + this);
\r
501 public synchronized void asyncRequest(Write request, Callback<DatabaseException> callback) {
\r
503 // System.out.println(this + " asyncRequest(WriteGraphRequest request)");
\r
505 if (requestSet.contains(request))
\r
508 Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
\r
509 requestQueue.add(pair);
\r
510 requestSet.add(pair);
\r
512 if (!hasAlreadyRequest) {
\r
513 // System.out.println("new transaction");
\r
515 hasAlreadyRequest = true;
\r
517 // System.out.println("notify");
\r
524 public synchronized void asyncRequest(DelayedWrite request, Callback<DatabaseException> callback) {
\r
526 // System.out.println(this + " asyncRequest(WriteGraphRequest request)");
\r
528 if (requestSet.contains(request))
\r
531 Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
\r
532 requestQueue.add(pair);
\r
533 requestSet.add(pair);
\r
535 if (!hasAlreadyRequest) {
\r
536 // System.out.println("new transaction");
\r
538 hasAlreadyRequest = true;
\r
540 // System.out.println("notify");
\r
547 public synchronized void asyncRequest(WriteOnly request, Callback<DatabaseException> callback) {
\r
549 // System.out.println(this + " asyncRequest(WriteGraphRequest request)");
\r
551 if (requestSet.contains(request))
\r
554 Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
\r
555 requestQueue.add(pair);
\r
556 requestSet.add(pair);
\r
558 if (!hasAlreadyRequest) {
\r
559 // System.out.println("new transaction");
\r
561 hasAlreadyRequest = true;
\r
563 // System.out.println("notify");
\r
570 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
\r
572 final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
\r
574 // Queue the adapter up for execution.
\r
575 synchronized (request) {
\r
576 syncRequests.add(request);
\r
577 asyncRequest(request, procedure);
\r
578 if(syncRequests.contains(request)) {
\r
580 // System.out.println("waiting " + request);
\r
582 } catch (InterruptedException e) {
\r
583 throw new Error(e);
\r
588 Throwable t = throwable.get();
\r
591 Logger.defaultLogError(t);
\r
592 throw new RuntimeException(t.getMessage());
\r
600 public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) {
\r
602 // System.out.println("syncRequest(ReadGraphRequest<SingleQueryProcedure4<T>> request, SingleQueryProcedure4<T> procedure)");
\r
604 final DataContainer<T> result = new DataContainer<T>(null);
\r
605 final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
\r
607 // Queue the adapter up for execution.
\r
608 synchronized (request) {
\r
610 syncRequests.add(request);
\r
611 asyncRequest(request, new AsyncProcedure<T>() {
\r
613 public void execute(AsyncReadGraph graph, T t) {
\r
614 synchronized(result) {
\r
617 procedure.execute(graph, t);
\r
621 public void exception(AsyncReadGraph graph, Throwable t) {
\r
626 public String toString() {
\r
627 return procedure.toString();
\r
631 if(syncRequests.contains(request)) {
\r
633 // System.out.println("waiting " + request);
\r
635 } catch (InterruptedException e) {
\r
636 throw new Error(e);
\r
641 Throwable t = throwable.get();
\r
644 Logger.defaultLogError(t);
\r
645 throw new RuntimeException(t.getMessage());
\r
648 return result.get();
\r
650 //return result.get();
\r
656 public void syncRequest(Write request) {
\r
658 // System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)");
\r
660 SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request);
\r
662 asyncRequest(adapter, null);
\r
666 // Throw exception if one occurred.
\r
667 adapter.throwOrWrapException();
\r
672 public void syncRequest(WriteOnly request) {
\r
674 // System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)");
\r
676 SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request);
\r
678 // Queue the adapter up for execution.
\r
679 synchronized (adapter) {
\r
680 asyncRequest(adapter, null);
\r
683 } catch (InterruptedException e) {
\r
684 throw new Error(e);
\r
688 // Throw exception if one occurred.
\r
689 adapter.throwOrWrapException();
\r
694 public Session getSession() {
\r
695 return processor.getSession();
\r
699 public String toString() {
\r
700 return "MergingGraphRequestProcessor[" + name + "]@" + System.identityHashCode(this) + " (based on " + processor + ")";
\r
704 public <T> void asyncRequest(AsyncRead<T> request) {
\r
706 asyncRequest(request, new ProcedureAdapter<T>() {
\r
709 public void exception(Throwable t) {
\r
710 Logger.defaultLogError(t);
\r
718 public <T> void asyncRequest(AsyncRead<T> request, Procedure<T> procedure) {
\r
719 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
\r
723 public <T> void asyncRequest(AsyncMultiRead<T> request) {
\r
724 throw new UnsupportedOperationException("Not implemented.");
\r
728 public <T> void asyncRequest(AsyncMultiRead<T> request,
\r
729 MultiProcedure<T> procedure) {
\r
730 throw new UnsupportedOperationException("Not implemented.");
\r
734 public <T> T syncRequest(AsyncRead<T> request) {
\r
736 final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
\r
737 final DataContainer<T> result = new DataContainer<T>();
\r
739 syncRequest(request, new AsyncProcedure<T>() {
\r
741 public void execute(AsyncReadGraph graph, T t) {
\r
746 public void exception(AsyncReadGraph graph, Throwable t) {
\r
752 Throwable t = throwable.get();
\r
755 Logger.defaultLogError(t);
\r
756 throw new RuntimeException(t.getMessage());
\r
759 return result.get();
\r
764 public <T> T syncRequest(AsyncRead<T> request,
\r
765 Procedure<T> procedure) {
\r
766 throw new UnsupportedOperationException("Not implemented.");
\r
770 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request) {
\r
772 final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
\r
773 final ArrayList<T> result = new ArrayList<T>();
\r
775 syncRequest(request, new AsyncMultiProcedureAdapter<T>() {
\r
778 public void execute(AsyncReadGraph graph, T t) {
\r
779 synchronized(result) {
\r
785 public void exception(AsyncReadGraph graph, Throwable t) {
\r
791 Throwable t = throwable.get();
\r
794 Logger.defaultLogError(t);
\r
795 throw new RuntimeException(t.getMessage());
\r
803 public <T> Collection<T> syncRequest(AsyncMultiRead<T> request,
\r
804 MultiProcedure<T> procedure) {
\r
805 throw new Error("Not implemented.");
\r
809 public <T> T syncRequest(Read<T> request) {
\r
811 final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
\r
812 final DataContainer<T> result = new DataContainer<T>();
\r
815 syncRequest(request, new Procedure<T>() {
\r
817 public void execute(T t) {
\r
822 public void exception(Throwable t) {
\r
828 Throwable t = throwable.get();
\r
831 throw new Error(t.getMessage());
\r
835 return result.get();
\r
840 public <T> T syncRequest(Read<T> request,
\r
841 final AsyncProcedure<T> procedure) {
\r
843 final DataContainer<T> result = new DataContainer<T>(null);
\r
844 final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
\r
846 // Queue the adapter up for execution.
\r
847 synchronized (request) {
\r
849 syncRequests.add(request);
\r
850 asyncRequest(request, new AsyncProcedure<T>() {
\r
852 public void execute(AsyncReadGraph graph, T t) {
\r
853 synchronized(result) {
\r
856 procedure.execute(graph, t);
\r
860 public void exception(AsyncReadGraph graph, Throwable t) {
\r
865 public String toString() {
\r
866 return procedure.toString();
\r
870 if(syncRequests.contains(request)) {
\r
872 // System.out.println("waiting " + request);
\r
874 } catch (InterruptedException e) {
\r
875 throw new Error(e);
\r
880 Throwable t = throwable.get();
\r
883 throw new RuntimeException("Unexpected exception in MergingGraphRequestProcessor.syncRequest(Read, AsyncProcedure)", t);
\r
886 return result.get();
\r
891 public <T> void asyncRequest(Read<T> request) {
\r
893 asyncRequest(request, new ProcedureAdapter<T>() {
\r
896 public void exception(Throwable t) {
\r
897 Logger.defaultLogError(t);
\r
905 public synchronized <T> void asyncRequest(Read<T> request,
\r
906 AsyncProcedure<T> procedure) {
\r
908 if (requestSet.contains(request))
\r
911 Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
\r
912 requestQueue.add(pair);
\r
913 requestSet.add(pair);
\r
915 if (!hasAlreadyRequest) {
\r
917 hasAlreadyRequest = true;
\r
925 public <T> T syncRequest(Read<T> request, Procedure<T> procedure) {
\r
926 return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
\r
930 public <T> Collection<T> syncRequest(final MultiRead<T> request) throws DatabaseException {
\r
931 assert(request != null);
\r
933 final ArrayList<T> result = new ArrayList<T>();
\r
934 final DataContainer<Throwable> exception = new DataContainer<Throwable>();
\r
936 syncRequest(request, new AsyncMultiProcedureAdapter<T>() {
\r
939 public void execute(AsyncReadGraph graph, T t) {
\r
940 synchronized(result) {
\r
946 public void exception(AsyncReadGraph graph, Throwable t) {
\r
951 public String toString() {
\r
952 return "syncRequest(MultiRead) -> " + request;
\r
957 Throwable t = exception.get();
\r
959 if(t instanceof DatabaseException) throw (DatabaseException)t;
\r
960 else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
\r
967 public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiProcedure<T> procedure) {
\r
968 throw new UnsupportedOperationException("Not implemented");
\r
972 public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
\r
973 throw new UnsupportedOperationException("Not implemented");
\r
977 public <T> void asyncRequest(Read<T> request, Procedure<T> procedure) {
\r
978 asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
\r
982 public <T> void asyncRequest(MultiRead<T> request) {
\r
983 throw new UnsupportedOperationException("Not implemented");
\r
987 public <T> void asyncRequest(MultiRead<T> request, AsyncMultiProcedure<T> procedure) {
\r
988 throw new UnsupportedOperationException("Not implemented");
\r
992 public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
\r
993 throw new UnsupportedOperationException("Not implemented");
\r
997 public void asyncRequest(Write r) {
\r
998 asyncRequest(r, null);
\r
1002 public void asyncRequest(DelayedWrite r) {
\r
1003 asyncRequest(r, null);
\r
1007 public void asyncRequest(WriteOnly r) {
\r
1008 asyncRequest(r, null);
\r
1012 public <T> T getService(Class<T> api) {
\r
1013 return getSession().getService(api);
\r
1017 public <T> T peekService(Class<T> api) {
\r
1018 return getSession().peekService(api);
\r
1022 public boolean hasService(Class<?> api) {
\r
1023 return getSession().hasService(api);
\r
1027 public <T> void registerService(Class<T> api, T service) {
\r
1028 getSession().registerService(api, service);
\r
1032 // public <T> T syncRequest(Read<T> arg0, AsyncListener<T> arg1) {
\r
1033 // throw new UnsupportedOperationException("Not implemented.");
\r
1037 public <T> T syncRequest(Read<T> arg0, SyncListener<T> arg1) {
\r
1038 throw new UnsupportedOperationException("Not implemented.");
\r
1042 public <T> T syncRequest(Read<T> arg0, Listener<T> arg1) {
\r
1043 throw new UnsupportedOperationException("Not implemented.");
\r
1047 public <T> T syncRequest(Read<T> arg0, SyncProcedure<T> arg1) {
\r
1048 throw new UnsupportedOperationException("Not implemented.");
\r
1052 public <T> T syncRequest(AsyncRead<T> arg0, AsyncListener<T> arg1) {
\r
1053 throw new UnsupportedOperationException("Not implemented.");
\r
1057 public <T> T syncRequest(AsyncRead<T> arg0, SyncListener<T> arg1) {
\r
1058 throw new UnsupportedOperationException("Not implemented.");
\r
1063 public <T> T syncRequest(AsyncRead<T> arg0, Listener<T> arg1) {
\r
1064 throw new UnsupportedOperationException("Not implemented.");
\r
1069 public <T> T syncRequest(AsyncRead<T> arg0, SyncProcedure<T> arg1) {
\r
1070 throw new UnsupportedOperationException("Not implemented.");
\r
1075 public <T> void asyncRequest(Read<T> arg0, AsyncListener<T> arg1) {
\r
1076 throw new UnsupportedOperationException("Not implemented.");
\r
1081 public <T> void asyncRequest(Read<T> arg0, SyncListener<T> arg1) {
\r
1082 throw new UnsupportedOperationException("Not implemented.");
\r
1087 public <T> void asyncRequest(Read<T> arg0, Listener<T> arg1) {
\r
1088 throw new UnsupportedOperationException("Not implemented.");
\r
1093 public <T> void asyncRequest(Read<T> arg0, SyncProcedure<T> arg1) {
\r
1094 throw new UnsupportedOperationException("Not implemented.");
\r
1099 public <T> void asyncRequest(AsyncRead<T> arg0, AsyncListener<T> arg1) {
\r
1100 throw new UnsupportedOperationException("Not implemented.");
\r
1105 public <T> void asyncRequest(AsyncRead<T> arg0, SyncListener<T> arg1) {
\r
1106 throw new UnsupportedOperationException("Not implemented.");
\r
1111 public <T> void asyncRequest(AsyncRead<T> arg0, Listener<T> arg1) {
\r
1112 throw new UnsupportedOperationException("Not implemented.");
\r
1117 public <T> void asyncRequest(AsyncRead<T> arg0, SyncProcedure<T> arg1) {
\r
1118 throw new UnsupportedOperationException("Not implemented.");
\r
1123 public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiListener<T> arg1) {
\r
1124 throw new UnsupportedOperationException("Not implemented.");
\r
1129 public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {
\r
1130 throw new UnsupportedOperationException("Not implemented.");
\r
1135 public <T> Collection<T> syncRequest(MultiRead<T> arg0, MultiListener<T> arg1) {
\r
1136 throw new UnsupportedOperationException("Not implemented.");
\r
1141 public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
\r
1142 throw new UnsupportedOperationException("Not implemented.");
\r
1147 public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0,
\r
1148 AsyncMultiListener<T> arg1) {
\r
1149 throw new UnsupportedOperationException("Not implemented.");
\r
1154 public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0, SyncMultiListener<T> arg1) {
\r
1155 throw new UnsupportedOperationException("Not implemented.");
\r
1160 public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0,
\r
1161 MultiListener<T> arg1) {
\r
1162 throw new UnsupportedOperationException("Not implemented.");
\r
1167 public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
\r
1168 throw new UnsupportedOperationException("Not implemented.");
\r
1173 public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiListener<T> arg1) {
\r
1174 throw new UnsupportedOperationException("Not implemented.");
\r
1179 public <T> void asyncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {
\r
1180 throw new UnsupportedOperationException("Not implemented.");
\r
1185 public <T> void asyncRequest(MultiRead<T> arg0, MultiListener<T> arg1) {
\r
1186 throw new UnsupportedOperationException("Not implemented.");
\r
1191 public <T> void asyncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
\r
1192 throw new UnsupportedOperationException("Not implemented.");
\r
1197 public <T> void asyncRequest(AsyncMultiRead<T> arg0,
\r
1198 AsyncMultiListener<T> arg1) {
\r
1199 throw new UnsupportedOperationException("Not implemented.");
\r
1203 public <T> void asyncRequest(AsyncMultiRead<T> arg0, SyncMultiListener<T> arg1) {
\r
1204 throw new UnsupportedOperationException("Not implemented.");
\r
1209 public <T> void asyncRequest(AsyncMultiRead<T> arg0,
\r
1210 MultiListener<T> arg1) {
\r
1211 throw new UnsupportedOperationException("Not implemented.");
\r
1216 public <T> void asyncRequest(AsyncMultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
\r
1217 throw new UnsupportedOperationException("Not implemented.");
\r
1221 public <T> void asyncRequest(ExternalRead<T> request, Procedure<T> procedure) {
\r
1222 throw new UnsupportedOperationException("Not implemented.");
\r
1226 public <T> T syncRequest(ExternalRead<T> request) {
\r
1227 throw new UnsupportedOperationException("Not implemented.");
\r
1231 public <T> T syncRequest(ExternalRead<T> request, Listener<T> procedure) {
\r
1232 throw new UnsupportedOperationException("Not implemented.");
\r
1236 public <T> T syncRequest(ExternalRead<T> request, Procedure<T> procedure) {
\r
1237 throw new UnsupportedOperationException("Not implemented.");
\r
1241 public <T> void asyncRequest(ExternalRead<T> request) {
\r
1242 throw new UnsupportedOperationException("Not implemented.");
\r
1246 public <T> void asyncRequest(ExternalRead<T> request, Listener<T> procedure) {
\r
1247 throw new UnsupportedOperationException("Not implemented.");
\r
1251 public void syncRequest(DelayedWrite request) throws DatabaseException {
\r
1252 throw new UnsupportedOperationException("Not implemented.");
\r
1256 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
\r
1257 throw new UnsupportedOperationException();
\r
1261 public <T> T syncRequest(DelayedWriteResult<T> request)
\r
1262 throws DatabaseException {
\r
1263 throw new UnsupportedOperationException();
\r
1267 public <T> T syncRequest(WriteOnlyResult<T> r) throws DatabaseException {
\r
1268 throw new UnsupportedOperationException();
\r
1272 public <T> void asyncRequest(WriteResult<T> r, Procedure<T> procedure) {
\r
1273 throw new UnsupportedOperationException();
\r
1277 public <T> void asyncRequest(DelayedWriteResult<T> r, Procedure<T> procedure) {
\r
1278 throw new UnsupportedOperationException();
\r
1282 public <T> void asyncRequest(WriteOnlyResult<T> r, Procedure<T> procedure) {
\r
1283 throw new UnsupportedOperationException();
\r
1287 public Resource getRootLibrary() {
\r
1288 return processor.getRootLibrary();
\r
1292 public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
\r
1293 throw new UnsupportedOperationException();
\r
1297 public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
\r
1298 throw new UnsupportedOperationException();
\r
1302 public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
\r
1303 throw new UnsupportedOperationException();
\r
1307 public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
\r
1308 throw new UnsupportedOperationException();
\r
1312 public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
\r
1313 throw new UnsupportedOperationException();
\r
1317 public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
\r
1318 throw new UnsupportedOperationException();
\r
1322 public <T> T sync(ReadInterface<T> r) throws DatabaseException {
\r
1323 throw new UnsupportedOperationException();
\r
1327 public <T> T sync(WriteInterface<T> r) throws DatabaseException {
\r
1328 throw new UnsupportedOperationException();
\r
1332 public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
\r
1333 throw new UnsupportedOperationException();
\r
1337 public <T> void async(WriteInterface<T> r) {
\r
1338 throw new UnsupportedOperationException();
\r
1342 public Object getModificationCounter() {
\r
1343 throw new UnsupportedOperationException();
\r