import org.simantics.db.server.ProCoreException;
import org.simantics.db.service.ClusterSetsSupport;
import org.simantics.db.service.ClusterUID;
+import org.simantics.db.service.EventSupport;
import org.simantics.db.service.LifecycleSupport;
-import org.simantics.utils.DataContainer;
import org.simantics.utils.datastructures.Pair;
import org.simantics.utils.logging.TimeLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import fi.vtt.simantics.procore.internal.EventSupportImpl;
import gnu.trove.map.hash.TLongObjectHashMap;
public class GraphClientImpl2 implements Database.Session {
private static final Logger LOGGER = LoggerFactory.getLogger(GraphClientImpl2.class);
public static final boolean DEBUG = false;
+
+ public static final String CLOSE = "close";
+ public static final String PURGE = "purge";
final ClusterManager clusters;
private ServiceLocator locator;
private FileCache fileCache;
private MainProgram mainProgram;
+ private EventSupportImpl eventSupport;
private static class ClientThreadFactory implements ThreadFactory {
cssi.updateWriteDirectory(clusters.workingDirectory);
mainProgram = new MainProgram(this, clusters);
executor.execute(mainProgram);
+ eventSupport = (EventSupportImpl)locator.getService(EventSupport.class);
+
}
public Path getDbFolder() {
throw new ProCoreException(e1);
}
closed = true;
+ eventSupport.fireEvent(CLOSE, null);
}
//impl.close();
}
this.state = state;
this.semaphore = semaphore;
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "[state=" + state + ", semaphore=" + semaphore + "]";
+ }
}
private class TransactionManager {
return req;
}
+ private long askWriteTransactionCount = 0;
/*
* This method cannot be synchronized since it waits and must support multiple entries
* by query thread(s) and internal transactions such as snapshot saver
TransactionRequest req = queue(TransactionState.WRITE, semaphore);
try {
- semaphore.acquire();
+ while (true) {
+ boolean acquired = semaphore.tryAcquire(1, TimeUnit.MINUTES);
+ if (!acquired) {
+ if (askWriteTransactionCount < 10) {
+ LOGGER.error("Could not acquire semaphore for askWriteTransaction for TransactionRequest {}", req);
+ LOGGER.error("Current clusters.state.headChangeSetId is {}", clusters.state.headChangeSetId);
+ LOGGER.error("Current clusters.state.transactionId is {}", clusters.state.transactionId);
+ LOGGER.error("Current amount of requests is {}", requests.size());
+ if (requests.size() < 100) {
+ LOGGER.error("Current requests {}", requests);
+ }
+ LOGGER.error("Current transaction state is {}", currentTransactionState);
+ askWriteTransactionCount++;
+ }
+ } else {
+ askWriteTransactionCount = 0;
+ break;
+ }
+ }
} catch (InterruptedException e) {
throw new IllegalAcornStateException(e);
}
LOGGER.info("performUndo " + ccsid);
performUndo(ccsid, clusterChanges, support);
} catch (DatabaseException e) {
- e.printStackTrace();
+ LOGGER.error("failed to perform undo for cluster change set {}", ccsid, e);
}
}
}
unexpectedClose = true;
} finally {
try {
- if(tr != null)
+ if(tr != null) {
endTransaction(tr.getTransactionId());
+ eventSupport.fireEvent(PURGE, null);
+ }
if (unexpectedClose) {
LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
try {