X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.common%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fcommon%2Fprocessor%2FMergingGraphRequestProcessor.java;h=5e07bb034ecb924adc8567f99f09990e141e69ed;hp=252decbf3998d9fb21ba7585fca4d033673eb9aa;hb=d33a5a61e2c056a3d48733819257890406ee9d52;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.db.common/src/org/simantics/db/common/processor/MergingGraphRequestProcessor.java b/bundles/org.simantics.db.common/src/org/simantics/db/common/processor/MergingGraphRequestProcessor.java index 252decbf3..5e07bb034 100644 --- a/bundles/org.simantics.db.common/src/org/simantics/db/common/processor/MergingGraphRequestProcessor.java +++ b/bundles/org.simantics.db.common/src/org/simantics/db/common/processor/MergingGraphRequestProcessor.java @@ -1,1346 +1,1330 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.db.common.processor; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Semaphore; - -import org.simantics.db.AsyncReadGraph; -import org.simantics.db.AsyncRequestProcessor; -import org.simantics.db.ReadGraph; -import org.simantics.db.RequestProcessor; -import org.simantics.db.Resource; -import org.simantics.db.Session; -import org.simantics.db.VirtualGraph; -import org.simantics.db.WriteGraph; -import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; -import org.simantics.db.common.procedure.adapter.ProcedureAdapter; -import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure; -import org.simantics.db.common.request.ReadRequest; -import org.simantics.db.common.request.WriteRequest; -import org.simantics.db.common.utils.Logger; -import org.simantics.db.exception.CancelTransactionException; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.procedure.AsyncListener; -import org.simantics.db.procedure.AsyncMultiListener; -import org.simantics.db.procedure.AsyncMultiProcedure; -import org.simantics.db.procedure.AsyncProcedure; -import org.simantics.db.procedure.Listener; -import org.simantics.db.procedure.MultiListener; -import org.simantics.db.procedure.MultiProcedure; -import org.simantics.db.procedure.Procedure; -import org.simantics.db.procedure.SyncListener; -import org.simantics.db.procedure.SyncMultiListener; -import org.simantics.db.procedure.SyncMultiProcedure; -import org.simantics.db.procedure.SyncProcedure; -import org.simantics.db.request.AsyncMultiRead; -import org.simantics.db.request.AsyncRead; -import org.simantics.db.request.DelayedWrite; -import org.simantics.db.request.DelayedWriteResult; -import org.simantics.db.request.ExternalRead; -import org.simantics.db.request.MultiRead; -import org.simantics.db.request.Read; -import org.simantics.db.request.ReadInterface; -import org.simantics.db.request.UndoTraits; -import org.simantics.db.request.Write; -import org.simantics.db.request.WriteInterface; -import org.simantics.db.request.WriteOnly; -import org.simantics.db.request.WriteOnlyResult; -import org.simantics.db.request.WriteResult; -import org.simantics.utils.DataContainer; -import org.simantics.utils.datastructures.Callback; -import org.simantics.utils.datastructures.Pair; - -public class MergingGraphRequestProcessor implements RequestProcessor { - - private static class SyncWriteRequestAdapter implements Write { - - private Semaphore semaphore = new Semaphore(0); - private Object request; - private Throwable exception; - SyncWriteRequestAdapter(Write r) { - this.request = r; - } - SyncWriteRequestAdapter(WriteOnly r) { - this.request = r; - } -// @Override -// public GraphRequestStatus perform(Graph g) throws Exception { -// return perform((ReadGraph)g); -// } - @Override - public void perform(WriteGraph g) throws DatabaseException, CancelTransactionException { - if(request instanceof Write) { - ((Write)request).perform(g); - } else if(request instanceof DelayedWrite) { - ((DelayedWrite)request).perform(g); - } else { - ((WriteOnly)request).perform(g); - } - } -// @Override -// public String getId() { -// if(request instanceof WriteGraphRequest) { -// return ((WriteGraphRequest)request).getId(); -// } else { -// return null; -// } -// } -// @Override -// public void requestCompleted(GraphRequestStatus status) { -// if(request instanceof WriteGraphRequest) { -// ((WriteGraphRequest)request).requestCompleted(status); -// } else { -// } -// } -// @Override -// public void handleException(Throwable e) { -// this.exception = e; -// if(request instanceof WriteGraphRequest) { -// ((WriteGraphRequest)request).handleException(e); -// } -// } - - public void throwOrWrapException() { - if (exception == null) - return; - if (exception instanceof RuntimeException) - throw (RuntimeException) exception; - if (exception instanceof Error) - throw (Error) exception; - throw new RuntimeException("See cause for the real exception.", exception); - } - - @Override - public VirtualGraph getProvider() { - return null; - } - -// @Override -// public void fillMetadata(Map metadata) { -// } - - public void acquire() { - try { - semaphore.acquire(); - } catch (InterruptedException e) { - Logger.defaultLogError(e); - } - } - - public void release() { - semaphore.release(); - } - - @Override - public UndoTraits getUndoTraits() { - return null; - } - - @Override - public String toString() { - return "SyncWriteRequestAdapter " + request; - } - - } - - long transactionKeepalivePeriod; - - /** - * Synchronization object for implementing {@link #synchronize()}. - * {@link Object#notifyAll()} is invoked for this lock object every time a - * single transaction is completed, thereby releasing all waiters in - * {@link #synchronize()}. - */ - Object barrier = new Object(); - - Set> requestSet = new HashSet>(); - LinkedList> requestQueue = new LinkedList>(); - boolean hasAlreadyRequest = false; - - /** - * A set of requests which {@link #synchronize()} is depending on at the - * moment. Every time a request within this set is completed, some thread in - * {@link #synchronize()} should be released. - */ -// Set barrierRequests = new HashSet(); - Set syncRequests = new HashSet(); - - private String name; - - private AsyncRequestProcessor processor; - - public MergingGraphRequestProcessor(String name, AsyncRequestProcessor processor, long transactionKeepalivePeriod) { - this.name = name; - this.processor = processor; - this.transactionKeepalivePeriod = transactionKeepalivePeriod; - } - - public MergingGraphRequestProcessor(AsyncRequestProcessor processor, long transactionKeepalivePeriod) { - this.name = "MergingGraphRequestProcessor" + UUID.randomUUID().toString(); - this.processor = processor; - this.transactionKeepalivePeriod = transactionKeepalivePeriod; - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - protected class MergedRead extends ReadRequest { - - Pair currentRequest; - -// RunnerReadGraphRequest(GraphRequestProcessor processor) { -// super(processor); -// } -// -// @Override -// public void completed(boolean value) { -//// System.out.println(this + "MGRP read completed"); -//// synchronized (MergingGraphRequestProcessor.this) { -//// if (requestQueue.isEmpty()) -//// hasAlreadyRequest = false; -//// else -//// newTransaction(); -//// } -// } - - @Override - public void run(ReadGraph graph) { - -// System.out.println(MergingGraphRequestProcessor.this + " reads"); - - while (true) { - - synchronized (MergingGraphRequestProcessor.this) { - - // Release #synchronize() invokers if necessary. -// if (currentRequest != null && barrierRequests.contains(currentRequest)) { -// synchronized (barrier) { -// barrier.notifyAll(); -// } -// } - - if(requestQueue.isEmpty()) { - if (transactionKeepalivePeriod > 0) { -// System.out.println("MGRP [" + MergingGraphRequestProcessor.this + "] waits " + transactionKeepalivePeriod + " ms. in " + Thread.currentThread() ); - try { - MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod); - } catch (InterruptedException e) { - Logger.defaultLogError(e); - } - if (requestQueue.isEmpty()) - break; - } else - break; - } - - Object nextRequest = requestQueue.peekFirst().first; - if(nextRequest instanceof Write || nextRequest instanceof DelayedWrite) { - break; - } - - currentRequest = requestQueue.remove(0); - requestSet.remove(currentRequest); - - } - -// ReadGraphRequest req = (ReadGraphRequest)currentRequest.first; - - if( syncRequests.contains(currentRequest.first)) { - - try { - - if(currentRequest.second instanceof AsyncProcedure) { - if(currentRequest.first instanceof Read) { - Read req = (Read)currentRequest.first; - graph.syncRequest(req, (AsyncProcedure)currentRequest.second); - } else { - AsyncRead req = (AsyncRead)currentRequest.first; - graph.syncRequest(req, (AsyncProcedure)currentRequest.second); - } - } else { - AsyncMultiRead req = (AsyncMultiRead)currentRequest.first; - graph.syncRequest(req, (AsyncMultiProcedure)currentRequest.second); - } - - } catch(Throwable t) { - - Logger.defaultLogError(t); - - if(currentRequest.second instanceof AsyncProcedure) { - ((AsyncProcedure)currentRequest.second).exception(graph, t); - } else { - ((AsyncMultiProcedure)currentRequest.second).exception(graph, t); - } - - } - - synchronized (currentRequest.first) { - syncRequests.remove(currentRequest.first); -// System.out.println("notifying " + currentRequest.first); - currentRequest.first.notify(); - } - - - } else { - - try{ - - if(currentRequest.second instanceof AsyncProcedure) { - if(currentRequest.first instanceof AsyncRead) { - AsyncRead req = (AsyncRead)currentRequest.first; - graph.asyncRequest(req, (AsyncProcedure)currentRequest.second); - } else { - Read req = (Read)currentRequest.first; - graph.asyncRequest(req, (AsyncProcedure)currentRequest.second); - } - } else { - AsyncMultiRead req = (AsyncMultiRead)currentRequest.first; - graph.asyncRequest(req, (AsyncMultiProcedure)currentRequest.second); - } - - } catch(Throwable t) { - - Logger.defaultLogError(t); - - if(currentRequest.second instanceof AsyncProcedure) { - ((AsyncProcedure)currentRequest.second).exception(graph, t); - } else { - ((AsyncMultiProcedure)currentRequest.second).exception(graph, t); - } - - } - } - - } - -// System.out.println(MergingGraphRequestProcessor.this + " read completed"); - - synchronized (MergingGraphRequestProcessor.this) { - if (requestQueue.isEmpty()) - hasAlreadyRequest = false; - else - newTransaction(); - } - - } - - @Override - public String toString() { - return "MergedRead[" + requestQueue.size() + " requests]"; - } - - } - - protected class RunnerWriteGraphRequest extends WriteRequest { - - Pair currentRequest; - HashMap metadata = new HashMap(); - - @Override - public void perform(WriteGraph graph) throws DatabaseException { - -// System.out.println(MergingGraphRequestProcessor.this + " writes"); - - while (true) { - - synchronized (MergingGraphRequestProcessor.this) { - - // Release #synchronize() invokers if necessary. -// if (currentRequest != null && barrierRequests.contains(currentRequest)) { -// synchronized (barrier) { -// barrier.notifyAll(); -// } -// } - - if(requestQueue.isEmpty()) { - if (transactionKeepalivePeriod > 0) { - try { - MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod); - } catch (InterruptedException e) { - Logger.defaultLogError(e); - } - if (requestQueue.isEmpty()) - break; - } else - break; - } - - Object nextRequest = requestQueue.peekFirst().first; - if(nextRequest instanceof AsyncMultiRead || nextRequest instanceof AsyncRead || nextRequest instanceof Read) { - break; - } - - currentRequest = requestQueue.remove(0); - requestSet.remove(currentRequest); - - } - - @SuppressWarnings("unchecked") - Callback callback = (Callback)currentRequest.second; - - if (currentRequest.first.getClass().equals(SyncWriteRequestAdapter.class)) { - - SyncWriteRequestAdapter adapter = (SyncWriteRequestAdapter)currentRequest.first; - - try { -// System.out.println("merg.sync " + adapter); - graph.syncRequest(adapter); - if(callback != null) callback.run(null); - } catch(Throwable t) { - Logger.defaultLogError(t); - if(callback != null) callback.run(t); - } - - adapter.release(); -// System.out.println("merg.sync.release " + adapter); - - } else { - - try { - if(currentRequest.first instanceof Write) graph.syncRequest((Write)currentRequest.first); - else if(currentRequest.first instanceof DelayedWrite) graph.syncRequest((DelayedWrite)currentRequest.first); - if(callback != null) callback.run(null); - } catch(Throwable t) { - Logger.defaultLogError(t); - if(callback != null) callback.run(t); - } - - } - - } - -// System.out.println(MergingGraphRequestProcessor.this + " write completed"); - - synchronized (MergingGraphRequestProcessor.this) { - if (requestQueue.isEmpty()) - hasAlreadyRequest = false; - else - newTransaction(); - } - - } - - } - - private void newTransaction() { - - boolean write = false; - - synchronized (MergingGraphRequestProcessor.this) { - assert(!requestQueue.isEmpty()); - Object nextRequest = requestQueue.peekFirst().first; - write = (nextRequest instanceof Write || nextRequest instanceof DelayedWrite); - } - - if(write) { - processor.asyncRequest(new RunnerWriteGraphRequest(), null); - } else { - processor.asyncRequest(new MergedRead()); - } - - } - - @Override - public void asyncRequest(AsyncMultiRead request, AsyncMultiProcedure procedure) { - -// System.out.println(this + " asyncRequest(ReadGraphRequest> request, QueryProcedure4 procedure)"); - - if (requestSet.contains(request)) - return; - - Pair pair = new Pair(request, procedure); - requestQueue.add(pair); - requestSet.add(pair); - - if (!hasAlreadyRequest) { - newTransaction(); - hasAlreadyRequest = true; - } else { - notify(); - } - - } - - @Override - public synchronized void asyncRequest(AsyncRead request, AsyncProcedure procedure) { - -// System.out.println(this + " asyncRequest(ReadGraphRequest> request, SingleQueryProcedure4 procedure) " + this); - - if (requestSet.contains(request)) - return; - - Pair pair = new Pair(request, procedure); - requestQueue.add(pair); - requestSet.add(pair); - - if (!hasAlreadyRequest) { - newTransaction(); - hasAlreadyRequest = true; - } else { -// System.out.println("notify " + this); - notify(); - } - - } - - @Override - public synchronized void asyncRequest(Write request, Callback callback) { - -// System.out.println(this + " asyncRequest(WriteGraphRequest request)"); - - if (requestSet.contains(request)) - return; - - Pair pair = new Pair(request, callback); - requestQueue.add(pair); - requestSet.add(pair); - - if (!hasAlreadyRequest) { -// System.out.println("new transaction"); - newTransaction(); - hasAlreadyRequest = true; - } else { -// System.out.println("notify"); - notify(); - } - - } - - @Override - public synchronized void asyncRequest(DelayedWrite request, Callback callback) { - -// System.out.println(this + " asyncRequest(WriteGraphRequest request)"); - - if (requestSet.contains(request)) - return; - - Pair pair = new Pair(request, callback); - requestQueue.add(pair); - requestSet.add(pair); - - if (!hasAlreadyRequest) { -// System.out.println("new transaction"); - newTransaction(); - hasAlreadyRequest = true; - } else { -// System.out.println("notify"); - notify(); - } - - } - - @Override - public synchronized void asyncRequest(WriteOnly request, Callback callback) { - -// System.out.println(this + " asyncRequest(WriteGraphRequest request)"); - - if (requestSet.contains(request)) - return; - - Pair pair = new Pair(request, callback); - requestQueue.add(pair); - requestSet.add(pair); - - if (!hasAlreadyRequest) { -// System.out.println("new transaction"); - newTransaction(); - hasAlreadyRequest = true; - } else { -// System.out.println("notify"); - notify(); - } - - } - - @Override - public Collection syncRequest(AsyncMultiRead request, final AsyncMultiProcedure procedure) { - - final DataContainer throwable = new DataContainer(null); - - // Queue the adapter up for execution. - synchronized (request) { - syncRequests.add(request); - asyncRequest(request, procedure); - if(syncRequests.contains(request)) { - try { - // System.out.println("waiting " + request); - request.wait(); - } catch (InterruptedException e) { - throw new Error(e); - } - } - } - - Throwable t = throwable.get(); - - if(t != null) { - Logger.defaultLogError(t); - throw new RuntimeException(t.getMessage()); - } - - return null; - - } - - @Override - public T syncRequest(AsyncRead request, final AsyncProcedure procedure) { - -// System.out.println("syncRequest(ReadGraphRequest> request, SingleQueryProcedure4 procedure)"); - - final DataContainer result = new DataContainer(null); - final DataContainer throwable = new DataContainer(null); - - // Queue the adapter up for execution. - synchronized (request) { - - syncRequests.add(request); - asyncRequest(request, new AsyncProcedure() { - - public void execute(AsyncReadGraph graph, T t) { - synchronized(result) { - result.set(t); - } - procedure.execute(graph, t); - }; - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - throwable.set(t); - } - - @Override - public String toString() { - return procedure.toString(); - } - - }); - if(syncRequests.contains(request)) { - try { - // System.out.println("waiting " + request); - request.wait(); - } catch (InterruptedException e) { - throw new Error(e); - } - } - } - - Throwable t = throwable.get(); - - if(t != null) { - Logger.defaultLogError(t); - throw new RuntimeException(t.getMessage()); - } - - return result.get(); - - //return result.get(); - - } - - - @Override - public void syncRequest(Write request) { - -// System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)"); - - SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request); - - asyncRequest(adapter, null); - - adapter.acquire(); - - // Throw exception if one occurred. - adapter.throwOrWrapException(); - - } - - @Override - public void syncRequest(WriteOnly request) { - -// System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)"); - - SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request); - - // Queue the adapter up for execution. - synchronized (adapter) { - asyncRequest(adapter, null); - try { - adapter.wait(); - } catch (InterruptedException e) { - throw new Error(e); - } - } - - // Throw exception if one occurred. - adapter.throwOrWrapException(); - - } - - @Override - public Session getSession() { - return processor.getSession(); - } - - @Override - public String toString() { - return "MergingGraphRequestProcessor[" + name + "]@" + System.identityHashCode(this) + " (based on " + processor + ")"; - } - - @Override - public void asyncRequest(AsyncRead request) { - - asyncRequest(request, new ProcedureAdapter() { - - @Override - public void exception(Throwable t) { - Logger.defaultLogError(t); - } - - }); - - } - - @Override - public void asyncRequest(AsyncRead request, Procedure procedure) { - asyncRequest(request, new NoneToAsyncProcedure(procedure)); - } - - @Override - public void asyncRequest(AsyncMultiRead request) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public void asyncRequest(AsyncMultiRead request, - MultiProcedure procedure) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public T syncRequest(AsyncRead request) { - - final DataContainer throwable = new DataContainer(null); - final DataContainer result = new DataContainer(); - - syncRequest(request, new AsyncProcedure() { - - public void execute(AsyncReadGraph graph, T t) { - result.set(t); - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - throwable.set(t); - } - - }); - - Throwable t = throwable.get(); - - if(t != null) { - Logger.defaultLogError(t); - throw new RuntimeException(t.getMessage()); - } - - return result.get(); - - } - - @Override - public T syncRequest(AsyncRead request, - Procedure procedure) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public Collection syncRequest(AsyncMultiRead request) { - - final DataContainer throwable = new DataContainer(null); - final ArrayList result = new ArrayList(); - - syncRequest(request, new AsyncMultiProcedureAdapter() { - - @Override - public void execute(AsyncReadGraph graph, T t) { - synchronized(result) { - result.add(t); - } - }; - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - throwable.set(t); - } - - }); - - Throwable t = throwable.get(); - - if(t != null) { - Logger.defaultLogError(t); - throw new RuntimeException(t.getMessage()); - } - - return result; - - } - - @Override - public Collection syncRequest(AsyncMultiRead request, - MultiProcedure procedure) { - throw new Error("Not implemented."); - } - - @Override - public T syncRequest(Read request) { - - final DataContainer throwable = new DataContainer(null); - final DataContainer result = new DataContainer(); - - - syncRequest(request, new Procedure() { - - public void execute(T t) { - result.set(t); - } - - @Override - public void exception(Throwable t) { - throwable.set(t); - } - - }); - - Throwable t = throwable.get(); - - if(t != null) { - throw new Error(t.getMessage()); - } - - - return result.get(); - - } - - @Override - public T syncRequest(Read request, - final AsyncProcedure procedure) { - - final DataContainer result = new DataContainer(null); - final DataContainer throwable = new DataContainer(null); - - // Queue the adapter up for execution. - synchronized (request) { - - syncRequests.add(request); - asyncRequest(request, new AsyncProcedure() { - - public void execute(AsyncReadGraph graph, T t) { - synchronized(result) { - result.set(t); - } - procedure.execute(graph, t); - }; - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - throwable.set(t); - } - - @Override - public String toString() { - return procedure.toString(); - } - - }); - if(syncRequests.contains(request)) { - try { - // System.out.println("waiting " + request); - request.wait(); - } catch (InterruptedException e) { - throw new Error(e); - } - } - } - - Throwable t = throwable.get(); - - if(t != null) { - throw new RuntimeException("Unexpected exception in MergingGraphRequestProcessor.syncRequest(Read, AsyncProcedure)", t); - } - - return result.get(); - - } - - @Override - public void asyncRequest(Read request) { - - asyncRequest(request, new ProcedureAdapter() { - - @Override - public void exception(Throwable t) { - Logger.defaultLogError(t); - } - - }); - - } - - @Override - public synchronized void asyncRequest(Read request, - AsyncProcedure procedure) { - - if (requestSet.contains(request)) - return; - - Pair pair = new Pair(request, procedure); - requestQueue.add(pair); - requestSet.add(pair); - - if (!hasAlreadyRequest) { - newTransaction(); - hasAlreadyRequest = true; - } else { - notify(); - } - - } - - @Override - public T syncRequest(Read request, Procedure procedure) { - return syncRequest(request, new NoneToAsyncProcedure(procedure)); - } - - @Override - public Collection syncRequest(final MultiRead request) throws DatabaseException { - assert(request != null); - - final ArrayList result = new ArrayList(); - final DataContainer exception = new DataContainer(); - - syncRequest(request, new AsyncMultiProcedureAdapter() { - - @Override - public void execute(AsyncReadGraph graph, T t) { - synchronized(result) { - result.add(t); - } - } - - @Override - public void exception(AsyncReadGraph graph, Throwable t) { - exception.set(t); - } - - @Override - public String toString() { - return "syncRequest(MultiRead) -> " + request; - } - - }); - - Throwable t = exception.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); - } - - return result; - } - - @Override - public Collection syncRequest(MultiRead request, AsyncMultiProcedure procedure) { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public Collection syncRequest(MultiRead request, MultiProcedure procedure) { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public void asyncRequest(Read request, Procedure procedure) { - asyncRequest(request, new NoneToAsyncProcedure(procedure)); - } - - @Override - public void asyncRequest(MultiRead request) { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public void asyncRequest(MultiRead request, AsyncMultiProcedure procedure) { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public void asyncRequest(MultiRead request, MultiProcedure procedure) { - throw new UnsupportedOperationException("Not implemented"); - } - - @Override - public void asyncRequest(Write r) { - asyncRequest(r, null); - } - - @Override - public void asyncRequest(DelayedWrite r) { - asyncRequest(r, null); - } - - @Override - public void asyncRequest(WriteOnly r) { - asyncRequest(r, null); - } - - @Override - public T getService(Class api) { - return getSession().getService(api); - } - - @Override - public T peekService(Class api) { - return getSession().peekService(api); - } - - @Override - public boolean hasService(Class api) { - return getSession().hasService(api); - } - - @Override - public void registerService(Class api, T service) { - getSession().registerService(api, service); - } - -// @Override -// public T syncRequest(Read arg0, AsyncListener arg1) { -// throw new UnsupportedOperationException("Not implemented."); -// } - - @Override - public T syncRequest(Read arg0, SyncListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public T syncRequest(Read arg0, Listener arg1) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public T syncRequest(Read arg0, SyncProcedure arg1) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public T syncRequest(AsyncRead arg0, AsyncListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public T syncRequest(AsyncRead arg0, SyncListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public T syncRequest(AsyncRead arg0, Listener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public T syncRequest(AsyncRead arg0, SyncProcedure arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(Read arg0, AsyncListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(Read arg0, SyncListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(Read arg0, Listener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(Read arg0, SyncProcedure arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(AsyncRead arg0, AsyncListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(AsyncRead arg0, SyncListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(AsyncRead arg0, Listener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(AsyncRead arg0, SyncProcedure arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public Collection syncRequest(MultiRead arg0, AsyncMultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public Collection syncRequest(MultiRead arg0, SyncMultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public Collection syncRequest(MultiRead arg0, MultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public Collection syncRequest(MultiRead arg0, SyncMultiProcedure arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public Collection syncRequest(AsyncMultiRead arg0, - AsyncMultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public Collection syncRequest(AsyncMultiRead arg0, SyncMultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public Collection syncRequest(AsyncMultiRead arg0, - MultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public Collection syncRequest(AsyncMultiRead arg0, SyncMultiProcedure arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(MultiRead arg0, AsyncMultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(MultiRead arg0, SyncMultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(MultiRead arg0, MultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(MultiRead arg0, SyncMultiProcedure arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(AsyncMultiRead arg0, - AsyncMultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public void asyncRequest(AsyncMultiRead arg0, SyncMultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(AsyncMultiRead arg0, - MultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - - @Override - public void asyncRequest(AsyncMultiRead arg0, SyncMultiProcedure arg1) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public void asyncRequest(ExternalRead request, Procedure procedure) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public T syncRequest(ExternalRead request) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public T syncRequest(ExternalRead request, Listener procedure) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public T syncRequest(ExternalRead request, Procedure procedure) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public void asyncRequest(ExternalRead request) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public void asyncRequest(ExternalRead request, Listener procedure) { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public void syncRequest(DelayedWrite request) throws DatabaseException { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public T syncRequest(WriteResult request) throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public T syncRequest(DelayedWriteResult request) - throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public T syncRequest(WriteOnlyResult r) throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public void asyncRequest(WriteResult r, Procedure procedure) { - throw new UnsupportedOperationException(); - } - - @Override - public void asyncRequest(DelayedWriteResult r, Procedure procedure) { - throw new UnsupportedOperationException(); - } - - @Override - public void asyncRequest(WriteOnlyResult r, Procedure procedure) { - throw new UnsupportedOperationException(); - } - - @Override - public Resource getRootLibrary() { - return processor.getRootLibrary(); - } - - @Override - public void async(ReadInterface r, Procedure procedure) { - throw new UnsupportedOperationException(); - } - - @Override - public void async(ReadInterface r, AsyncProcedure procedure) { - throw new UnsupportedOperationException(); - } - - @Override - public void async(ReadInterface r, SyncProcedure procedure) { - throw new UnsupportedOperationException(); - } - - @Override - public void async(ReadInterface r, Listener procedure) { - throw new UnsupportedOperationException(); - } - - @Override - public void async(ReadInterface r, AsyncListener procedure) { - throw new UnsupportedOperationException(); - } - - @Override - public void async(ReadInterface r, SyncListener procedure) { - throw new UnsupportedOperationException(); - } - - @Override - public T sync(ReadInterface r) throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public T sync(WriteInterface r) throws DatabaseException { - throw new UnsupportedOperationException(); - } - - @Override - public void async(WriteInterface r, Procedure procedure) { - throw new UnsupportedOperationException(); - } - - @Override - public void async(WriteInterface r) { - throw new UnsupportedOperationException(); - } - - @Override - public Object getModificationCounter() { - throw new UnsupportedOperationException(); - } - -} +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.db.common.processor; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Semaphore; + +import org.simantics.db.AsyncReadGraph; +import org.simantics.db.AsyncRequestProcessor; +import org.simantics.db.ReadGraph; +import org.simantics.db.RequestProcessor; +import org.simantics.db.Resource; +import org.simantics.db.Session; +import org.simantics.db.WriteGraph; +import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; +import org.simantics.db.common.procedure.adapter.ProcedureAdapter; +import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure; +import org.simantics.db.common.request.ReadRequest; +import org.simantics.db.common.request.WriteRequest; +import org.simantics.db.common.utils.Logger; +import org.simantics.db.exception.CancelTransactionException; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.procedure.AsyncListener; +import org.simantics.db.procedure.AsyncMultiListener; +import org.simantics.db.procedure.AsyncMultiProcedure; +import org.simantics.db.procedure.AsyncProcedure; +import org.simantics.db.procedure.Listener; +import org.simantics.db.procedure.MultiListener; +import org.simantics.db.procedure.MultiProcedure; +import org.simantics.db.procedure.Procedure; +import org.simantics.db.procedure.SyncListener; +import org.simantics.db.procedure.SyncMultiListener; +import org.simantics.db.procedure.SyncMultiProcedure; +import org.simantics.db.procedure.SyncProcedure; +import org.simantics.db.request.AsyncMultiRead; +import org.simantics.db.request.AsyncRead; +import org.simantics.db.request.DelayedWrite; +import org.simantics.db.request.DelayedWriteResult; +import org.simantics.db.request.ExternalRead; +import org.simantics.db.request.MultiRead; +import org.simantics.db.request.Read; +import org.simantics.db.request.ReadInterface; +import org.simantics.db.request.Write; +import org.simantics.db.request.WriteInterface; +import org.simantics.db.request.WriteOnly; +import org.simantics.db.request.WriteOnlyResult; +import org.simantics.db.request.WriteResult; +import org.simantics.utils.DataContainer; +import org.simantics.utils.datastructures.Callback; +import org.simantics.utils.datastructures.Pair; + +public class MergingGraphRequestProcessor implements RequestProcessor { + + private static class SyncWriteRequestAdapter implements Write { + + private Semaphore semaphore = new Semaphore(0); + private Object request; + private Throwable exception; + SyncWriteRequestAdapter(Write r) { + this.request = r; + } + SyncWriteRequestAdapter(WriteOnly r) { + this.request = r; + } +// @Override +// public GraphRequestStatus perform(Graph g) throws Exception { +// return perform((ReadGraph)g); +// } + @Override + public void perform(WriteGraph g) throws DatabaseException, CancelTransactionException { + if(request instanceof Write) { + ((Write)request).perform(g); + } else if(request instanceof DelayedWrite) { + ((DelayedWrite)request).perform(g); + } else { + ((WriteOnly)request).perform(g); + } + } +// @Override +// public String getId() { +// if(request instanceof WriteGraphRequest) { +// return ((WriteGraphRequest)request).getId(); +// } else { +// return null; +// } +// } +// @Override +// public void requestCompleted(GraphRequestStatus status) { +// if(request instanceof WriteGraphRequest) { +// ((WriteGraphRequest)request).requestCompleted(status); +// } else { +// } +// } +// @Override +// public void handleException(Throwable e) { +// this.exception = e; +// if(request instanceof WriteGraphRequest) { +// ((WriteGraphRequest)request).handleException(e); +// } +// } + + public void throwOrWrapException() { + if (exception == null) + return; + if (exception instanceof RuntimeException) + throw (RuntimeException) exception; + if (exception instanceof Error) + throw (Error) exception; + throw new RuntimeException("See cause for the real exception.", exception); + } + + public void acquire() { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + Logger.defaultLogError(e); + } + } + + public void release() { + semaphore.release(); + } + + @Override + public String toString() { + return "SyncWriteRequestAdapter " + request; + } + + } + + long transactionKeepalivePeriod; + + /** + * Synchronization object for implementing {@link #synchronize()}. + * {@link Object#notifyAll()} is invoked for this lock object every time a + * single transaction is completed, thereby releasing all waiters in + * {@link #synchronize()}. + */ + Object barrier = new Object(); + + Set> requestSet = new HashSet>(); + LinkedList> requestQueue = new LinkedList>(); + boolean hasAlreadyRequest = false; + + /** + * A set of requests which {@link #synchronize()} is depending on at the + * moment. Every time a request within this set is completed, some thread in + * {@link #synchronize()} should be released. + */ +// Set barrierRequests = new HashSet(); + Set syncRequests = new HashSet(); + + private String name; + + private AsyncRequestProcessor processor; + + public MergingGraphRequestProcessor(String name, AsyncRequestProcessor processor, long transactionKeepalivePeriod) { + this.name = name; + this.processor = processor; + this.transactionKeepalivePeriod = transactionKeepalivePeriod; + } + + public MergingGraphRequestProcessor(AsyncRequestProcessor processor, long transactionKeepalivePeriod) { + this.name = "MergingGraphRequestProcessor" + UUID.randomUUID().toString(); + this.processor = processor; + this.transactionKeepalivePeriod = transactionKeepalivePeriod; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + protected class MergedRead extends ReadRequest { + + Pair currentRequest; + +// RunnerReadGraphRequest(GraphRequestProcessor processor) { +// super(processor); +// } +// +// @Override +// public void completed(boolean value) { +//// System.out.println(this + "MGRP read completed"); +//// synchronized (MergingGraphRequestProcessor.this) { +//// if (requestQueue.isEmpty()) +//// hasAlreadyRequest = false; +//// else +//// newTransaction(); +//// } +// } + + @Override + public void run(ReadGraph graph) { + +// System.out.println(MergingGraphRequestProcessor.this + " reads"); + + while (true) { + + synchronized (MergingGraphRequestProcessor.this) { + + // Release #synchronize() invokers if necessary. +// if (currentRequest != null && barrierRequests.contains(currentRequest)) { +// synchronized (barrier) { +// barrier.notifyAll(); +// } +// } + + if(requestQueue.isEmpty()) { + if (transactionKeepalivePeriod > 0) { +// System.out.println("MGRP [" + MergingGraphRequestProcessor.this + "] waits " + transactionKeepalivePeriod + " ms. in " + Thread.currentThread() ); + try { + MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod); + } catch (InterruptedException e) { + Logger.defaultLogError(e); + } + if (requestQueue.isEmpty()) + break; + } else + break; + } + + Object nextRequest = requestQueue.peekFirst().first; + if(nextRequest instanceof Write || nextRequest instanceof DelayedWrite) { + break; + } + + currentRequest = requestQueue.remove(0); + requestSet.remove(currentRequest); + + } + +// ReadGraphRequest req = (ReadGraphRequest)currentRequest.first; + + if( syncRequests.contains(currentRequest.first)) { + + try { + + if(currentRequest.second instanceof AsyncProcedure) { + if(currentRequest.first instanceof Read) { + Read req = (Read)currentRequest.first; + graph.syncRequest(req, (AsyncProcedure)currentRequest.second); + } else { + AsyncRead req = (AsyncRead)currentRequest.first; + graph.syncRequest(req, (AsyncProcedure)currentRequest.second); + } + } else { + AsyncMultiRead req = (AsyncMultiRead)currentRequest.first; + graph.syncRequest(req, (AsyncMultiProcedure)currentRequest.second); + } + + } catch(Throwable t) { + + Logger.defaultLogError(t); + + if(currentRequest.second instanceof AsyncProcedure) { + ((AsyncProcedure)currentRequest.second).exception(graph, t); + } else { + ((AsyncMultiProcedure)currentRequest.second).exception(graph, t); + } + + } + + synchronized (currentRequest.first) { + syncRequests.remove(currentRequest.first); +// System.out.println("notifying " + currentRequest.first); + currentRequest.first.notify(); + } + + + } else { + + try{ + + if(currentRequest.second instanceof AsyncProcedure) { + if(currentRequest.first instanceof AsyncRead) { + AsyncRead req = (AsyncRead)currentRequest.first; + graph.asyncRequest(req, (AsyncProcedure)currentRequest.second); + } else { + Read req = (Read)currentRequest.first; + graph.asyncRequest(req, (AsyncProcedure)currentRequest.second); + } + } else { + AsyncMultiRead req = (AsyncMultiRead)currentRequest.first; + graph.asyncRequest(req, (AsyncMultiProcedure)currentRequest.second); + } + + } catch(Throwable t) { + + Logger.defaultLogError(t); + + if(currentRequest.second instanceof AsyncProcedure) { + ((AsyncProcedure)currentRequest.second).exception(graph, t); + } else { + ((AsyncMultiProcedure)currentRequest.second).exception(graph, t); + } + + } + } + + } + +// System.out.println(MergingGraphRequestProcessor.this + " read completed"); + + synchronized (MergingGraphRequestProcessor.this) { + if (requestQueue.isEmpty()) + hasAlreadyRequest = false; + else + newTransaction(); + } + + } + + @Override + public String toString() { + return "MergedRead[" + requestQueue.size() + " requests]"; + } + + } + + protected class RunnerWriteGraphRequest extends WriteRequest { + + Pair currentRequest; + HashMap metadata = new HashMap(); + + @Override + public void perform(WriteGraph graph) throws DatabaseException { + +// System.out.println(MergingGraphRequestProcessor.this + " writes"); + + while (true) { + + synchronized (MergingGraphRequestProcessor.this) { + + // Release #synchronize() invokers if necessary. +// if (currentRequest != null && barrierRequests.contains(currentRequest)) { +// synchronized (barrier) { +// barrier.notifyAll(); +// } +// } + + if(requestQueue.isEmpty()) { + if (transactionKeepalivePeriod > 0) { + try { + MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod); + } catch (InterruptedException e) { + Logger.defaultLogError(e); + } + if (requestQueue.isEmpty()) + break; + } else + break; + } + + Object nextRequest = requestQueue.peekFirst().first; + if(nextRequest instanceof AsyncMultiRead || nextRequest instanceof AsyncRead || nextRequest instanceof Read) { + break; + } + + currentRequest = requestQueue.remove(0); + requestSet.remove(currentRequest); + + } + + @SuppressWarnings("unchecked") + Callback callback = (Callback)currentRequest.second; + + if (currentRequest.first.getClass().equals(SyncWriteRequestAdapter.class)) { + + SyncWriteRequestAdapter adapter = (SyncWriteRequestAdapter)currentRequest.first; + + try { +// System.out.println("merg.sync " + adapter); + graph.syncRequest(adapter); + if(callback != null) callback.run(null); + } catch(Throwable t) { + Logger.defaultLogError(t); + if(callback != null) callback.run(t); + } + + adapter.release(); +// System.out.println("merg.sync.release " + adapter); + + } else { + + try { + if(currentRequest.first instanceof Write) graph.syncRequest((Write)currentRequest.first); + else if(currentRequest.first instanceof DelayedWrite) graph.syncRequest((DelayedWrite)currentRequest.first); + if(callback != null) callback.run(null); + } catch(Throwable t) { + Logger.defaultLogError(t); + if(callback != null) callback.run(t); + } + + } + + } + +// System.out.println(MergingGraphRequestProcessor.this + " write completed"); + + synchronized (MergingGraphRequestProcessor.this) { + if (requestQueue.isEmpty()) + hasAlreadyRequest = false; + else + newTransaction(); + } + + } + + } + + private void newTransaction() { + + boolean write = false; + + synchronized (MergingGraphRequestProcessor.this) { + assert(!requestQueue.isEmpty()); + Object nextRequest = requestQueue.peekFirst().first; + write = (nextRequest instanceof Write || nextRequest instanceof DelayedWrite); + } + + if(write) { + processor.asyncRequest(new RunnerWriteGraphRequest(), null); + } else { + processor.asyncRequest(new MergedRead()); + } + + } + + @Override + public void asyncRequest(AsyncMultiRead request, AsyncMultiProcedure procedure) { + +// System.out.println(this + " asyncRequest(ReadGraphRequest> request, QueryProcedure4 procedure)"); + + if (requestSet.contains(request)) + return; + + Pair pair = new Pair(request, procedure); + requestQueue.add(pair); + requestSet.add(pair); + + if (!hasAlreadyRequest) { + newTransaction(); + hasAlreadyRequest = true; + } else { + notify(); + } + + } + + @Override + public synchronized void asyncRequest(AsyncRead request, AsyncProcedure procedure) { + +// System.out.println(this + " asyncRequest(ReadGraphRequest> request, SingleQueryProcedure4 procedure) " + this); + + if (requestSet.contains(request)) + return; + + Pair pair = new Pair(request, procedure); + requestQueue.add(pair); + requestSet.add(pair); + + if (!hasAlreadyRequest) { + newTransaction(); + hasAlreadyRequest = true; + } else { +// System.out.println("notify " + this); + notify(); + } + + } + + @Override + public synchronized void asyncRequest(Write request, Callback callback) { + +// System.out.println(this + " asyncRequest(WriteGraphRequest request)"); + + if (requestSet.contains(request)) + return; + + Pair pair = new Pair(request, callback); + requestQueue.add(pair); + requestSet.add(pair); + + if (!hasAlreadyRequest) { +// System.out.println("new transaction"); + newTransaction(); + hasAlreadyRequest = true; + } else { +// System.out.println("notify"); + notify(); + } + + } + + @Override + public synchronized void asyncRequest(DelayedWrite request, Callback callback) { + +// System.out.println(this + " asyncRequest(WriteGraphRequest request)"); + + if (requestSet.contains(request)) + return; + + Pair pair = new Pair(request, callback); + requestQueue.add(pair); + requestSet.add(pair); + + if (!hasAlreadyRequest) { +// System.out.println("new transaction"); + newTransaction(); + hasAlreadyRequest = true; + } else { +// System.out.println("notify"); + notify(); + } + + } + + @Override + public synchronized void asyncRequest(WriteOnly request, Callback callback) { + +// System.out.println(this + " asyncRequest(WriteGraphRequest request)"); + + if (requestSet.contains(request)) + return; + + Pair pair = new Pair(request, callback); + requestQueue.add(pair); + requestSet.add(pair); + + if (!hasAlreadyRequest) { +// System.out.println("new transaction"); + newTransaction(); + hasAlreadyRequest = true; + } else { +// System.out.println("notify"); + notify(); + } + + } + + @Override + public Collection syncRequest(AsyncMultiRead request, final AsyncMultiProcedure procedure) { + + final DataContainer throwable = new DataContainer(null); + + // Queue the adapter up for execution. + synchronized (request) { + syncRequests.add(request); + asyncRequest(request, procedure); + if(syncRequests.contains(request)) { + try { + // System.out.println("waiting " + request); + request.wait(); + } catch (InterruptedException e) { + throw new Error(e); + } + } + } + + Throwable t = throwable.get(); + + if(t != null) { + Logger.defaultLogError(t); + throw new RuntimeException(t.getMessage()); + } + + return null; + + } + + @Override + public T syncRequest(AsyncRead request, final AsyncProcedure procedure) { + +// System.out.println("syncRequest(ReadGraphRequest> request, SingleQueryProcedure4 procedure)"); + + final DataContainer result = new DataContainer(null); + final DataContainer throwable = new DataContainer(null); + + // Queue the adapter up for execution. + synchronized (request) { + + syncRequests.add(request); + asyncRequest(request, new AsyncProcedure() { + + public void execute(AsyncReadGraph graph, T t) { + synchronized(result) { + result.set(t); + } + procedure.execute(graph, t); + }; + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + throwable.set(t); + } + + @Override + public String toString() { + return procedure.toString(); + } + + }); + if(syncRequests.contains(request)) { + try { + // System.out.println("waiting " + request); + request.wait(); + } catch (InterruptedException e) { + throw new Error(e); + } + } + } + + Throwable t = throwable.get(); + + if(t != null) { + Logger.defaultLogError(t); + throw new RuntimeException(t.getMessage()); + } + + return result.get(); + + //return result.get(); + + } + + + @Override + public void syncRequest(Write request) { + +// System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)"); + + SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request); + + asyncRequest(adapter, null); + + adapter.acquire(); + + // Throw exception if one occurred. + adapter.throwOrWrapException(); + + } + + @Override + public void syncRequest(WriteOnly request) { + +// System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)"); + + SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request); + + // Queue the adapter up for execution. + synchronized (adapter) { + asyncRequest(adapter, null); + try { + adapter.wait(); + } catch (InterruptedException e) { + throw new Error(e); + } + } + + // Throw exception if one occurred. + adapter.throwOrWrapException(); + + } + + @Override + public Session getSession() { + return processor.getSession(); + } + + @Override + public String toString() { + return "MergingGraphRequestProcessor[" + name + "]@" + System.identityHashCode(this) + " (based on " + processor + ")"; + } + + @Override + public void asyncRequest(AsyncRead request) { + + asyncRequest(request, new ProcedureAdapter() { + + @Override + public void exception(Throwable t) { + Logger.defaultLogError(t); + } + + }); + + } + + @Override + public void asyncRequest(AsyncRead request, Procedure procedure) { + asyncRequest(request, new NoneToAsyncProcedure(procedure)); + } + + @Override + public void asyncRequest(AsyncMultiRead request) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public void asyncRequest(AsyncMultiRead request, + MultiProcedure procedure) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public T syncRequest(AsyncRead request) { + + final DataContainer throwable = new DataContainer(null); + final DataContainer result = new DataContainer(); + + syncRequest(request, new AsyncProcedure() { + + public void execute(AsyncReadGraph graph, T t) { + result.set(t); + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + throwable.set(t); + } + + }); + + Throwable t = throwable.get(); + + if(t != null) { + Logger.defaultLogError(t); + throw new RuntimeException(t.getMessage()); + } + + return result.get(); + + } + + @Override + public T syncRequest(AsyncRead request, + Procedure procedure) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public Collection syncRequest(AsyncMultiRead request) { + + final DataContainer throwable = new DataContainer(null); + final ArrayList result = new ArrayList(); + + syncRequest(request, new AsyncMultiProcedureAdapter() { + + @Override + public void execute(AsyncReadGraph graph, T t) { + synchronized(result) { + result.add(t); + } + }; + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + throwable.set(t); + } + + }); + + Throwable t = throwable.get(); + + if(t != null) { + Logger.defaultLogError(t); + throw new RuntimeException(t.getMessage()); + } + + return result; + + } + + @Override + public Collection syncRequest(AsyncMultiRead request, + MultiProcedure procedure) { + throw new Error("Not implemented."); + } + + @Override + public T syncRequest(Read request) { + + final DataContainer throwable = new DataContainer(null); + final DataContainer result = new DataContainer(); + + + syncRequest(request, new Procedure() { + + public void execute(T t) { + result.set(t); + } + + @Override + public void exception(Throwable t) { + throwable.set(t); + } + + }); + + Throwable t = throwable.get(); + + if(t != null) { + throw new Error(t.getMessage()); + } + + + return result.get(); + + } + + @Override + public T syncRequest(Read request, + final AsyncProcedure procedure) { + + final DataContainer result = new DataContainer(null); + final DataContainer throwable = new DataContainer(null); + + // Queue the adapter up for execution. + synchronized (request) { + + syncRequests.add(request); + asyncRequest(request, new AsyncProcedure() { + + public void execute(AsyncReadGraph graph, T t) { + synchronized(result) { + result.set(t); + } + procedure.execute(graph, t); + }; + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + throwable.set(t); + } + + @Override + public String toString() { + return procedure.toString(); + } + + }); + if(syncRequests.contains(request)) { + try { + // System.out.println("waiting " + request); + request.wait(); + } catch (InterruptedException e) { + throw new Error(e); + } + } + } + + Throwable t = throwable.get(); + + if(t != null) { + throw new RuntimeException("Unexpected exception in MergingGraphRequestProcessor.syncRequest(Read, AsyncProcedure)", t); + } + + return result.get(); + + } + + @Override + public void asyncRequest(Read request) { + + asyncRequest(request, new ProcedureAdapter() { + + @Override + public void exception(Throwable t) { + Logger.defaultLogError(t); + } + + }); + + } + + @Override + public synchronized void asyncRequest(Read request, + AsyncProcedure procedure) { + + if (requestSet.contains(request)) + return; + + Pair pair = new Pair(request, procedure); + requestQueue.add(pair); + requestSet.add(pair); + + if (!hasAlreadyRequest) { + newTransaction(); + hasAlreadyRequest = true; + } else { + notify(); + } + + } + + @Override + public T syncRequest(Read request, Procedure procedure) { + return syncRequest(request, new NoneToAsyncProcedure(procedure)); + } + + @Override + public Collection syncRequest(final MultiRead request) throws DatabaseException { + assert(request != null); + + final ArrayList result = new ArrayList(); + final DataContainer exception = new DataContainer(); + + syncRequest(request, new AsyncMultiProcedureAdapter() { + + @Override + public void execute(AsyncReadGraph graph, T t) { + synchronized(result) { + result.add(t); + } + } + + @Override + public void exception(AsyncReadGraph graph, Throwable t) { + exception.set(t); + } + + @Override + public String toString() { + return "syncRequest(MultiRead) -> " + request; + } + + }); + + Throwable t = exception.get(); + if(t != null) { + if(t instanceof DatabaseException) throw (DatabaseException)t; + else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); + } + + return result; + } + + @Override + public Collection syncRequest(MultiRead request, AsyncMultiProcedure procedure) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public Collection syncRequest(MultiRead request, MultiProcedure procedure) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void asyncRequest(Read request, Procedure procedure) { + asyncRequest(request, new NoneToAsyncProcedure(procedure)); + } + + @Override + public void asyncRequest(MultiRead request) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void asyncRequest(MultiRead request, AsyncMultiProcedure procedure) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void asyncRequest(MultiRead request, MultiProcedure procedure) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void asyncRequest(Write r) { + asyncRequest(r, null); + } + + @Override + public void asyncRequest(DelayedWrite r) { + asyncRequest(r, null); + } + + @Override + public void asyncRequest(WriteOnly r) { + asyncRequest(r, null); + } + + @Override + public T getService(Class api) { + return getSession().getService(api); + } + + @Override + public T peekService(Class api) { + return getSession().peekService(api); + } + + @Override + public boolean hasService(Class api) { + return getSession().hasService(api); + } + + @Override + public void registerService(Class api, T service) { + getSession().registerService(api, service); + } + +// @Override +// public T syncRequest(Read arg0, AsyncListener arg1) { +// throw new UnsupportedOperationException("Not implemented."); +// } + + @Override + public T syncRequest(Read arg0, SyncListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public T syncRequest(Read arg0, Listener arg1) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public T syncRequest(Read arg0, SyncProcedure arg1) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public T syncRequest(AsyncRead arg0, AsyncListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public T syncRequest(AsyncRead arg0, SyncListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public T syncRequest(AsyncRead arg0, Listener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public T syncRequest(AsyncRead arg0, SyncProcedure arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(Read arg0, AsyncListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(Read arg0, SyncListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(Read arg0, Listener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(Read arg0, SyncProcedure arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(AsyncRead arg0, AsyncListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(AsyncRead arg0, SyncListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(AsyncRead arg0, Listener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(AsyncRead arg0, SyncProcedure arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public Collection syncRequest(MultiRead arg0, AsyncMultiListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public Collection syncRequest(MultiRead arg0, SyncMultiListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public Collection syncRequest(MultiRead arg0, MultiListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public Collection syncRequest(MultiRead arg0, SyncMultiProcedure arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public Collection syncRequest(AsyncMultiRead arg0, + AsyncMultiListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public Collection syncRequest(AsyncMultiRead arg0, SyncMultiListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public Collection syncRequest(AsyncMultiRead arg0, + MultiListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public Collection syncRequest(AsyncMultiRead arg0, SyncMultiProcedure arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(MultiRead arg0, AsyncMultiListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(MultiRead arg0, SyncMultiListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(MultiRead arg0, MultiListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(MultiRead arg0, SyncMultiProcedure arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(AsyncMultiRead arg0, + AsyncMultiListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public void asyncRequest(AsyncMultiRead arg0, SyncMultiListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(AsyncMultiRead arg0, + MultiListener arg1) { + throw new UnsupportedOperationException("Not implemented."); + + } + + @Override + public void asyncRequest(AsyncMultiRead arg0, SyncMultiProcedure arg1) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public void asyncRequest(ExternalRead request, Procedure procedure) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public T syncRequest(ExternalRead request) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public T syncRequest(ExternalRead request, Listener procedure) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public T syncRequest(ExternalRead request, Procedure procedure) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public void asyncRequest(ExternalRead request) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public void asyncRequest(ExternalRead request, Listener procedure) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public void syncRequest(DelayedWrite request) throws DatabaseException { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public T syncRequest(WriteResult request) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public T syncRequest(DelayedWriteResult request) + throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public T syncRequest(WriteOnlyResult r) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public void asyncRequest(WriteResult r, Procedure procedure) { + throw new UnsupportedOperationException(); + } + + @Override + public void asyncRequest(DelayedWriteResult r, Procedure procedure) { + throw new UnsupportedOperationException(); + } + + @Override + public void asyncRequest(WriteOnlyResult r, Procedure procedure) { + throw new UnsupportedOperationException(); + } + + @Override + public Resource getRootLibrary() { + return processor.getRootLibrary(); + } + + @Override + public void async(ReadInterface r, Procedure procedure) { + throw new UnsupportedOperationException(); + } + + @Override + public void async(ReadInterface r, AsyncProcedure procedure) { + throw new UnsupportedOperationException(); + } + + @Override + public void async(ReadInterface r, SyncProcedure procedure) { + throw new UnsupportedOperationException(); + } + + @Override + public void async(ReadInterface r, Listener procedure) { + throw new UnsupportedOperationException(); + } + + @Override + public void async(ReadInterface r, AsyncListener procedure) { + throw new UnsupportedOperationException(); + } + + @Override + public void async(ReadInterface r, SyncListener procedure) { + throw new UnsupportedOperationException(); + } + + @Override + public T sync(ReadInterface r) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public T sync(WriteInterface r) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public void async(WriteInterface r, Procedure procedure) { + throw new UnsupportedOperationException(); + } + + @Override + public void async(WriteInterface r) { + throw new UnsupportedOperationException(); + } + + @Override + public Object getModificationCounter() { + throw new UnsupportedOperationException(); + } + +}