--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2012 Association for Decentralized Information Management in\r
+ * Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package org.simantics.db.layer0.util;\r
+\r
+import java.util.concurrent.Semaphore;\r
+import java.util.concurrent.TimeUnit;\r
+\r
+import org.simantics.db.Session;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.procedure.Listener;\r
+import org.simantics.db.request.Read;\r
+\r
+/**\r
+ * A database listener that disposes itself based on a criterion evaluated using\r
+ * the request results received by the listener. If the results pass the\r
+ * criterion, the listener is disposed and the result is either accepted or\r
+ * discarded based on the evaluation.\r
+ * \r
+ * <p>\r
+ * The reason for the existence of this class is that sometimes database\r
+ * requests always perform asynchronously, i.e. return empty results at first\r
+ * and later get updated to their final results. For example, a the results of a\r
+ * web request would take a while to be delivered. This listener makes it a bit\r
+ * easier to deal with these situations when results are needed synchronously.\r
+ * \r
+ * <p>\r
+ * {@link #trySyncRequest(Session, Read, Criterion, long, TimeUnit)} provides a\r
+ * utility for executing a synchronous read request with this listener so that\r
+ * the implementation will wait for a criterion accepted result for the\r
+ * specified amount of time. If an accepted result is reached within the time\r
+ * limit, it will be returned. If the time limit is reached and no accepted\r
+ * result is attained, <code>null</code> will be returned. If the request\r
+ * produces an exception, it will be thrown.\r
+ * \r
+ * @author Tuukka Lehtonen\r
+ * \r
+ * @param <T> database request result type\r
+ * \r
+ * @see EvaluatingListener.Evaluation\r
+ * @see EvaluatingListener.Criterion\r
+ */\r
+public class EvaluatingListener<T> implements Listener<T> {\r
+\r
+ public static enum Evaluation {\r
+ /**\r
+ * Keep on listening to further results.\r
+ */\r
+ IGNORE,\r
+ /**\r
+ * Dispose listener and discard the results.\r
+ */\r
+ DISCARD,\r
+ /**\r
+ * Dispose listener and return the latest result.\r
+ */\r
+ ACCEPT\r
+ }\r
+\r
+ /**\r
+ * An evaluable criterion for the result received by\r
+ * {@link Listener#execute(Object)} to tell whether to accept the result,\r
+ * wait for another result or to consider the listener disposed.\r
+ * \r
+ * @param <T> the type of the result\r
+ */\r
+ public static interface Criterion<T> {\r
+ Evaluation evaluate(T result);\r
+ }\r
+\r
+ /**\r
+ * The criterion the listener evaluates. When it evaluates to\r
+ * {@value Evaluation#DISCARD}, this field is nullified and the listener is\r
+ * considered disposed.\r
+ */\r
+ private volatile Criterion<T> criterion;\r
+ private T result;\r
+ private Throwable exception;\r
+ private Semaphore wait = new Semaphore(0);\r
+\r
+ public EvaluatingListener(Criterion<T> criterion) {\r
+ if (criterion == null)\r
+ throw new NullPointerException("null criterion");\r
+ this.criterion = criterion;\r
+ }\r
+\r
+ /**\r
+ * @param session\r
+ * @param request\r
+ * @param criterion\r
+ * @param timeout\r
+ * @param unit\r
+ * @return\r
+ * @throws InterruptedException \r
+ * @throws DatabaseException \r
+ */\r
+ public static <T> T trySyncRequest(Session session, Read<T> request, Criterion<T> criterion, long timeout, TimeUnit unit) throws InterruptedException, DatabaseException {\r
+ EvaluatingListener<T> l = new EvaluatingListener<T>(criterion);\r
+ session.asyncRequest(request, l);\r
+ l.tryWaitForResult(timeout, unit);\r
+ // Make sure the listener is disposed.\r
+ l.dispose();\r
+ l.throwPossibleException();\r
+ return l.getResult();\r
+ }\r
+\r
+ public T waitForResult() throws InterruptedException {\r
+ wait.acquire();\r
+ return getResult();\r
+ }\r
+\r
+ public boolean tryWaitForResult(long timeout, TimeUnit unit) throws InterruptedException {\r
+ return wait.tryAcquire(timeout, unit);\r
+ }\r
+\r
+ public T getResult() {\r
+ return result;\r
+ }\r
+\r
+ public Throwable getException() {\r
+ return exception;\r
+ }\r
+\r
+ public void throwPossibleException() throws DatabaseException {\r
+ if (exception != null) {\r
+ if (exception instanceof DatabaseException)\r
+ throw (DatabaseException) exception;\r
+ throw new DatabaseException(exception);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void execute(T result) {\r
+ Criterion<T> crit = criterion;\r
+ if (crit == null)\r
+ return;\r
+ EvaluatingListener.Evaluation e = crit.evaluate(result);\r
+ switch (e) {\r
+ case IGNORE:\r
+ ignored(result);\r
+ return;\r
+ case ACCEPT:\r
+ this.result = result;\r
+ try {\r
+ accepted(result);\r
+ } finally {\r
+ dispose();\r
+ wait.release();\r
+ }\r
+ return;\r
+ case DISCARD:\r
+ dispose();\r
+ wait.release();\r
+ return;\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Override to process results that were ignored.\r
+ * \r
+ * @param result\r
+ */\r
+ public void ignored(T result) {\r
+ }\r
+\r
+ /**\r
+ * Override this to immediately process an accepted result in the listener.\r
+ * This method is invoked before the listener is disposed\r
+ * \r
+ * @param result\r
+ */\r
+ public void accepted(T result) {\r
+ }\r
+\r
+ @Override\r
+ public void exception(Throwable t) {\r
+ this.exception = t;\r
+ dispose();\r
+ wait.release();\r
+ }\r
+\r
+ private void dispose() {\r
+ this.criterion = null;\r
+ }\r
+\r
+ @Override\r
+ public boolean isDisposed() {\r
+ return criterion == null;\r
+ }\r
+\r
+}
\ No newline at end of file