package org.simantics.db.common.service; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.simantics.db.service.ServiceActivityMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import gnu.trove.map.TObjectIntMap; import gnu.trove.map.hash.TObjectIntHashMap; public class ServiceActivityMonitorImpl implements ServiceActivityMonitor { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceActivityMonitorImpl.class); public static final long REPORTING_PERIOD = 5L; Semaphore sem = new Semaphore(1); AtomicInteger ref = new AtomicInteger(); TObjectIntMap ids = new TObjectIntHashMap<>(); @Override public void registerActivity(Object id) { if (ref.getAndIncrement() == 0) sem.acquireUninterruptibly(1); synchronized (ids) { ids.adjustOrPutValue(id, 1, 1); } } @Override public void unregisterActivity(Object id) { synchronized (ids) { int refs = ids.get(id); if (refs == 1) { ids.remove(id); } else { ids.put(id, refs-1); } } if (ref.decrementAndGet() == 0) sem.release(); } @Override public void waitForCompletion() throws InterruptedException { while(true) { if(waitForCompletion(REPORTING_PERIOD, TimeUnit.SECONDS)) return; synchronized (ids) { LOGGER.info("waitForCompletion: " + ids); } } } @Override public boolean waitForCompletion(long timeout, TimeUnit unit) throws InterruptedException { if(sem.tryAcquire(timeout, unit)) { sem.release(); return true; } else return false; } }