1 package org.simantics.document.server;
\r
3 import java.util.ArrayList;
\r
4 import java.util.Collection;
\r
5 import java.util.Collections;
\r
6 import java.util.List;
\r
7 import java.util.Map;
\r
8 import java.util.Set;
\r
9 import java.util.TreeMap;
\r
10 import java.util.concurrent.ScheduledThreadPoolExecutor;
\r
11 import java.util.concurrent.SynchronousQueue;
\r
12 import java.util.concurrent.ThreadFactory;
\r
13 import java.util.concurrent.ThreadPoolExecutor;
\r
14 import java.util.concurrent.TimeUnit;
\r
16 import org.simantics.Simantics;
\r
17 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
\r
18 import org.simantics.db.common.utils.Logger;
\r
19 import org.simantics.db.exception.DatabaseException;
\r
20 import org.simantics.db.procedure.Listener;
\r
21 import org.simantics.document.server.request.URIDocumentRequest;
\r
22 import org.simantics.utils.datastructures.Pair;
\r
24 import gnu.trove.map.hash.THashMap;
\r
25 import gnu.trove.set.hash.THashSet;
\r
27 public class DocumentHistory {
\r
29 public static final int MIN_REFRESHER_THREADS = 1;
\r
30 public static final int MAX_REFRESHER_THREADS = 32;
\r
31 public static final long DELAY_BETWEEN_UPDATES = 50; // milliseconds
\r
32 public static final long REFRESHER_THREAD_TTL = 10; // seconds
\r
34 private ArrayList<Listener<Integer>> listeners = new ArrayList<Listener<Integer>>();
\r
36 private TreeMap<Integer, JSONObject> entries = new TreeMap<Integer, JSONObject>();
\r
37 private Map<String, Integer> lastRevisions = new THashMap<String, Integer>();
\r
38 private List<JSONObject> previous = new ArrayList<JSONObject>();
\r
39 private int lastRevision;
\r
41 private List<JSONObject> incoming;
\r
42 private long lastUpdateTime = System.nanoTime();
\r
43 private Boolean pending = false;
\r
44 private Object pendingChanges = new Object();
\r
46 private DocumentHistoryListener updater = null;
\r
48 private static ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
\r
50 public Thread newThread(Runnable r) {
\r
51 Thread t = new Thread(null, r, "Document-Refresh-Scheduler");
\r
57 private static ThreadPoolExecutor executor = new ThreadPoolExecutor(MIN_REFRESHER_THREADS, MAX_REFRESHER_THREADS, REFRESHER_THREAD_TTL, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true), new ThreadFactory() {
\r
59 public Thread newThread(Runnable r) {
\r
60 Thread t = new Thread(null, r, "Document-Refresher-" + (executor.getPoolSize() + 1));
\r
66 private static void scheduleRefresh(Runnable runnable, long delay, TimeUnit unit) {
\r
67 Runnable wrapper = new Runnable() {
\r
70 executor.execute(runnable);
\r
73 scheduler.schedule(wrapper, delay, unit);
\r
76 public DocumentHistory() {
\r
80 private void pruneListeners() {
\r
81 ArrayList<Listener<Integer>> removals = new ArrayList<Listener<Integer>>();
\r
82 for(Listener<Integer> listener : listeners) {
\r
83 if(listener.isDisposed()) {
\r
84 removals.add(listener);
\r
87 listeners.removeAll(removals);
\r
90 private void registerListener(Listener<Integer> listener) {
\r
91 if(!listeners.contains(listener))
\r
92 listeners.add(listener);
\r
95 public synchronized boolean hasListeners() {
\r
97 return !listeners.isEmpty();
\r
100 private void fireListeners(Integer value, Listener<Integer> ignoreListener) {
\r
101 ArrayList<Listener<Integer>> currentListeners;
\r
102 synchronized (this) {
\r
103 currentListeners = new ArrayList<Listener<Integer>>(listeners);
\r
105 for(Listener<Integer> listener : currentListeners) {
\r
106 if(listener.isDisposed()) {
\r
107 synchronized (this) {
\r
108 listeners.remove(listener);
\r
111 if (!listener.equals(ignoreListener)) {
\r
112 listener.execute(value);
\r
118 public Pair<Integer, Collection<JSONObject>> readChanges(int sequenceNumber) {
\r
119 synchronized (entries) {
\r
120 Collection<JSONObject> changes =
\r
121 sequenceNumber < lastRevision ?
\r
122 new ArrayList<JSONObject>(entries.tailMap(sequenceNumber, false).values()) :
\r
123 Collections.emptyList();
\r
124 return Pair.make(lastRevision, changes);
\r
128 public void refresh(List<JSONObject> result) {
\r
129 synchronized (pendingChanges) {
\r
133 long delay = Math.max(0, lastUpdateTime + DELAY_BETWEEN_UPDATES * 1000000 - System.nanoTime());
\r
134 scheduleRefresh(new RefresherRunnable(), delay, TimeUnit.NANOSECONDS);
\r
139 private class RefresherRunnable implements Runnable {
\r
142 public void run() {
\r
144 Integer revision = null;
\r
145 synchronized (entries) {
\r
146 List<JSONObject> update;
\r
147 synchronized (pendingChanges) {
\r
152 if (update != null) {
\r
153 revision = refreshInternal(update);
\r
156 synchronized (pendingChanges) {
\r
157 if (incoming != null) {
\r
158 scheduleRefresh(new RefresherRunnable(), DELAY_BETWEEN_UPDATES, TimeUnit.MILLISECONDS);
\r
161 if (update != null) {
\r
162 lastUpdateTime = System.nanoTime();
\r
167 if (revision != null) {
\r
168 fireListeners(revision, null);
\r
173 private Pair<Integer, Collection<JSONObject>> flushAndReadChanges(int sequenceNumber, Listener<Integer> ignoreListener) {
\r
174 Integer revision = null;
\r
175 Pair<Integer, Collection<JSONObject>> changes;
\r
176 synchronized (entries) {
\r
177 List<JSONObject> update;
\r
178 synchronized (pendingChanges) {
\r
183 if (update != null) {
\r
184 revision = refreshInternal(update);
\r
186 changes = readChanges(sequenceNumber);
\r
188 if (revision != null) {
\r
189 fireListeners(revision, ignoreListener);
\r
195 private int refreshInternal(List<JSONObject> result) {
\r
196 Set<JSONObject> added = null;
\r
197 Set<JSONObject> removed = null;
\r
204 boolean noMoreOlds = oldIndex == previous.size();
\r
205 boolean noMoreNews = newIndex == result.size();
\r
208 if(noMoreNews) break;
\r
210 // all remaining objects are added
\r
211 for(int i=newIndex;i<result.size();i++) {
\r
212 if(added == null) added = new THashSet<JSONObject>();
\r
213 added.add(result.get(i));
\r
218 // old objects are still available
\r
220 // all remaining olds have been removed
\r
221 for(int i=oldIndex;i<previous.size();i++) {
\r
222 if(removed == null) removed = new THashSet<JSONObject>();
\r
223 removed.add(previous.get(i));
\r
229 JSONObject o = previous.get(oldIndex);
\r
230 JSONObject n = result.get(newIndex);
\r
232 int comp = o.id.compareTo(n.id);
\r
234 // objects are same => compare
\r
236 if(added == null) added = new THashSet<JSONObject>();
\r
241 } else if (comp > 0) {
\r
242 // old is bigger than new => new has been added
\r
243 if(added == null) added = new THashSet<JSONObject>();
\r
246 } else if (comp < 0) {
\r
247 // old is smaller than new => old has been removed
\r
248 if(removed == null) removed = new THashSet<JSONObject>();
\r
255 if(added != null) {
\r
256 for(JSONObject entry : added) {
\r
258 // String parent = entry.getParent();
\r
259 // // Null parent = exists but no parent , empty parent = has been removed
\r
260 // if(parent != null && parent.isEmpty()) continue;
\r
266 if(removed != null) {
\r
267 for(JSONObject entry : removed) {
\r
269 // String parent = entry.getParent();
\r
270 // // Null parent = exists but no parent , empty parent = has been removed
\r
271 // if(parent != null && parent.isEmpty()) continue;
\r
280 return lastRevision;
\r
283 private void add(JSONObject entry) {
\r
285 Integer rev = lastRevisions.get(entry.getId());
\r
287 entries.remove(rev);
\r
291 lastRevisions.put(entry.getId(), lastRevision);
\r
292 entries.put(lastRevision, entry);
\r
295 private void remove(JSONObject entry) {
\r
297 JSONObject object = new JSONObject(entry.getId());
\r
298 object.addJSONField("id", entry.getId());
\r
300 Integer rev = lastRevisions.get(object.getId());
\r
302 entries.remove(rev);
\r
306 lastRevisions.put(object.getId(), lastRevision);
\r
307 entries.put(lastRevision, object);
\r
310 public Pair<Integer, Collection<JSONObject>> getContent(Listener<Integer> listener, String location, int sequenceNumber) {
\r
311 Listener<List<JSONObject>> newUpdater = null;
\r
313 boolean cached = false;
\r
314 synchronized (this) {
\r
315 if (!listener.isDisposed()) {
\r
316 registerListener(listener);
\r
320 if ((updater != null) && !updater.isDisposed()) {
\r
323 if (hasListeners()) {
\r
324 updater = new DocumentHistoryListener(this);
\r
325 newUpdater = updater;
\r
330 return flushAndReadChanges(sequenceNumber, listener);
\r
335 if (newUpdater != null) {
\r
336 Simantics.getSession().syncRequest(new URIDocumentRequest(location), newUpdater);
\r
338 synchronized (entries) {
\r
339 List<JSONObject> result = Simantics.getSession().syncRequest(new URIDocumentRequest(location), TransientCacheListener.<List<JSONObject>>instance());
\r
340 refreshInternal(result);
\r
341 return readChanges(sequenceNumber);
\r
344 } catch (DatabaseException e) {
\r
345 Logger.defaultLogError(e);
\r
348 return flushAndReadChanges(sequenceNumber, listener);
\r
351 protected void removeUpdater() {
\r