X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.document.server%2Fsrc%2Forg%2Fsimantics%2Fdocument%2Fserver%2FDocumentHistory.java;h=ebda42c786c681ebcc6f564f498ee6e67363208c;hb=4f87cc11bc1e4a56860f278798e41834ea1fbb3f;hp=c9e05bc6c7601d1a1dbe68b6903761c6f2fab331;hpb=969bd23cab98a79ca9101af33334000879fb60c5;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.document.server/src/org/simantics/document/server/DocumentHistory.java b/bundles/org.simantics.document.server/src/org/simantics/document/server/DocumentHistory.java index c9e05bc6c..ebda42c78 100644 --- a/bundles/org.simantics.document.server/src/org/simantics/document/server/DocumentHistory.java +++ b/bundles/org.simantics.document.server/src/org/simantics/document/server/DocumentHistory.java @@ -1,355 +1,355 @@ -package org.simantics.document.server; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.simantics.Simantics; -import org.simantics.db.common.procedure.adapter.TransientCacheListener; -import org.simantics.db.common.utils.Logger; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.procedure.Listener; -import org.simantics.document.server.request.URIDocumentRequest; -import org.simantics.utils.datastructures.Pair; - -import gnu.trove.map.hash.THashMap; -import gnu.trove.set.hash.THashSet; - -public class DocumentHistory { - - public static final int MIN_REFRESHER_THREADS = 1; - public static final int MAX_REFRESHER_THREADS = 32; - public static final long DELAY_BETWEEN_UPDATES = 50; // milliseconds - public static final long REFRESHER_THREAD_TTL = 10; // seconds - - private ArrayList> listeners = new ArrayList>(); - - private TreeMap entries = new TreeMap(); - private Map lastRevisions = new THashMap(); - private List previous = new ArrayList(); - private int lastRevision; - - private List incoming; - private long lastUpdateTime = System.nanoTime(); - private Boolean pending = false; - private Object pendingChanges = new Object(); - - private DocumentHistoryListener updater = null; - - private static ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(null, r, "Document-Refresh-Scheduler"); - t.setDaemon(true); - return t; - } - }); - - private static ThreadPoolExecutor executor = new ThreadPoolExecutor(MIN_REFRESHER_THREADS, MAX_REFRESHER_THREADS, REFRESHER_THREAD_TTL, TimeUnit.SECONDS, new SynchronousQueue(true), new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(null, r, "Document-Refresher-" + (executor.getPoolSize() + 1)); - t.setDaemon(true); - return t; - } - }); - - private static void scheduleRefresh(Runnable runnable, long delay, TimeUnit unit) { - Runnable wrapper = new Runnable() { - @Override - public void run() { - executor.execute(runnable); - } - }; - scheduler.schedule(wrapper, delay, unit); - } - - public DocumentHistory() { - lastRevision = 0; - } - - private void pruneListeners() { - ArrayList> removals = new ArrayList>(); - for(Listener listener : listeners) { - if(listener.isDisposed()) { - removals.add(listener); - } - } - listeners.removeAll(removals); - } - - private void registerListener(Listener listener) { - if(!listeners.contains(listener)) - listeners.add(listener); - } - - public synchronized boolean hasListeners() { - pruneListeners(); - return !listeners.isEmpty(); - } - - private void fireListeners(Integer value, Listener ignoreListener) { - ArrayList> currentListeners; - synchronized (this) { - currentListeners = new ArrayList>(listeners); - } - for(Listener listener : currentListeners) { - if(listener.isDisposed()) { - synchronized (this) { - listeners.remove(listener); - } - } else { - if (!listener.equals(ignoreListener)) { - listener.execute(value); - } - } - } - } - - public Pair> readChanges(int sequenceNumber) { - synchronized (entries) { - Collection changes = - sequenceNumber < lastRevision ? - new ArrayList(entries.tailMap(sequenceNumber, false).values()) : - Collections.emptyList(); - return Pair.make(lastRevision, changes); - } - } - - public void refresh(List result) { - synchronized (pendingChanges) { - incoming = result; - if (!pending) { - pending = true; - long delay = Math.max(0, lastUpdateTime + DELAY_BETWEEN_UPDATES * 1000000 - System.nanoTime()); - scheduleRefresh(new RefresherRunnable(), delay, TimeUnit.NANOSECONDS); - } - } - } - - private class RefresherRunnable implements Runnable { - - @Override - public void run() { - - Integer revision = null; - synchronized (entries) { - List update; - synchronized (pendingChanges) { - update = incoming; - incoming = null; - } - - if (update != null) { - revision = refreshInternal(update); - } - - synchronized (pendingChanges) { - if (incoming != null) { - scheduleRefresh(new RefresherRunnable(), DELAY_BETWEEN_UPDATES, TimeUnit.MILLISECONDS); - } else { - pending = false; - if (update != null) { - lastUpdateTime = System.nanoTime(); - } - } - } - } - if (revision != null) { - fireListeners(revision, null); - } - } - } - - private Pair> flushAndReadChanges(int sequenceNumber, Listener ignoreListener) { - Integer revision = null; - Pair> changes; - synchronized (entries) { - List update; - synchronized (pendingChanges) { - update = incoming; - incoming = null; - } - - if (update != null) { - revision = refreshInternal(update); - } - changes = readChanges(sequenceNumber); - } - if (revision != null) { - fireListeners(revision, ignoreListener); - } - return changes; - } - - - private int refreshInternal(List result) { - Set added = null; - Set removed = null; - - int oldIndex = 0; - int newIndex = 0; - - while(true) { - - boolean noMoreOlds = oldIndex == previous.size(); - boolean noMoreNews = newIndex == result.size(); - - if(noMoreOlds) { - if(noMoreNews) break; - else { - // all remaining objects are added - for(int i=newIndex;i(); - added.add(result.get(i)); - } - break; - } - } else { - // old objects are still available - if(noMoreNews) { - // all remaining olds have been removed - for(int i=oldIndex;i(); - removed.add(previous.get(i)); - } - break; - } - } - - JSONObject o = previous.get(oldIndex); - JSONObject n = result.get(newIndex); - - int comp = o.id.compareTo(n.id); - if(comp == 0) { - // objects are same => compare - if(!n.equals(o)) { - if(added == null) added = new THashSet(); - added.add(n); - } - oldIndex++; - newIndex++; - } else if (comp > 0) { - // old is bigger than new => new has been added - if(added == null) added = new THashSet(); - added.add(n); - newIndex++; - } else if (comp < 0) { - // old is smaller than new => old has been removed - if(removed == null) removed = new THashSet(); - removed.add(o); - oldIndex++; - } - - } - - if(added != null) { - for(JSONObject entry : added) { - -// String parent = entry.getParent(); -// // Null parent = exists but no parent , empty parent = has been removed -// if(parent != null && parent.isEmpty()) continue; - - add(entry); - - } - } - if(removed != null) { - for(JSONObject entry : removed) { - -// String parent = entry.getParent(); -// // Null parent = exists but no parent , empty parent = has been removed -// if(parent != null && parent.isEmpty()) continue; - - remove(entry); - - } - } - - previous = result; - - return lastRevision; - } - - private void add(JSONObject entry) { - - Integer rev = lastRevisions.get(entry.getId()); - if(rev != null) { - entries.remove(rev); - } - - lastRevision++; - lastRevisions.put(entry.getId(), lastRevision); - entries.put(lastRevision, entry); - } - - private void remove(JSONObject entry) { - - JSONObject object = new JSONObject(entry.getId()); - object.addJSONField("id", entry.getId()); - - Integer rev = lastRevisions.get(object.getId()); - if(rev != null) { - entries.remove(rev); - } - - lastRevision++; - lastRevisions.put(object.getId(), lastRevision); - entries.put(lastRevision, object); - } - - public Pair> getContent(Listener listener, String location, int sequenceNumber) { - Listener> newUpdater = null; - - boolean cached = false; - synchronized (this) { - if (!listener.isDisposed()) { - registerListener(listener); - } - - // Try cache - if ((updater != null) && !updater.isDisposed()) { - cached = true; - } else { - if (hasListeners()) { - updater = new DocumentHistoryListener(this); - newUpdater = updater; - } - } - } - if (cached) { - return flushAndReadChanges(sequenceNumber, listener); - } - - - try { - if (newUpdater != null) { - Simantics.getSession().syncRequest(new URIDocumentRequest(location), newUpdater); - } else { - synchronized (entries) { - List result = Simantics.getSession().syncRequest(new URIDocumentRequest(location), TransientCacheListener.>instance()); - refreshInternal(result); - return readChanges(sequenceNumber); - } - } - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } - - return flushAndReadChanges(sequenceNumber, listener); - } - - protected void removeUpdater() { - updater = null; - } - +package org.simantics.document.server; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.simantics.Simantics; +import org.simantics.db.common.procedure.adapter.TransientCacheListener; +import org.simantics.db.common.utils.Logger; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.procedure.Listener; +import org.simantics.document.server.request.URIDocumentRequest; +import org.simantics.utils.datastructures.Pair; + +import gnu.trove.map.hash.THashMap; +import gnu.trove.set.hash.THashSet; + +public class DocumentHistory { + + public static final int MIN_REFRESHER_THREADS = 1; + public static final int MAX_REFRESHER_THREADS = 32; + public static final long DELAY_BETWEEN_UPDATES = 50; // milliseconds + public static final long REFRESHER_THREAD_TTL = 10; // seconds + + private ArrayList> listeners = new ArrayList>(); + + private TreeMap entries = new TreeMap(); + private Map lastRevisions = new THashMap(); + private List previous = new ArrayList(); + private int lastRevision; + + private List incoming; + private long lastUpdateTime = System.nanoTime(); + private Boolean pending = false; + private Object pendingChanges = new Object(); + + private DocumentHistoryListener updater = null; + + private static ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(null, r, "Document-Refresh-Scheduler"); + t.setDaemon(true); + return t; + } + }); + + private static ThreadPoolExecutor executor = new ThreadPoolExecutor(MIN_REFRESHER_THREADS, MAX_REFRESHER_THREADS, REFRESHER_THREAD_TTL, TimeUnit.SECONDS, new SynchronousQueue(true), new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(null, r, "Document-Refresher-" + (executor.getPoolSize() + 1)); + t.setDaemon(true); + return t; + } + }); + + private static void scheduleRefresh(Runnable runnable, long delay, TimeUnit unit) { + Runnable wrapper = new Runnable() { + @Override + public void run() { + executor.execute(runnable); + } + }; + scheduler.schedule(wrapper, delay, unit); + } + + public DocumentHistory() { + lastRevision = 0; + } + + private void pruneListeners() { + ArrayList> removals = new ArrayList>(); + for(Listener listener : listeners) { + if(listener.isDisposed()) { + removals.add(listener); + } + } + listeners.removeAll(removals); + } + + private void registerListener(Listener listener) { + if(!listeners.contains(listener)) + listeners.add(listener); + } + + public synchronized boolean hasListeners() { + pruneListeners(); + return !listeners.isEmpty(); + } + + private void fireListeners(Integer value, Listener ignoreListener) { + ArrayList> currentListeners; + synchronized (this) { + currentListeners = new ArrayList>(listeners); + } + for(Listener listener : currentListeners) { + if(listener.isDisposed()) { + synchronized (this) { + listeners.remove(listener); + } + } else { + if (!listener.equals(ignoreListener)) { + listener.execute(value); + } + } + } + } + + public Pair> readChanges(int sequenceNumber) { + synchronized (entries) { + Collection changes = + sequenceNumber < lastRevision ? + new ArrayList(entries.tailMap(sequenceNumber, false).values()) : + Collections.emptyList(); + return Pair.make(lastRevision, changes); + } + } + + public void refresh(List result) { + synchronized (pendingChanges) { + incoming = result; + if (!pending) { + pending = true; + long delay = Math.max(0, lastUpdateTime + DELAY_BETWEEN_UPDATES * 1000000 - System.nanoTime()); + scheduleRefresh(new RefresherRunnable(), delay, TimeUnit.NANOSECONDS); + } + } + } + + private class RefresherRunnable implements Runnable { + + @Override + public void run() { + + Integer revision = null; + synchronized (entries) { + List update; + synchronized (pendingChanges) { + update = incoming; + incoming = null; + } + + if (update != null) { + revision = refreshInternal(update); + } + + synchronized (pendingChanges) { + if (incoming != null) { + scheduleRefresh(new RefresherRunnable(), DELAY_BETWEEN_UPDATES, TimeUnit.MILLISECONDS); + } else { + pending = false; + if (update != null) { + lastUpdateTime = System.nanoTime(); + } + } + } + } + if (revision != null) { + fireListeners(revision, null); + } + } + } + + private Pair> flushAndReadChanges(int sequenceNumber, Listener ignoreListener) { + Integer revision = null; + Pair> changes; + synchronized (entries) { + List update; + synchronized (pendingChanges) { + update = incoming; + incoming = null; + } + + if (update != null) { + revision = refreshInternal(update); + } + changes = readChanges(sequenceNumber); + } + if (revision != null) { + fireListeners(revision, ignoreListener); + } + return changes; + } + + + private int refreshInternal(List result) { + Set added = null; + Set removed = null; + + int oldIndex = 0; + int newIndex = 0; + + while(true) { + + boolean noMoreOlds = oldIndex == previous.size(); + boolean noMoreNews = newIndex == result.size(); + + if(noMoreOlds) { + if(noMoreNews) break; + else { + // all remaining objects are added + for(int i=newIndex;i(); + added.add(result.get(i)); + } + break; + } + } else { + // old objects are still available + if(noMoreNews) { + // all remaining olds have been removed + for(int i=oldIndex;i(); + removed.add(previous.get(i)); + } + break; + } + } + + JSONObject o = previous.get(oldIndex); + JSONObject n = result.get(newIndex); + + int comp = o.id.compareTo(n.id); + if(comp == 0) { + // objects are same => compare + if(!n.equals(o)) { + if(added == null) added = new THashSet(); + added.add(n); + } + oldIndex++; + newIndex++; + } else if (comp > 0) { + // old is bigger than new => new has been added + if(added == null) added = new THashSet(); + added.add(n); + newIndex++; + } else if (comp < 0) { + // old is smaller than new => old has been removed + if(removed == null) removed = new THashSet(); + removed.add(o); + oldIndex++; + } + + } + + if(added != null) { + for(JSONObject entry : added) { + +// String parent = entry.getParent(); +// // Null parent = exists but no parent , empty parent = has been removed +// if(parent != null && parent.isEmpty()) continue; + + add(entry); + + } + } + if(removed != null) { + for(JSONObject entry : removed) { + +// String parent = entry.getParent(); +// // Null parent = exists but no parent , empty parent = has been removed +// if(parent != null && parent.isEmpty()) continue; + + remove(entry); + + } + } + + previous = result; + + return lastRevision; + } + + private void add(JSONObject entry) { + + Integer rev = lastRevisions.get(entry.getId()); + if(rev != null) { + entries.remove(rev); + } + + lastRevision++; + lastRevisions.put(entry.getId(), lastRevision); + entries.put(lastRevision, entry); + } + + private void remove(JSONObject entry) { + + JSONObject object = new JSONObject(entry.getId()); + object.addJSONField("id", entry.getId()); + + Integer rev = lastRevisions.get(object.getId()); + if(rev != null) { + entries.remove(rev); + } + + lastRevision++; + lastRevisions.put(object.getId(), lastRevision); + entries.put(lastRevision, object); + } + + public Pair> getContent(Listener listener, String location, int sequenceNumber) { + Listener> newUpdater = null; + + boolean cached = false; + synchronized (this) { + if (!listener.isDisposed()) { + registerListener(listener); + } + + // Try cache + if ((updater != null) && !updater.isDisposed()) { + cached = true; + } else { + if (hasListeners()) { + updater = new DocumentHistoryListener(this); + newUpdater = updater; + } + } + } + if (cached) { + return flushAndReadChanges(sequenceNumber, listener); + } + + + try { + if (newUpdater != null) { + Simantics.getSession().syncRequest(new URIDocumentRequest(location), newUpdater); + } else { + synchronized (entries) { + List result = Simantics.getSession().syncRequest(new URIDocumentRequest(location), TransientCacheListener.>instance()); + refreshInternal(result); + return readChanges(sequenceNumber); + } + } + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + return flushAndReadChanges(sequenceNumber, listener); + } + + protected void removeUpdater() { + updater = null; + } + } \ No newline at end of file