]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.databoard/src/org/simantics/databoard/serialization/ConcurrentSerializerFactory.java
Initial support for concurrency in databoard, bindings and serializers
[simantics/platform.git] / bundles / org.simantics.databoard / src / org / simantics / databoard / serialization / ConcurrentSerializerFactory.java
diff --git a/bundles/org.simantics.databoard/src/org/simantics/databoard/serialization/ConcurrentSerializerFactory.java b/bundles/org.simantics.databoard/src/org/simantics/databoard/serialization/ConcurrentSerializerFactory.java
new file mode 100644 (file)
index 0000000..463b5c2
--- /dev/null
@@ -0,0 +1,106 @@
+package org.simantics.databoard.serialization;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.simantics.databoard.binding.Binding;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Jani Simomaa
+ */
+public abstract class ConcurrentSerializerFactory implements SerializerScheme {
+
+    private Logger LOGGER = LoggerFactory.getLogger(getClass());
+
+    /**
+     * Repository where serializers are placed. 
+     */
+    private ConcurrentHashMap<Binding, CompletableFuture<Serializer>> repository = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<Binding, Serializer> inProgress = new ConcurrentHashMap<>();
+
+    /**
+     * Constructs a serilizer for a binding. Implement this.
+     * It should use the inprogress -map for construction of 
+     * serializers that have component types.
+     * 
+     *  e.g. 
+     *   inprogress.put(binding, notCompletelyConstructedSerializer);
+     *   Serializer componentSerializer = construct( componentBinding );
+     *   notCompletelyConstructedSerializer.setComponent( componentSerializer );
+     *   inprogress.remove(binding);
+     *   
+     * try-finally is not needed.
+     * 
+     * @param request
+     * @return
+     * @throws SerializerConstructionException
+     */
+    protected abstract Serializer doConstruct(Binding request) throws SerializerConstructionException;
+
+    protected Serializer construct(Binding request) throws SerializerConstructionException {
+        if (request.cachedSerializer() != null) {
+            return request.cachedSerializer();
+        } else {
+            // this is required cause construct can be called from subclasses
+            repository.putIfAbsent(request, new CompletableFuture<>());
+            // lets see if we are processing this already
+            Serializer binding = inProgress.get(request);
+            if (binding == null) {
+                binding = doConstruct(request);
+                request.cacheSerializer(binding);
+                CompletableFuture<Serializer> completableFuture = repository.get(request);
+                completableFuture.complete(binding); // this releases the waiters
+            }
+            return binding;
+        }
+    }
+
+    protected void addInProgress(Binding request, Serializer serializer) {
+        inProgress.put(request, serializer);
+    }
+    
+    protected void finishInProgress(Binding request) {
+        Serializer remove = inProgress.remove(request);
+        request.cacheSerializer(remove);
+        CompletableFuture<Serializer> existing = repository.get(request);
+        if (existing != null) {
+            existing.complete(remove);
+        } else {
+            LOGGER.warn("Finishing binding request {} for serializer {} without CompletableFuture!", request, remove);
+        }
+    }
+
+    @Override
+    public Serializer getSerializer(Binding binding) throws SerializerConstructionException {
+        if (binding.cachedSerializer() != null)
+            return binding.cachedSerializer();
+        // this shouldn't be called that many time as serializers are cached on bindings 
+        AtomicBoolean shouldConstruct = new AtomicBoolean(false);
+        CompletableFuture<Serializer> completableFuture = repository.computeIfAbsent(binding, t -> {
+            shouldConstruct.set(true);
+            return new CompletableFuture<>();
+        });
+        if (shouldConstruct.get()) {
+            // construct should complete the completable future and release the waiters
+            construct(binding);
+        }
+        // we should be ready at this point
+        try {
+            return completableFuture.get();
+        } catch (Exception e) {
+            throw new SerializerConstructionException(e);
+        }
+    }
+
+    public Serializer getSerializerUnchecked(Binding binding) throws RuntimeSerializerConstructionException {
+        try {
+            return getSerializer(binding);
+        } catch (SerializerConstructionException e) {
+            throw new RuntimeSerializerConstructionException(e);
+        }
+    }
+    
+}