]> gerrit.simantics Code Review - simantics/platform.git/blob
5e07bb034ecb924adc8567f99f09990e141e69ed
[simantics/platform.git] /
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.common.processor;
13
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.LinkedList;
19 import java.util.Set;
20 import java.util.UUID;
21 import java.util.concurrent.Semaphore;
22
23 import org.simantics.db.AsyncReadGraph;
24 import org.simantics.db.AsyncRequestProcessor;
25 import org.simantics.db.ReadGraph;
26 import org.simantics.db.RequestProcessor;
27 import org.simantics.db.Resource;
28 import org.simantics.db.Session;
29 import org.simantics.db.WriteGraph;
30 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
31 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
32 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
33 import org.simantics.db.common.request.ReadRequest;
34 import org.simantics.db.common.request.WriteRequest;
35 import org.simantics.db.common.utils.Logger;
36 import org.simantics.db.exception.CancelTransactionException;
37 import org.simantics.db.exception.DatabaseException;
38 import org.simantics.db.procedure.AsyncListener;
39 import org.simantics.db.procedure.AsyncMultiListener;
40 import org.simantics.db.procedure.AsyncMultiProcedure;
41 import org.simantics.db.procedure.AsyncProcedure;
42 import org.simantics.db.procedure.Listener;
43 import org.simantics.db.procedure.MultiListener;
44 import org.simantics.db.procedure.MultiProcedure;
45 import org.simantics.db.procedure.Procedure;
46 import org.simantics.db.procedure.SyncListener;
47 import org.simantics.db.procedure.SyncMultiListener;
48 import org.simantics.db.procedure.SyncMultiProcedure;
49 import org.simantics.db.procedure.SyncProcedure;
50 import org.simantics.db.request.AsyncMultiRead;
51 import org.simantics.db.request.AsyncRead;
52 import org.simantics.db.request.DelayedWrite;
53 import org.simantics.db.request.DelayedWriteResult;
54 import org.simantics.db.request.ExternalRead;
55 import org.simantics.db.request.MultiRead;
56 import org.simantics.db.request.Read;
57 import org.simantics.db.request.ReadInterface;
58 import org.simantics.db.request.Write;
59 import org.simantics.db.request.WriteInterface;
60 import org.simantics.db.request.WriteOnly;
61 import org.simantics.db.request.WriteOnlyResult;
62 import org.simantics.db.request.WriteResult;
63 import org.simantics.utils.DataContainer;
64 import org.simantics.utils.datastructures.Callback;
65 import org.simantics.utils.datastructures.Pair;
66
67 public class MergingGraphRequestProcessor implements RequestProcessor {
68
69     private static class SyncWriteRequestAdapter implements Write {
70
71         private Semaphore semaphore = new Semaphore(0);
72         private Object request;
73         private Throwable exception;
74         SyncWriteRequestAdapter(Write r) {
75             this.request = r;
76         }
77         SyncWriteRequestAdapter(WriteOnly r) {
78             this.request = r;
79         }
80 //        @Override
81 //        public GraphRequestStatus perform(Graph g) throws Exception {
82 //            return perform((ReadGraph)g);
83 //        }
84         @Override
85         public void perform(WriteGraph g) throws DatabaseException, CancelTransactionException {
86             if(request instanceof Write) {
87                 ((Write)request).perform(g);
88             } else if(request instanceof DelayedWrite) {
89                     ((DelayedWrite)request).perform(g);
90             } else {
91                 ((WriteOnly)request).perform(g);
92             }
93         }
94 //        @Override
95 //        public String getId() {
96 //            if(request instanceof WriteGraphRequest) {
97 //                return ((WriteGraphRequest)request).getId();
98 //            } else {
99 //                return null;
100 //            }
101 //        }
102 //        @Override
103 //        public void requestCompleted(GraphRequestStatus status) {
104 //            if(request instanceof WriteGraphRequest) {
105 //                ((WriteGraphRequest)request).requestCompleted(status);
106 //            } else {
107 //            }
108 //        }
109 //        @Override
110 //        public void handleException(Throwable e) {
111 //            this.exception = e;
112 //            if(request instanceof WriteGraphRequest) {
113 //                ((WriteGraphRequest)request).handleException(e);
114 //            }
115 //        }
116
117         public void throwOrWrapException() {
118             if (exception == null)
119                 return;
120             if (exception instanceof RuntimeException)
121                 throw (RuntimeException) exception;
122             if (exception instanceof Error)
123                 throw (Error) exception;
124             throw new RuntimeException("See cause for the real exception.", exception);
125         }
126
127         public void acquire() {
128             try {
129                 semaphore.acquire();
130             } catch (InterruptedException e) {
131                         Logger.defaultLogError(e);
132             }
133         }
134
135         public void release() {
136             semaphore.release();
137         }
138
139         @Override
140         public String toString() {
141             return "SyncWriteRequestAdapter " + request;
142         }
143
144     }
145
146     long transactionKeepalivePeriod;
147
148     /**
149      * Synchronization object for implementing {@link #synchronize()}.
150      * {@link Object#notifyAll()} is invoked for this lock object every time a
151      * single transaction is completed, thereby releasing all waiters in
152      * {@link #synchronize()}.
153      */
154     Object barrier = new Object();
155
156     Set<Pair<Object, Object>> requestSet = new HashSet<Pair<Object, Object>>();
157     LinkedList<Pair<Object, Object>> requestQueue = new LinkedList<Pair<Object, Object>>();
158     boolean hasAlreadyRequest = false;
159
160     /**
161      * A set of requests which {@link #synchronize()} is depending on at the
162      * moment. Every time a request within this set is completed, some thread in
163      * {@link #synchronize()} should be released.
164      */
165 //    Set<Object> barrierRequests = new HashSet<Object>();
166     Set<Object> syncRequests = new HashSet<Object>();
167
168     private String name;
169
170     private AsyncRequestProcessor processor;
171
172     public MergingGraphRequestProcessor(String name, AsyncRequestProcessor processor, long transactionKeepalivePeriod) {
173         this.name = name;
174         this.processor = processor;
175         this.transactionKeepalivePeriod = transactionKeepalivePeriod;
176     }
177
178     public MergingGraphRequestProcessor(AsyncRequestProcessor processor, long transactionKeepalivePeriod) {
179         this.name = "MergingGraphRequestProcessor" + UUID.randomUUID().toString();
180         this.processor = processor;
181         this.transactionKeepalivePeriod = transactionKeepalivePeriod;
182     }
183
184     @SuppressWarnings({"unchecked", "rawtypes"})
185     protected class MergedRead extends ReadRequest {
186
187         Pair<Object, Object> currentRequest;
188
189 //        RunnerReadGraphRequest(GraphRequestProcessor processor) {
190 //            super(processor);
191 //        }
192 //
193 //        @Override
194 //        public void completed(boolean value) {
195 ////            System.out.println(this + "MGRP read completed");
196 ////            synchronized (MergingGraphRequestProcessor.this) {
197 ////                if (requestQueue.isEmpty())
198 ////                    hasAlreadyRequest = false;
199 ////                else
200 ////                    newTransaction();
201 ////            }
202 //        }
203
204         @Override
205         public void run(ReadGraph graph) {
206
207 //            System.out.println(MergingGraphRequestProcessor.this + " reads");
208
209             while (true) {
210
211                 synchronized (MergingGraphRequestProcessor.this) {
212
213                     // Release #synchronize() invokers if necessary.
214 //                    if (currentRequest != null && barrierRequests.contains(currentRequest)) {
215 //                        synchronized (barrier) {
216 //                            barrier.notifyAll();
217 //                        }
218 //                    }
219
220                     if(requestQueue.isEmpty()) {
221                         if (transactionKeepalivePeriod > 0) {
222 //                            System.out.println("MGRP [" + MergingGraphRequestProcessor.this + "] waits " + transactionKeepalivePeriod + " ms. in " + Thread.currentThread() );
223                             try {
224                                 MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);
225                             } catch (InterruptedException e) {
226                                         Logger.defaultLogError(e);
227                             }
228                             if (requestQueue.isEmpty())
229                                 break;
230                         } else
231                             break;
232                     }
233
234                     Object nextRequest = requestQueue.peekFirst().first;
235                     if(nextRequest instanceof Write || nextRequest instanceof DelayedWrite) {
236                         break;
237                     }
238
239                     currentRequest = requestQueue.remove(0);
240                     requestSet.remove(currentRequest);
241
242                 }
243
244 //                ReadGraphRequest req = (ReadGraphRequest)currentRequest.first;
245
246                 if( syncRequests.contains(currentRequest.first)) {
247
248                     try {
249
250                         if(currentRequest.second instanceof AsyncProcedure<?>) {
251                             if(currentRequest.first instanceof Read) {
252                                 Read req = (Read)currentRequest.first;
253                                 graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
254                             } else {
255                                 AsyncRead req = (AsyncRead)currentRequest.first;
256                                 graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
257                             }
258                         } else {
259                             AsyncMultiRead req = (AsyncMultiRead)currentRequest.first;
260                             graph.syncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);
261                         }
262
263                     } catch(Throwable t) {
264
265                                 Logger.defaultLogError(t);
266
267                         if(currentRequest.second instanceof AsyncProcedure<?>) {
268                             ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
269                         } else {
270                             ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
271                         }
272
273                     }
274
275                     synchronized (currentRequest.first) {
276                         syncRequests.remove(currentRequest.first);
277 //                      System.out.println("notifying " + currentRequest.first);
278                         currentRequest.first.notify();
279                     }
280
281
282                 } else {
283
284                     try{
285
286                         if(currentRequest.second instanceof AsyncProcedure<?>) {
287                             if(currentRequest.first instanceof AsyncRead) {
288                                 AsyncRead req = (AsyncRead)currentRequest.first;
289                                 graph.asyncRequest(req, (AsyncProcedure<?>)currentRequest.second);
290                             } else {
291                                 Read req = (Read)currentRequest.first;
292                                 graph.asyncRequest(req, (AsyncProcedure<?>)currentRequest.second);
293                             }
294                         } else {
295                             AsyncMultiRead req = (AsyncMultiRead)currentRequest.first;
296                             graph.asyncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);
297                         }
298
299                     } catch(Throwable t) {
300
301                                 Logger.defaultLogError(t);
302
303                         if(currentRequest.second instanceof AsyncProcedure<?>) {
304                             ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
305                         } else {
306                             ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
307                         }
308
309                     }
310                 }
311
312             }
313
314 //            System.out.println(MergingGraphRequestProcessor.this + " read completed");
315
316             synchronized (MergingGraphRequestProcessor.this) {
317                 if (requestQueue.isEmpty())
318                     hasAlreadyRequest = false;
319                 else
320                     newTransaction();
321             }
322
323         }
324
325         @Override
326         public String toString() {
327             return "MergedRead[" + requestQueue.size() + " requests]";
328         }
329
330     }
331
332     protected class RunnerWriteGraphRequest extends WriteRequest {
333
334         Pair<Object, Object> currentRequest;
335         HashMap<String, String> metadata = new HashMap<String, String>();
336
337         @Override
338         public void perform(WriteGraph graph) throws DatabaseException {
339
340 //            System.out.println(MergingGraphRequestProcessor.this + " writes");
341
342             while (true) {
343
344                 synchronized (MergingGraphRequestProcessor.this) {
345
346                     // Release #synchronize() invokers if necessary.
347 //                    if (currentRequest != null && barrierRequests.contains(currentRequest)) {
348 //                        synchronized (barrier) {
349 //                            barrier.notifyAll();
350 //                        }
351 //                    }
352
353                     if(requestQueue.isEmpty()) {
354                         if (transactionKeepalivePeriod > 0) {
355                             try {
356                                 MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);
357                             } catch (InterruptedException e) {
358                                         Logger.defaultLogError(e);
359                             }
360                             if (requestQueue.isEmpty())
361                                 break;
362                         } else
363                             break;
364                     }
365
366                     Object nextRequest = requestQueue.peekFirst().first;
367                     if(nextRequest instanceof AsyncMultiRead || nextRequest instanceof AsyncRead || nextRequest instanceof Read) {
368                         break;
369                     }
370
371                     currentRequest = requestQueue.remove(0);
372                     requestSet.remove(currentRequest);
373
374                 }
375
376                 @SuppressWarnings("unchecked")
377                 Callback<Throwable> callback = (Callback<Throwable>)currentRequest.second;
378
379                 if (currentRequest.first.getClass().equals(SyncWriteRequestAdapter.class)) {
380
381                     SyncWriteRequestAdapter adapter = (SyncWriteRequestAdapter)currentRequest.first;
382
383                     try {
384 //                        System.out.println("merg.sync " + adapter);
385                         graph.syncRequest(adapter);
386                         if(callback != null) callback.run(null);
387                     } catch(Throwable t) {
388                                 Logger.defaultLogError(t);
389                         if(callback != null) callback.run(t);
390                     }
391
392                     adapter.release();
393 //                    System.out.println("merg.sync.release " + adapter);
394
395                 } else {
396
397                     try {
398                         if(currentRequest.first instanceof Write) graph.syncRequest((Write)currentRequest.first); 
399                         else if(currentRequest.first instanceof DelayedWrite) graph.syncRequest((DelayedWrite)currentRequest.first); 
400                         if(callback != null) callback.run(null);
401                     } catch(Throwable t) {
402                                 Logger.defaultLogError(t);
403                         if(callback != null) callback.run(t);
404                     }
405
406                 }
407
408             }
409
410 //            System.out.println(MergingGraphRequestProcessor.this + " write completed");
411
412             synchronized (MergingGraphRequestProcessor.this) {
413                 if (requestQueue.isEmpty())
414                     hasAlreadyRequest = false;
415                 else
416                     newTransaction();
417             }
418
419         }
420
421     }
422
423     private void newTransaction() {
424
425         boolean write = false;
426
427         synchronized (MergingGraphRequestProcessor.this) {
428             assert(!requestQueue.isEmpty());
429             Object nextRequest = requestQueue.peekFirst().first;
430             write = (nextRequest instanceof Write || nextRequest instanceof DelayedWrite);
431         }
432
433         if(write) {
434             processor.asyncRequest(new RunnerWriteGraphRequest(), null);
435         } else {
436             processor.asyncRequest(new MergedRead());
437         }
438
439     }
440
441     @Override
442     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiProcedure<T> procedure) {
443
444 //        System.out.println(this + " asyncRequest(ReadGraphRequest<QueryProcedure4<T>> request, QueryProcedure4<T> procedure)");
445
446         if (requestSet.contains(request))
447             return;
448
449         Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
450         requestQueue.add(pair);
451         requestSet.add(pair);
452
453         if (!hasAlreadyRequest) {
454             newTransaction();
455             hasAlreadyRequest = true;
456         } else {
457             notify();
458         }
459
460     }
461
462     @Override
463     public synchronized <T> void asyncRequest(AsyncRead<T> request, AsyncProcedure<T> procedure) {
464
465 //        System.out.println(this + " asyncRequest(ReadGraphRequest<SingleQueryProcedure4<T>> request, SingleQueryProcedure4<T> procedure) " + this);
466
467         if (requestSet.contains(request))
468             return;
469
470         Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
471         requestQueue.add(pair);
472         requestSet.add(pair);
473
474         if (!hasAlreadyRequest) {
475             newTransaction();
476             hasAlreadyRequest = true;
477         } else {
478 //            System.out.println("notify " + this);
479             notify();
480         }
481
482     }
483
484     @Override
485     public synchronized void asyncRequest(Write request, Callback<DatabaseException> callback) {
486
487 //        System.out.println(this + " asyncRequest(WriteGraphRequest request)");
488
489         if (requestSet.contains(request))
490             return;
491
492         Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
493         requestQueue.add(pair);
494         requestSet.add(pair);
495
496         if (!hasAlreadyRequest) {
497 //            System.out.println("new transaction");
498             newTransaction();
499             hasAlreadyRequest = true;
500         } else {
501 //            System.out.println("notify");
502             notify();
503         }
504
505     }
506
507     @Override
508     public synchronized void asyncRequest(DelayedWrite request, Callback<DatabaseException> callback) {
509
510 //        System.out.println(this + " asyncRequest(WriteGraphRequest request)");
511
512         if (requestSet.contains(request))
513             return;
514
515         Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
516         requestQueue.add(pair);
517         requestSet.add(pair);
518
519         if (!hasAlreadyRequest) {
520 //            System.out.println("new transaction");
521             newTransaction();
522             hasAlreadyRequest = true;
523         } else {
524 //            System.out.println("notify");
525             notify();
526         }
527
528     }
529
530     @Override
531     public synchronized void asyncRequest(WriteOnly request, Callback<DatabaseException> callback) {
532
533 //        System.out.println(this + " asyncRequest(WriteGraphRequest request)");
534
535         if (requestSet.contains(request))
536             return;
537
538         Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
539         requestQueue.add(pair);
540         requestSet.add(pair);
541
542         if (!hasAlreadyRequest) {
543 //            System.out.println("new transaction");
544             newTransaction();
545             hasAlreadyRequest = true;
546         } else {
547 //            System.out.println("notify");
548             notify();
549         }
550
551     }
552
553     @Override
554     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
555
556         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
557
558         // Queue the adapter up for execution.
559         synchronized (request) {
560             syncRequests.add(request);
561             asyncRequest(request, procedure);
562             if(syncRequests.contains(request)) {
563                 try {
564                     //                  System.out.println("waiting " + request);
565                     request.wait();
566                 } catch (InterruptedException e) {
567                     throw new Error(e);
568                 }
569             }
570         }
571
572         Throwable t = throwable.get();
573
574         if(t != null) {
575                 Logger.defaultLogError(t);
576             throw new RuntimeException(t.getMessage());
577         }
578         
579         return null;
580
581     }
582
583     @Override
584     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) {
585
586 //        System.out.println("syncRequest(ReadGraphRequest<SingleQueryProcedure4<T>> request, SingleQueryProcedure4<T> procedure)");
587
588         final DataContainer<T> result = new DataContainer<T>(null);
589         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
590
591         // Queue the adapter up for execution.
592         synchronized (request) {
593
594             syncRequests.add(request);
595             asyncRequest(request, new AsyncProcedure<T>() {
596
597                 public void execute(AsyncReadGraph graph, T t) {
598                     synchronized(result) {
599                         result.set(t);
600                     }
601                     procedure.execute(graph, t);
602                 };
603
604                 @Override
605                 public void exception(AsyncReadGraph graph, Throwable t) {
606                     throwable.set(t);
607                 }
608
609                 @Override
610                 public String toString() {
611                     return procedure.toString();
612                 }
613
614             });
615             if(syncRequests.contains(request)) {
616                 try {
617                     //                  System.out.println("waiting " + request);
618                     request.wait();
619                 } catch (InterruptedException e) {
620                     throw new Error(e);
621                 }
622             }
623         }
624
625         Throwable t = throwable.get();
626
627         if(t != null) {
628                 Logger.defaultLogError(t);
629             throw new RuntimeException(t.getMessage());
630         }
631         
632         return result.get();
633
634         //return result.get();
635
636     }
637
638
639     @Override
640     public void syncRequest(Write request) {
641
642 //        System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)");
643
644         SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request);
645
646         asyncRequest(adapter, null);
647
648         adapter.acquire();
649
650         // Throw exception if one occurred.
651         adapter.throwOrWrapException();
652
653     }
654
655     @Override
656     public void syncRequest(WriteOnly request) {
657
658 //        System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)");
659
660         SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request);
661
662         // Queue the adapter up for execution.
663         synchronized (adapter) {
664             asyncRequest(adapter, null);
665             try {
666                 adapter.wait();
667             } catch (InterruptedException e) {
668                 throw new Error(e);
669             }
670         }
671
672         // Throw exception if one occurred.
673         adapter.throwOrWrapException();
674
675     }
676
677     @Override
678     public Session getSession() {
679         return processor.getSession();
680     }
681
682     @Override
683     public String toString() {
684         return "MergingGraphRequestProcessor[" + name + "]@" + System.identityHashCode(this) + " (based on " + processor + ")";
685     }
686
687     @Override
688     public <T> void asyncRequest(AsyncRead<T> request) {
689
690         asyncRequest(request, new ProcedureAdapter<T>() {
691
692             @Override
693             public void exception(Throwable t) {
694                         Logger.defaultLogError(t);
695             }
696
697         });
698
699     }
700
701     @Override
702     public <T> void asyncRequest(AsyncRead<T> request, Procedure<T> procedure) {
703         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
704     }
705
706     @Override
707     public <T> void asyncRequest(AsyncMultiRead<T> request) {
708         throw new UnsupportedOperationException("Not implemented.");
709     }
710
711     @Override
712     public <T> void asyncRequest(AsyncMultiRead<T> request,
713             MultiProcedure<T> procedure) {
714         throw new UnsupportedOperationException("Not implemented.");
715     }
716
717     @Override
718     public <T> T syncRequest(AsyncRead<T> request) {
719
720         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
721         final DataContainer<T> result = new DataContainer<T>();
722
723         syncRequest(request, new AsyncProcedure<T>() {
724
725             public void execute(AsyncReadGraph graph, T t) {
726                 result.set(t);
727             }
728
729             @Override
730             public void exception(AsyncReadGraph graph, Throwable t) {
731                 throwable.set(t);
732             }
733
734         });
735
736         Throwable t = throwable.get();
737
738         if(t != null) {
739                 Logger.defaultLogError(t);
740             throw new RuntimeException(t.getMessage());
741         }
742
743         return result.get();
744
745     }
746
747     @Override
748     public <T> T syncRequest(AsyncRead<T> request,
749             Procedure<T> procedure) {
750         throw new UnsupportedOperationException("Not implemented.");
751     }
752
753     @Override
754     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request) {
755
756         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
757         final ArrayList<T> result = new ArrayList<T>();
758
759         syncRequest(request, new AsyncMultiProcedureAdapter<T>() {
760
761             @Override
762             public void execute(AsyncReadGraph graph, T t) {
763                 synchronized(result) {
764                     result.add(t);
765                 }
766             };
767
768             @Override
769             public void exception(AsyncReadGraph graph, Throwable t) {
770                 throwable.set(t);
771             }
772
773         });
774
775         Throwable t = throwable.get();
776
777         if(t != null) {
778                 Logger.defaultLogError(t);
779             throw new RuntimeException(t.getMessage());
780         }
781
782         return result;
783
784     }
785
786     @Override
787     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request,
788             MultiProcedure<T> procedure) {
789         throw new Error("Not implemented.");
790     }
791
792     @Override
793     public <T> T syncRequest(Read<T> request) {
794
795         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
796         final DataContainer<T> result = new DataContainer<T>();
797
798
799         syncRequest(request, new Procedure<T>() {
800
801             public void execute(T t) {
802                 result.set(t);
803             }
804
805             @Override
806             public void exception(Throwable t) {
807                 throwable.set(t);
808             }
809
810         });
811
812         Throwable t = throwable.get();
813
814         if(t != null) {
815             throw new Error(t.getMessage());
816         }
817
818
819         return result.get();
820
821     }
822
823     @Override
824     public <T> T syncRequest(Read<T> request,
825             final AsyncProcedure<T> procedure) {
826
827         final DataContainer<T> result = new DataContainer<T>(null);
828         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
829
830         // Queue the adapter up for execution.
831         synchronized (request) {
832
833             syncRequests.add(request);
834             asyncRequest(request, new AsyncProcedure<T>() {
835
836                 public void execute(AsyncReadGraph graph, T t) {
837                     synchronized(result) {
838                         result.set(t);
839                     }
840                     procedure.execute(graph, t);
841                 };
842
843                 @Override
844                 public void exception(AsyncReadGraph graph, Throwable t) {
845                     throwable.set(t);
846                 }
847
848                 @Override
849                 public String toString() {
850                     return procedure.toString();
851                 }
852
853             });
854             if(syncRequests.contains(request)) {
855                 try {
856                     //                  System.out.println("waiting " + request);
857                     request.wait();
858                 } catch (InterruptedException e) {
859                     throw new Error(e);
860                 }
861             }
862         }
863
864         Throwable t = throwable.get();
865
866         if(t != null) {
867             throw new RuntimeException("Unexpected exception in MergingGraphRequestProcessor.syncRequest(Read, AsyncProcedure)", t);
868         }
869         
870         return result.get();
871
872     }
873
874     @Override
875     public <T> void asyncRequest(Read<T> request) {
876
877         asyncRequest(request, new ProcedureAdapter<T>() {
878
879             @Override
880             public void exception(Throwable t) {
881                 Logger.defaultLogError(t);
882             }
883
884         });
885
886     }
887
888     @Override
889     public synchronized <T> void asyncRequest(Read<T> request,
890             AsyncProcedure<T> procedure) {
891
892         if (requestSet.contains(request))
893             return;
894
895         Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
896         requestQueue.add(pair);
897         requestSet.add(pair);
898
899         if (!hasAlreadyRequest) {
900             newTransaction();
901             hasAlreadyRequest = true;
902         } else {
903             notify();
904         }
905
906     }
907
908     @Override
909     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) {
910         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
911     }
912
913     @Override
914     public <T> Collection<T> syncRequest(final MultiRead<T> request) throws DatabaseException {
915         assert(request != null);
916
917         final ArrayList<T> result = new ArrayList<T>();
918         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
919
920         syncRequest(request, new AsyncMultiProcedureAdapter<T>() {
921
922             @Override
923             public void execute(AsyncReadGraph graph, T t) {
924                 synchronized(result) {
925                     result.add(t);
926                 }
927             }
928
929             @Override
930             public void exception(AsyncReadGraph graph, Throwable t) {
931                 exception.set(t);
932             }
933
934             @Override
935             public String toString() {
936                 return "syncRequest(MultiRead) -> " + request;
937             }
938
939         });
940
941         Throwable t = exception.get();
942         if(t != null) {
943             if(t instanceof DatabaseException) throw (DatabaseException)t;
944             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
945         }
946
947         return result;
948     }
949
950     @Override
951     public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiProcedure<T> procedure)  {
952         throw new UnsupportedOperationException("Not implemented");
953     }
954
955     @Override
956     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
957         throw new UnsupportedOperationException("Not implemented");
958     }
959
960     @Override
961     public <T> void asyncRequest(Read<T> request, Procedure<T> procedure) {
962         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
963     }
964
965     @Override
966     public <T> void asyncRequest(MultiRead<T> request) {
967         throw new UnsupportedOperationException("Not implemented");
968     }
969
970     @Override
971     public <T> void asyncRequest(MultiRead<T> request, AsyncMultiProcedure<T> procedure) {
972         throw new UnsupportedOperationException("Not implemented");
973     }
974
975     @Override
976     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
977         throw new UnsupportedOperationException("Not implemented");
978     }
979
980     @Override
981     public void asyncRequest(Write r) {
982         asyncRequest(r, null);
983     }
984
985     @Override
986     public void asyncRequest(DelayedWrite r) {
987         asyncRequest(r, null);
988     }
989
990     @Override
991     public void asyncRequest(WriteOnly r) {
992         asyncRequest(r, null);
993     }
994
995     @Override
996     public <T> T getService(Class<T> api) {
997         return getSession().getService(api);
998     }
999
1000     @Override
1001     public <T> T peekService(Class<T> api) {
1002         return getSession().peekService(api);
1003     }
1004
1005     @Override
1006     public boolean hasService(Class<?> api) {
1007         return getSession().hasService(api);
1008     }
1009
1010     @Override
1011     public <T> void registerService(Class<T> api, T service) {
1012         getSession().registerService(api, service);
1013     }
1014
1015 //    @Override
1016 //    public <T> T syncRequest(Read<T> arg0, AsyncListener<T> arg1) {
1017 //        throw new UnsupportedOperationException("Not implemented.");
1018 //    }
1019
1020     @Override
1021     public <T> T syncRequest(Read<T> arg0, SyncListener<T> arg1) {
1022         throw new UnsupportedOperationException("Not implemented.");
1023     }
1024
1025     @Override
1026     public <T> T syncRequest(Read<T> arg0, Listener<T> arg1) {
1027         throw new UnsupportedOperationException("Not implemented.");
1028     }
1029
1030     @Override
1031     public <T> T syncRequest(Read<T> arg0, SyncProcedure<T> arg1) {
1032         throw new UnsupportedOperationException("Not implemented.");
1033     }
1034
1035     @Override
1036     public <T> T syncRequest(AsyncRead<T> arg0, AsyncListener<T> arg1) {
1037         throw new UnsupportedOperationException("Not implemented.");
1038     }
1039
1040     @Override
1041     public <T> T syncRequest(AsyncRead<T> arg0, SyncListener<T> arg1) {
1042         throw new UnsupportedOperationException("Not implemented.");
1043
1044     }
1045
1046     @Override
1047     public <T> T syncRequest(AsyncRead<T> arg0, Listener<T> arg1) {
1048         throw new UnsupportedOperationException("Not implemented.");
1049
1050     }
1051
1052     @Override
1053     public <T> T syncRequest(AsyncRead<T> arg0, SyncProcedure<T> arg1) {
1054         throw new UnsupportedOperationException("Not implemented.");
1055
1056     }
1057
1058     @Override
1059     public <T> void asyncRequest(Read<T> arg0, AsyncListener<T> arg1) {
1060         throw new UnsupportedOperationException("Not implemented.");
1061
1062     }
1063
1064     @Override
1065     public <T> void asyncRequest(Read<T> arg0, SyncListener<T> arg1) {
1066         throw new UnsupportedOperationException("Not implemented.");
1067
1068     }
1069
1070     @Override
1071     public <T> void asyncRequest(Read<T> arg0, Listener<T> arg1) {
1072         throw new UnsupportedOperationException("Not implemented.");
1073
1074     }
1075
1076     @Override
1077     public <T> void asyncRequest(Read<T> arg0, SyncProcedure<T> arg1) {
1078         throw new UnsupportedOperationException("Not implemented.");
1079
1080     }
1081
1082     @Override
1083     public <T> void asyncRequest(AsyncRead<T> arg0, AsyncListener<T> arg1) {
1084         throw new UnsupportedOperationException("Not implemented.");
1085
1086     }
1087
1088     @Override
1089     public <T> void asyncRequest(AsyncRead<T> arg0, SyncListener<T> arg1) {
1090         throw new UnsupportedOperationException("Not implemented.");
1091
1092     }
1093
1094     @Override
1095     public <T> void asyncRequest(AsyncRead<T> arg0, Listener<T> arg1) {
1096         throw new UnsupportedOperationException("Not implemented.");
1097
1098     }
1099
1100     @Override
1101     public <T> void asyncRequest(AsyncRead<T> arg0, SyncProcedure<T> arg1) {
1102         throw new UnsupportedOperationException("Not implemented.");
1103
1104     }
1105
1106     @Override
1107     public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiListener<T> arg1) {
1108         throw new UnsupportedOperationException("Not implemented.");
1109
1110     }
1111
1112     @Override
1113     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {
1114         throw new UnsupportedOperationException("Not implemented.");
1115
1116     }
1117
1118     @Override
1119     public <T> Collection<T> syncRequest(MultiRead<T> arg0, MultiListener<T> arg1) {
1120         throw new UnsupportedOperationException("Not implemented.");
1121
1122     }
1123
1124     @Override
1125     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
1126         throw new UnsupportedOperationException("Not implemented.");
1127
1128     }
1129
1130     @Override
1131     public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0,
1132             AsyncMultiListener<T> arg1) {
1133         throw new UnsupportedOperationException("Not implemented.");
1134
1135     }
1136
1137     @Override
1138     public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0, SyncMultiListener<T> arg1) {
1139         throw new UnsupportedOperationException("Not implemented.");
1140
1141     }
1142
1143     @Override
1144     public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0,
1145             MultiListener<T> arg1) {
1146         throw new UnsupportedOperationException("Not implemented.");
1147
1148     }
1149
1150     @Override
1151     public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
1152         throw new UnsupportedOperationException("Not implemented.");
1153
1154     }
1155
1156     @Override
1157     public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiListener<T> arg1) {
1158         throw new UnsupportedOperationException("Not implemented.");
1159
1160     }
1161
1162     @Override
1163     public <T> void asyncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {
1164         throw new UnsupportedOperationException("Not implemented.");
1165
1166     }
1167
1168     @Override
1169     public <T> void asyncRequest(MultiRead<T> arg0, MultiListener<T> arg1) {
1170         throw new UnsupportedOperationException("Not implemented.");
1171
1172     }
1173
1174     @Override
1175     public <T> void asyncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
1176         throw new UnsupportedOperationException("Not implemented.");
1177
1178     }
1179
1180     @Override
1181     public <T> void asyncRequest(AsyncMultiRead<T> arg0,
1182             AsyncMultiListener<T> arg1) {
1183         throw new UnsupportedOperationException("Not implemented.");
1184     }
1185
1186     @Override
1187     public <T> void asyncRequest(AsyncMultiRead<T> arg0, SyncMultiListener<T> arg1) {
1188         throw new UnsupportedOperationException("Not implemented.");
1189
1190     }
1191
1192     @Override
1193     public <T> void asyncRequest(AsyncMultiRead<T> arg0,
1194             MultiListener<T> arg1) {
1195         throw new UnsupportedOperationException("Not implemented.");
1196
1197     }
1198
1199     @Override
1200     public <T> void asyncRequest(AsyncMultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
1201         throw new UnsupportedOperationException("Not implemented.");
1202     }
1203
1204     @Override
1205     public <T> void asyncRequest(ExternalRead<T> request, Procedure<T> procedure) {
1206         throw new UnsupportedOperationException("Not implemented.");
1207     }
1208
1209     @Override
1210     public <T> T syncRequest(ExternalRead<T> request) {
1211         throw new UnsupportedOperationException("Not implemented.");
1212     }
1213
1214     @Override
1215     public <T> T syncRequest(ExternalRead<T> request, Listener<T> procedure) {
1216         throw new UnsupportedOperationException("Not implemented.");
1217     }
1218
1219     @Override
1220     public <T> T syncRequest(ExternalRead<T> request, Procedure<T> procedure) {
1221         throw new UnsupportedOperationException("Not implemented.");
1222     }
1223
1224     @Override
1225     public <T> void asyncRequest(ExternalRead<T> request) {
1226         throw new UnsupportedOperationException("Not implemented.");
1227     }
1228
1229     @Override
1230     public <T> void asyncRequest(ExternalRead<T> request, Listener<T> procedure) {
1231         throw new UnsupportedOperationException("Not implemented.");
1232     }
1233
1234     @Override
1235     public void syncRequest(DelayedWrite request) throws DatabaseException {
1236         throw new UnsupportedOperationException("Not implemented.");
1237     }
1238
1239         @Override
1240         public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1241                 throw new UnsupportedOperationException();
1242         }
1243
1244         @Override
1245         public <T> T syncRequest(DelayedWriteResult<T> request)
1246                         throws DatabaseException {
1247                 throw new UnsupportedOperationException();
1248         }
1249
1250         @Override
1251         public <T> T syncRequest(WriteOnlyResult<T> r) throws DatabaseException {
1252                 throw new UnsupportedOperationException();
1253         }
1254
1255         @Override
1256         public <T> void asyncRequest(WriteResult<T> r, Procedure<T> procedure) {
1257                 throw new UnsupportedOperationException();
1258         }
1259
1260         @Override
1261         public <T> void asyncRequest(DelayedWriteResult<T> r, Procedure<T> procedure) {
1262                 throw new UnsupportedOperationException();
1263         }
1264
1265         @Override
1266         public <T> void asyncRequest(WriteOnlyResult<T> r, Procedure<T> procedure) {
1267                 throw new UnsupportedOperationException();
1268         }
1269         
1270         @Override
1271         public Resource getRootLibrary() {
1272                 return processor.getRootLibrary();
1273         }
1274
1275         @Override
1276         public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
1277                 throw new UnsupportedOperationException();
1278         }
1279
1280         @Override
1281         public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
1282                 throw new UnsupportedOperationException();
1283         }
1284
1285         @Override
1286         public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
1287                 throw new UnsupportedOperationException();
1288         }
1289
1290         @Override
1291         public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
1292                 throw new UnsupportedOperationException();
1293         }
1294
1295         @Override
1296         public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
1297                 throw new UnsupportedOperationException();
1298         }
1299
1300         @Override
1301         public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
1302                 throw new UnsupportedOperationException();
1303         }
1304         
1305         @Override
1306         public <T> T sync(ReadInterface<T> r) throws DatabaseException {
1307                 throw new UnsupportedOperationException();
1308         }
1309         
1310         @Override
1311         public <T> T sync(WriteInterface<T> r) throws DatabaseException {
1312                 throw new UnsupportedOperationException();
1313         }
1314         
1315         @Override
1316         public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
1317                 throw new UnsupportedOperationException();
1318         }
1319
1320         @Override
1321         public <T> void async(WriteInterface<T> r) {
1322                 throw new UnsupportedOperationException();
1323         }
1324         
1325         @Override
1326         public Object getModificationCounter() {
1327                 throw new UnsupportedOperationException();
1328         }
1329         
1330 }