1 package org.simantics.document.server;
3 import java.util.ArrayList;
4 import java.util.Collection;
5 import java.util.Collections;
9 import java.util.TreeMap;
10 import java.util.concurrent.ScheduledThreadPoolExecutor;
11 import java.util.concurrent.SynchronousQueue;
12 import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.ThreadPoolExecutor;
14 import java.util.concurrent.TimeUnit;
16 import org.simantics.Simantics;
17 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
18 import org.simantics.db.common.utils.Logger;
19 import org.simantics.db.exception.DatabaseException;
20 import org.simantics.db.procedure.Listener;
21 import org.simantics.document.server.request.URIDocumentRequest;
22 import org.simantics.utils.datastructures.Pair;
24 import gnu.trove.map.hash.THashMap;
25 import gnu.trove.set.hash.THashSet;
27 public class DocumentHistory {
29 public static final int MIN_REFRESHER_THREADS = 1;
30 public static final int MAX_REFRESHER_THREADS = 32;
31 public static final long DELAY_BETWEEN_UPDATES = 50; // milliseconds
32 public static final long REFRESHER_THREAD_TTL = 10; // seconds
34 private ArrayList<Listener<Integer>> listeners = new ArrayList<Listener<Integer>>();
36 private TreeMap<Integer, JSONObject> entries = new TreeMap<Integer, JSONObject>();
37 private Map<String, Integer> lastRevisions = new THashMap<String, Integer>();
38 private List<JSONObject> previous = new ArrayList<JSONObject>();
39 private int lastRevision;
41 private List<JSONObject> incoming;
42 private long lastUpdateTime = System.nanoTime();
43 private Boolean pending = false;
44 private Object pendingChanges = new Object();
46 private DocumentHistoryListener updater = null;
48 private static ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
50 public Thread newThread(Runnable r) {
51 Thread t = new Thread(null, r, "Document-Refresh-Scheduler");
57 private static ThreadPoolExecutor executor = new ThreadPoolExecutor(MIN_REFRESHER_THREADS, MAX_REFRESHER_THREADS, REFRESHER_THREAD_TTL, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true), new ThreadFactory() {
59 public Thread newThread(Runnable r) {
60 Thread t = new Thread(null, r, "Document-Refresher-" + (executor.getPoolSize() + 1));
66 private static void scheduleRefresh(Runnable runnable, long delay, TimeUnit unit) {
67 Runnable wrapper = new Runnable() {
70 executor.execute(runnable);
73 scheduler.schedule(wrapper, delay, unit);
76 public DocumentHistory() {
80 private void pruneListeners() {
81 ArrayList<Listener<Integer>> removals = new ArrayList<Listener<Integer>>();
82 for(Listener<Integer> listener : listeners) {
83 if(listener.isDisposed()) {
84 removals.add(listener);
87 listeners.removeAll(removals);
90 private void registerListener(Listener<Integer> listener) {
91 if(!listeners.contains(listener))
92 listeners.add(listener);
95 public synchronized boolean hasListeners() {
97 return !listeners.isEmpty();
100 private void fireListeners(Integer value, Listener<Integer> ignoreListener) {
101 ArrayList<Listener<Integer>> currentListeners;
102 synchronized (this) {
103 currentListeners = new ArrayList<Listener<Integer>>(listeners);
105 for(Listener<Integer> listener : currentListeners) {
106 if(listener.isDisposed()) {
107 synchronized (this) {
108 listeners.remove(listener);
111 if (!listener.equals(ignoreListener)) {
112 listener.execute(value);
118 public Pair<Integer, Collection<JSONObject>> readChanges(int sequenceNumber) {
119 synchronized (entries) {
120 Collection<JSONObject> changes =
121 sequenceNumber < lastRevision ?
122 new ArrayList<JSONObject>(entries.tailMap(sequenceNumber, false).values()) :
123 Collections.emptyList();
124 return Pair.make(lastRevision, changes);
128 public void refresh(List<JSONObject> result) {
129 synchronized (pendingChanges) {
133 long delay = Math.max(0, lastUpdateTime + DELAY_BETWEEN_UPDATES * 1000000 - System.nanoTime());
134 scheduleRefresh(new RefresherRunnable(), delay, TimeUnit.NANOSECONDS);
139 private class RefresherRunnable implements Runnable {
144 Integer revision = null;
145 synchronized (entries) {
146 List<JSONObject> update;
147 synchronized (pendingChanges) {
152 if (update != null) {
153 revision = refreshInternal(update);
156 synchronized (pendingChanges) {
157 if (incoming != null) {
158 scheduleRefresh(new RefresherRunnable(), DELAY_BETWEEN_UPDATES, TimeUnit.MILLISECONDS);
161 if (update != null) {
162 lastUpdateTime = System.nanoTime();
167 if (revision != null) {
168 fireListeners(revision, null);
173 private Pair<Integer, Collection<JSONObject>> flushAndReadChanges(int sequenceNumber, Listener<Integer> ignoreListener) {
174 Integer revision = null;
175 Pair<Integer, Collection<JSONObject>> changes;
176 synchronized (entries) {
177 List<JSONObject> update;
178 synchronized (pendingChanges) {
183 if (update != null) {
184 revision = refreshInternal(update);
186 changes = readChanges(sequenceNumber);
188 if (revision != null) {
189 fireListeners(revision, ignoreListener);
195 private int refreshInternal(List<JSONObject> result) {
196 Set<JSONObject> added = null;
197 Set<JSONObject> removed = null;
204 boolean noMoreOlds = oldIndex == previous.size();
205 boolean noMoreNews = newIndex == result.size();
208 if(noMoreNews) break;
210 // all remaining objects are added
211 for(int i=newIndex;i<result.size();i++) {
212 if(added == null) added = new THashSet<JSONObject>();
213 added.add(result.get(i));
218 // old objects are still available
220 // all remaining olds have been removed
221 for(int i=oldIndex;i<previous.size();i++) {
222 if(removed == null) removed = new THashSet<JSONObject>();
223 removed.add(previous.get(i));
229 JSONObject o = previous.get(oldIndex);
230 JSONObject n = result.get(newIndex);
232 int comp = o.id.compareTo(n.id);
234 // objects are same => compare
236 if(added == null) added = new THashSet<JSONObject>();
241 } else if (comp > 0) {
242 // old is bigger than new => new has been added
243 if(added == null) added = new THashSet<JSONObject>();
246 } else if (comp < 0) {
247 // old is smaller than new => old has been removed
248 if(removed == null) removed = new THashSet<JSONObject>();
256 for(JSONObject entry : added) {
258 // String parent = entry.getParent();
259 // // Null parent = exists but no parent , empty parent = has been removed
260 // if(parent != null && parent.isEmpty()) continue;
266 if(removed != null) {
267 for(JSONObject entry : removed) {
269 // String parent = entry.getParent();
270 // // Null parent = exists but no parent , empty parent = has been removed
271 // if(parent != null && parent.isEmpty()) continue;
283 private void add(JSONObject entry) {
285 Integer rev = lastRevisions.get(entry.getId());
291 lastRevisions.put(entry.getId(), lastRevision);
292 entries.put(lastRevision, entry);
295 private void remove(JSONObject entry) {
297 JSONObject object = new JSONObject(entry.getId());
298 object.addJSONField("id", entry.getId());
300 Integer rev = lastRevisions.get(object.getId());
306 lastRevisions.put(object.getId(), lastRevision);
307 entries.put(lastRevision, object);
310 public Pair<Integer, Collection<JSONObject>> getContent(Listener<Integer> listener, String location, int sequenceNumber) {
311 Listener<List<JSONObject>> newUpdater = null;
313 boolean cached = false;
314 synchronized (this) {
315 if (!listener.isDisposed()) {
316 registerListener(listener);
320 if ((updater != null) && !updater.isDisposed()) {
323 if (hasListeners()) {
324 updater = new DocumentHistoryListener(this);
325 newUpdater = updater;
330 return flushAndReadChanges(sequenceNumber, listener);
335 if (newUpdater != null) {
336 Simantics.getSession().syncRequest(new URIDocumentRequest(location), newUpdater);
338 synchronized (entries) {
339 List<JSONObject> result = Simantics.getSession().syncRequest(new URIDocumentRequest(location), TransientCacheListener.<List<JSONObject>>instance());
340 refreshInternal(result);
341 return readChanges(sequenceNumber);
344 } catch (DatabaseException e) {
345 Logger.defaultLogError(e);
348 return flushAndReadChanges(sequenceNumber, listener);
351 protected void removeUpdater() {