-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package org.simantics.db.common.processor;\r
-\r
-import java.util.ArrayList;\r
-import java.util.Collection;\r
-import java.util.HashMap;\r
-import java.util.HashSet;\r
-import java.util.LinkedList;\r
-import java.util.Set;\r
-import java.util.UUID;\r
-import java.util.concurrent.Semaphore;\r
-\r
-import org.simantics.db.AsyncReadGraph;\r
-import org.simantics.db.AsyncRequestProcessor;\r
-import org.simantics.db.ReadGraph;\r
-import org.simantics.db.RequestProcessor;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.Session;\r
-import org.simantics.db.VirtualGraph;\r
-import org.simantics.db.WriteGraph;\r
-import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;\r
-import org.simantics.db.common.procedure.adapter.ProcedureAdapter;\r
-import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;\r
-import org.simantics.db.common.request.ReadRequest;\r
-import org.simantics.db.common.request.WriteRequest;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.exception.CancelTransactionException;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.procedure.AsyncListener;\r
-import org.simantics.db.procedure.AsyncMultiListener;\r
-import org.simantics.db.procedure.AsyncMultiProcedure;\r
-import org.simantics.db.procedure.AsyncProcedure;\r
-import org.simantics.db.procedure.Listener;\r
-import org.simantics.db.procedure.MultiListener;\r
-import org.simantics.db.procedure.MultiProcedure;\r
-import org.simantics.db.procedure.Procedure;\r
-import org.simantics.db.procedure.SyncListener;\r
-import org.simantics.db.procedure.SyncMultiListener;\r
-import org.simantics.db.procedure.SyncMultiProcedure;\r
-import org.simantics.db.procedure.SyncProcedure;\r
-import org.simantics.db.request.AsyncMultiRead;\r
-import org.simantics.db.request.AsyncRead;\r
-import org.simantics.db.request.DelayedWrite;\r
-import org.simantics.db.request.DelayedWriteResult;\r
-import org.simantics.db.request.ExternalRead;\r
-import org.simantics.db.request.MultiRead;\r
-import org.simantics.db.request.Read;\r
-import org.simantics.db.request.ReadInterface;\r
-import org.simantics.db.request.UndoTraits;\r
-import org.simantics.db.request.Write;\r
-import org.simantics.db.request.WriteInterface;\r
-import org.simantics.db.request.WriteOnly;\r
-import org.simantics.db.request.WriteOnlyResult;\r
-import org.simantics.db.request.WriteResult;\r
-import org.simantics.utils.DataContainer;\r
-import org.simantics.utils.datastructures.Callback;\r
-import org.simantics.utils.datastructures.Pair;\r
-\r
-public class MergingGraphRequestProcessor implements RequestProcessor {\r
-\r
- private static class SyncWriteRequestAdapter implements Write {\r
-\r
- private Semaphore semaphore = new Semaphore(0);\r
- private Object request;\r
- private Throwable exception;\r
- SyncWriteRequestAdapter(Write r) {\r
- this.request = r;\r
- }\r
- SyncWriteRequestAdapter(WriteOnly r) {\r
- this.request = r;\r
- }\r
-// @Override\r
-// public GraphRequestStatus perform(Graph g) throws Exception {\r
-// return perform((ReadGraph)g);\r
-// }\r
- @Override\r
- public void perform(WriteGraph g) throws DatabaseException, CancelTransactionException {\r
- if(request instanceof Write) {\r
- ((Write)request).perform(g);\r
- } else if(request instanceof DelayedWrite) {\r
- ((DelayedWrite)request).perform(g);\r
- } else {\r
- ((WriteOnly)request).perform(g);\r
- }\r
- }\r
-// @Override\r
-// public String getId() {\r
-// if(request instanceof WriteGraphRequest) {\r
-// return ((WriteGraphRequest)request).getId();\r
-// } else {\r
-// return null;\r
-// }\r
-// }\r
-// @Override\r
-// public void requestCompleted(GraphRequestStatus status) {\r
-// if(request instanceof WriteGraphRequest) {\r
-// ((WriteGraphRequest)request).requestCompleted(status);\r
-// } else {\r
-// }\r
-// }\r
-// @Override\r
-// public void handleException(Throwable e) {\r
-// this.exception = e;\r
-// if(request instanceof WriteGraphRequest) {\r
-// ((WriteGraphRequest)request).handleException(e);\r
-// }\r
-// }\r
-\r
- public void throwOrWrapException() {\r
- if (exception == null)\r
- return;\r
- if (exception instanceof RuntimeException)\r
- throw (RuntimeException) exception;\r
- if (exception instanceof Error)\r
- throw (Error) exception;\r
- throw new RuntimeException("See cause for the real exception.", exception);\r
- }\r
-\r
- @Override\r
- public VirtualGraph getProvider() {\r
- return null;\r
- }\r
-\r
-// @Override\r
-// public void fillMetadata(Map<String, String> metadata) {\r
-// }\r
-\r
- public void acquire() {\r
- try {\r
- semaphore.acquire();\r
- } catch (InterruptedException e) {\r
- Logger.defaultLogError(e);\r
- }\r
- }\r
-\r
- public void release() {\r
- semaphore.release();\r
- }\r
-\r
- @Override\r
- public UndoTraits getUndoTraits() {\r
- return null;\r
- }\r
- \r
- @Override\r
- public String toString() {\r
- return "SyncWriteRequestAdapter " + request;\r
- }\r
-\r
- }\r
-\r
- long transactionKeepalivePeriod;\r
-\r
- /**\r
- * Synchronization object for implementing {@link #synchronize()}.\r
- * {@link Object#notifyAll()} is invoked for this lock object every time a\r
- * single transaction is completed, thereby releasing all waiters in\r
- * {@link #synchronize()}.\r
- */\r
- Object barrier = new Object();\r
-\r
- Set<Pair<Object, Object>> requestSet = new HashSet<Pair<Object, Object>>();\r
- LinkedList<Pair<Object, Object>> requestQueue = new LinkedList<Pair<Object, Object>>();\r
- boolean hasAlreadyRequest = false;\r
-\r
- /**\r
- * A set of requests which {@link #synchronize()} is depending on at the\r
- * moment. Every time a request within this set is completed, some thread in\r
- * {@link #synchronize()} should be released.\r
- */\r
-// Set<Object> barrierRequests = new HashSet<Object>();\r
- Set<Object> syncRequests = new HashSet<Object>();\r
-\r
- private String name;\r
-\r
- private AsyncRequestProcessor processor;\r
-\r
- public MergingGraphRequestProcessor(String name, AsyncRequestProcessor processor, long transactionKeepalivePeriod) {\r
- this.name = name;\r
- this.processor = processor;\r
- this.transactionKeepalivePeriod = transactionKeepalivePeriod;\r
- }\r
-\r
- public MergingGraphRequestProcessor(AsyncRequestProcessor processor, long transactionKeepalivePeriod) {\r
- this.name = "MergingGraphRequestProcessor" + UUID.randomUUID().toString();\r
- this.processor = processor;\r
- this.transactionKeepalivePeriod = transactionKeepalivePeriod;\r
- }\r
-\r
- @SuppressWarnings({"unchecked", "rawtypes"})\r
- protected class MergedRead extends ReadRequest {\r
-\r
- Pair<Object, Object> currentRequest;\r
-\r
-// RunnerReadGraphRequest(GraphRequestProcessor processor) {\r
-// super(processor);\r
-// }\r
-//\r
-// @Override\r
-// public void completed(boolean value) {\r
-//// System.out.println(this + "MGRP read completed");\r
-//// synchronized (MergingGraphRequestProcessor.this) {\r
-//// if (requestQueue.isEmpty())\r
-//// hasAlreadyRequest = false;\r
-//// else\r
-//// newTransaction();\r
-//// }\r
-// }\r
-\r
- @Override\r
- public void run(ReadGraph graph) {\r
-\r
-// System.out.println(MergingGraphRequestProcessor.this + " reads");\r
-\r
- while (true) {\r
-\r
- synchronized (MergingGraphRequestProcessor.this) {\r
-\r
- // Release #synchronize() invokers if necessary.\r
-// if (currentRequest != null && barrierRequests.contains(currentRequest)) {\r
-// synchronized (barrier) {\r
-// barrier.notifyAll();\r
-// }\r
-// }\r
-\r
- if(requestQueue.isEmpty()) {\r
- if (transactionKeepalivePeriod > 0) {\r
-// System.out.println("MGRP [" + MergingGraphRequestProcessor.this + "] waits " + transactionKeepalivePeriod + " ms. in " + Thread.currentThread() );\r
- try {\r
- MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);\r
- } catch (InterruptedException e) {\r
- Logger.defaultLogError(e);\r
- }\r
- if (requestQueue.isEmpty())\r
- break;\r
- } else\r
- break;\r
- }\r
-\r
- Object nextRequest = requestQueue.peekFirst().first;\r
- if(nextRequest instanceof Write || nextRequest instanceof DelayedWrite) {\r
- break;\r
- }\r
-\r
- currentRequest = requestQueue.remove(0);\r
- requestSet.remove(currentRequest);\r
-\r
- }\r
-\r
-// ReadGraphRequest req = (ReadGraphRequest)currentRequest.first;\r
-\r
- if( syncRequests.contains(currentRequest.first)) {\r
-\r
- try {\r
-\r
- if(currentRequest.second instanceof AsyncProcedure<?>) {\r
- if(currentRequest.first instanceof Read) {\r
- Read req = (Read)currentRequest.first;\r
- graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);\r
- } else {\r
- AsyncRead req = (AsyncRead)currentRequest.first;\r
- graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);\r
- }\r
- } else {\r
- AsyncMultiRead req = (AsyncMultiRead)currentRequest.first;\r
- graph.syncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);\r
- }\r
-\r
- } catch(Throwable t) {\r
-\r
- Logger.defaultLogError(t);\r
-\r
- if(currentRequest.second instanceof AsyncProcedure<?>) {\r
- ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);\r
- } else {\r
- ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);\r
- }\r
-\r
- }\r
-\r
- synchronized (currentRequest.first) {\r
- syncRequests.remove(currentRequest.first);\r
-// System.out.println("notifying " + currentRequest.first);\r
- currentRequest.first.notify();\r
- }\r
-\r
-\r
- } else {\r
-\r
- try{\r
-\r
- if(currentRequest.second instanceof AsyncProcedure<?>) {\r
- if(currentRequest.first instanceof AsyncRead) {\r
- AsyncRead req = (AsyncRead)currentRequest.first;\r
- graph.asyncRequest(req, (AsyncProcedure<?>)currentRequest.second);\r
- } else {\r
- Read req = (Read)currentRequest.first;\r
- graph.asyncRequest(req, (AsyncProcedure<?>)currentRequest.second);\r
- }\r
- } else {\r
- AsyncMultiRead req = (AsyncMultiRead)currentRequest.first;\r
- graph.asyncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);\r
- }\r
-\r
- } catch(Throwable t) {\r
-\r
- Logger.defaultLogError(t);\r
-\r
- if(currentRequest.second instanceof AsyncProcedure<?>) {\r
- ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);\r
- } else {\r
- ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);\r
- }\r
-\r
- }\r
- }\r
-\r
- }\r
-\r
-// System.out.println(MergingGraphRequestProcessor.this + " read completed");\r
-\r
- synchronized (MergingGraphRequestProcessor.this) {\r
- if (requestQueue.isEmpty())\r
- hasAlreadyRequest = false;\r
- else\r
- newTransaction();\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return "MergedRead[" + requestQueue.size() + " requests]";\r
- }\r
-\r
- }\r
-\r
- protected class RunnerWriteGraphRequest extends WriteRequest {\r
-\r
- Pair<Object, Object> currentRequest;\r
- HashMap<String, String> metadata = new HashMap<String, String>();\r
-\r
- @Override\r
- public void perform(WriteGraph graph) throws DatabaseException {\r
-\r
-// System.out.println(MergingGraphRequestProcessor.this + " writes");\r
-\r
- while (true) {\r
-\r
- synchronized (MergingGraphRequestProcessor.this) {\r
-\r
- // Release #synchronize() invokers if necessary.\r
-// if (currentRequest != null && barrierRequests.contains(currentRequest)) {\r
-// synchronized (barrier) {\r
-// barrier.notifyAll();\r
-// }\r
-// }\r
-\r
- if(requestQueue.isEmpty()) {\r
- if (transactionKeepalivePeriod > 0) {\r
- try {\r
- MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);\r
- } catch (InterruptedException e) {\r
- Logger.defaultLogError(e);\r
- }\r
- if (requestQueue.isEmpty())\r
- break;\r
- } else\r
- break;\r
- }\r
-\r
- Object nextRequest = requestQueue.peekFirst().first;\r
- if(nextRequest instanceof AsyncMultiRead || nextRequest instanceof AsyncRead || nextRequest instanceof Read) {\r
- break;\r
- }\r
-\r
- currentRequest = requestQueue.remove(0);\r
- requestSet.remove(currentRequest);\r
-\r
- }\r
-\r
- @SuppressWarnings("unchecked")\r
- Callback<Throwable> callback = (Callback<Throwable>)currentRequest.second;\r
-\r
- if (currentRequest.first.getClass().equals(SyncWriteRequestAdapter.class)) {\r
-\r
- SyncWriteRequestAdapter adapter = (SyncWriteRequestAdapter)currentRequest.first;\r
-\r
- try {\r
-// System.out.println("merg.sync " + adapter);\r
- graph.syncRequest(adapter);\r
- if(callback != null) callback.run(null);\r
- } catch(Throwable t) {\r
- Logger.defaultLogError(t);\r
- if(callback != null) callback.run(t);\r
- }\r
-\r
- adapter.release();\r
-// System.out.println("merg.sync.release " + adapter);\r
-\r
- } else {\r
-\r
- try {\r
- if(currentRequest.first instanceof Write) graph.syncRequest((Write)currentRequest.first); \r
- else if(currentRequest.first instanceof DelayedWrite) graph.syncRequest((DelayedWrite)currentRequest.first); \r
- if(callback != null) callback.run(null);\r
- } catch(Throwable t) {\r
- Logger.defaultLogError(t);\r
- if(callback != null) callback.run(t);\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
-// System.out.println(MergingGraphRequestProcessor.this + " write completed");\r
-\r
- synchronized (MergingGraphRequestProcessor.this) {\r
- if (requestQueue.isEmpty())\r
- hasAlreadyRequest = false;\r
- else\r
- newTransaction();\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
- private void newTransaction() {\r
-\r
- boolean write = false;\r
-\r
- synchronized (MergingGraphRequestProcessor.this) {\r
- assert(!requestQueue.isEmpty());\r
- Object nextRequest = requestQueue.peekFirst().first;\r
- write = (nextRequest instanceof Write || nextRequest instanceof DelayedWrite);\r
- }\r
-\r
- if(write) {\r
- processor.asyncRequest(new RunnerWriteGraphRequest(), null);\r
- } else {\r
- processor.asyncRequest(new MergedRead());\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiProcedure<T> procedure) {\r
-\r
-// System.out.println(this + " asyncRequest(ReadGraphRequest<QueryProcedure4<T>> request, QueryProcedure4<T> procedure)");\r
-\r
- if (requestSet.contains(request))\r
- return;\r
-\r
- Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);\r
- requestQueue.add(pair);\r
- requestSet.add(pair);\r
-\r
- if (!hasAlreadyRequest) {\r
- newTransaction();\r
- hasAlreadyRequest = true;\r
- } else {\r
- notify();\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public synchronized <T> void asyncRequest(AsyncRead<T> request, AsyncProcedure<T> procedure) {\r
-\r
-// System.out.println(this + " asyncRequest(ReadGraphRequest<SingleQueryProcedure4<T>> request, SingleQueryProcedure4<T> procedure) " + this);\r
-\r
- if (requestSet.contains(request))\r
- return;\r
-\r
- Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);\r
- requestQueue.add(pair);\r
- requestSet.add(pair);\r
-\r
- if (!hasAlreadyRequest) {\r
- newTransaction();\r
- hasAlreadyRequest = true;\r
- } else {\r
-// System.out.println("notify " + this);\r
- notify();\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public synchronized void asyncRequest(Write request, Callback<DatabaseException> callback) {\r
-\r
-// System.out.println(this + " asyncRequest(WriteGraphRequest request)");\r
-\r
- if (requestSet.contains(request))\r
- return;\r
-\r
- Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);\r
- requestQueue.add(pair);\r
- requestSet.add(pair);\r
-\r
- if (!hasAlreadyRequest) {\r
-// System.out.println("new transaction");\r
- newTransaction();\r
- hasAlreadyRequest = true;\r
- } else {\r
-// System.out.println("notify");\r
- notify();\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public synchronized void asyncRequest(DelayedWrite request, Callback<DatabaseException> callback) {\r
-\r
-// System.out.println(this + " asyncRequest(WriteGraphRequest request)");\r
-\r
- if (requestSet.contains(request))\r
- return;\r
-\r
- Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);\r
- requestQueue.add(pair);\r
- requestSet.add(pair);\r
-\r
- if (!hasAlreadyRequest) {\r
-// System.out.println("new transaction");\r
- newTransaction();\r
- hasAlreadyRequest = true;\r
- } else {\r
-// System.out.println("notify");\r
- notify();\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public synchronized void asyncRequest(WriteOnly request, Callback<DatabaseException> callback) {\r
-\r
-// System.out.println(this + " asyncRequest(WriteGraphRequest request)");\r
-\r
- if (requestSet.contains(request))\r
- return;\r
-\r
- Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);\r
- requestQueue.add(pair);\r
- requestSet.add(pair);\r
-\r
- if (!hasAlreadyRequest) {\r
-// System.out.println("new transaction");\r
- newTransaction();\r
- hasAlreadyRequest = true;\r
- } else {\r
-// System.out.println("notify");\r
- notify();\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {\r
-\r
- final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);\r
-\r
- // Queue the adapter up for execution.\r
- synchronized (request) {\r
- syncRequests.add(request);\r
- asyncRequest(request, procedure);\r
- if(syncRequests.contains(request)) {\r
- try {\r
- // System.out.println("waiting " + request);\r
- request.wait();\r
- } catch (InterruptedException e) {\r
- throw new Error(e);\r
- }\r
- }\r
- }\r
-\r
- Throwable t = throwable.get();\r
-\r
- if(t != null) {\r
- Logger.defaultLogError(t);\r
- throw new RuntimeException(t.getMessage());\r
- }\r
- \r
- return null;\r
-\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) {\r
-\r
-// System.out.println("syncRequest(ReadGraphRequest<SingleQueryProcedure4<T>> request, SingleQueryProcedure4<T> procedure)");\r
-\r
- final DataContainer<T> result = new DataContainer<T>(null);\r
- final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);\r
-\r
- // Queue the adapter up for execution.\r
- synchronized (request) {\r
-\r
- syncRequests.add(request);\r
- asyncRequest(request, new AsyncProcedure<T>() {\r
-\r
- public void execute(AsyncReadGraph graph, T t) {\r
- synchronized(result) {\r
- result.set(t);\r
- }\r
- procedure.execute(graph, t);\r
- };\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- throwable.set(t);\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return procedure.toString();\r
- }\r
-\r
- });\r
- if(syncRequests.contains(request)) {\r
- try {\r
- // System.out.println("waiting " + request);\r
- request.wait();\r
- } catch (InterruptedException e) {\r
- throw new Error(e);\r
- }\r
- }\r
- }\r
-\r
- Throwable t = throwable.get();\r
-\r
- if(t != null) {\r
- Logger.defaultLogError(t);\r
- throw new RuntimeException(t.getMessage());\r
- }\r
- \r
- return result.get();\r
-\r
- //return result.get();\r
-\r
- }\r
-\r
-\r
- @Override\r
- public void syncRequest(Write request) {\r
-\r
-// System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)");\r
-\r
- SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request);\r
-\r
- asyncRequest(adapter, null);\r
-\r
- adapter.acquire();\r
-\r
- // Throw exception if one occurred.\r
- adapter.throwOrWrapException();\r
-\r
- }\r
-\r
- @Override\r
- public void syncRequest(WriteOnly request) {\r
-\r
-// System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)");\r
-\r
- SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request);\r
-\r
- // Queue the adapter up for execution.\r
- synchronized (adapter) {\r
- asyncRequest(adapter, null);\r
- try {\r
- adapter.wait();\r
- } catch (InterruptedException e) {\r
- throw new Error(e);\r
- }\r
- }\r
-\r
- // Throw exception if one occurred.\r
- adapter.throwOrWrapException();\r
-\r
- }\r
-\r
- @Override\r
- public Session getSession() {\r
- return processor.getSession();\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return "MergingGraphRequestProcessor[" + name + "]@" + System.identityHashCode(this) + " (based on " + processor + ")";\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncRead<T> request) {\r
-\r
- asyncRequest(request, new ProcedureAdapter<T>() {\r
-\r
- @Override\r
- public void exception(Throwable t) {\r
- Logger.defaultLogError(t);\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncRead<T> request, Procedure<T> procedure) {\r
- asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncMultiRead<T> request) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncMultiRead<T> request,\r
- MultiProcedure<T> procedure) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(AsyncRead<T> request) {\r
-\r
- final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);\r
- final DataContainer<T> result = new DataContainer<T>();\r
-\r
- syncRequest(request, new AsyncProcedure<T>() {\r
-\r
- public void execute(AsyncReadGraph graph, T t) {\r
- result.set(t);\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- throwable.set(t);\r
- }\r
-\r
- });\r
-\r
- Throwable t = throwable.get();\r
-\r
- if(t != null) {\r
- Logger.defaultLogError(t);\r
- throw new RuntimeException(t.getMessage());\r
- }\r
-\r
- return result.get();\r
-\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(AsyncRead<T> request,\r
- Procedure<T> procedure) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(AsyncMultiRead<T> request) {\r
-\r
- final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);\r
- final ArrayList<T> result = new ArrayList<T>();\r
-\r
- syncRequest(request, new AsyncMultiProcedureAdapter<T>() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, T t) {\r
- synchronized(result) {\r
- result.add(t);\r
- }\r
- };\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- throwable.set(t);\r
- }\r
-\r
- });\r
-\r
- Throwable t = throwable.get();\r
-\r
- if(t != null) {\r
- Logger.defaultLogError(t);\r
- throw new RuntimeException(t.getMessage());\r
- }\r
-\r
- return result;\r
-\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(AsyncMultiRead<T> request,\r
- MultiProcedure<T> procedure) {\r
- throw new Error("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(Read<T> request) {\r
-\r
- final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);\r
- final DataContainer<T> result = new DataContainer<T>();\r
-\r
-\r
- syncRequest(request, new Procedure<T>() {\r
-\r
- public void execute(T t) {\r
- result.set(t);\r
- }\r
-\r
- @Override\r
- public void exception(Throwable t) {\r
- throwable.set(t);\r
- }\r
-\r
- });\r
-\r
- Throwable t = throwable.get();\r
-\r
- if(t != null) {\r
- throw new Error(t.getMessage());\r
- }\r
-\r
-\r
- return result.get();\r
-\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(Read<T> request,\r
- final AsyncProcedure<T> procedure) {\r
-\r
- final DataContainer<T> result = new DataContainer<T>(null);\r
- final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);\r
-\r
- // Queue the adapter up for execution.\r
- synchronized (request) {\r
-\r
- syncRequests.add(request);\r
- asyncRequest(request, new AsyncProcedure<T>() {\r
-\r
- public void execute(AsyncReadGraph graph, T t) {\r
- synchronized(result) {\r
- result.set(t);\r
- }\r
- procedure.execute(graph, t);\r
- };\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- throwable.set(t);\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return procedure.toString();\r
- }\r
-\r
- });\r
- if(syncRequests.contains(request)) {\r
- try {\r
- // System.out.println("waiting " + request);\r
- request.wait();\r
- } catch (InterruptedException e) {\r
- throw new Error(e);\r
- }\r
- }\r
- }\r
-\r
- Throwable t = throwable.get();\r
-\r
- if(t != null) {\r
- throw new RuntimeException("Unexpected exception in MergingGraphRequestProcessor.syncRequest(Read, AsyncProcedure)", t);\r
- }\r
- \r
- return result.get();\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(Read<T> request) {\r
-\r
- asyncRequest(request, new ProcedureAdapter<T>() {\r
-\r
- @Override\r
- public void exception(Throwable t) {\r
- Logger.defaultLogError(t);\r
- }\r
-\r
- });\r
-\r
- }\r
-\r
- @Override\r
- public synchronized <T> void asyncRequest(Read<T> request,\r
- AsyncProcedure<T> procedure) {\r
-\r
- if (requestSet.contains(request))\r
- return;\r
-\r
- Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);\r
- requestQueue.add(pair);\r
- requestSet.add(pair);\r
-\r
- if (!hasAlreadyRequest) {\r
- newTransaction();\r
- hasAlreadyRequest = true;\r
- } else {\r
- notify();\r
- }\r
-\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(Read<T> request, Procedure<T> procedure) {\r
- return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(final MultiRead<T> request) throws DatabaseException {\r
- assert(request != null);\r
-\r
- final ArrayList<T> result = new ArrayList<T>();\r
- final DataContainer<Throwable> exception = new DataContainer<Throwable>();\r
-\r
- syncRequest(request, new AsyncMultiProcedureAdapter<T>() {\r
-\r
- @Override\r
- public void execute(AsyncReadGraph graph, T t) {\r
- synchronized(result) {\r
- result.add(t);\r
- }\r
- }\r
-\r
- @Override\r
- public void exception(AsyncReadGraph graph, Throwable t) {\r
- exception.set(t);\r
- }\r
-\r
- @Override\r
- public String toString() {\r
- return "syncRequest(MultiRead) -> " + request;\r
- }\r
-\r
- });\r
-\r
- Throwable t = exception.get();\r
- if(t != null) {\r
- if(t instanceof DatabaseException) throw (DatabaseException)t;\r
- else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);\r
- }\r
-\r
- return result;\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiProcedure<T> procedure) {\r
- throw new UnsupportedOperationException("Not implemented");\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {\r
- throw new UnsupportedOperationException("Not implemented");\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(Read<T> request, Procedure<T> procedure) {\r
- asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> request) {\r
- throw new UnsupportedOperationException("Not implemented");\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> request, AsyncMultiProcedure<T> procedure) {\r
- throw new UnsupportedOperationException("Not implemented");\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {\r
- throw new UnsupportedOperationException("Not implemented");\r
- }\r
-\r
- @Override\r
- public void asyncRequest(Write r) {\r
- asyncRequest(r, null);\r
- }\r
-\r
- @Override\r
- public void asyncRequest(DelayedWrite r) {\r
- asyncRequest(r, null);\r
- }\r
-\r
- @Override\r
- public void asyncRequest(WriteOnly r) {\r
- asyncRequest(r, null);\r
- }\r
-\r
- @Override\r
- public <T> T getService(Class<T> api) {\r
- return getSession().getService(api);\r
- }\r
-\r
- @Override\r
- public <T> T peekService(Class<T> api) {\r
- return getSession().peekService(api);\r
- }\r
-\r
- @Override\r
- public boolean hasService(Class<?> api) {\r
- return getSession().hasService(api);\r
- }\r
-\r
- @Override\r
- public <T> void registerService(Class<T> api, T service) {\r
- getSession().registerService(api, service);\r
- }\r
-\r
-// @Override\r
-// public <T> T syncRequest(Read<T> arg0, AsyncListener<T> arg1) {\r
-// throw new UnsupportedOperationException("Not implemented.");\r
-// }\r
-\r
- @Override\r
- public <T> T syncRequest(Read<T> arg0, SyncListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(Read<T> arg0, Listener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(Read<T> arg0, SyncProcedure<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(AsyncRead<T> arg0, AsyncListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(AsyncRead<T> arg0, SyncListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(AsyncRead<T> arg0, Listener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(AsyncRead<T> arg0, SyncProcedure<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(Read<T> arg0, AsyncListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(Read<T> arg0, SyncListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(Read<T> arg0, Listener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(Read<T> arg0, SyncProcedure<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncRead<T> arg0, AsyncListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncRead<T> arg0, SyncListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncRead<T> arg0, Listener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncRead<T> arg0, SyncProcedure<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(MultiRead<T> arg0, MultiListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0,\r
- AsyncMultiListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0, SyncMultiListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0,\r
- MultiListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0, SyncMultiProcedure<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> arg0, MultiListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncMultiRead<T> arg0,\r
- AsyncMultiListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncMultiRead<T> arg0, SyncMultiListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncMultiRead<T> arg0,\r
- MultiListener<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
-\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(AsyncMultiRead<T> arg0, SyncMultiProcedure<T> arg1) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(ExternalRead<T> request, Procedure<T> procedure) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(ExternalRead<T> request) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(ExternalRead<T> request, Listener<T> procedure) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(ExternalRead<T> request, Procedure<T> procedure) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(ExternalRead<T> request) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(ExternalRead<T> request, Listener<T> procedure) {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public void syncRequest(DelayedWrite request) throws DatabaseException {\r
- throw new UnsupportedOperationException("Not implemented.");\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(DelayedWriteResult<T> request)\r
- throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> T syncRequest(WriteOnlyResult<T> r) throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(WriteResult<T> r, Procedure<T> procedure) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(DelayedWriteResult<T> r, Procedure<T> procedure) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> void asyncRequest(WriteOnlyResult<T> r, Procedure<T> procedure) {\r
- throw new UnsupportedOperationException();\r
- }\r
- \r
- @Override\r
- public Resource getRootLibrary() {\r
- return processor.getRootLibrary();\r
- }\r
-\r
- @Override\r
- public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> void async(ReadInterface<T> r, Listener<T> procedure) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {\r
- throw new UnsupportedOperationException();\r
- }\r
- \r
- @Override\r
- public <T> T sync(ReadInterface<T> r) throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
- \r
- @Override\r
- public <T> T sync(WriteInterface<T> r) throws DatabaseException {\r
- throw new UnsupportedOperationException();\r
- }\r
- \r
- @Override\r
- public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {\r
- throw new UnsupportedOperationException();\r
- }\r
-\r
- @Override\r
- public <T> void async(WriteInterface<T> r) {\r
- throw new UnsupportedOperationException();\r
- }\r
- \r
- @Override\r
- public Object getModificationCounter() {\r
- throw new UnsupportedOperationException();\r
- }\r
- \r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package org.simantics.db.common.processor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.function.Consumer;
+
+import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.AsyncRequestProcessor;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.RequestProcessor;
+import org.simantics.db.Resource;
+import org.simantics.db.Session;
+import org.simantics.db.WriteGraph;
+import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
+import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
+import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
+import org.simantics.db.common.request.ReadRequest;
+import org.simantics.db.common.request.WriteRequest;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.CancelTransactionException;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.procedure.AsyncListener;
+import org.simantics.db.procedure.AsyncMultiListener;
+import org.simantics.db.procedure.AsyncMultiProcedure;
+import org.simantics.db.procedure.AsyncProcedure;
+import org.simantics.db.procedure.Listener;
+import org.simantics.db.procedure.MultiListener;
+import org.simantics.db.procedure.MultiProcedure;
+import org.simantics.db.procedure.Procedure;
+import org.simantics.db.procedure.SyncListener;
+import org.simantics.db.procedure.SyncMultiListener;
+import org.simantics.db.procedure.SyncMultiProcedure;
+import org.simantics.db.procedure.SyncProcedure;
+import org.simantics.db.request.AsyncMultiRead;
+import org.simantics.db.request.AsyncRead;
+import org.simantics.db.request.DelayedWrite;
+import org.simantics.db.request.DelayedWriteResult;
+import org.simantics.db.request.ExternalRead;
+import org.simantics.db.request.MultiRead;
+import org.simantics.db.request.Read;
+import org.simantics.db.request.ReadInterface;
+import org.simantics.db.request.Write;
+import org.simantics.db.request.WriteInterface;
+import org.simantics.db.request.WriteOnly;
+import org.simantics.db.request.WriteOnlyResult;
+import org.simantics.db.request.WriteResult;
+import org.simantics.utils.DataContainer;
+import org.simantics.utils.datastructures.Pair;
+
+public class MergingGraphRequestProcessor implements RequestProcessor {
+
+ private static class SyncWriteRequestAdapter implements Write {
+
+ private Semaphore semaphore = new Semaphore(0);
+ private Object request;
+ private Throwable exception;
+ SyncWriteRequestAdapter(Write r) {
+ this.request = r;
+ }
+ SyncWriteRequestAdapter(WriteOnly r) {
+ this.request = r;
+ }
+// @Override
+// public GraphRequestStatus perform(Graph g) throws Exception {
+// return perform((ReadGraph)g);
+// }
+ @Override
+ public void perform(WriteGraph g) throws DatabaseException, CancelTransactionException {
+ if(request instanceof Write) {
+ ((Write)request).perform(g);
+ } else if(request instanceof DelayedWrite) {
+ ((DelayedWrite)request).perform(g);
+ } else {
+ ((WriteOnly)request).perform(g);
+ }
+ }
+// @Override
+// public String getId() {
+// if(request instanceof WriteGraphRequest) {
+// return ((WriteGraphRequest)request).getId();
+// } else {
+// return null;
+// }
+// }
+// @Override
+// public void requestCompleted(GraphRequestStatus status) {
+// if(request instanceof WriteGraphRequest) {
+// ((WriteGraphRequest)request).requestCompleted(status);
+// } else {
+// }
+// }
+// @Override
+// public void handleException(Throwable e) {
+// this.exception = e;
+// if(request instanceof WriteGraphRequest) {
+// ((WriteGraphRequest)request).handleException(e);
+// }
+// }
+
+ public void throwOrWrapException() {
+ if (exception == null)
+ return;
+ if (exception instanceof RuntimeException)
+ throw (RuntimeException) exception;
+ if (exception instanceof Error)
+ throw (Error) exception;
+ throw new RuntimeException("See cause for the real exception.", exception);
+ }
+
+ public void acquire() {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ Logger.defaultLogError(e);
+ }
+ }
+
+ public void release() {
+ semaphore.release();
+ }
+
+ @Override
+ public String toString() {
+ return "SyncWriteRequestAdapter " + request;
+ }
+
+ }
+
+ long transactionKeepalivePeriod;
+
+ /**
+ * Synchronization object for implementing {@link #synchronize()}.
+ * {@link Object#notifyAll()} is invoked for this lock object every time a
+ * single transaction is completed, thereby releasing all waiters in
+ * {@link #synchronize()}.
+ */
+ Object barrier = new Object();
+
+ Set<Pair<Object, Object>> requestSet = new HashSet<Pair<Object, Object>>();
+ LinkedList<Pair<Object, Object>> requestQueue = new LinkedList<Pair<Object, Object>>();
+ boolean hasAlreadyRequest = false;
+
+ /**
+ * A set of requests which {@link #synchronize()} is depending on at the
+ * moment. Every time a request within this set is completed, some thread in
+ * {@link #synchronize()} should be released.
+ */
+// Set<Object> barrierRequests = new HashSet<Object>();
+ Set<Object> syncRequests = new HashSet<Object>();
+
+ private String name;
+
+ private AsyncRequestProcessor processor;
+
+ public MergingGraphRequestProcessor(String name, AsyncRequestProcessor processor, long transactionKeepalivePeriod) {
+ this.name = name;
+ this.processor = processor;
+ this.transactionKeepalivePeriod = transactionKeepalivePeriod;
+ }
+
+ public MergingGraphRequestProcessor(AsyncRequestProcessor processor, long transactionKeepalivePeriod) {
+ this.name = "MergingGraphRequestProcessor" + UUID.randomUUID().toString();
+ this.processor = processor;
+ this.transactionKeepalivePeriod = transactionKeepalivePeriod;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ protected class MergedRead extends ReadRequest {
+
+ Pair<Object, Object> currentRequest;
+
+// RunnerReadGraphRequest(GraphRequestProcessor processor) {
+// super(processor);
+// }
+//
+// @Override
+// public void completed(boolean value) {
+//// System.out.println(this + "MGRP read completed");
+//// synchronized (MergingGraphRequestProcessor.this) {
+//// if (requestQueue.isEmpty())
+//// hasAlreadyRequest = false;
+//// else
+//// newTransaction();
+//// }
+// }
+
+ @Override
+ public void run(ReadGraph graph) {
+
+// System.out.println(MergingGraphRequestProcessor.this + " reads");
+
+ while (true) {
+
+ synchronized (MergingGraphRequestProcessor.this) {
+
+ // Release #synchronize() invokers if necessary.
+// if (currentRequest != null && barrierRequests.contains(currentRequest)) {
+// synchronized (barrier) {
+// barrier.notifyAll();
+// }
+// }
+
+ if(requestQueue.isEmpty()) {
+ if (transactionKeepalivePeriod > 0) {
+// System.out.println("MGRP [" + MergingGraphRequestProcessor.this + "] waits " + transactionKeepalivePeriod + " ms. in " + Thread.currentThread() );
+ try {
+ MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);
+ } catch (InterruptedException e) {
+ Logger.defaultLogError(e);
+ }
+ if (requestQueue.isEmpty())
+ break;
+ } else
+ break;
+ }
+
+ Object nextRequest = requestQueue.peekFirst().first;
+ if(nextRequest instanceof Write || nextRequest instanceof DelayedWrite) {
+ break;
+ }
+
+ currentRequest = requestQueue.remove(0);
+ requestSet.remove(currentRequest);
+
+ }
+
+// ReadGraphRequest req = (ReadGraphRequest)currentRequest.first;
+
+ if( syncRequests.contains(currentRequest.first)) {
+
+ try {
+
+ if(currentRequest.second instanceof AsyncProcedure<?>) {
+ if(currentRequest.first instanceof Read) {
+ Read req = (Read)currentRequest.first;
+ graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
+ } else {
+ AsyncRead req = (AsyncRead)currentRequest.first;
+ graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
+ }
+ } else {
+ AsyncMultiRead req = (AsyncMultiRead)currentRequest.first;
+ graph.syncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);
+ }
+
+ } catch(Throwable t) {
+
+ Logger.defaultLogError(t);
+
+ if(currentRequest.second instanceof AsyncProcedure<?>) {
+ ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
+ } else {
+ ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
+ }
+
+ }
+
+ synchronized (currentRequest.first) {
+ syncRequests.remove(currentRequest.first);
+// System.out.println("notifying " + currentRequest.first);
+ currentRequest.first.notify();
+ }
+
+
+ } else {
+
+ try{
+
+ if(currentRequest.second instanceof AsyncProcedure<?>) {
+ if(currentRequest.first instanceof AsyncRead) {
+ AsyncRead req = (AsyncRead)currentRequest.first;
+ graph.asyncRequest(req, (AsyncProcedure<?>)currentRequest.second);
+ } else {
+ Read req = (Read)currentRequest.first;
+ graph.asyncRequest(req, (AsyncProcedure<?>)currentRequest.second);
+ }
+ } else {
+ AsyncMultiRead req = (AsyncMultiRead)currentRequest.first;
+ graph.asyncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);
+ }
+
+ } catch(Throwable t) {
+
+ Logger.defaultLogError(t);
+
+ if(currentRequest.second instanceof AsyncProcedure<?>) {
+ ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
+ } else {
+ ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
+ }
+
+ }
+ }
+
+ }
+
+// System.out.println(MergingGraphRequestProcessor.this + " read completed");
+
+ synchronized (MergingGraphRequestProcessor.this) {
+ if (requestQueue.isEmpty())
+ hasAlreadyRequest = false;
+ else
+ newTransaction();
+ }
+
+ }
+
+ @Override
+ public String toString() {
+ return "MergedRead[" + requestQueue.size() + " requests]";
+ }
+
+ }
+
+ protected class RunnerWriteGraphRequest extends WriteRequest {
+
+ Pair<Object, Object> currentRequest;
+ HashMap<String, String> metadata = new HashMap<String, String>();
+
+ @Override
+ public void perform(WriteGraph graph) throws DatabaseException {
+
+// System.out.println(MergingGraphRequestProcessor.this + " writes");
+
+ while (true) {
+
+ synchronized (MergingGraphRequestProcessor.this) {
+
+ // Release #synchronize() invokers if necessary.
+// if (currentRequest != null && barrierRequests.contains(currentRequest)) {
+// synchronized (barrier) {
+// barrier.notifyAll();
+// }
+// }
+
+ if(requestQueue.isEmpty()) {
+ if (transactionKeepalivePeriod > 0) {
+ try {
+ MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);
+ } catch (InterruptedException e) {
+ Logger.defaultLogError(e);
+ }
+ if (requestQueue.isEmpty())
+ break;
+ } else
+ break;
+ }
+
+ Object nextRequest = requestQueue.peekFirst().first;
+ if(nextRequest instanceof AsyncMultiRead || nextRequest instanceof AsyncRead || nextRequest instanceof Read) {
+ break;
+ }
+
+ currentRequest = requestQueue.remove(0);
+ requestSet.remove(currentRequest);
+
+ }
+
+ @SuppressWarnings("unchecked")
+ Consumer<Throwable> callback = (Consumer<Throwable>)currentRequest.second;
+
+ if (currentRequest.first.getClass().equals(SyncWriteRequestAdapter.class)) {
+
+ SyncWriteRequestAdapter adapter = (SyncWriteRequestAdapter)currentRequest.first;
+
+ try {
+// System.out.println("merg.sync " + adapter);
+ graph.syncRequest(adapter);
+ if(callback != null) callback.accept(null);
+ } catch(Throwable t) {
+ Logger.defaultLogError(t);
+ if(callback != null) callback.accept(t);
+ }
+
+ adapter.release();
+// System.out.println("merg.sync.release " + adapter);
+
+ } else {
+
+ try {
+ if(currentRequest.first instanceof Write) graph.syncRequest((Write)currentRequest.first);
+ else if(currentRequest.first instanceof DelayedWrite) graph.syncRequest((DelayedWrite)currentRequest.first);
+ if(callback != null) callback.accept(null);
+ } catch(Throwable t) {
+ Logger.defaultLogError(t);
+ if(callback != null) callback.accept(t);
+ }
+
+ }
+
+ }
+
+// System.out.println(MergingGraphRequestProcessor.this + " write completed");
+
+ synchronized (MergingGraphRequestProcessor.this) {
+ if (requestQueue.isEmpty())
+ hasAlreadyRequest = false;
+ else
+ newTransaction();
+ }
+
+ }
+
+ }
+
+ private void newTransaction() {
+
+ boolean write = false;
+
+ synchronized (MergingGraphRequestProcessor.this) {
+ assert(!requestQueue.isEmpty());
+ Object nextRequest = requestQueue.peekFirst().first;
+ write = (nextRequest instanceof Write || nextRequest instanceof DelayedWrite);
+ }
+
+ if(write) {
+ processor.asyncRequest(new RunnerWriteGraphRequest(), null);
+ } else {
+ processor.asyncRequest(new MergedRead());
+ }
+
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiProcedure<T> procedure) {
+
+// System.out.println(this + " asyncRequest(ReadGraphRequest<QueryProcedure4<T>> request, QueryProcedure4<T> procedure)");
+
+ if (requestSet.contains(request))
+ return;
+
+ Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
+ requestQueue.add(pair);
+ requestSet.add(pair);
+
+ if (!hasAlreadyRequest) {
+ newTransaction();
+ hasAlreadyRequest = true;
+ } else {
+ notify();
+ }
+
+ }
+
+ @Override
+ public synchronized <T> void asyncRequest(AsyncRead<T> request, AsyncProcedure<T> procedure) {
+
+// System.out.println(this + " asyncRequest(ReadGraphRequest<SingleQueryProcedure4<T>> request, SingleQueryProcedure4<T> procedure) " + this);
+
+ if (requestSet.contains(request))
+ return;
+
+ Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
+ requestQueue.add(pair);
+ requestSet.add(pair);
+
+ if (!hasAlreadyRequest) {
+ newTransaction();
+ hasAlreadyRequest = true;
+ } else {
+// System.out.println("notify " + this);
+ notify();
+ }
+
+ }
+
+ @Override
+ public synchronized void asyncRequest(Write request, Consumer<DatabaseException> callback) {
+
+// System.out.println(this + " asyncRequest(WriteGraphRequest request)");
+
+ if (requestSet.contains(request))
+ return;
+
+ Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
+ requestQueue.add(pair);
+ requestSet.add(pair);
+
+ if (!hasAlreadyRequest) {
+// System.out.println("new transaction");
+ newTransaction();
+ hasAlreadyRequest = true;
+ } else {
+// System.out.println("notify");
+ notify();
+ }
+
+ }
+
+ @Override
+ public synchronized void asyncRequest(DelayedWrite request, Consumer<DatabaseException> callback) {
+
+// System.out.println(this + " asyncRequest(WriteGraphRequest request)");
+
+ if (requestSet.contains(request))
+ return;
+
+ Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
+ requestQueue.add(pair);
+ requestSet.add(pair);
+
+ if (!hasAlreadyRequest) {
+// System.out.println("new transaction");
+ newTransaction();
+ hasAlreadyRequest = true;
+ } else {
+// System.out.println("notify");
+ notify();
+ }
+
+ }
+
+ @Override
+ public synchronized void asyncRequest(WriteOnly request, Consumer<DatabaseException> callback) {
+
+// System.out.println(this + " asyncRequest(WriteGraphRequest request)");
+
+ if (requestSet.contains(request))
+ return;
+
+ Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
+ requestQueue.add(pair);
+ requestSet.add(pair);
+
+ if (!hasAlreadyRequest) {
+// System.out.println("new transaction");
+ newTransaction();
+ hasAlreadyRequest = true;
+ } else {
+// System.out.println("notify");
+ notify();
+ }
+
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
+
+ final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
+
+ // Queue the adapter up for execution.
+ synchronized (request) {
+ syncRequests.add(request);
+ asyncRequest(request, procedure);
+ if(syncRequests.contains(request)) {
+ try {
+ // System.out.println("waiting " + request);
+ request.wait();
+ } catch (InterruptedException e) {
+ throw new Error(e);
+ }
+ }
+ }
+
+ Throwable t = throwable.get();
+
+ if(t != null) {
+ Logger.defaultLogError(t);
+ throw new RuntimeException(t.getMessage());
+ }
+
+ return null;
+
+ }
+
+ @Override
+ public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) {
+
+// System.out.println("syncRequest(ReadGraphRequest<SingleQueryProcedure4<T>> request, SingleQueryProcedure4<T> procedure)");
+
+ final DataContainer<T> result = new DataContainer<T>(null);
+ final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
+
+ // Queue the adapter up for execution.
+ synchronized (request) {
+
+ syncRequests.add(request);
+ asyncRequest(request, new AsyncProcedure<T>() {
+
+ public void execute(AsyncReadGraph graph, T t) {
+ synchronized(result) {
+ result.set(t);
+ }
+ procedure.execute(graph, t);
+ };
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ throwable.set(t);
+ }
+
+ @Override
+ public String toString() {
+ return procedure.toString();
+ }
+
+ });
+ if(syncRequests.contains(request)) {
+ try {
+ // System.out.println("waiting " + request);
+ request.wait();
+ } catch (InterruptedException e) {
+ throw new Error(e);
+ }
+ }
+ }
+
+ Throwable t = throwable.get();
+
+ if(t != null) {
+ Logger.defaultLogError(t);
+ throw new RuntimeException(t.getMessage());
+ }
+
+ return result.get();
+
+ //return result.get();
+
+ }
+
+
+ @Override
+ public void syncRequest(Write request) {
+
+// System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)");
+
+ SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request);
+
+ asyncRequest(adapter, null);
+
+ adapter.acquire();
+
+ // Throw exception if one occurred.
+ adapter.throwOrWrapException();
+
+ }
+
+ @Override
+ public void syncRequest(WriteOnly request) {
+
+// System.out.println(MergingGraphRequestProcessor.this + " syncRequest(WriteGraphRequest)");
+
+ SyncWriteRequestAdapter adapter = new SyncWriteRequestAdapter(request);
+
+ // Queue the adapter up for execution.
+ synchronized (adapter) {
+ asyncRequest(adapter, null);
+ try {
+ adapter.wait();
+ } catch (InterruptedException e) {
+ throw new Error(e);
+ }
+ }
+
+ // Throw exception if one occurred.
+ adapter.throwOrWrapException();
+
+ }
+
+ @Override
+ public Session getSession() {
+ return processor.getSession();
+ }
+
+ @Override
+ public String toString() {
+ return "MergingGraphRequestProcessor[" + name + "]@" + System.identityHashCode(this) + " (based on " + processor + ")";
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncRead<T> request) {
+
+ asyncRequest(request, new ProcedureAdapter<T>() {
+
+ @Override
+ public void exception(Throwable t) {
+ Logger.defaultLogError(t);
+ }
+
+ });
+
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncRead<T> request, Procedure<T> procedure) {
+ asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncMultiRead<T> request) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncMultiRead<T> request,
+ MultiProcedure<T> procedure) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> T syncRequest(AsyncRead<T> request) {
+
+ final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
+ final DataContainer<T> result = new DataContainer<T>();
+
+ syncRequest(request, new AsyncProcedure<T>() {
+
+ public void execute(AsyncReadGraph graph, T t) {
+ result.set(t);
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ throwable.set(t);
+ }
+
+ });
+
+ Throwable t = throwable.get();
+
+ if(t != null) {
+ Logger.defaultLogError(t);
+ throw new RuntimeException(t.getMessage());
+ }
+
+ return result.get();
+
+ }
+
+ @Override
+ public <T> T syncRequest(AsyncRead<T> request,
+ Procedure<T> procedure) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(AsyncMultiRead<T> request) {
+
+ final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
+ final ArrayList<T> result = new ArrayList<T>();
+
+ syncRequest(request, new AsyncMultiProcedureAdapter<T>() {
+
+ @Override
+ public void execute(AsyncReadGraph graph, T t) {
+ synchronized(result) {
+ result.add(t);
+ }
+ };
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ throwable.set(t);
+ }
+
+ });
+
+ Throwable t = throwable.get();
+
+ if(t != null) {
+ Logger.defaultLogError(t);
+ throw new RuntimeException(t.getMessage());
+ }
+
+ return result;
+
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(AsyncMultiRead<T> request,
+ MultiProcedure<T> procedure) {
+ throw new Error("Not implemented.");
+ }
+
+ @Override
+ public <T> T syncRequest(Read<T> request) {
+
+ final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
+ final DataContainer<T> result = new DataContainer<T>();
+
+
+ syncRequest(request, new Procedure<T>() {
+
+ public void execute(T t) {
+ result.set(t);
+ }
+
+ @Override
+ public void exception(Throwable t) {
+ throwable.set(t);
+ }
+
+ });
+
+ Throwable t = throwable.get();
+
+ if(t != null) {
+ throw new Error(t.getMessage());
+ }
+
+
+ return result.get();
+
+ }
+
+ @Override
+ public <T> T syncRequest(Read<T> request,
+ final AsyncProcedure<T> procedure) {
+
+ final DataContainer<T> result = new DataContainer<T>(null);
+ final DataContainer<Throwable> throwable = new DataContainer<Throwable>(null);
+
+ // Queue the adapter up for execution.
+ synchronized (request) {
+
+ syncRequests.add(request);
+ asyncRequest(request, new AsyncProcedure<T>() {
+
+ public void execute(AsyncReadGraph graph, T t) {
+ synchronized(result) {
+ result.set(t);
+ }
+ procedure.execute(graph, t);
+ };
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ throwable.set(t);
+ }
+
+ @Override
+ public String toString() {
+ return procedure.toString();
+ }
+
+ });
+ if(syncRequests.contains(request)) {
+ try {
+ // System.out.println("waiting " + request);
+ request.wait();
+ } catch (InterruptedException e) {
+ throw new Error(e);
+ }
+ }
+ }
+
+ Throwable t = throwable.get();
+
+ if(t != null) {
+ throw new RuntimeException("Unexpected exception in MergingGraphRequestProcessor.syncRequest(Read, AsyncProcedure)", t);
+ }
+
+ return result.get();
+
+ }
+
+ @Override
+ public <T> void asyncRequest(Read<T> request) {
+
+ asyncRequest(request, new ProcedureAdapter<T>() {
+
+ @Override
+ public void exception(Throwable t) {
+ Logger.defaultLogError(t);
+ }
+
+ });
+
+ }
+
+ @Override
+ public synchronized <T> void asyncRequest(Read<T> request,
+ AsyncProcedure<T> procedure) {
+
+ if (requestSet.contains(request))
+ return;
+
+ Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
+ requestQueue.add(pair);
+ requestSet.add(pair);
+
+ if (!hasAlreadyRequest) {
+ newTransaction();
+ hasAlreadyRequest = true;
+ } else {
+ notify();
+ }
+
+ }
+
+ @Override
+ public <T> T syncRequest(Read<T> request, Procedure<T> procedure) {
+ return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(final MultiRead<T> request) throws DatabaseException {
+ assert(request != null);
+
+ final ArrayList<T> result = new ArrayList<T>();
+ final DataContainer<Throwable> exception = new DataContainer<Throwable>();
+
+ syncRequest(request, new AsyncMultiProcedureAdapter<T>() {
+
+ @Override
+ public void execute(AsyncReadGraph graph, T t) {
+ synchronized(result) {
+ result.add(t);
+ }
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable t) {
+ exception.set(t);
+ }
+
+ @Override
+ public String toString() {
+ return "syncRequest(MultiRead) -> " + request;
+ }
+
+ });
+
+ Throwable t = exception.get();
+ if(t != null) {
+ if(t instanceof DatabaseException) throw (DatabaseException)t;
+ else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
+ }
+
+ return result;
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiProcedure<T> procedure) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public <T> void asyncRequest(Read<T> request, Procedure<T> procedure) {
+ asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
+ }
+
+ @Override
+ public <T> void asyncRequest(MultiRead<T> request) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public <T> void asyncRequest(MultiRead<T> request, AsyncMultiProcedure<T> procedure) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void asyncRequest(Write r) {
+ asyncRequest(r, null);
+ }
+
+ @Override
+ public void asyncRequest(DelayedWrite r) {
+ asyncRequest(r, null);
+ }
+
+ @Override
+ public void asyncRequest(WriteOnly r) {
+ asyncRequest(r, null);
+ }
+
+ @Override
+ public <T> T getService(Class<T> api) {
+ return getSession().getService(api);
+ }
+
+ @Override
+ public <T> T peekService(Class<T> api) {
+ return getSession().peekService(api);
+ }
+
+ @Override
+ public boolean hasService(Class<?> api) {
+ return getSession().hasService(api);
+ }
+
+ @Override
+ public <T> void registerService(Class<T> api, T service) {
+ getSession().registerService(api, service);
+ }
+
+// @Override
+// public <T> T syncRequest(Read<T> arg0, AsyncListener<T> arg1) {
+// throw new UnsupportedOperationException("Not implemented.");
+// }
+
+ @Override
+ public <T> T syncRequest(Read<T> arg0, SyncListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> T syncRequest(Read<T> arg0, Listener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> T syncRequest(Read<T> arg0, SyncProcedure<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> T syncRequest(AsyncRead<T> arg0, AsyncListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> T syncRequest(AsyncRead<T> arg0, SyncListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> T syncRequest(AsyncRead<T> arg0, Listener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> T syncRequest(AsyncRead<T> arg0, SyncProcedure<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(Read<T> arg0, AsyncListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(Read<T> arg0, SyncListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(Read<T> arg0, Listener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(Read<T> arg0, SyncProcedure<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncRead<T> arg0, AsyncListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncRead<T> arg0, SyncListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncRead<T> arg0, Listener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncRead<T> arg0, SyncProcedure<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(MultiRead<T> arg0, MultiListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0,
+ AsyncMultiListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0, SyncMultiListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0,
+ MultiListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> Collection<T> syncRequest(AsyncMultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(MultiRead<T> arg0, MultiListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncMultiRead<T> arg0,
+ AsyncMultiListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncMultiRead<T> arg0, SyncMultiListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncMultiRead<T> arg0,
+ MultiListener<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+
+ }
+
+ @Override
+ public <T> void asyncRequest(AsyncMultiRead<T> arg0, SyncMultiProcedure<T> arg1) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> void asyncRequest(ExternalRead<T> request, Procedure<T> procedure) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> T syncRequest(ExternalRead<T> request) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> T syncRequest(ExternalRead<T> request, Listener<T> procedure) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> T syncRequest(ExternalRead<T> request, Procedure<T> procedure) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> void asyncRequest(ExternalRead<T> request) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> void asyncRequest(ExternalRead<T> request, Listener<T> procedure) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public void syncRequest(DelayedWrite request) throws DatabaseException {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T syncRequest(DelayedWriteResult<T> request)
+ throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T syncRequest(WriteOnlyResult<T> r) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void asyncRequest(WriteResult<T> r, Procedure<T> procedure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void asyncRequest(DelayedWriteResult<T> r, Procedure<T> procedure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void asyncRequest(WriteOnlyResult<T> r, Procedure<T> procedure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Resource getRootLibrary() {
+ return processor.getRootLibrary();
+ }
+
+ @Override
+ public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T sync(ReadInterface<T> r) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T sync(WriteInterface<T> r) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void async(WriteInterface<T> r) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object getModificationCounter() {
+ throw new UnsupportedOperationException();
+ }
+
+}