package org.simantics.databoard.serialization; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import org.simantics.databoard.binding.Binding; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Jani Simomaa */ public abstract class ConcurrentSerializerFactory implements SerializerScheme { private Logger LOGGER = LoggerFactory.getLogger(getClass()); /** * Repository where serializers are placed. */ private ConcurrentHashMap> repository = new ConcurrentHashMap<>(); private ConcurrentHashMap inProgress = new ConcurrentHashMap<>(); /** * Constructs a serilizer for a binding. Implement this. * It should use the inprogress -map for construction of * serializers that have component types. * * e.g. * inprogress.put(binding, notCompletelyConstructedSerializer); * Serializer componentSerializer = construct( componentBinding ); * notCompletelyConstructedSerializer.setComponent( componentSerializer ); * inprogress.remove(binding); * * try-finally is not needed. * * @param request * @return * @throws SerializerConstructionException */ protected abstract Serializer doConstruct(Binding request) throws SerializerConstructionException; protected Serializer construct(Binding request) throws SerializerConstructionException { if (request.cachedSerializer() != null) { return request.cachedSerializer(); } else { // this is required cause construct can be called from subclasses repository.putIfAbsent(request, new CompletableFuture<>()); // lets see if we are processing this already Serializer binding = inProgress.get(request); if (binding == null) { binding = doConstruct(request); request.cacheSerializer(binding); CompletableFuture completableFuture = repository.get(request); completableFuture.complete(binding); // this releases the waiters } return binding; } } protected void addInProgress(Binding request, Serializer serializer) { inProgress.put(request, serializer); } protected void finishInProgress(Binding request) { Serializer remove = inProgress.remove(request); request.cacheSerializer(remove); CompletableFuture existing = repository.get(request); if (existing != null) { existing.complete(remove); } else { LOGGER.warn("Finishing binding request {} for serializer {} without CompletableFuture!", request, remove); } } @Override public Serializer getSerializer(Binding binding) throws SerializerConstructionException { if (binding.cachedSerializer() != null) return binding.cachedSerializer(); // this shouldn't be called that many time as serializers are cached on bindings AtomicBoolean shouldConstruct = new AtomicBoolean(false); CompletableFuture completableFuture = repository.computeIfAbsent(binding, t -> { shouldConstruct.set(true); return new CompletableFuture<>(); }); if (shouldConstruct.get()) { // construct should complete the completable future and release the waiters construct(binding); } // we should be ready at this point try { return completableFuture.get(); } catch (Exception e) { throw new SerializerConstructionException(e); } } public Serializer getSerializerUnchecked(Binding binding) throws RuntimeSerializerConstructionException { try { return getSerializer(binding); } catch (SerializerConstructionException e) { throw new RuntimeSerializerConstructionException(e); } } }