--- /dev/null
+package org.simantics.document.server;\r
+\r
+import java.util.ArrayList;\r
+import java.util.Collection;\r
+import java.util.Collections;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.TreeMap;\r
+import java.util.concurrent.ScheduledThreadPoolExecutor;\r
+import java.util.concurrent.SynchronousQueue;\r
+import java.util.concurrent.ThreadFactory;\r
+import java.util.concurrent.ThreadPoolExecutor;\r
+import java.util.concurrent.TimeUnit;\r
+\r
+import org.simantics.Simantics;\r
+import org.simantics.db.common.procedure.adapter.TransientCacheListener;\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.procedure.Listener;\r
+import org.simantics.document.server.request.URIDocumentRequest;\r
+import org.simantics.utils.datastructures.Pair;\r
+\r
+import gnu.trove.map.hash.THashMap;\r
+import gnu.trove.set.hash.THashSet;\r
+\r
+public class DocumentHistory {\r
+\r
+ public static final int MIN_REFRESHER_THREADS = 1;\r
+ public static final int MAX_REFRESHER_THREADS = 32;\r
+ public static final long DELAY_BETWEEN_UPDATES = 50; // milliseconds\r
+ public static final long REFRESHER_THREAD_TTL = 10; // seconds\r
+ \r
+ private ArrayList<Listener<Integer>> listeners = new ArrayList<Listener<Integer>>(); \r
+\r
+ private TreeMap<Integer, JSONObject> entries = new TreeMap<Integer, JSONObject>();\r
+ private Map<String, Integer> lastRevisions = new THashMap<String, Integer>();\r
+ private List<JSONObject> previous = new ArrayList<JSONObject>();\r
+ private int lastRevision;\r
+\r
+ private List<JSONObject> incoming;\r
+ private long lastUpdateTime = System.nanoTime();\r
+ private Boolean pending = false;\r
+ private Object pendingChanges = new Object();\r
+ \r
+ private DocumentHistoryListener updater = null;\r
+\r
+ private static ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {\r
+ @Override\r
+ public Thread newThread(Runnable r) {\r
+ Thread t = new Thread(null, r, "Document-Refresh-Scheduler");\r
+ t.setDaemon(true);\r
+ return t;\r
+ }\r
+ });\r
+ \r
+ private static ThreadPoolExecutor executor = new ThreadPoolExecutor(MIN_REFRESHER_THREADS, MAX_REFRESHER_THREADS, REFRESHER_THREAD_TTL, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true), new ThreadFactory() {\r
+ @Override\r
+ public Thread newThread(Runnable r) {\r
+ Thread t = new Thread(null, r, "Document-Refresher-" + (executor.getPoolSize() + 1));\r
+ t.setDaemon(true);\r
+ return t;\r
+ }\r
+ });\r
+ \r
+ private static void scheduleRefresh(Runnable runnable, long delay, TimeUnit unit) {\r
+ Runnable wrapper = new Runnable() {\r
+ @Override\r
+ public void run() {\r
+ executor.execute(runnable);\r
+ }\r
+ };\r
+ scheduler.schedule(wrapper, delay, unit);\r
+ }\r
+ \r
+ public DocumentHistory() {\r
+ lastRevision = 0;\r
+ }\r
+ \r
+ private void pruneListeners() {\r
+ ArrayList<Listener<Integer>> removals = new ArrayList<Listener<Integer>>(); \r
+ for(Listener<Integer> listener : listeners) {\r
+ if(listener.isDisposed()) {\r
+ removals.add(listener);\r
+ }\r
+ }\r
+ listeners.removeAll(removals);\r
+ }\r
+ \r
+ private void registerListener(Listener<Integer> listener) {\r
+ if(!listeners.contains(listener))\r
+ listeners.add(listener);\r
+ }\r
+\r
+ public synchronized boolean hasListeners() {\r
+ pruneListeners();\r
+ return !listeners.isEmpty();\r
+ }\r
+ \r
+ private void fireListeners(Integer value, Listener<Integer> ignoreListener) {\r
+ ArrayList<Listener<Integer>> currentListeners;\r
+ synchronized (this) {\r
+ currentListeners = new ArrayList<Listener<Integer>>(listeners);\r
+ }\r
+ for(Listener<Integer> listener : currentListeners) {\r
+ if(listener.isDisposed()) {\r
+ synchronized (this) {\r
+ listeners.remove(listener);\r
+ }\r
+ } else {\r
+ if (!listener.equals(ignoreListener)) {\r
+ listener.execute(value);\r
+ }\r
+ }\r
+ }\r
+ }\r
+ \r
+ public Pair<Integer, Collection<JSONObject>> readChanges(int sequenceNumber) {\r
+ synchronized (entries) {\r
+ Collection<JSONObject> changes = \r
+ sequenceNumber < lastRevision ?\r
+ new ArrayList<JSONObject>(entries.tailMap(sequenceNumber, false).values()) : \r
+ Collections.emptyList();\r
+ return Pair.make(lastRevision, changes);\r
+ }\r
+ }\r
+ \r
+ public void refresh(List<JSONObject> result) {\r
+ synchronized (pendingChanges) {\r
+ incoming = result;\r
+ if (!pending) {\r
+ pending = true;\r
+ long delay = Math.max(0, lastUpdateTime + DELAY_BETWEEN_UPDATES * 1000000 - System.nanoTime());\r
+ scheduleRefresh(new RefresherRunnable(), delay, TimeUnit.NANOSECONDS);\r
+ }\r
+ }\r
+ }\r
+ \r
+ private class RefresherRunnable implements Runnable {\r
+ \r
+ @Override\r
+ public void run() {\r
+\r
+ Integer revision = null;\r
+ synchronized (entries) {\r
+ List<JSONObject> update;\r
+ synchronized (pendingChanges) {\r
+ update = incoming;\r
+ incoming = null;\r
+ }\r
+ \r
+ if (update != null) {\r
+ revision = refreshInternal(update);\r
+ }\r
+ \r
+ synchronized (pendingChanges) {\r
+ if (incoming != null) {\r
+ scheduleRefresh(new RefresherRunnable(), DELAY_BETWEEN_UPDATES, TimeUnit.MILLISECONDS);\r
+ } else {\r
+ pending = false;\r
+ if (update != null) {\r
+ lastUpdateTime = System.nanoTime();\r
+ }\r
+ }\r
+ }\r
+ }\r
+ if (revision != null) {\r
+ fireListeners(revision, null);\r
+ }\r
+ }\r
+ }\r
+ \r
+ private Pair<Integer, Collection<JSONObject>> flushAndReadChanges(int sequenceNumber, Listener<Integer> ignoreListener) {\r
+ Integer revision = null;\r
+ Pair<Integer, Collection<JSONObject>> changes;\r
+ synchronized (entries) {\r
+ List<JSONObject> update;\r
+ synchronized (pendingChanges) {\r
+ update = incoming;\r
+ incoming = null;\r
+ }\r
+ \r
+ if (update != null) {\r
+ revision = refreshInternal(update);\r
+ }\r
+ changes = readChanges(sequenceNumber);\r
+ }\r
+ if (revision != null) {\r
+ fireListeners(revision, ignoreListener);\r
+ }\r
+ return changes;\r
+ }\r
+\r
+ \r
+ private int refreshInternal(List<JSONObject> result) {\r
+ Set<JSONObject> added = null;\r
+ Set<JSONObject> removed = null;\r
+\r
+ int oldIndex = 0;\r
+ int newIndex = 0;\r
+ \r
+ while(true) {\r
+\r
+ boolean noMoreOlds = oldIndex == previous.size();\r
+ boolean noMoreNews = newIndex == result.size();\r
+ \r
+ if(noMoreOlds) {\r
+ if(noMoreNews) break;\r
+ else {\r
+ // all remaining objects are added\r
+ for(int i=newIndex;i<result.size();i++) {\r
+ if(added == null) added = new THashSet<JSONObject>();\r
+ added.add(result.get(i));\r
+ }\r
+ break;\r
+ }\r
+ } else {\r
+ // old objects are still available\r
+ if(noMoreNews) {\r
+ // all remaining olds have been removed\r
+ for(int i=oldIndex;i<previous.size();i++) {\r
+ if(removed == null) removed = new THashSet<JSONObject>();\r
+ removed.add(previous.get(i));\r
+ }\r
+ break;\r
+ }\r
+ }\r
+ \r
+ JSONObject o = previous.get(oldIndex);\r
+ JSONObject n = result.get(newIndex);\r
+ \r
+ int comp = o.id.compareTo(n.id);\r
+ if(comp == 0) {\r
+ // objects are same => compare\r
+ if(!n.equals(o)) {\r
+ if(added == null) added = new THashSet<JSONObject>();\r
+ added.add(n);\r
+ }\r
+ oldIndex++;\r
+ newIndex++;\r
+ } else if (comp > 0) {\r
+ // old is bigger than new => new has been added\r
+ if(added == null) added = new THashSet<JSONObject>();\r
+ added.add(n);\r
+ newIndex++;\r
+ } else if (comp < 0) {\r
+ // old is smaller than new => old has been removed \r
+ if(removed == null) removed = new THashSet<JSONObject>();\r
+ removed.add(o);\r
+ oldIndex++;\r
+ }\r
+ \r
+ }\r
+\r
+ if(added != null) {\r
+ for(JSONObject entry : added) {\r
+\r
+// String parent = entry.getParent();\r
+// // Null parent = exists but no parent , empty parent = has been removed\r
+// if(parent != null && parent.isEmpty()) continue;\r
+\r
+ add(entry);\r
+\r
+ }\r
+ }\r
+ if(removed != null) {\r
+ for(JSONObject entry : removed) {\r
+ \r
+// String parent = entry.getParent();\r
+// // Null parent = exists but no parent , empty parent = has been removed\r
+// if(parent != null && parent.isEmpty()) continue;\r
+ \r
+ remove(entry);\r
+ \r
+ }\r
+ }\r
+ \r
+ previous = result;\r
+\r
+ return lastRevision;\r
+ }\r
+ \r
+ private void add(JSONObject entry) {\r
+ \r
+ Integer rev = lastRevisions.get(entry.getId());\r
+ if(rev != null) {\r
+ entries.remove(rev);\r
+ }\r
+\r
+ lastRevision++;\r
+ lastRevisions.put(entry.getId(), lastRevision);\r
+ entries.put(lastRevision, entry);\r
+ }\r
+ \r
+ private void remove(JSONObject entry) {\r
+\r
+ JSONObject object = new JSONObject(entry.getId());\r
+ object.addJSONField("id", entry.getId());\r
+\r
+ Integer rev = lastRevisions.get(object.getId());\r
+ if(rev != null) {\r
+ entries.remove(rev);\r
+ }\r
+\r
+ lastRevision++;\r
+ lastRevisions.put(object.getId(), lastRevision);\r
+ entries.put(lastRevision, object);\r
+ }\r
+ \r
+ public Pair<Integer, Collection<JSONObject>> getContent(Listener<Integer> listener, String location, int sequenceNumber) {\r
+ Listener<List<JSONObject>> newUpdater = null;\r
+ \r
+ boolean cached = false;\r
+ synchronized (this) {\r
+ if (!listener.isDisposed()) {\r
+ registerListener(listener);\r
+ }\r
+ \r
+ // Try cache\r
+ if ((updater != null) && !updater.isDisposed()) {\r
+ cached = true;\r
+ } else {\r
+ if (hasListeners()) {\r
+ updater = new DocumentHistoryListener(this);\r
+ newUpdater = updater;\r
+ }\r
+ }\r
+ }\r
+ if (cached) {\r
+ return flushAndReadChanges(sequenceNumber, listener);\r
+ }\r
+ \r
+\r
+ try {\r
+ if (newUpdater != null) {\r
+ Simantics.getSession().syncRequest(new URIDocumentRequest(location), newUpdater);\r
+ } else {\r
+ synchronized (entries) {\r
+ List<JSONObject> result = Simantics.getSession().syncRequest(new URIDocumentRequest(location), TransientCacheListener.<List<JSONObject>>instance());\r
+ refreshInternal(result);\r
+ return readChanges(sequenceNumber);\r
+ }\r
+ }\r
+ } catch (DatabaseException e) {\r
+ Logger.defaultLogError(e);\r
+ }\r
+\r
+ return flushAndReadChanges(sequenceNumber, listener);\r
+ }\r
+ \r
+ protected void removeUpdater() {\r
+ updater = null;\r
+ }\r
+ \r
+}
\ No newline at end of file