--- /dev/null
+package org.simantics.databoard.serialization;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.simantics.databoard.binding.Binding;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Jani Simomaa
+ */
+public abstract class ConcurrentSerializerFactory implements SerializerScheme {
+
+ private Logger LOGGER = LoggerFactory.getLogger(getClass());
+
+ /**
+ * Repository where serializers are placed.
+ */
+ private ConcurrentHashMap<Binding, CompletableFuture<Serializer>> repository = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<Binding, Serializer> inProgress = new ConcurrentHashMap<>();
+
+ /**
+ * Constructs a serilizer for a binding. Implement this.
+ * It should use the inprogress -map for construction of
+ * serializers that have component types.
+ *
+ * e.g.
+ * inprogress.put(binding, notCompletelyConstructedSerializer);
+ * Serializer componentSerializer = construct( componentBinding );
+ * notCompletelyConstructedSerializer.setComponent( componentSerializer );
+ * inprogress.remove(binding);
+ *
+ * try-finally is not needed.
+ *
+ * @param request
+ * @return
+ * @throws SerializerConstructionException
+ */
+ protected abstract Serializer doConstruct(Binding request) throws SerializerConstructionException;
+
+ protected Serializer construct(Binding request) throws SerializerConstructionException {
+ if (request.cachedSerializer() != null) {
+ return request.cachedSerializer();
+ } else {
+ // this is required cause construct can be called from subclasses
+ repository.putIfAbsent(request, new CompletableFuture<>());
+ // lets see if we are processing this already
+ Serializer binding = inProgress.get(request);
+ if (binding == null) {
+ binding = doConstruct(request);
+ request.cacheSerializer(binding);
+ CompletableFuture<Serializer> completableFuture = repository.get(request);
+ completableFuture.complete(binding); // this releases the waiters
+ }
+ return binding;
+ }
+ }
+
+ protected void addInProgress(Binding request, Serializer serializer) {
+ inProgress.put(request, serializer);
+ }
+
+ protected void finishInProgress(Binding request) {
+ Serializer remove = inProgress.remove(request);
+ request.cacheSerializer(remove);
+ CompletableFuture<Serializer> existing = repository.get(request);
+ if (existing != null) {
+ existing.complete(remove);
+ } else {
+ LOGGER.warn("Finishing binding request {} for serializer {} without CompletableFuture!", request, remove);
+ }
+ }
+
+ @Override
+ public Serializer getSerializer(Binding binding) throws SerializerConstructionException {
+ if (binding.cachedSerializer() != null)
+ return binding.cachedSerializer();
+ // this shouldn't be called that many time as serializers are cached on bindings
+ AtomicBoolean shouldConstruct = new AtomicBoolean(false);
+ CompletableFuture<Serializer> completableFuture = repository.computeIfAbsent(binding, t -> {
+ shouldConstruct.set(true);
+ return new CompletableFuture<>();
+ });
+ if (shouldConstruct.get()) {
+ // construct should complete the completable future and release the waiters
+ construct(binding);
+ }
+ // we should be ready at this point
+ try {
+ return completableFuture.get();
+ } catch (Exception e) {
+ throw new SerializerConstructionException(e);
+ }
+ }
+
+ public Serializer getSerializerUnchecked(Binding binding) throws RuntimeSerializerConstructionException {
+ try {
+ return getSerializer(binding);
+ } catch (SerializerConstructionException e) {
+ throw new RuntimeSerializerConstructionException(e);
+ }
+ }
+
+}