]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.common/src/org/simantics/db/common/processor/MergingGraphRequestProcessor.java
Direct and synchronization-free access to Layer0 resource class for DB
[simantics/platform.git] / bundles / org.simantics.db.common / src / org / simantics / db / common / processor / MergingGraphRequestProcessor.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.common.processor;
13
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.LinkedList;
19 import java.util.Set;
20 import java.util.UUID;
21 import java.util.concurrent.Semaphore;
22 import java.util.function.Consumer;
23
24 import org.simantics.db.AsyncReadGraph;
25 import org.simantics.db.AsyncRequestProcessor;
26 import org.simantics.db.ReadGraph;
27 import org.simantics.db.Resource;
28 import org.simantics.db.Session;
29 import org.simantics.db.WriteGraph;
30 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
31 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
32 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
33 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
34 import org.simantics.db.common.request.ReadRequest;
35 import org.simantics.db.common.request.WriteRequest;
36 import org.simantics.db.exception.CancelTransactionException;
37 import org.simantics.db.exception.DatabaseException;
38 import org.simantics.db.procedure.AsyncListener;
39 import org.simantics.db.procedure.AsyncMultiListener;
40 import org.simantics.db.procedure.AsyncMultiProcedure;
41 import org.simantics.db.procedure.AsyncProcedure;
42 import org.simantics.db.procedure.Listener;
43 import org.simantics.db.procedure.MultiListener;
44 import org.simantics.db.procedure.MultiProcedure;
45 import org.simantics.db.procedure.Procedure;
46 import org.simantics.db.procedure.SyncListener;
47 import org.simantics.db.procedure.SyncMultiListener;
48 import org.simantics.db.procedure.SyncMultiProcedure;
49 import org.simantics.db.procedure.SyncProcedure;
50 import org.simantics.db.request.AsyncMultiRead;
51 import org.simantics.db.request.AsyncRead;
52 import org.simantics.db.request.DelayedWrite;
53 import org.simantics.db.request.DelayedWriteResult;
54 import org.simantics.db.request.ExternalRead;
55 import org.simantics.db.request.MultiRead;
56 import org.simantics.db.request.Read;
57 import org.simantics.db.request.ReadInterface;
58 import org.simantics.db.request.Write;
59 import org.simantics.db.request.WriteInterface;
60 import org.simantics.db.request.WriteOnly;
61 import org.simantics.db.request.WriteOnlyResult;
62 import org.simantics.db.request.WriteResult;
63 import org.simantics.utils.DataContainer;
64 import org.simantics.utils.datastructures.Pair;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 public class MergingGraphRequestProcessor implements AsyncRequestProcessor {
69
70     private static final Logger LOGGER = LoggerFactory.getLogger(MergingGraphRequestProcessor.class);
71
72     private static class SyncWriteRequestAdapter implements Write {
73
74         private Semaphore semaphore = new Semaphore(0);
75         private Object request;
76         private Throwable exception;
77         SyncWriteRequestAdapter(Write r) {
78             this.request = r;
79         }
80         SyncWriteRequestAdapter(WriteOnly r) {
81             this.request = r;
82         }
83 //        @Override
84 //        public GraphRequestStatus perform(Graph g) throws Exception {
85 //            return perform((ReadGraph)g);
86 //        }
87         @Override
88         public void perform(WriteGraph g) throws DatabaseException, CancelTransactionException {
89             if(request instanceof Write) {
90                 ((Write)request).perform(g);
91             } else if(request instanceof DelayedWrite) {
92                     ((DelayedWrite)request).perform(g);
93             } else {
94                 ((WriteOnly)request).perform(g);
95             }
96         }
97 //        @Override
98 //        public String getId() {
99 //            if(request instanceof WriteGraphRequest) {
100 //                return ((WriteGraphRequest)request).getId();
101 //            } else {
102 //                return null;
103 //            }
104 //        }
105 //        @Override
106 //        public void requestCompleted(GraphRequestStatus status) {
107 //            if(request instanceof WriteGraphRequest) {
108 //                ((WriteGraphRequest)request).requestCompleted(status);
109 //            } else {
110 //            }
111 //        }
112 //        @Override
113 //        public void handleException(Throwable e) {
114 //            this.exception = e;
115 //            if(request instanceof WriteGraphRequest) {
116 //                ((WriteGraphRequest)request).handleException(e);
117 //            }
118 //        }
119
120         public void throwOrWrapException() {
121             if (exception == null)
122                 return;
123             if (exception instanceof RuntimeException)
124                 throw (RuntimeException) exception;
125             if (exception instanceof Error)
126                 throw (Error) exception;
127             throw new RuntimeException("See cause for the real exception.", exception);
128         }
129
130         public void acquire() {
131             try {
132                 semaphore.acquire();
133             } catch (InterruptedException e) {
134                 LOGGER.error("SyncWriteRequestAdapter interrupted", e);
135             }
136         }
137
138         public void release() {
139             semaphore.release();
140         }
141
142         @Override
143         public String toString() {
144             return "SyncWriteRequestAdapter " + request;
145         }
146
147     }
148
149     long transactionKeepalivePeriod;
150
151     /**
152      * Synchronization object for implementing {@link #synchronize()}.
153      * {@link Object#notifyAll()} is invoked for this lock object every time a
154      * single transaction is completed, thereby releasing all waiters in
155      * {@link #synchronize()}.
156      */
157     Object barrier = new Object();
158
159     Set<Pair<Object, Object>> requestSet = new HashSet<Pair<Object, Object>>();
160     LinkedList<Pair<Object, Object>> requestQueue = new LinkedList<Pair<Object, Object>>();
161     boolean hasAlreadyRequest = false;
162
163     /**
164      * A set of requests which {@link #synchronize()} is depending on at the
165      * moment. Every time a request within this set is completed, some thread in
166      * {@link #synchronize()} should be released.
167      */
168 //    Set<Object> barrierRequests = new HashSet<Object>();
169     Set<Object> syncRequests = new HashSet<Object>();
170
171     private String name;
172
173     private AsyncRequestProcessor processor;
174
175     public MergingGraphRequestProcessor(String name, AsyncRequestProcessor processor, long transactionKeepalivePeriod) {
176         this.name = name;
177         this.processor = processor;
178         this.transactionKeepalivePeriod = transactionKeepalivePeriod;
179     }
180
181     public MergingGraphRequestProcessor(AsyncRequestProcessor processor, long transactionKeepalivePeriod) {
182         this.name = "MergingGraphRequestProcessor" + UUID.randomUUID().toString();
183         this.processor = processor;
184         this.transactionKeepalivePeriod = transactionKeepalivePeriod;
185     }
186
187     @SuppressWarnings({"unchecked", "rawtypes"})
188     protected class MergedRead extends ReadRequest {
189
190         Pair<Object, Object> currentRequest;
191
192 //        RunnerReadGraphRequest(GraphRequestProcessor processor) {
193 //            super(processor);
194 //        }
195 //
196 //        @Override
197 //        public void completed(boolean value) {
198 ////            System.out.println(this + "MGRP read completed");
199 ////            synchronized (MergingGraphRequestProcessor.this) {
200 ////                if (requestQueue.isEmpty())
201 ////                    hasAlreadyRequest = false;
202 ////                else
203 ////                    newTransaction();
204 ////            }
205 //        }
206
207         @Override
208         public void run(ReadGraph graph) {
209
210 //            System.out.println(MergingGraphRequestProcessor.this + " reads");
211
212             while (true) {
213
214                 synchronized (MergingGraphRequestProcessor.this) {
215
216                     // Release #synchronize() invokers if necessary.
217 //                    if (currentRequest != null && barrierRequests.contains(currentRequest)) {
218 //                        synchronized (barrier) {
219 //                            barrier.notifyAll();
220 //                        }
221 //                    }
222
223                     if(requestQueue.isEmpty()) {
224                         if (transactionKeepalivePeriod > 0) {
225 //                            System.out.println("MGRP [" + MergingGraphRequestProcessor.this + "] waits " + transactionKeepalivePeriod + " ms. in " + Thread.currentThread() );
226                             try {
227                                 MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);
228                             } catch (InterruptedException e) {
229                                 LOGGER.error("MergedRead interrupted", e);
230                             }
231                             if (requestQueue.isEmpty())
232                                 break;
233                         } else
234                             break;
235                     }
236
237                     Object nextRequest = requestQueue.peekFirst().first;
238                     if(nextRequest instanceof Write || nextRequest instanceof DelayedWrite) {
239                         break;
240                     }
241
242                     currentRequest = requestQueue.remove(0);
243                     requestSet.remove(currentRequest);
244
245                 }
246
247 //                ReadGraphRequest req = (ReadGraphRequest)currentRequest.first;
248
249                 if( syncRequests.contains(currentRequest.first)) {
250
251                     try {
252
253                         if(currentRequest.second instanceof AsyncProcedure<?>) {
254                             if(currentRequest.first instanceof Read) {
255                                 Read req = (Read)currentRequest.first;
256                                 graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
257                             } else {
258                                 AsyncRead req = (AsyncRead)currentRequest.first;
259                                 graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
260                             }
261                         } else {
262                             AsyncMultiRead req = (AsyncMultiRead)currentRequest.first;
263                             graph.syncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);
264                         }
265
266                     } catch(Throwable t) {
267
268                         LOGGER.error("MergedRead failed", t);
269
270 //                        if(currentRequest.second instanceof AsyncProcedure<?>) {
271 //                            ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
272 //                        } else {
273 //                            ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
274 //                        }
275
276                     }
277
278                     synchronized (currentRequest.first) {
279                         syncRequests.remove(currentRequest.first);
280 //                      System.out.println("notifying " + currentRequest.first);
281                         currentRequest.first.notify();
282                     }
283
284
285                 } else {
286
287                     try {
288
289                         if(currentRequest.second instanceof AsyncProcedure<?>) {
290                             if(currentRequest.first instanceof AsyncRead) {
291                                 AsyncRead req = (AsyncRead)currentRequest.first;
292                                 graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
293                             } else {
294                                 Read req = (Read)currentRequest.first;
295                                 graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
296                             }
297                         } else {
298                             AsyncMultiRead req = (AsyncMultiRead)currentRequest.first;
299                             graph.syncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);
300                         }
301
302                     } catch(Throwable t) {
303
304                         LOGGER.error("MergedRead failed", t);
305
306 //                        if(currentRequest.second instanceof AsyncProcedure<?>) {
307 //                            ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
308 //                        } else {
309 //                            ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
310 //                        }
311
312                     }
313                 }
314
315             }
316
317 //            System.out.println(MergingGraphRequestProcessor.this + " read completed");
318
319             synchronized (MergingGraphRequestProcessor.this) {
320                 if (requestQueue.isEmpty())
321                     hasAlreadyRequest = false;
322                 else
323                     newTransaction();
324             }
325
326         }
327
328         @Override
329         public String toString() {
330             return "MergedRead[" + requestQueue.size() + " requests]";
331         }
332
333     }
334
335     protected class RunnerWriteGraphRequest extends WriteRequest {
336
337         Pair<Object, Object> currentRequest;
338         HashMap<String, String> metadata = new HashMap<String, String>();
339
340         @Override
341         public void perform(WriteGraph graph) throws DatabaseException {
342
343 //            System.out.println(MergingGraphRequestProcessor.this + " writes");
344
345             while (true) {
346
347                 synchronized (MergingGraphRequestProcessor.this) {
348
349                     // Release #synchronize() invokers if necessary.
350 //                    if (currentRequest != null && barrierRequests.contains(currentRequest)) {
351 //                        synchronized (barrier) {
352 //                            barrier.notifyAll();
353 //                        }
354 //                    }
355
356                     if(requestQueue.isEmpty()) {
357                         if (transactionKeepalivePeriod > 0) {
358                             try {
359                                 MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);
360                             } catch (InterruptedException e) {
361                                 LOGGER.error("RunnerWriteGraphRequest interrupted", e);
362                             }
363                             if (requestQueue.isEmpty())
364                                 break;
365                         } else
366                             break;
367                     }
368
369                     Object nextRequest = requestQueue.peekFirst().first;
370                     if(nextRequest instanceof AsyncMultiRead || nextRequest instanceof AsyncRead || nextRequest instanceof Read) {
371                         break;
372                     }
373
374                     currentRequest = requestQueue.remove(0);
375                     requestSet.remove(currentRequest);
376
377                 }
378
379                 @SuppressWarnings("unchecked")
380                 Consumer<Throwable> callback = (Consumer<Throwable>)currentRequest.second;
381
382                 if (currentRequest.first.getClass().equals(SyncWriteRequestAdapter.class)) {
383
384                     SyncWriteRequestAdapter adapter = (SyncWriteRequestAdapter)currentRequest.first;
385
386                     try {
387 //                        System.out.println("merg.sync " + adapter);
388                         graph.syncRequest(adapter);
389                         if(callback != null) callback.accept(null);
390                     } catch(Throwable t) {
391                         LOGGER.error("RunnerWriteGraphRequest failed", t);
392                         if(callback != null) callback.accept(t);
393                     }
394
395                     adapter.release();
396 //                    System.out.println("merg.sync.release " + adapter);
397
398                 } else {
399
400                     try {
401                         if(currentRequest.first instanceof Write) graph.syncRequest((Write)currentRequest.first); 
402                         else if(currentRequest.first instanceof DelayedWrite) graph.syncRequest((DelayedWrite)currentRequest.first); 
403                         if(callback != null) callback.accept(null);
404                     } catch(Throwable t) {
405                         LOGGER.error("RunnerWriteGraphRequest failed", t);
406                         if(callback != null) callback.accept(t);
407                     }
408
409                 }
410
411             }
412
413 //            System.out.println(MergingGraphRequestProcessor.this + " write completed");
414
415             synchronized (MergingGraphRequestProcessor.this) {
416                 if (requestQueue.isEmpty())
417                     hasAlreadyRequest = false;
418                 else
419                     newTransaction();
420             }
421
422         }
423
424     }
425
426     private void newTransaction() {
427
428         boolean write = false;
429
430         synchronized (MergingGraphRequestProcessor.this) {
431             assert(!requestQueue.isEmpty());
432             Object nextRequest = requestQueue.peekFirst().first;
433             write = (nextRequest instanceof Write || nextRequest instanceof DelayedWrite);
434         }
435
436         if(write) {
437             processor.asyncRequest(new RunnerWriteGraphRequest(), null);
438         } else {
439             processor.asyncRequest(new MergedRead());
440         }
441
442     }
443
444     @Override
445     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiProcedure<T> procedure) {
446
447 //        System.out.println(this + " asyncRequest(ReadGraphRequest<QueryProcedure4<T>> request, QueryProcedure4<T> procedure)");
448
449         Pair<Object, Object> pair = Pair.make(request, procedure);
450         if (requestSet.contains(pair))
451             return;
452
453         requestQueue.add(pair);
454         requestSet.add(pair);
455
456         if (!hasAlreadyRequest) {
457             newTransaction();
458             hasAlreadyRequest = true;
459         } else {
460             notify();
461         }
462
463     }
464
465     @Override
466     public synchronized <T> void asyncRequest(AsyncRead<T> request, AsyncProcedure<T> procedure) {
467
468 //        System.out.println(this + " asyncRequest(ReadGraphRequest<SingleQueryProcedure4<T>> request, SingleQueryProcedure4<T> procedure) " + this);
469
470         Pair<Object, Object> pair = Pair.make(request, procedure);
471         if (requestSet.contains(pair))
472             return;
473
474         requestQueue.add(pair);
475         requestSet.add(pair);
476
477         if (!hasAlreadyRequest) {
478             newTransaction();
479             hasAlreadyRequest = true;
480         } else {
481 //            System.out.println("notify " + this);
482             notify();
483         }
484
485     }
486
487     @Override
488     public synchronized void asyncRequest(Write request, Consumer<DatabaseException> callback) {
489
490 //        System.out.println(this + " asyncRequest(WriteGraphRequest request)");
491
492         Pair<Object, Object> pair = Pair.make(request, callback);
493         if (requestSet.contains(pair))
494             return;
495
496         requestQueue.add(pair);
497         requestSet.add(pair);
498
499         if (!hasAlreadyRequest) {
500 //            System.out.println("new transaction");
501             newTransaction();
502             hasAlreadyRequest = true;
503         } else {
504 //            System.out.println("notify");
505             notify();
506         }
507
508     }
509
510     @Override
511     public synchronized void asyncRequest(DelayedWrite request, Consumer<DatabaseException> callback) {
512
513 //        System.out.println(this + " asyncRequest(WriteGraphRequest request)");
514
515         Pair<Object, Object> pair = Pair.make(request, callback);
516         if (requestSet.contains(pair))
517             return;
518
519         requestQueue.add(pair);
520         requestSet.add(pair);
521
522         if (!hasAlreadyRequest) {
523 //            System.out.println("new transaction");
524             newTransaction();
525             hasAlreadyRequest = true;
526         } else {
527 //            System.out.println("notify");
528             notify();
529         }
530
531     }
532
533     @Override
534     public synchronized void asyncRequest(WriteOnly request, Consumer<DatabaseException> callback) {
535
536 //        System.out.println(this + " asyncRequest(WriteGraphRequest request)");
537
538         Pair<Object, Object> pair = Pair.make(request, callback);
539         if (requestSet.contains(pair))
540             return;
541
542         requestQueue.add(pair);
543         requestSet.add(pair);
544
545         if (!hasAlreadyRequest) {
546 //            System.out.println("new transaction");
547             newTransaction();
548             hasAlreadyRequest = true;
549         } else {
550 //            System.out.println("notify");
551             notify();
552         }
553
554     }
555
556     @Override
557     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
558
559         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
560
561         // Queue the adapter up for execution.
562         synchronized (request) {
563             syncRequests.add(request);
564             asyncRequest(request, procedure);
565             if(syncRequests.contains(request)) {
566                 try {
567                     //                  System.out.println("waiting " + request);
568                     request.wait();
569                 } catch (InterruptedException e) {
570                     throw new Error(e);
571                 }
572             }
573         }
574
575         Throwable t = throwable.get();
576
577         if(t != null) {
578             LOGGER.error("syncRequest(AsyncMultiRead, AsyncMultiProcedure) failed", t);
579             throw new RuntimeException(t.getMessage());
580         }
581         
582         return null;
583
584     }
585
586     @Override
587     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) {
588
589 //        System.out.println("syncRequest(ReadGraphRequest<SingleQueryProcedure4<T>> request, SingleQueryProcedure4<T> procedure)");
590
591         final DataContainer<T> result = new DataContainer<T>(null);
592         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
593
594         // Queue the adapter up for execution.
595         synchronized (request) {
596
597             syncRequests.add(request);
598             asyncRequest(request, new AsyncProcedure<T>() {
599
600                 public void execute(AsyncReadGraph graph, T t) {
601                     synchronized(result) {
602                         result.set(t);
603                     }
604                     procedure.execute(graph, t);
605                 };
606
607                 @Override
608                 public void exception(AsyncReadGraph graph, Throwable t) {
609                     throwable.set(t);
610                 }
611
612                 @Override
613                 public String toString() {
614                     return procedure.toString();
615                 }
616
617             });
618             if(syncRequests.contains(request)) {
619                 try {
620                     //                  System.out.println("waiting " + request);
621                     request.wait();
622                 } catch (InterruptedException e) {
623                     throw new Error(e);
624                 }
625             }
626         }
627
628         Throwable t = throwable.get();
629
630         if(t != null) {
631             LOGGER.error("syncRequest(AsyncRead, AsyncProcedure) failed", t);
632             throw new RuntimeException(t.getMessage());
633         }
634         
635         return result.get();
636
637         //return result.get();
638
639     }
640
641
642     @Override
643     public void syncRequest(Write request) {
644
645 //        System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)");
646
647         SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request);
648
649         asyncRequest(adapter, null);
650
651         adapter.acquire();
652
653         // Throw exception if one occurred.
654         adapter.throwOrWrapException();
655
656     }
657
658     @Override
659     public void syncRequest(WriteOnly request) {
660
661 //        System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)");
662
663         SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request);
664
665         // Queue the adapter up for execution.
666         synchronized (adapter) {
667             asyncRequest(adapter, null);
668             try {
669                 adapter.wait();
670             } catch (InterruptedException e) {
671                 throw new Error(e);
672             }
673         }
674
675         // Throw exception if one occurred.
676         adapter.throwOrWrapException();
677
678     }
679
680     @Override
681     public Session getSession() {
682         return processor.getSession();
683     }
684
685     @Override
686     public String toString() {
687         return "MergingGraphRequestProcessor[" + name + "]@" + System.identityHashCode(this) + " (based on " + processor + ")";
688     }
689
690     @Override
691     public <T> void asyncRequest(AsyncRead<T> request) {
692
693         asyncRequest(request, new ProcedureAdapter<T>() {
694
695             @Override
696             public void exception(Throwable t) {
697                 LOGGER.error("asyncRequest(AsyncRead) failed", t);
698             }
699
700         });
701
702     }
703
704     @Override
705     public <T> void asyncRequest(AsyncRead<T> request, Procedure<T> procedure) {
706         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
707     }
708
709     @Override
710     public <T> void asyncRequest(AsyncMultiRead<T> request) {
711         throw new UnsupportedOperationException("Not implemented.");
712     }
713
714     @Override
715     public <T> void asyncRequest(AsyncMultiRead<T> request,
716             MultiProcedure<T> procedure) {
717         throw new UnsupportedOperationException("Not implemented.");
718     }
719
720     @Override
721     public <T> T syncRequest(AsyncRead<T> request) {
722
723         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
724         final DataContainer<T> result = new DataContainer<T>();
725
726         syncRequest(request, new AsyncProcedure<T>() {
727
728             public void execute(AsyncReadGraph graph, T t) {
729                 result.set(t);
730             }
731
732             @Override
733             public void exception(AsyncReadGraph graph, Throwable t) {
734                 throwable.set(t);
735             }
736
737         });
738
739         Throwable t = throwable.get();
740
741         if(t != null) {
742             LOGGER.error("syncRequest(AsyncRead) failed", t);
743             throw new RuntimeException(t.getMessage());
744         }
745
746         return result.get();
747
748     }
749
750     @Override
751     public <T> T syncRequest(AsyncRead<T> request,
752             Procedure<T> procedure) {
753         throw new UnsupportedOperationException("Not implemented.");
754     }
755
756     @Override
757     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request) {
758
759         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
760         final ArrayList<T> result = new ArrayList<T>();
761
762         syncRequest(request, new AsyncMultiProcedureAdapter<T>() {
763
764             @Override
765             public void execute(AsyncReadGraph graph, T t) {
766                 synchronized(result) {
767                     result.add(t);
768                 }
769             };
770
771             @Override
772             public void exception(AsyncReadGraph graph, Throwable t) {
773                 throwable.set(t);
774             }
775
776         });
777
778         Throwable t = throwable.get();
779
780         if(t != null) {
781             LOGGER.error("syncRequest(AsyncMultiRead) failed", t);
782             throw new RuntimeException(t.getMessage());
783         }
784
785         return result;
786
787     }
788
789     @Override
790     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request,
791             MultiProcedure<T> procedure) {
792         throw new Error("Not implemented.");
793     }
794
795     @Override
796     public <T> T syncRequest(Read<T> request) {
797
798         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
799         final DataContainer<T> result = new DataContainer<T>();
800
801
802         syncRequest(request, new Procedure<T>() {
803
804             public void execute(T t) {
805                 result.set(t);
806             }
807
808             @Override
809             public void exception(Throwable t) {
810                 throwable.set(t);
811             }
812
813         });
814
815         Throwable t = throwable.get();
816
817         if(t != null) {
818             throw new Error(t.getMessage());
819         }
820
821
822         return result.get();
823
824     }
825
826     @Override
827     public <T> T syncRequest(Read<T> request,
828             final AsyncProcedure<T> procedure) {
829
830         final DataContainer<T> result = new DataContainer<T>(null);
831         final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
832
833         // Queue the adapter up for execution.
834         synchronized (request) {
835
836             syncRequests.add(request);
837             asyncRequest(request, new AsyncProcedure<T>() {
838
839                 public void execute(AsyncReadGraph graph, T t) {
840                     synchronized(result) {
841                         result.set(t);
842                     }
843                     procedure.execute(graph, t);
844                 };
845
846                 @Override
847                 public void exception(AsyncReadGraph graph, Throwable t) {
848                     throwable.set(t);
849                 }
850
851                 @Override
852                 public String toString() {
853                     return procedure.toString();
854                 }
855
856             });
857             if(syncRequests.contains(request)) {
858                 try {
859                     //                  System.out.println("waiting " + request);
860                     request.wait();
861                 } catch (InterruptedException e) {
862                     throw new Error(e);
863                 }
864             }
865         }
866
867         Throwable t = throwable.get();
868
869         if(t != null) {
870             throw new RuntimeException("Unexpected exception in MergingGraphRequestProcessor.syncRequest(Read, AsyncProcedure)", t);
871         }
872         
873         return result.get();
874
875     }
876
877     @Override
878     public <T> void asyncRequest(Read<T> request) {
879
880         asyncRequest(request, new ProcedureAdapter<T>() {
881
882             @Override
883             public void exception(Throwable t) {
884                 LOGGER.error("asyncRequest(Read) failed", t);
885             }
886
887         });
888
889     }
890
891     @Override
892     public synchronized <T> void asyncRequest(Read<T> request,
893             AsyncProcedure<T> procedure) {
894
895         Pair<Object, Object> pair = Pair.make(request, procedure);
896         if (requestSet.contains(pair))
897             return;
898
899         requestQueue.add(pair);
900         requestSet.add(pair);
901
902         if (!hasAlreadyRequest) {
903             newTransaction();
904             hasAlreadyRequest = true;
905         } else {
906             notify();
907         }
908
909     }
910
911     @Override
912     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) {
913         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
914     }
915
916     @Override
917     public <T> Collection<T> syncRequest(final MultiRead<T> request) throws DatabaseException {
918         assert(request != null);
919
920         final ArrayList<T> result = new ArrayList<T>();
921         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
922
923         syncRequest(request, new SyncMultiProcedureAdapter<T>() {
924
925             @Override
926             public void execute(ReadGraph graph, T t) {
927                 synchronized(result) {
928                     result.add(t);
929                 }
930             }
931
932             @Override
933             public void exception(ReadGraph graph, Throwable t) {
934                 exception.set(t);
935             }
936
937             @Override
938             public String toString() {
939                 return "syncRequest(MultiRead) -> " + request;
940             }
941
942         });
943
944         Throwable t = exception.get();
945         if(t != null) {
946             if(t instanceof DatabaseException) throw (DatabaseException)t;
947             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
948         }
949
950         return result;
951     }
952
953     @Override
954     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
955         throw new UnsupportedOperationException("Not implemented");
956     }
957
958     @Override
959     public <T> void asyncRequest(Read<T> request, Procedure<T> procedure) {
960         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
961     }
962
963     @Override
964     public <T> void asyncRequest(MultiRead<T> request) {
965         throw new UnsupportedOperationException("Not implemented");
966     }
967
968     @Override
969     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
970         throw new UnsupportedOperationException("Not implemented");
971     }
972
973     @Override
974     public void asyncRequest(Write r) {
975         asyncRequest(r, null);
976     }
977
978     @Override
979     public void asyncRequest(DelayedWrite r) {
980         asyncRequest(r, null);
981     }
982
983     @Override
984     public void asyncRequest(WriteOnly r) {
985         asyncRequest(r, null);
986     }
987
988     @Override
989     public <T> T getService(Class<T> api) {
990         return getSession().getService(api);
991     }
992
993     @Override
994     public <T> T peekService(Class<T> api) {
995         return getSession().peekService(api);
996     }
997
998     @Override
999     public boolean hasService(Class<?> api) {
1000         return getSession().hasService(api);
1001     }
1002
1003     @Override
1004     public <T> void registerService(Class<T> api, T service) {
1005         getSession().registerService(api, service);
1006     }
1007
1008 //    @Override
1009 //    public <T> T syncRequest(Read<T> arg0, AsyncListener<T> arg1) {
1010 //        throw new UnsupportedOperationException("Not implemented.");
1011 //    }
1012
1013     @Override
1014     public <T> T syncRequest(Read<T> arg0, SyncListener<T> arg1) {
1015         throw new UnsupportedOperationException("Not implemented.");
1016     }
1017
1018     @Override
1019     public <T> T syncRequest(Read<T> arg0, Listener<T> arg1) {
1020         throw new UnsupportedOperationException("Not implemented.");
1021     }
1022
1023     @Override
1024     public <T> T syncRequest(Read<T> arg0, SyncProcedure<T> arg1) {
1025         throw new UnsupportedOperationException("Not implemented.");
1026     }
1027
1028     @Override
1029     public <T> T syncRequest(AsyncRead<T> arg0, AsyncListener<T> arg1) {
1030         throw new UnsupportedOperationException("Not implemented.");
1031     }
1032
1033     @Override
1034     public <T> T syncRequest(AsyncRead<T> arg0, SyncListener<T> arg1) {
1035         throw new UnsupportedOperationException("Not implemented.");
1036
1037     }
1038
1039     @Override
1040     public <T> T syncRequest(AsyncRead<T> arg0, Listener<T> arg1) {
1041         throw new UnsupportedOperationException("Not implemented.");
1042
1043     }
1044
1045     @Override
1046     public <T> T syncRequest(AsyncRead<T> arg0, SyncProcedure<T> arg1) {
1047         throw new UnsupportedOperationException("Not implemented.");
1048
1049     }
1050
1051     @Override
1052     public <T> void asyncRequest(Read<T> arg0, AsyncListener<T> arg1) {
1053         throw new UnsupportedOperationException("Not implemented.");
1054
1055     }
1056
1057     @Override
1058     public <T> void asyncRequest(Read<T> arg0, SyncListener<T> arg1) {
1059         throw new UnsupportedOperationException("Not implemented.");
1060
1061     }
1062
1063     @Override
1064     public <T> void asyncRequest(Read<T> arg0, Listener<T> arg1) {
1065         throw new UnsupportedOperationException("Not implemented.");
1066
1067     }
1068
1069     @Override
1070     public <T> void asyncRequest(Read<T> arg0, SyncProcedure<T> arg1) {
1071         throw new UnsupportedOperationException("Not implemented.");
1072
1073     }
1074
1075     @Override
1076     public <T> void asyncRequest(AsyncRead<T> arg0, AsyncListener<T> arg1) {
1077         throw new UnsupportedOperationException("Not implemented.");
1078
1079     }
1080
1081     @Override
1082     public <T> void asyncRequest(AsyncRead<T> arg0, SyncListener<T> arg1) {
1083         throw new UnsupportedOperationException("Not implemented.");
1084
1085     }
1086
1087     @Override
1088     public <T> void asyncRequest(AsyncRead<T> arg0, Listener<T> arg1) {
1089         throw new UnsupportedOperationException("Not implemented.");
1090
1091     }
1092
1093     @Override
1094     public <T> void asyncRequest(AsyncRead<T> arg0, SyncProcedure<T> arg1) {
1095         throw new UnsupportedOperationException("Not implemented.");
1096
1097     }
1098
1099     @Override
1100     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {
1101         throw new UnsupportedOperationException("Not implemented.");
1102
1103     }
1104
1105     @Override
1106     public <T> Collection<T> syncRequest(MultiRead<T> arg0, MultiListener<T> arg1) {
1107         throw new UnsupportedOperationException("Not implemented.");
1108
1109     }
1110
1111     @Override
1112     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
1113         throw new UnsupportedOperationException("Not implemented.");
1114
1115     }
1116
1117     @Override
1118     public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0,
1119             AsyncMultiListener<T> arg1) {
1120         throw new UnsupportedOperationException("Not implemented.");
1121
1122     }
1123
1124     @Override
1125     public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0, SyncMultiListener<T> arg1) {
1126         throw new UnsupportedOperationException("Not implemented.");
1127
1128     }
1129
1130     @Override
1131     public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0,
1132             MultiListener<T> arg1) {
1133         throw new UnsupportedOperationException("Not implemented.");
1134
1135     }
1136
1137     @Override
1138     public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
1139         throw new UnsupportedOperationException("Not implemented.");
1140
1141     }
1142
1143     @Override
1144     public <T> void asyncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {
1145         throw new UnsupportedOperationException("Not implemented.");
1146
1147     }
1148
1149     @Override
1150     public <T> void asyncRequest(MultiRead<T> arg0, MultiListener<T> arg1) {
1151         throw new UnsupportedOperationException("Not implemented.");
1152
1153     }
1154
1155     @Override
1156     public <T> void asyncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
1157         throw new UnsupportedOperationException("Not implemented.");
1158
1159     }
1160
1161     @Override
1162     public <T> void asyncRequest(AsyncMultiRead<T> arg0,
1163             AsyncMultiListener<T> arg1) {
1164         throw new UnsupportedOperationException("Not implemented.");
1165     }
1166
1167     @Override
1168     public <T> void asyncRequest(AsyncMultiRead<T> arg0, SyncMultiListener<T> arg1) {
1169         throw new UnsupportedOperationException("Not implemented.");
1170
1171     }
1172
1173     @Override
1174     public <T> void asyncRequest(AsyncMultiRead<T> arg0,
1175             MultiListener<T> arg1) {
1176         throw new UnsupportedOperationException("Not implemented.");
1177
1178     }
1179
1180     @Override
1181     public <T> void asyncRequest(AsyncMultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
1182         throw new UnsupportedOperationException("Not implemented.");
1183     }
1184
1185     @Override
1186     public <T> void asyncRequest(ExternalRead<T> request, Procedure<T> procedure) {
1187         throw new UnsupportedOperationException("Not implemented.");
1188     }
1189
1190     @Override
1191     public <T> T syncRequest(ExternalRead<T> request) {
1192         throw new UnsupportedOperationException("Not implemented.");
1193     }
1194
1195     @Override
1196     public <T> T syncRequest(ExternalRead<T> request, Listener<T> procedure) {
1197         throw new UnsupportedOperationException("Not implemented.");
1198     }
1199
1200     @Override
1201     public <T> T syncRequest(ExternalRead<T> request, Procedure<T> procedure) {
1202         throw new UnsupportedOperationException("Not implemented.");
1203     }
1204
1205     @Override
1206     public <T> void asyncRequest(ExternalRead<T> request) {
1207         throw new UnsupportedOperationException("Not implemented.");
1208     }
1209
1210     @Override
1211     public <T> void asyncRequest(ExternalRead<T> request, Listener<T> procedure) {
1212         throw new UnsupportedOperationException("Not implemented.");
1213     }
1214
1215     @Override
1216     public void syncRequest(DelayedWrite request) throws DatabaseException {
1217         throw new UnsupportedOperationException("Not implemented.");
1218     }
1219
1220         @Override
1221         public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1222                 throw new UnsupportedOperationException();
1223         }
1224
1225         @Override
1226         public <T> T syncRequest(DelayedWriteResult<T> request)
1227                         throws DatabaseException {
1228                 throw new UnsupportedOperationException();
1229         }
1230
1231         @Override
1232         public <T> T syncRequest(WriteOnlyResult<T> r) throws DatabaseException {
1233                 throw new UnsupportedOperationException();
1234         }
1235
1236         @Override
1237         public <T> void asyncRequest(WriteResult<T> r, Procedure<T> procedure) {
1238                 throw new UnsupportedOperationException();
1239         }
1240
1241         @Override
1242         public <T> void asyncRequest(DelayedWriteResult<T> r, Procedure<T> procedure) {
1243                 throw new UnsupportedOperationException();
1244         }
1245
1246         @Override
1247         public <T> void asyncRequest(WriteOnlyResult<T> r, Procedure<T> procedure) {
1248                 throw new UnsupportedOperationException();
1249         }
1250         
1251         @Override
1252         public Resource getRootLibrary() {
1253                 return processor.getRootLibrary();
1254         }
1255
1256         @Override
1257         public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
1258                 throw new UnsupportedOperationException();
1259         }
1260
1261         @Override
1262         public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
1263                 throw new UnsupportedOperationException();
1264         }
1265
1266         @Override
1267         public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
1268                 throw new UnsupportedOperationException();
1269         }
1270
1271         @Override
1272         public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
1273                 throw new UnsupportedOperationException();
1274         }
1275
1276         @Override
1277         public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
1278                 throw new UnsupportedOperationException();
1279         }
1280
1281         @Override
1282         public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
1283                 throw new UnsupportedOperationException();
1284         }
1285         
1286         @Override
1287         public <T> T sync(ReadInterface<T> r) throws DatabaseException {
1288                 throw new UnsupportedOperationException();
1289         }
1290         
1291         @Override
1292         public <T> T sync(WriteInterface<T> r) throws DatabaseException {
1293                 throw new UnsupportedOperationException();
1294         }
1295         
1296         @Override
1297         public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
1298                 throw new UnsupportedOperationException();
1299         }
1300
1301         @Override
1302         public <T> void async(WriteInterface<T> r) {
1303                 throw new UnsupportedOperationException();
1304         }
1305         
1306         @Override
1307         public Object getModificationCounter() {
1308                 throw new UnsupportedOperationException();
1309         }
1310         
1311         @Override
1312         public <T> T l0() {
1313                 return processor.l0();
1314         }
1315         
1316 }