package org.simantics.document.server; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.simantics.Simantics; import org.simantics.db.common.procedure.adapter.TransientCacheListener; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.procedure.Listener; import org.simantics.document.server.request.URIDocumentRequest; import org.simantics.utils.datastructures.Pair; import gnu.trove.map.hash.THashMap; import gnu.trove.set.hash.THashSet; public class DocumentHistory { public static final int MIN_REFRESHER_THREADS = 1; public static final int MAX_REFRESHER_THREADS = 32; public static final long DELAY_BETWEEN_UPDATES = 50; // milliseconds public static final long REFRESHER_THREAD_TTL = 10; // seconds private ArrayList> listeners = new ArrayList>(); private TreeMap entries = new TreeMap(); private Map lastRevisions = new THashMap(); private List previous = new ArrayList(); private int lastRevision; private List incoming; private long lastUpdateTime = System.nanoTime(); private Boolean pending = false; private Object pendingChanges = new Object(); private DocumentHistoryListener updater = null; private static ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(null, r, "Document-Refresh-Scheduler"); t.setDaemon(true); return t; } }); private static ThreadPoolExecutor executor = new ThreadPoolExecutor(MIN_REFRESHER_THREADS, MAX_REFRESHER_THREADS, REFRESHER_THREAD_TTL, TimeUnit.SECONDS, new SynchronousQueue(true), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(null, r, "Document-Refresher-" + (executor.getPoolSize() + 1)); t.setDaemon(true); return t; } }); private static void scheduleRefresh(Runnable runnable, long delay, TimeUnit unit) { Runnable wrapper = new Runnable() { @Override public void run() { executor.execute(runnable); } }; scheduler.schedule(wrapper, delay, unit); } public DocumentHistory() { lastRevision = 0; } private void pruneListeners() { ArrayList> removals = new ArrayList>(); for(Listener listener : listeners) { if(listener.isDisposed()) { removals.add(listener); } } listeners.removeAll(removals); } private void registerListener(Listener listener) { if(!listeners.contains(listener)) listeners.add(listener); } public synchronized boolean hasListeners() { pruneListeners(); return !listeners.isEmpty(); } private void fireListeners(Integer value, Listener ignoreListener) { ArrayList> currentListeners; synchronized (this) { currentListeners = new ArrayList>(listeners); } for(Listener listener : currentListeners) { if(listener.isDisposed()) { synchronized (this) { listeners.remove(listener); } } else { if (!listener.equals(ignoreListener)) { listener.execute(value); } } } } public Pair> readChanges(int sequenceNumber) { synchronized (entries) { Collection changes = sequenceNumber < lastRevision ? new ArrayList(entries.tailMap(sequenceNumber, false).values()) : Collections.emptyList(); return Pair.make(lastRevision, changes); } } public void refresh(List result) { synchronized (pendingChanges) { incoming = result; if (!pending) { pending = true; long delay = Math.max(0, lastUpdateTime + DELAY_BETWEEN_UPDATES * 1000000 - System.nanoTime()); scheduleRefresh(new RefresherRunnable(), delay, TimeUnit.NANOSECONDS); } } } private class RefresherRunnable implements Runnable { @Override public void run() { Integer revision = null; synchronized (entries) { List update; synchronized (pendingChanges) { update = incoming; incoming = null; } if (update != null) { revision = refreshInternal(update); } synchronized (pendingChanges) { if (incoming != null) { scheduleRefresh(new RefresherRunnable(), DELAY_BETWEEN_UPDATES, TimeUnit.MILLISECONDS); } else { pending = false; if (update != null) { lastUpdateTime = System.nanoTime(); } } } } if (revision != null) { fireListeners(revision, null); } } } private Pair> flushAndReadChanges(int sequenceNumber, Listener ignoreListener) { Integer revision = null; Pair> changes; synchronized (entries) { List update; synchronized (pendingChanges) { update = incoming; incoming = null; } if (update != null) { revision = refreshInternal(update); } changes = readChanges(sequenceNumber); } if (revision != null) { fireListeners(revision, ignoreListener); } return changes; } private int refreshInternal(List result) { Set added = null; Set removed = null; int oldIndex = 0; int newIndex = 0; while(true) { boolean noMoreOlds = oldIndex == previous.size(); boolean noMoreNews = newIndex == result.size(); if(noMoreOlds) { if(noMoreNews) break; else { // all remaining objects are added for(int i=newIndex;i(); added.add(result.get(i)); } break; } } else { // old objects are still available if(noMoreNews) { // all remaining olds have been removed for(int i=oldIndex;i(); removed.add(previous.get(i)); } break; } } JSONObject o = previous.get(oldIndex); JSONObject n = result.get(newIndex); int comp = o.id.compareTo(n.id); if(comp == 0) { // objects are same => compare if(!n.equals(o)) { if(added == null) added = new THashSet(); added.add(n); } oldIndex++; newIndex++; } else if (comp > 0) { // old is bigger than new => new has been added if(added == null) added = new THashSet(); added.add(n); newIndex++; } else if (comp < 0) { // old is smaller than new => old has been removed if(removed == null) removed = new THashSet(); removed.add(o); oldIndex++; } } if(added != null) { for(JSONObject entry : added) { // String parent = entry.getParent(); // // Null parent = exists but no parent , empty parent = has been removed // if(parent != null && parent.isEmpty()) continue; add(entry); } } if(removed != null) { for(JSONObject entry : removed) { // String parent = entry.getParent(); // // Null parent = exists but no parent , empty parent = has been removed // if(parent != null && parent.isEmpty()) continue; remove(entry); } } previous = result; return lastRevision; } private void add(JSONObject entry) { Integer rev = lastRevisions.get(entry.getId()); if(rev != null) { entries.remove(rev); } lastRevision++; lastRevisions.put(entry.getId(), lastRevision); entries.put(lastRevision, entry); } private void remove(JSONObject entry) { JSONObject object = new JSONObject(entry.getId()); object.addJSONField("id", entry.getId()); Integer rev = lastRevisions.get(object.getId()); if(rev != null) { entries.remove(rev); } lastRevision++; lastRevisions.put(object.getId(), lastRevision); entries.put(lastRevision, object); } public Pair> getContent(Listener listener, String location, int sequenceNumber) { Listener> newUpdater = null; boolean cached = false; synchronized (this) { if (!listener.isDisposed()) { registerListener(listener); } // Try cache if ((updater != null) && !updater.isDisposed()) { cached = true; } else { if (hasListeners()) { updater = new DocumentHistoryListener(this); newUpdater = updater; } } } if (cached) { return flushAndReadChanges(sequenceNumber, listener); } try { if (newUpdater != null) { Simantics.getSession().syncRequest(new URIDocumentRequest(location), newUpdater); } else { synchronized (entries) { List result = Simantics.getSession().syncRequest(new URIDocumentRequest(location), TransientCacheListener.>instance()); refreshInternal(result); return readChanges(sequenceNumber); } } } catch (DatabaseException e) { Logger.defaultLogError(e); } return flushAndReadChanges(sequenceNumber, listener); } protected void removeUpdater() { updater = null; } }