]> gerrit.simantics Code Review - simantics/platform.git/blob
acd2aad02c909e40a66eb22c406923aad39c0471
[simantics/platform.git] /
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.common.processor;
13
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.LinkedList;
19 import java.util.Set;
20 import java.util.UUID;
21 import java.util.concurrent.Semaphore;
22
23 import org.simantics.db.AsyncReadGraph;
24 import org.simantics.db.AsyncRequestProcessor;
25 import org.simantics.db.ReadGraph;
26 import org.simantics.db.RequestProcessor;
27 import org.simantics.db.Resource;
28 import org.simantics.db.Session;
29 import org.simantics.db.VirtualGraph;
30 import org.simantics.db.WriteGraph;
31 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
32 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
33 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
34 import org.simantics.db.common.request.ReadRequest;
35 import org.simantics.db.common.request.WriteRequest;
36 import org.simantics.db.common.utils.Logger;
37 import org.simantics.db.exception.CancelTransactionException;
38 import org.simantics.db.exception.DatabaseException;
39 import org.simantics.db.procedure.AsyncListener;
40 import org.simantics.db.procedure.AsyncMultiListener;
41 import org.simantics.db.procedure.AsyncMultiProcedure;
42 import org.simantics.db.procedure.AsyncProcedure;
43 import org.simantics.db.procedure.Listener;
44 import org.simantics.db.procedure.MultiListener;
45 import org.simantics.db.procedure.MultiProcedure;
46 import org.simantics.db.procedure.Procedure;
47 import org.simantics.db.procedure.SyncListener;
48 import org.simantics.db.procedure.SyncMultiListener;
49 import org.simantics.db.procedure.SyncMultiProcedure;
50 import org.simantics.db.procedure.SyncProcedure;
51 import org.simantics.db.request.AsyncMultiRead;
52 import org.simantics.db.request.AsyncRead;
53 import org.simantics.db.request.DelayedWrite;
54 import org.simantics.db.request.DelayedWriteResult;
55 import org.simantics.db.request.ExternalRead;
56 import org.simantics.db.request.MultiRead;
57 import org.simantics.db.request.Read;
58 import org.simantics.db.request.ReadInterface;
59 import org.simantics.db.request.UndoTraits;
60 import org.simantics.db.request.Write;
61 import org.simantics.db.request.WriteInterface;
62 import org.simantics.db.request.WriteOnly;
63 import org.simantics.db.request.WriteOnlyResult;
64 import org.simantics.db.request.WriteResult;
65 import org.simantics.utils.DataContainer;
66 import org.simantics.utils.datastructures.Callback;
67 import org.simantics.utils.datastructures.Pair;
68
69 public class MergingGraphRequestProcessor implements RequestProcessor {
70
71     private static class SyncWriteRequestAdapter implements Write {
72
73         private Semaphore semaphore = new Semaphore(0);
74         private Object request;
75         private Throwable exception;
76         SyncWriteRequestAdapter(Write r) {
77             this.request = r;
78         }
79         SyncWriteRequestAdapter(WriteOnly r) {
80             this.request = r;
81         }
82 //        @Override
83 //        public GraphRequestStatus perform(Graph g) throws Exception {
84 //            return perform((ReadGraph)g);
85 //        }
86         @Override
87         public void perform(WriteGraph g) throws DatabaseException, CancelTransactionException {
88             if(request instanceof Write) {
89                 ((Write)request).perform(g);
90             } else if(request instanceof DelayedWrite) {
91                     ((DelayedWrite)request).perform(g);
92             } else {
93                 ((WriteOnly)request).perform(g);
94             }
95         }
96 //        @Override
97 //        public String getId() {
98 //            if(request instanceof WriteGraphRequest) {
99 //                return ((WriteGraphRequest)request).getId();
100 //            } else {
101 //                return null;
102 //            }
103 //        }
104 //        @Override
105 //        public void requestCompleted(GraphRequestStatus status) {
106 //            if(request instanceof WriteGraphRequest) {
107 //                ((WriteGraphRequest)request).requestCompleted(status);
108 //            } else {
109 //            }
110 //        }
111 //        @Override
112 //        public void handleException(Throwable e) {
113 //            this.exception = e;
114 //            if(request instanceof WriteGraphRequest) {
115 //                ((WriteGraphRequest)request).handleException(e);
116 //            }
117 //        }
118
119         public void throwOrWrapException() {
120             if (exception == null)
121                 return;
122             if (exception instanceof RuntimeException)
123                 throw (RuntimeException) exception;
124             if (exception instanceof Error)
125                 throw (Error) exception;
126             throw new RuntimeException("See cause for the real exception.", exception);
127         }
128
129         @Override
130         public VirtualGraph getProvider() {
131             return null;
132         }
133
134 //        @Override
135 //        public void fillMetadata(Map<String, String> metadata) {
136 //        }
137
138         public void acquire() {
139             try {
140                 semaphore.acquire();
141             } catch (InterruptedException e) {
142                         Logger.defaultLogError(e);
143             }
144         }
145
146         public void release() {
147             semaphore.release();
148         }
149
150         @Override
151         public UndoTraits getUndoTraits() {
152             return null;
153         }
154         
155         @Override
156         public String toString() {
157             return "SyncWriteRequestAdapter " + request;
158         }
159
160     }
161
162     long transactionKeepalivePeriod;
163
164     /**
165      * Synchronization object for implementing {@link #synchronize()}.
166      * {@link Object#notifyAll()} is invoked for this lock object every time a
167      * single transaction is completed, thereby releasing all waiters in
168      * {@link #synchronize()}.
169      */
170     Object barrier = new Object();
171
172     Set<Pair<Object, Object>> requestSet = new HashSet<Pair<Object, Object>>();
173     LinkedList<Pair<Object, Object>> requestQueue = new LinkedList<Pair<Object, Object>>();
174     boolean hasAlreadyRequest = false;
175
176     /**
177      * A set of requests which {@link #synchronize()} is depending on at the
178      * moment. Every time a request within this set is completed, some thread in
179      * {@link #synchronize()} should be released.
180      */
181 //    Set<Object> barrierRequests = new HashSet<Object>();
182     Set<Object> syncRequests = new HashSet<Object>();
183
184     private String name;
185
186     private AsyncRequestProcessor processor;
187
188     public MergingGraphRequestProcessor(String name, AsyncRequestProcessor processor, long transactionKeepalivePeriod) {
189         this.name = name;
190         this.processor = processor;
191         this.transactionKeepalivePeriod = transactionKeepalivePeriod;
192     }
193
194     public MergingGraphRequestProcessor(AsyncRequestProcessor processor, long transactionKeepalivePeriod) {
195         this.name = "MergingGraphRequestProcessor" + UUID.randomUUID().toString();
196         this.processor = processor;
197         this.transactionKeepalivePeriod = transactionKeepalivePeriod;
198     }
199
200     @SuppressWarnings({"unchecked", "rawtypes"})
201     protected class MergedRead extends ReadRequest {
202
203         Pair<Object, Object> currentRequest;
204
205 //        RunnerReadGraphRequest(GraphRequestProcessor processor) {
206 //            super(processor);
207 //        }
208 //
209 //        @Override
210 //        public void completed(boolean value) {
211 ////            System.out.println(this + "MGRP read completed");
212 ////            synchronized (MergingGraphRequestProcessor.this) {
213 ////                if (requestQueue.isEmpty())
214 ////                    hasAlreadyRequest = false;
215 ////                else
216 ////                    newTransaction();
217 ////            }
218 //        }
219
220         @Override
221         public void run(ReadGraph graph) {
222
223 //            System.out.println(MergingGraphRequestProcessor.this + " reads");
224
225             while (true) {
226
227                 synchronized (MergingGraphRequestProcessor.this) {
228
229                     // Release #synchronize() invokers if necessary.
230 //                    if (currentRequest != null && barrierRequests.contains(currentRequest)) {
231 //                        synchronized (barrier) {
232 //                            barrier.notifyAll();
233 //                        }
234 //                    }
235
236                     if(requestQueue.isEmpty()) {
237                         if (transactionKeepalivePeriod > 0) {
238 //                            System.out.println("MGRP [" + MergingGraphRequestProcessor.this + "] waits " + transactionKeepalivePeriod + " ms. in " + Thread.currentThread() );
239                             try {
240                                 MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);
241                             } catch (InterruptedException e) {
242                                         Logger.defaultLogError(e);
243                             }
244                             if (requestQueue.isEmpty())
245                                 break;
246                         } else
247                             break;
248                     }
249
250                     Object nextRequest = requestQueue.peekFirst().first;
251                     if(nextRequest instanceof Write || nextRequest instanceof DelayedWrite) {
252                         break;
253                     }
254
255                     currentRequest = requestQueue.remove(0);
256                     requestSet.remove(currentRequest);
257
258                 }
259
260 //                ReadGraphRequest req = (ReadGraphRequest)currentRequest.first;
261
262                 if( syncRequests.contains(currentRequest.first)) {
263
264                     try {
265
266                         if(currentRequest.second instanceof AsyncProcedure<?>) {
267                             if(currentRequest.first instanceof Read) {
268                                 Read req = (Read)currentRequest.first;
269                                 graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
270                             } else {
271                                 AsyncRead req = (AsyncRead)currentRequest.first;
272                                 graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
273                             }
274                         } else {
275                             AsyncMultiRead req = (AsyncMultiRead)currentRequest.first;
276                             graph.syncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);
277                         }
278
279                     } catch(Throwable t) {
280
281                                 Logger.defaultLogError(t);
282
283                         if(currentRequest.second instanceof AsyncProcedure<?>) {
284                             ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
285                         } else {
286                             ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
287                         }
288
289                     }
290
291                     synchronized (currentRequest.first) {
292                         syncRequests.remove(currentRequest.first);
293 //                      System.out.println("notifying " + currentRequest.first);
294                         currentRequest.first.notify();
295                     }
296
297
298                 } else {
299
300                     try{
301
302                         if(currentRequest.second instanceof AsyncProcedure<?>) {
303                             if(currentRequest.first instanceof AsyncRead) {
304                                 AsyncRead req = (AsyncRead)currentRequest.first;
305                                 graph.asyncRequest(req, (AsyncProcedure<?>)currentRequest.second);
306                             } else {
307                                 Read req = (Read)currentRequest.first;
308                                 graph.asyncRequest(req, (AsyncProcedure<?>)currentRequest.second);
309                             }
310                         } else {
311                             AsyncMultiRead req = (AsyncMultiRead)currentRequest.first;
312                             graph.asyncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);
313                         }
314
315                     } catch(Throwable t) {
316
317                                 Logger.defaultLogError(t);
318
319                         if(currentRequest.second instanceof AsyncProcedure<?>) {
320                             ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
321                         } else {
322                             ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
323                         }
324
325                     }
326                 }
327
328             }
329
330 //            System.out.println(MergingGraphRequestProcessor.this + " read completed");
331
332             synchronized (MergingGraphRequestProcessor.this) {
333                 if (requestQueue.isEmpty())
334                     hasAlreadyRequest = false;
335                 else
336                     newTransaction();
337             }
338
339         }
340
341         @Override
342         public String toString() {
343             return "MergedRead[" + requestQueue.size() + " requests]";
344         }
345
346     }
347
348     protected class RunnerWriteGraphRequest extends WriteRequest {
349
350         Pair<Object, Object> currentRequest;
351         HashMap<String, String> metadata = new HashMap<String, String>();
352
353         @Override
354         public void perform(WriteGraph graph) throws DatabaseException {
355
356 //            System.out.println(MergingGraphRequestProcessor.this + " writes");
357
358             while (true) {
359
360                 synchronized (MergingGraphRequestProcessor.this) {
361
362                     // Release #synchronize() invokers if necessary.
363 //                    if (currentRequest != null && barrierRequests.contains(currentRequest)) {
364 //                        synchronized (barrier) {
365 //                            barrier.notifyAll();
366 //                        }
367 //                    }
368
369                     if(requestQueue.isEmpty()) {
370                         if (transactionKeepalivePeriod > 0) {
371                             try {
372                                 MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);
373                             } catch (InterruptedException e) {
374                                         Logger.defaultLogError(e);
375                             }
376                             if (requestQueue.isEmpty())
377                                 break;
378                         } else
379                             break;
380                     }
381
382                     Object nextRequest = requestQueue.peekFirst().first;
383                     if(nextRequest instanceof AsyncMultiRead || nextRequest instanceof AsyncRead || nextRequest instanceof Read) {
384                         break;
385                     }
386
387                     currentRequest = requestQueue.remove(0);
388                     requestSet.remove(currentRequest);
389
390                 }
391
392                 @SuppressWarnings("unchecked")
393                 Callback<Throwable> callback = (Callback<Throwable>)currentRequest.second;
394
395                 if (currentRequest.first.getClass().equals(SyncWriteRequestAdapter.class)) {
396
397                     SyncWriteRequestAdapter adapter = (SyncWriteRequestAdapter)currentRequest.first;
398
399                     try {
400 //                        System.out.println("merg.sync " + adapter);
401                         graph.syncRequest(adapter);
402                         if(callback != null) callback.run(null);
403                     } catch(Throwable t) {
404                                 Logger.defaultLogError(t);
405                         if(callback != null) callback.run(t);
406                     }
407
408                     adapter.release();
409 //                    System.out.println("merg.sync.release " + adapter);
410
411                 } else {
412
413                     try {
414                         if(currentRequest.first instanceof Write) graph.syncRequest((Write)currentRequest.first); 
415                         else if(currentRequest.first instanceof DelayedWrite) graph.syncRequest((DelayedWrite)currentRequest.first); 
416                         if(callback != null) callback.run(null);
417                     } catch(Throwable t) {
418                                 Logger.defaultLogError(t);
419                         if(callback != null) callback.run(t);
420                     }
421
422                 }
423
424             }
425
426 //            System.out.println(MergingGraphRequestProcessor.this + " write completed");
427
428             synchronized (MergingGraphRequestProcessor.this) {
429                 if (requestQueue.isEmpty())
430                     hasAlreadyRequest = false;
431                 else
432                     newTransaction();
433             }
434
435         }
436
437     }
438
439     private void newTransaction() {
440
441         boolean write = false;
442
443         synchronized (MergingGraphRequestProcessor.this) {
444             assert(!requestQueue.isEmpty());
445             Object nextRequest = requestQueue.peekFirst().first;
446             write = (nextRequest instanceof Write || nextRequest instanceof DelayedWrite);
447         }
448
449         if(write) {
450             processor.asyncRequest(new RunnerWriteGraphRequest(), null);
451         } else {
452             processor.asyncRequest(new MergedRead());
453         }
454
455     }
456
457     @Override
458     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiProcedure<T> procedure) {
459
460 //        System.out.println(this + " asyncRequest(ReadGraphRequest<QueryProcedure4<T>> request, QueryProcedure4<T> procedure)");
461
462         if (requestSet.contains(request))
463             return;
464
465         Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
466         requestQueue.add(pair);
467         requestSet.add(pair);
468
469         if (!hasAlreadyRequest) {
470             newTransaction();
471             hasAlreadyRequest = true;
472         } else {
473             notify();
474         }
475
476     }
477
478     @Override
479     public synchronized <T> void asyncRequest(AsyncRead<T> request, AsyncProcedure<T> procedure) {
480
481 //        System.out.println(this + " asyncRequest(ReadGraphRequest<SingleQueryProcedure4<T>> request, SingleQueryProcedure4<T> procedure) " + this);
482
483         if (requestSet.contains(request))
484             return;
485
486         Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
487         requestQueue.add(pair);
488         requestSet.add(pair);
489
490         if (!hasAlreadyRequest) {
491             newTransaction();
492             hasAlreadyRequest = true;
493         } else {
494 //            System.out.println("notify " + this);
495             notify();
496         }
497
498     }
499
500     @Override
501     public synchronized void asyncRequest(Write request, Callback<DatabaseException> callback) {
502
503 //        System.out.println(this + " asyncRequest(WriteGraphRequest request)");
504
505         if (requestSet.contains(request))
506             return;
507
508         Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
509         requestQueue.add(pair);
510         requestSet.add(pair);
511
512         if (!hasAlreadyRequest) {
513 //            System.out.println("new transaction");
514             newTransaction();
515             hasAlreadyRequest = true;
516         } else {
517 //            System.out.println("notify");
518             notify();
519         }
520
521     }
522
523     @Override
524     public synchronized void asyncRequest(DelayedWrite request, Callback<DatabaseException> callback) {
525
526 //        System.out.println(this + " asyncRequest(WriteGraphRequest request)");
527
528         if (requestSet.contains(request))
529             return;
530
531         Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
532         requestQueue.add(pair);
533         requestSet.add(pair);
534
535         if (!hasAlreadyRequest) {
536 //            System.out.println("new transaction");
537             newTransaction();
538             hasAlreadyRequest = true;
539         } else {
540 //            System.out.println("notify");
541             notify();
542         }
543
544     }
545
546     @Override
547     public synchronized void asyncRequest(WriteOnly request, Callback<DatabaseException> callback) {
548
549 //        System.out.println(this + " asyncRequest(WriteGraphRequest request)");
550
551         if (requestSet.contains(request))
552             return;
553
554         Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
555         requestQueue.add(pair);
556         requestSet.add(pair);
557
558         if (!hasAlreadyRequest) {
559 //            System.out.println("new transaction");
560             newTransaction();
561             hasAlreadyRequest = true;
562         } else {
563 //            System.out.println("notify");
564             notify();
565         }
566
567     }
568
569     @Override
570     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
571
572         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
573
574         // Queue the adapter up for execution.
575         synchronized (request) {
576             syncRequests.add(request);
577             asyncRequest(request, procedure);
578             if(syncRequests.contains(request)) {
579                 try {
580                     //                  System.out.println("waiting " + request);
581                     request.wait();
582                 } catch (InterruptedException e) {
583                     throw new Error(e);
584                 }
585             }
586         }
587
588         Throwable t = throwable.get();
589
590         if(t != null) {
591                 Logger.defaultLogError(t);
592             throw new RuntimeException(t.getMessage());
593         }
594         
595         return null;
596
597     }
598
599     @Override
600     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) {
601
602 //        System.out.println("syncRequest(ReadGraphRequest<SingleQueryProcedure4<T>> request, SingleQueryProcedure4<T> procedure)");
603
604         final DataContainer<T> result = new DataContainer<T>(null);
605         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
606
607         // Queue the adapter up for execution.
608         synchronized (request) {
609
610             syncRequests.add(request);
611             asyncRequest(request, new AsyncProcedure<T>() {
612
613                 public void execute(AsyncReadGraph graph, T t) {
614                     synchronized(result) {
615                         result.set(t);
616                     }
617                     procedure.execute(graph, t);
618                 };
619
620                 @Override
621                 public void exception(AsyncReadGraph graph, Throwable t) {
622                     throwable.set(t);
623                 }
624
625                 @Override
626                 public String toString() {
627                     return procedure.toString();
628                 }
629
630             });
631             if(syncRequests.contains(request)) {
632                 try {
633                     //                  System.out.println("waiting " + request);
634                     request.wait();
635                 } catch (InterruptedException e) {
636                     throw new Error(e);
637                 }
638             }
639         }
640
641         Throwable t = throwable.get();
642
643         if(t != null) {
644                 Logger.defaultLogError(t);
645             throw new RuntimeException(t.getMessage());
646         }
647         
648         return result.get();
649
650         //return result.get();
651
652     }
653
654
655     @Override
656     public void syncRequest(Write request) {
657
658 //        System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)");
659
660         SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request);
661
662         asyncRequest(adapter, null);
663
664         adapter.acquire();
665
666         // Throw exception if one occurred.
667         adapter.throwOrWrapException();
668
669     }
670
671     @Override
672     public void syncRequest(WriteOnly request) {
673
674 //        System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)");
675
676         SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request);
677
678         // Queue the adapter up for execution.
679         synchronized (adapter) {
680             asyncRequest(adapter, null);
681             try {
682                 adapter.wait();
683             } catch (InterruptedException e) {
684                 throw new Error(e);
685             }
686         }
687
688         // Throw exception if one occurred.
689         adapter.throwOrWrapException();
690
691     }
692
693     @Override
694     public Session getSession() {
695         return processor.getSession();
696     }
697
698     @Override
699     public String toString() {
700         return "MergingGraphRequestProcessor[" + name + "]@" + System.identityHashCode(this) + " (based on " + processor + ")";
701     }
702
703     @Override
704     public <T> void asyncRequest(AsyncRead<T> request) {
705
706         asyncRequest(request, new ProcedureAdapter<T>() {
707
708             @Override
709             public void exception(Throwable t) {
710                         Logger.defaultLogError(t);
711             }
712
713         });
714
715     }
716
717     @Override
718     public <T> void asyncRequest(AsyncRead<T> request, Procedure<T> procedure) {
719         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
720     }
721
722     @Override
723     public <T> void asyncRequest(AsyncMultiRead<T> request) {
724         throw new UnsupportedOperationException("Not implemented.");
725     }
726
727     @Override
728     public <T> void asyncRequest(AsyncMultiRead<T> request,
729             MultiProcedure<T> procedure) {
730         throw new UnsupportedOperationException("Not implemented.");
731     }
732
733     @Override
734     public <T> T syncRequest(AsyncRead<T> request) {
735
736         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
737         final DataContainer<T> result = new DataContainer<T>();
738
739         syncRequest(request, new AsyncProcedure<T>() {
740
741             public void execute(AsyncReadGraph graph, T t) {
742                 result.set(t);
743             }
744
745             @Override
746             public void exception(AsyncReadGraph graph, Throwable t) {
747                 throwable.set(t);
748             }
749
750         });
751
752         Throwable t = throwable.get();
753
754         if(t != null) {
755                 Logger.defaultLogError(t);
756             throw new RuntimeException(t.getMessage());
757         }
758
759         return result.get();
760
761     }
762
763     @Override
764     public <T> T syncRequest(AsyncRead<T> request,
765             Procedure<T> procedure) {
766         throw new UnsupportedOperationException("Not implemented.");
767     }
768
769     @Override
770     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request) {
771
772         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
773         final ArrayList<T> result = new ArrayList<T>();
774
775         syncRequest(request, new AsyncMultiProcedureAdapter<T>() {
776
777             @Override
778             public void execute(AsyncReadGraph graph, T t) {
779                 synchronized(result) {
780                     result.add(t);
781                 }
782             };
783
784             @Override
785             public void exception(AsyncReadGraph graph, Throwable t) {
786                 throwable.set(t);
787             }
788
789         });
790
791         Throwable t = throwable.get();
792
793         if(t != null) {
794                 Logger.defaultLogError(t);
795             throw new RuntimeException(t.getMessage());
796         }
797
798         return result;
799
800     }
801
802     @Override
803     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request,
804             MultiProcedure<T> procedure) {
805         throw new Error("Not implemented.");
806     }
807
808     @Override
809     public <T> T syncRequest(Read<T> request) {
810
811         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
812         final DataContainer<T> result = new DataContainer<T>();
813
814
815         syncRequest(request, new Procedure<T>() {
816
817             public void execute(T t) {
818                 result.set(t);
819             }
820
821             @Override
822             public void exception(Throwable t) {
823                 throwable.set(t);
824             }
825
826         });
827
828         Throwable t = throwable.get();
829
830         if(t != null) {
831             throw new Error(t.getMessage());
832         }
833
834
835         return result.get();
836
837     }
838
839     @Override
840     public <T> T syncRequest(Read<T> request,
841             final AsyncProcedure<T> procedure) {
842
843         final DataContainer<T> result = new DataContainer<T>(null);
844         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
845
846         // Queue the adapter up for execution.
847         synchronized (request) {
848
849             syncRequests.add(request);
850             asyncRequest(request, new AsyncProcedure<T>() {
851
852                 public void execute(AsyncReadGraph graph, T t) {
853                     synchronized(result) {
854                         result.set(t);
855                     }
856                     procedure.execute(graph, t);
857                 };
858
859                 @Override
860                 public void exception(AsyncReadGraph graph, Throwable t) {
861                     throwable.set(t);
862                 }
863
864                 @Override
865                 public String toString() {
866                     return procedure.toString();
867                 }
868
869             });
870             if(syncRequests.contains(request)) {
871                 try {
872                     //                  System.out.println("waiting " + request);
873                     request.wait();
874                 } catch (InterruptedException e) {
875                     throw new Error(e);
876                 }
877             }
878         }
879
880         Throwable t = throwable.get();
881
882         if(t != null) {
883             throw new RuntimeException("Unexpected exception in MergingGraphRequestProcessor.syncRequest(Read, AsyncProcedure)", t);
884         }
885         
886         return result.get();
887
888     }
889
890     @Override
891     public <T> void asyncRequest(Read<T> request) {
892
893         asyncRequest(request, new ProcedureAdapter<T>() {
894
895             @Override
896             public void exception(Throwable t) {
897                 Logger.defaultLogError(t);
898             }
899
900         });
901
902     }
903
904     @Override
905     public synchronized <T> void asyncRequest(Read<T> request,
906             AsyncProcedure<T> procedure) {
907
908         if (requestSet.contains(request))
909             return;
910
911         Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
912         requestQueue.add(pair);
913         requestSet.add(pair);
914
915         if (!hasAlreadyRequest) {
916             newTransaction();
917             hasAlreadyRequest = true;
918         } else {
919             notify();
920         }
921
922     }
923
924     @Override
925     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) {
926         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
927     }
928
929     @Override
930     public <T> Collection<T> syncRequest(final MultiRead<T> request) throws DatabaseException {
931         assert(request != null);
932
933         final ArrayList<T> result = new ArrayList<T>();
934         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
935
936         syncRequest(request, new AsyncMultiProcedureAdapter<T>() {
937
938             @Override
939             public void execute(AsyncReadGraph graph, T t) {
940                 synchronized(result) {
941                     result.add(t);
942                 }
943             }
944
945             @Override
946             public void exception(AsyncReadGraph graph, Throwable t) {
947                 exception.set(t);
948             }
949
950             @Override
951             public String toString() {
952                 return "syncRequest(MultiRead) -> " + request;
953             }
954
955         });
956
957         Throwable t = exception.get();
958         if(t != null) {
959             if(t instanceof DatabaseException) throw (DatabaseException)t;
960             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
961         }
962
963         return result;
964     }
965
966     @Override
967     public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiProcedure<T> procedure)  {
968         throw new UnsupportedOperationException("Not implemented");
969     }
970
971     @Override
972     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
973         throw new UnsupportedOperationException("Not implemented");
974     }
975
976     @Override
977     public <T> void asyncRequest(Read<T> request, Procedure<T> procedure) {
978         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
979     }
980
981     @Override
982     public <T> void asyncRequest(MultiRead<T> request) {
983         throw new UnsupportedOperationException("Not implemented");
984     }
985
986     @Override
987     public <T> void asyncRequest(MultiRead<T> request, AsyncMultiProcedure<T> procedure) {
988         throw new UnsupportedOperationException("Not implemented");
989     }
990
991     @Override
992     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
993         throw new UnsupportedOperationException("Not implemented");
994     }
995
996     @Override
997     public void asyncRequest(Write r) {
998         asyncRequest(r, null);
999     }
1000
1001     @Override
1002     public void asyncRequest(DelayedWrite r) {
1003         asyncRequest(r, null);
1004     }
1005
1006     @Override
1007     public void asyncRequest(WriteOnly r) {
1008         asyncRequest(r, null);
1009     }
1010
1011     @Override
1012     public <T> T getService(Class<T> api) {
1013         return getSession().getService(api);
1014     }
1015
1016     @Override
1017     public <T> T peekService(Class<T> api) {
1018         return getSession().peekService(api);
1019     }
1020
1021     @Override
1022     public boolean hasService(Class<?> api) {
1023         return getSession().hasService(api);
1024     }
1025
1026     @Override
1027     public <T> void registerService(Class<T> api, T service) {
1028         getSession().registerService(api, service);
1029     }
1030
1031 //    @Override
1032 //    public <T> T syncRequest(Read<T> arg0, AsyncListener<T> arg1) {
1033 //        throw new UnsupportedOperationException("Not implemented.");
1034 //    }
1035
1036     @Override
1037     public <T> T syncRequest(Read<T> arg0, SyncListener<T> arg1) {
1038         throw new UnsupportedOperationException("Not implemented.");
1039     }
1040
1041     @Override
1042     public <T> T syncRequest(Read<T> arg0, Listener<T> arg1) {
1043         throw new UnsupportedOperationException("Not implemented.");
1044     }
1045
1046     @Override
1047     public <T> T syncRequest(Read<T> arg0, SyncProcedure<T> arg1) {
1048         throw new UnsupportedOperationException("Not implemented.");
1049     }
1050
1051     @Override
1052     public <T> T syncRequest(AsyncRead<T> arg0, AsyncListener<T> arg1) {
1053         throw new UnsupportedOperationException("Not implemented.");
1054     }
1055
1056     @Override
1057     public <T> T syncRequest(AsyncRead<T> arg0, SyncListener<T> arg1) {
1058         throw new UnsupportedOperationException("Not implemented.");
1059
1060     }
1061
1062     @Override
1063     public <T> T syncRequest(AsyncRead<T> arg0, Listener<T> arg1) {
1064         throw new UnsupportedOperationException("Not implemented.");
1065
1066     }
1067
1068     @Override
1069     public <T> T syncRequest(AsyncRead<T> arg0, SyncProcedure<T> arg1) {
1070         throw new UnsupportedOperationException("Not implemented.");
1071
1072     }
1073
1074     @Override
1075     public <T> void asyncRequest(Read<T> arg0, AsyncListener<T> arg1) {
1076         throw new UnsupportedOperationException("Not implemented.");
1077
1078     }
1079
1080     @Override
1081     public <T> void asyncRequest(Read<T> arg0, SyncListener<T> arg1) {
1082         throw new UnsupportedOperationException("Not implemented.");
1083
1084     }
1085
1086     @Override
1087     public <T> void asyncRequest(Read<T> arg0, Listener<T> arg1) {
1088         throw new UnsupportedOperationException("Not implemented.");
1089
1090     }
1091
1092     @Override
1093     public <T> void asyncRequest(Read<T> arg0, SyncProcedure<T> arg1) {
1094         throw new UnsupportedOperationException("Not implemented.");
1095
1096     }
1097
1098     @Override
1099     public <T> void asyncRequest(AsyncRead<T> arg0, AsyncListener<T> arg1) {
1100         throw new UnsupportedOperationException("Not implemented.");
1101
1102     }
1103
1104     @Override
1105     public <T> void asyncRequest(AsyncRead<T> arg0, SyncListener<T> arg1) {
1106         throw new UnsupportedOperationException("Not implemented.");
1107
1108     }
1109
1110     @Override
1111     public <T> void asyncRequest(AsyncRead<T> arg0, Listener<T> arg1) {
1112         throw new UnsupportedOperationException("Not implemented.");
1113
1114     }
1115
1116     @Override
1117     public <T> void asyncRequest(AsyncRead<T> arg0, SyncProcedure<T> arg1) {
1118         throw new UnsupportedOperationException("Not implemented.");
1119
1120     }
1121
1122     @Override
1123     public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiListener<T> arg1) {
1124         throw new UnsupportedOperationException("Not implemented.");
1125
1126     }
1127
1128     @Override
1129     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {
1130         throw new UnsupportedOperationException("Not implemented.");
1131
1132     }
1133
1134     @Override
1135     public <T> Collection<T> syncRequest(MultiRead<T> arg0, MultiListener<T> arg1) {
1136         throw new UnsupportedOperationException("Not implemented.");
1137
1138     }
1139
1140     @Override
1141     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
1142         throw new UnsupportedOperationException("Not implemented.");
1143
1144     }
1145
1146     @Override
1147     public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0,
1148             AsyncMultiListener<T> arg1) {
1149         throw new UnsupportedOperationException("Not implemented.");
1150
1151     }
1152
1153     @Override
1154     public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0, SyncMultiListener<T> arg1) {
1155         throw new UnsupportedOperationException("Not implemented.");
1156
1157     }
1158
1159     @Override
1160     public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0,
1161             MultiListener<T> arg1) {
1162         throw new UnsupportedOperationException("Not implemented.");
1163
1164     }
1165
1166     @Override
1167     public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
1168         throw new UnsupportedOperationException("Not implemented.");
1169
1170     }
1171
1172     @Override
1173     public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiListener<T> arg1) {
1174         throw new UnsupportedOperationException("Not implemented.");
1175
1176     }
1177
1178     @Override
1179     public <T> void asyncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {
1180         throw new UnsupportedOperationException("Not implemented.");
1181
1182     }
1183
1184     @Override
1185     public <T> void asyncRequest(MultiRead<T> arg0, MultiListener<T> arg1) {
1186         throw new UnsupportedOperationException("Not implemented.");
1187
1188     }
1189
1190     @Override
1191     public <T> void asyncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
1192         throw new UnsupportedOperationException("Not implemented.");
1193
1194     }
1195
1196     @Override
1197     public <T> void asyncRequest(AsyncMultiRead<T> arg0,
1198             AsyncMultiListener<T> arg1) {
1199         throw new UnsupportedOperationException("Not implemented.");
1200     }
1201
1202     @Override
1203     public <T> void asyncRequest(AsyncMultiRead<T> arg0, SyncMultiListener<T> arg1) {
1204         throw new UnsupportedOperationException("Not implemented.");
1205
1206     }
1207
1208     @Override
1209     public <T> void asyncRequest(AsyncMultiRead<T> arg0,
1210             MultiListener<T> arg1) {
1211         throw new UnsupportedOperationException("Not implemented.");
1212
1213     }
1214
1215     @Override
1216     public <T> void asyncRequest(AsyncMultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
1217         throw new UnsupportedOperationException("Not implemented.");
1218     }
1219
1220     @Override
1221     public <T> void asyncRequest(ExternalRead<T> request, Procedure<T> procedure) {
1222         throw new UnsupportedOperationException("Not implemented.");
1223     }
1224
1225     @Override
1226     public <T> T syncRequest(ExternalRead<T> request) {
1227         throw new UnsupportedOperationException("Not implemented.");
1228     }
1229
1230     @Override
1231     public <T> T syncRequest(ExternalRead<T> request, Listener<T> procedure) {
1232         throw new UnsupportedOperationException("Not implemented.");
1233     }
1234
1235     @Override
1236     public <T> T syncRequest(ExternalRead<T> request, Procedure<T> procedure) {
1237         throw new UnsupportedOperationException("Not implemented.");
1238     }
1239
1240     @Override
1241     public <T> void asyncRequest(ExternalRead<T> request) {
1242         throw new UnsupportedOperationException("Not implemented.");
1243     }
1244
1245     @Override
1246     public <T> void asyncRequest(ExternalRead<T> request, Listener<T> procedure) {
1247         throw new UnsupportedOperationException("Not implemented.");
1248     }
1249
1250     @Override
1251     public void syncRequest(DelayedWrite request) throws DatabaseException {
1252         throw new UnsupportedOperationException("Not implemented.");
1253     }
1254
1255         @Override
1256         public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1257                 throw new UnsupportedOperationException();
1258         }
1259
1260         @Override
1261         public <T> T syncRequest(DelayedWriteResult<T> request)
1262                         throws DatabaseException {
1263                 throw new UnsupportedOperationException();
1264         }
1265
1266         @Override
1267         public <T> T syncRequest(WriteOnlyResult<T> r) throws DatabaseException {
1268                 throw new UnsupportedOperationException();
1269         }
1270
1271         @Override
1272         public <T> void asyncRequest(WriteResult<T> r, Procedure<T> procedure) {
1273                 throw new UnsupportedOperationException();
1274         }
1275
1276         @Override
1277         public <T> void asyncRequest(DelayedWriteResult<T> r, Procedure<T> procedure) {
1278                 throw new UnsupportedOperationException();
1279         }
1280
1281         @Override
1282         public <T> void asyncRequest(WriteOnlyResult<T> r, Procedure<T> procedure) {
1283                 throw new UnsupportedOperationException();
1284         }
1285         
1286         @Override
1287         public Resource getRootLibrary() {
1288                 return processor.getRootLibrary();
1289         }
1290
1291         @Override
1292         public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
1293                 throw new UnsupportedOperationException();
1294         }
1295
1296         @Override
1297         public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
1298                 throw new UnsupportedOperationException();
1299         }
1300
1301         @Override
1302         public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
1303                 throw new UnsupportedOperationException();
1304         }
1305
1306         @Override
1307         public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
1308                 throw new UnsupportedOperationException();
1309         }
1310
1311         @Override
1312         public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
1313                 throw new UnsupportedOperationException();
1314         }
1315
1316         @Override
1317         public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
1318                 throw new UnsupportedOperationException();
1319         }
1320         
1321         @Override
1322         public <T> T sync(ReadInterface<T> r) throws DatabaseException {
1323                 throw new UnsupportedOperationException();
1324         }
1325         
1326         @Override
1327         public <T> T sync(WriteInterface<T> r) throws DatabaseException {
1328                 throw new UnsupportedOperationException();
1329         }
1330         
1331         @Override
1332         public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
1333                 throw new UnsupportedOperationException();
1334         }
1335
1336         @Override
1337         public <T> void async(WriteInterface<T> r) {
1338                 throw new UnsupportedOperationException();
1339         }
1340         
1341         @Override
1342         public Object getModificationCounter() {
1343                 throw new UnsupportedOperationException();
1344         }
1345         
1346 }