-package org.simantics.document.server;\r
-\r
-import java.util.ArrayList;\r
-import java.util.Collection;\r
-import java.util.Collections;\r
-import java.util.List;\r
-import java.util.Map;\r
-import java.util.Set;\r
-import java.util.TreeMap;\r
-import java.util.concurrent.ScheduledThreadPoolExecutor;\r
-import java.util.concurrent.SynchronousQueue;\r
-import java.util.concurrent.ThreadFactory;\r
-import java.util.concurrent.ThreadPoolExecutor;\r
-import java.util.concurrent.TimeUnit;\r
-\r
-import org.simantics.Simantics;\r
-import org.simantics.db.common.procedure.adapter.TransientCacheListener;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.procedure.Listener;\r
-import org.simantics.document.server.request.URIDocumentRequest;\r
-import org.simantics.utils.datastructures.Pair;\r
-\r
-import gnu.trove.map.hash.THashMap;\r
-import gnu.trove.set.hash.THashSet;\r
-\r
-public class DocumentHistory {\r
-\r
- public static final int MIN_REFRESHER_THREADS = 1;\r
- public static final int MAX_REFRESHER_THREADS = 32;\r
- public static final long DELAY_BETWEEN_UPDATES = 50; // milliseconds\r
- public static final long REFRESHER_THREAD_TTL = 10; // seconds\r
- \r
- private ArrayList<Listener<Integer>> listeners = new ArrayList<Listener<Integer>>(); \r
-\r
- private TreeMap<Integer, JSONObject> entries = new TreeMap<Integer, JSONObject>();\r
- private Map<String, Integer> lastRevisions = new THashMap<String, Integer>();\r
- private List<JSONObject> previous = new ArrayList<JSONObject>();\r
- private int lastRevision;\r
-\r
- private List<JSONObject> incoming;\r
- private long lastUpdateTime = System.nanoTime();\r
- private Boolean pending = false;\r
- private Object pendingChanges = new Object();\r
- \r
- private DocumentHistoryListener updater = null;\r
-\r
- private static ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {\r
- @Override\r
- public Thread newThread(Runnable r) {\r
- Thread t = new Thread(null, r, "Document-Refresh-Scheduler");\r
- t.setDaemon(true);\r
- return t;\r
- }\r
- });\r
- \r
- private static ThreadPoolExecutor executor = new ThreadPoolExecutor(MIN_REFRESHER_THREADS, MAX_REFRESHER_THREADS, REFRESHER_THREAD_TTL, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true), new ThreadFactory() {\r
- @Override\r
- public Thread newThread(Runnable r) {\r
- Thread t = new Thread(null, r, "Document-Refresher-" + (executor.getPoolSize() + 1));\r
- t.setDaemon(true);\r
- return t;\r
- }\r
- });\r
- \r
- private static void scheduleRefresh(Runnable runnable, long delay, TimeUnit unit) {\r
- Runnable wrapper = new Runnable() {\r
- @Override\r
- public void run() {\r
- executor.execute(runnable);\r
- }\r
- };\r
- scheduler.schedule(wrapper, delay, unit);\r
- }\r
- \r
- public DocumentHistory() {\r
- lastRevision = 0;\r
- }\r
- \r
- private void pruneListeners() {\r
- ArrayList<Listener<Integer>> removals = new ArrayList<Listener<Integer>>(); \r
- for(Listener<Integer> listener : listeners) {\r
- if(listener.isDisposed()) {\r
- removals.add(listener);\r
- }\r
- }\r
- listeners.removeAll(removals);\r
- }\r
- \r
- private void registerListener(Listener<Integer> listener) {\r
- if(!listeners.contains(listener))\r
- listeners.add(listener);\r
- }\r
-\r
- public synchronized boolean hasListeners() {\r
- pruneListeners();\r
- return !listeners.isEmpty();\r
- }\r
- \r
- private void fireListeners(Integer value, Listener<Integer> ignoreListener) {\r
- ArrayList<Listener<Integer>> currentListeners;\r
- synchronized (this) {\r
- currentListeners = new ArrayList<Listener<Integer>>(listeners);\r
- }\r
- for(Listener<Integer> listener : currentListeners) {\r
- if(listener.isDisposed()) {\r
- synchronized (this) {\r
- listeners.remove(listener);\r
- }\r
- } else {\r
- if (!listener.equals(ignoreListener)) {\r
- listener.execute(value);\r
- }\r
- }\r
- }\r
- }\r
- \r
- public Pair<Integer, Collection<JSONObject>> readChanges(int sequenceNumber) {\r
- synchronized (entries) {\r
- Collection<JSONObject> changes = \r
- sequenceNumber < lastRevision ?\r
- new ArrayList<JSONObject>(entries.tailMap(sequenceNumber, false).values()) : \r
- Collections.emptyList();\r
- return Pair.make(lastRevision, changes);\r
- }\r
- }\r
- \r
- public void refresh(List<JSONObject> result) {\r
- synchronized (pendingChanges) {\r
- incoming = result;\r
- if (!pending) {\r
- pending = true;\r
- long delay = Math.max(0, lastUpdateTime + DELAY_BETWEEN_UPDATES * 1000000 - System.nanoTime());\r
- scheduleRefresh(new RefresherRunnable(), delay, TimeUnit.NANOSECONDS);\r
- }\r
- }\r
- }\r
- \r
- private class RefresherRunnable implements Runnable {\r
- \r
- @Override\r
- public void run() {\r
-\r
- Integer revision = null;\r
- synchronized (entries) {\r
- List<JSONObject> update;\r
- synchronized (pendingChanges) {\r
- update = incoming;\r
- incoming = null;\r
- }\r
- \r
- if (update != null) {\r
- revision = refreshInternal(update);\r
- }\r
- \r
- synchronized (pendingChanges) {\r
- if (incoming != null) {\r
- scheduleRefresh(new RefresherRunnable(), DELAY_BETWEEN_UPDATES, TimeUnit.MILLISECONDS);\r
- } else {\r
- pending = false;\r
- if (update != null) {\r
- lastUpdateTime = System.nanoTime();\r
- }\r
- }\r
- }\r
- }\r
- if (revision != null) {\r
- fireListeners(revision, null);\r
- }\r
- }\r
- }\r
- \r
- private Pair<Integer, Collection<JSONObject>> flushAndReadChanges(int sequenceNumber, Listener<Integer> ignoreListener) {\r
- Integer revision = null;\r
- Pair<Integer, Collection<JSONObject>> changes;\r
- synchronized (entries) {\r
- List<JSONObject> update;\r
- synchronized (pendingChanges) {\r
- update = incoming;\r
- incoming = null;\r
- }\r
- \r
- if (update != null) {\r
- revision = refreshInternal(update);\r
- }\r
- changes = readChanges(sequenceNumber);\r
- }\r
- if (revision != null) {\r
- fireListeners(revision, ignoreListener);\r
- }\r
- return changes;\r
- }\r
-\r
- \r
- private int refreshInternal(List<JSONObject> result) {\r
- Set<JSONObject> added = null;\r
- Set<JSONObject> removed = null;\r
-\r
- int oldIndex = 0;\r
- int newIndex = 0;\r
- \r
- while(true) {\r
-\r
- boolean noMoreOlds = oldIndex == previous.size();\r
- boolean noMoreNews = newIndex == result.size();\r
- \r
- if(noMoreOlds) {\r
- if(noMoreNews) break;\r
- else {\r
- // all remaining objects are added\r
- for(int i=newIndex;i<result.size();i++) {\r
- if(added == null) added = new THashSet<JSONObject>();\r
- added.add(result.get(i));\r
- }\r
- break;\r
- }\r
- } else {\r
- // old objects are still available\r
- if(noMoreNews) {\r
- // all remaining olds have been removed\r
- for(int i=oldIndex;i<previous.size();i++) {\r
- if(removed == null) removed = new THashSet<JSONObject>();\r
- removed.add(previous.get(i));\r
- }\r
- break;\r
- }\r
- }\r
- \r
- JSONObject o = previous.get(oldIndex);\r
- JSONObject n = result.get(newIndex);\r
- \r
- int comp = o.id.compareTo(n.id);\r
- if(comp == 0) {\r
- // objects are same => compare\r
- if(!n.equals(o)) {\r
- if(added == null) added = new THashSet<JSONObject>();\r
- added.add(n);\r
- }\r
- oldIndex++;\r
- newIndex++;\r
- } else if (comp > 0) {\r
- // old is bigger than new => new has been added\r
- if(added == null) added = new THashSet<JSONObject>();\r
- added.add(n);\r
- newIndex++;\r
- } else if (comp < 0) {\r
- // old is smaller than new => old has been removed \r
- if(removed == null) removed = new THashSet<JSONObject>();\r
- removed.add(o);\r
- oldIndex++;\r
- }\r
- \r
- }\r
-\r
- if(added != null) {\r
- for(JSONObject entry : added) {\r
-\r
-// String parent = entry.getParent();\r
-// // Null parent = exists but no parent , empty parent = has been removed\r
-// if(parent != null && parent.isEmpty()) continue;\r
-\r
- add(entry);\r
-\r
- }\r
- }\r
- if(removed != null) {\r
- for(JSONObject entry : removed) {\r
- \r
-// String parent = entry.getParent();\r
-// // Null parent = exists but no parent , empty parent = has been removed\r
-// if(parent != null && parent.isEmpty()) continue;\r
- \r
- remove(entry);\r
- \r
- }\r
- }\r
- \r
- previous = result;\r
-\r
- return lastRevision;\r
- }\r
- \r
- private void add(JSONObject entry) {\r
- \r
- Integer rev = lastRevisions.get(entry.getId());\r
- if(rev != null) {\r
- entries.remove(rev);\r
- }\r
-\r
- lastRevision++;\r
- lastRevisions.put(entry.getId(), lastRevision);\r
- entries.put(lastRevision, entry);\r
- }\r
- \r
- private void remove(JSONObject entry) {\r
-\r
- JSONObject object = new JSONObject(entry.getId());\r
- object.addJSONField("id", entry.getId());\r
-\r
- Integer rev = lastRevisions.get(object.getId());\r
- if(rev != null) {\r
- entries.remove(rev);\r
- }\r
-\r
- lastRevision++;\r
- lastRevisions.put(object.getId(), lastRevision);\r
- entries.put(lastRevision, object);\r
- }\r
- \r
- public Pair<Integer, Collection<JSONObject>> getContent(Listener<Integer> listener, String location, int sequenceNumber) {\r
- Listener<List<JSONObject>> newUpdater = null;\r
- \r
- boolean cached = false;\r
- synchronized (this) {\r
- if (!listener.isDisposed()) {\r
- registerListener(listener);\r
- }\r
- \r
- // Try cache\r
- if ((updater != null) && !updater.isDisposed()) {\r
- cached = true;\r
- } else {\r
- if (hasListeners()) {\r
- updater = new DocumentHistoryListener(this);\r
- newUpdater = updater;\r
- }\r
- }\r
- }\r
- if (cached) {\r
- return flushAndReadChanges(sequenceNumber, listener);\r
- }\r
- \r
-\r
- try {\r
- if (newUpdater != null) {\r
- Simantics.getSession().syncRequest(new URIDocumentRequest(location), newUpdater);\r
- } else {\r
- synchronized (entries) {\r
- List<JSONObject> result = Simantics.getSession().syncRequest(new URIDocumentRequest(location), TransientCacheListener.<List<JSONObject>>instance());\r
- refreshInternal(result);\r
- return readChanges(sequenceNumber);\r
- }\r
- }\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError(e);\r
- }\r
-\r
- return flushAndReadChanges(sequenceNumber, listener);\r
- }\r
- \r
- protected void removeUpdater() {\r
- updater = null;\r
- }\r
- \r
+package org.simantics.document.server;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.simantics.Simantics;
+import org.simantics.db.common.procedure.adapter.TransientCacheListener;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.procedure.Listener;
+import org.simantics.document.server.request.URIDocumentRequest;
+import org.simantics.utils.datastructures.Pair;
+
+import gnu.trove.map.hash.THashMap;
+import gnu.trove.set.hash.THashSet;
+
+public class DocumentHistory {
+
+ public static final int MIN_REFRESHER_THREADS = 1;
+ public static final int MAX_REFRESHER_THREADS = 32;
+ public static final long DELAY_BETWEEN_UPDATES = 50; // milliseconds
+ public static final long REFRESHER_THREAD_TTL = 10; // seconds
+
+ private ArrayList<Listener<Integer>> listeners = new ArrayList<Listener<Integer>>();
+
+ private TreeMap<Integer, JSONObject> entries = new TreeMap<Integer, JSONObject>();
+ private Map<String, Integer> lastRevisions = new THashMap<String, Integer>();
+ private List<JSONObject> previous = new ArrayList<JSONObject>();
+ private int lastRevision;
+
+ private List<JSONObject> incoming;
+ private long lastUpdateTime = System.nanoTime();
+ private Boolean pending = false;
+ private Object pendingChanges = new Object();
+
+ private DocumentHistoryListener updater = null;
+
+ private static ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(null, r, "Document-Refresh-Scheduler");
+ t.setDaemon(true);
+ return t;
+ }
+ });
+
+ private static ThreadPoolExecutor executor = new ThreadPoolExecutor(MIN_REFRESHER_THREADS, MAX_REFRESHER_THREADS, REFRESHER_THREAD_TTL, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true), new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(null, r, "Document-Refresher-" + (executor.getPoolSize() + 1));
+ t.setDaemon(true);
+ return t;
+ }
+ });
+
+ private static void scheduleRefresh(Runnable runnable, long delay, TimeUnit unit) {
+ Runnable wrapper = new Runnable() {
+ @Override
+ public void run() {
+ executor.execute(runnable);
+ }
+ };
+ scheduler.schedule(wrapper, delay, unit);
+ }
+
+ public DocumentHistory() {
+ lastRevision = 0;
+ }
+
+ private void pruneListeners() {
+ ArrayList<Listener<Integer>> removals = new ArrayList<Listener<Integer>>();
+ for(Listener<Integer> listener : listeners) {
+ if(listener.isDisposed()) {
+ removals.add(listener);
+ }
+ }
+ listeners.removeAll(removals);
+ }
+
+ private void registerListener(Listener<Integer> listener) {
+ if(!listeners.contains(listener))
+ listeners.add(listener);
+ }
+
+ public synchronized boolean hasListeners() {
+ pruneListeners();
+ return !listeners.isEmpty();
+ }
+
+ private void fireListeners(Integer value, Listener<Integer> ignoreListener) {
+ ArrayList<Listener<Integer>> currentListeners;
+ synchronized (this) {
+ currentListeners = new ArrayList<Listener<Integer>>(listeners);
+ }
+ for(Listener<Integer> listener : currentListeners) {
+ if(listener.isDisposed()) {
+ synchronized (this) {
+ listeners.remove(listener);
+ }
+ } else {
+ if (!listener.equals(ignoreListener)) {
+ listener.execute(value);
+ }
+ }
+ }
+ }
+
+ public Pair<Integer, Collection<JSONObject>> readChanges(int sequenceNumber) {
+ synchronized (entries) {
+ Collection<JSONObject> changes =
+ sequenceNumber < lastRevision ?
+ new ArrayList<JSONObject>(entries.tailMap(sequenceNumber, false).values()) :
+ Collections.emptyList();
+ return Pair.make(lastRevision, changes);
+ }
+ }
+
+ public void refresh(List<JSONObject> result) {
+ synchronized (pendingChanges) {
+ incoming = result;
+ if (!pending) {
+ pending = true;
+ long delay = Math.max(0, lastUpdateTime + DELAY_BETWEEN_UPDATES * 1000000 - System.nanoTime());
+ scheduleRefresh(new RefresherRunnable(), delay, TimeUnit.NANOSECONDS);
+ }
+ }
+ }
+
+ private class RefresherRunnable implements Runnable {
+
+ @Override
+ public void run() {
+
+ Integer revision = null;
+ synchronized (entries) {
+ List<JSONObject> update;
+ synchronized (pendingChanges) {
+ update = incoming;
+ incoming = null;
+ }
+
+ if (update != null) {
+ revision = refreshInternal(update);
+ }
+
+ synchronized (pendingChanges) {
+ if (incoming != null) {
+ scheduleRefresh(new RefresherRunnable(), DELAY_BETWEEN_UPDATES, TimeUnit.MILLISECONDS);
+ } else {
+ pending = false;
+ if (update != null) {
+ lastUpdateTime = System.nanoTime();
+ }
+ }
+ }
+ }
+ if (revision != null) {
+ fireListeners(revision, null);
+ }
+ }
+ }
+
+ private Pair<Integer, Collection<JSONObject>> flushAndReadChanges(int sequenceNumber, Listener<Integer> ignoreListener) {
+ Integer revision = null;
+ Pair<Integer, Collection<JSONObject>> changes;
+ synchronized (entries) {
+ List<JSONObject> update;
+ synchronized (pendingChanges) {
+ update = incoming;
+ incoming = null;
+ }
+
+ if (update != null) {
+ revision = refreshInternal(update);
+ }
+ changes = readChanges(sequenceNumber);
+ }
+ if (revision != null) {
+ fireListeners(revision, ignoreListener);
+ }
+ return changes;
+ }
+
+
+ private int refreshInternal(List<JSONObject> result) {
+ Set<JSONObject> added = null;
+ Set<JSONObject> removed = null;
+
+ int oldIndex = 0;
+ int newIndex = 0;
+
+ while(true) {
+
+ boolean noMoreOlds = oldIndex == previous.size();
+ boolean noMoreNews = newIndex == result.size();
+
+ if(noMoreOlds) {
+ if(noMoreNews) break;
+ else {
+ // all remaining objects are added
+ for(int i=newIndex;i<result.size();i++) {
+ if(added == null) added = new THashSet<JSONObject>();
+ added.add(result.get(i));
+ }
+ break;
+ }
+ } else {
+ // old objects are still available
+ if(noMoreNews) {
+ // all remaining olds have been removed
+ for(int i=oldIndex;i<previous.size();i++) {
+ if(removed == null) removed = new THashSet<JSONObject>();
+ removed.add(previous.get(i));
+ }
+ break;
+ }
+ }
+
+ JSONObject o = previous.get(oldIndex);
+ JSONObject n = result.get(newIndex);
+
+ int comp = o.id.compareTo(n.id);
+ if(comp == 0) {
+ // objects are same => compare
+ if(!n.equals(o)) {
+ if(added == null) added = new THashSet<JSONObject>();
+ added.add(n);
+ }
+ oldIndex++;
+ newIndex++;
+ } else if (comp > 0) {
+ // old is bigger than new => new has been added
+ if(added == null) added = new THashSet<JSONObject>();
+ added.add(n);
+ newIndex++;
+ } else if (comp < 0) {
+ // old is smaller than new => old has been removed
+ if(removed == null) removed = new THashSet<JSONObject>();
+ removed.add(o);
+ oldIndex++;
+ }
+
+ }
+
+ if(added != null) {
+ for(JSONObject entry : added) {
+
+// String parent = entry.getParent();
+// // Null parent = exists but no parent , empty parent = has been removed
+// if(parent != null && parent.isEmpty()) continue;
+
+ add(entry);
+
+ }
+ }
+ if(removed != null) {
+ for(JSONObject entry : removed) {
+
+// String parent = entry.getParent();
+// // Null parent = exists but no parent , empty parent = has been removed
+// if(parent != null && parent.isEmpty()) continue;
+
+ remove(entry);
+
+ }
+ }
+
+ previous = result;
+
+ return lastRevision;
+ }
+
+ private void add(JSONObject entry) {
+
+ Integer rev = lastRevisions.get(entry.getId());
+ if(rev != null) {
+ entries.remove(rev);
+ }
+
+ lastRevision++;
+ lastRevisions.put(entry.getId(), lastRevision);
+ entries.put(lastRevision, entry);
+ }
+
+ private void remove(JSONObject entry) {
+
+ JSONObject object = new JSONObject(entry.getId());
+ object.addJSONField("id", entry.getId());
+
+ Integer rev = lastRevisions.get(object.getId());
+ if(rev != null) {
+ entries.remove(rev);
+ }
+
+ lastRevision++;
+ lastRevisions.put(object.getId(), lastRevision);
+ entries.put(lastRevision, object);
+ }
+
+ public Pair<Integer, Collection<JSONObject>> getContent(Listener<Integer> listener, String location, int sequenceNumber) {
+ Listener<List<JSONObject>> newUpdater = null;
+
+ boolean cached = false;
+ synchronized (this) {
+ if (!listener.isDisposed()) {
+ registerListener(listener);
+ }
+
+ // Try cache
+ if ((updater != null) && !updater.isDisposed()) {
+ cached = true;
+ } else {
+ if (hasListeners()) {
+ updater = new DocumentHistoryListener(this);
+ newUpdater = updater;
+ }
+ }
+ }
+ if (cached) {
+ return flushAndReadChanges(sequenceNumber, listener);
+ }
+
+
+ try {
+ if (newUpdater != null) {
+ Simantics.getSession().syncRequest(new URIDocumentRequest(location), newUpdater);
+ } else {
+ synchronized (entries) {
+ List<JSONObject> result = Simantics.getSession().syncRequest(new URIDocumentRequest(location), TransientCacheListener.<List<JSONObject>>instance());
+ refreshInternal(result);
+ return readChanges(sequenceNumber);
+ }
+ }
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ return flushAndReadChanges(sequenceNumber, listener);
+ }
+
+ protected void removeUpdater() {
+ updater = null;
+ }
+