--- /dev/null
+package org.simantics.db.common.service;\r
+\r
+import java.util.concurrent.Semaphore;\r
+import java.util.concurrent.TimeUnit;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
+\r
+import org.simantics.db.service.ServiceActivityMonitor;\r
+\r
+import gnu.trove.map.TObjectIntMap;\r
+import gnu.trove.map.hash.TObjectIntHashMap;\r
+\r
+public class ServiceActivityMonitorImpl implements ServiceActivityMonitor {\r
+\r
+ public static final long REPORTING_PERIOD = 5L;\r
+ \r
+ Semaphore sem = new Semaphore(1);\r
+ AtomicInteger ref = new AtomicInteger();\r
+ TObjectIntMap<Object> ids = new TObjectIntHashMap<>();\r
+\r
+ @Override\r
+ public void registerActivity(Object id) {\r
+ if (ref.getAndIncrement() == 0)\r
+ sem.acquireUninterruptibly(1);\r
+ synchronized (ids) {\r
+ ids.adjustOrPutValue(id, 1, 1);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void unregisterActivity(Object id) {\r
+ synchronized (ids) {\r
+ int refs = ids.get(id);\r
+ if (refs == 1) {\r
+ ids.remove(id);\r
+ } else {\r
+ ids.put(id, refs-1);\r
+ }\r
+ }\r
+ if (ref.decrementAndGet() == 0)\r
+ sem.release();\r
+ }\r
+\r
+ @Override\r
+ public void waitForCompletion() throws InterruptedException {\r
+ while(true) {\r
+ if(waitForCompletion(REPORTING_PERIOD, TimeUnit.SECONDS))\r
+ return;\r
+ synchronized (ids) {\r
+ System.err.println("ServiceActivityMonitorImpl.waitForCompletion: " + ids);\r
+ } \r
+ }\r
+ }\r
+\r
+ @Override\r
+ public boolean waitForCompletion(long timeout, TimeUnit unit) throws InterruptedException {\r
+ if(sem.tryAcquire(timeout, unit)) {\r
+ sem.release();\r
+ return true;\r
+ }\r
+ else\r
+ return false;\r
+ }\r
+\r
+}
\ No newline at end of file