]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.document.server/src/org/simantics/document/server/DocumentHistory.java
Fixed multiple issues causing dangling references to discarded queries
[simantics/platform.git] / bundles / org.simantics.document.server / src / org / simantics / document / server / DocumentHistory.java
1 package org.simantics.document.server;
2
3 import java.util.ArrayList;
4 import java.util.Collection;
5 import java.util.Collections;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.Set;
9 import java.util.TreeMap;
10 import java.util.concurrent.ScheduledThreadPoolExecutor;
11 import java.util.concurrent.SynchronousQueue;
12 import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.ThreadPoolExecutor;
14 import java.util.concurrent.TimeUnit;
15
16 import org.simantics.Simantics;
17 import org.simantics.db.common.procedure.adapter.TransientCacheListener;
18 import org.simantics.db.common.utils.Logger;
19 import org.simantics.db.exception.DatabaseException;
20 import org.simantics.db.procedure.Listener;
21 import org.simantics.document.server.request.URIDocumentRequest;
22 import org.simantics.utils.datastructures.Pair;
23
24 import gnu.trove.map.hash.THashMap;
25 import gnu.trove.set.hash.THashSet;
26
27 public class DocumentHistory {
28
29         public static final int MIN_REFRESHER_THREADS = 1;
30         public static final int MAX_REFRESHER_THREADS = 32;
31         public static final long DELAY_BETWEEN_UPDATES = 50; // milliseconds
32         public static final long REFRESHER_THREAD_TTL = 10; // seconds
33         
34         private ArrayList<Listener<Integer>> listeners = new ArrayList<Listener<Integer>>(); 
35
36         private TreeMap<Integer, JSONObject> entries = new TreeMap<Integer, JSONObject>();
37         private Map<String, Integer> lastRevisions = new THashMap<String, Integer>();
38         private List<JSONObject> previous = new ArrayList<JSONObject>();
39         private int lastRevision;
40
41         private List<JSONObject> incoming;
42         private long lastUpdateTime = System.nanoTime();
43         private Boolean pending = false;
44         private Object pendingChanges = new Object();
45         
46         private DocumentHistoryListener updater = null;
47
48         private static ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
49                 @Override
50                 public Thread newThread(Runnable r) {
51                         Thread t = new Thread(null, r, "Document-Refresh-Scheduler");
52                         t.setDaemon(true);
53                         return t;
54                 }
55         });
56         
57         private static ThreadPoolExecutor executor = new ThreadPoolExecutor(MIN_REFRESHER_THREADS, MAX_REFRESHER_THREADS, REFRESHER_THREAD_TTL, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true), new ThreadFactory() {
58                 @Override
59                 public Thread newThread(Runnable r) {
60                         Thread t = new Thread(null, r, "Document-Refresher-" + (executor.getPoolSize() + 1));
61                         t.setDaemon(true);
62                         return t;
63                 }
64         });
65         
66         private static void scheduleRefresh(Runnable runnable, long delay, TimeUnit unit) {
67                 Runnable wrapper = new Runnable() {
68                         @Override
69                         public void run() {
70                                 executor.execute(runnable);
71                         }
72                 };
73                 scheduler.schedule(wrapper, delay, unit);
74         }
75         
76         public DocumentHistory() {
77                 lastRevision = 0;
78         }
79         
80         private void pruneListeners() {
81                 ArrayList<Listener<Integer>> removals = new ArrayList<Listener<Integer>>(); 
82                 for(Listener<Integer> listener : listeners) {
83                         if(listener.isDisposed()) {
84                                 removals.add(listener);
85                         }
86                 }
87                 listeners.removeAll(removals);
88         }
89         
90         private void registerListener(Listener<Integer> listener) {
91                 if(!listeners.contains(listener))
92                         listeners.add(listener);
93         }
94
95         public synchronized boolean hasListeners() {
96                 pruneListeners();
97                 return !listeners.isEmpty();
98         }
99         
100         private void fireListeners(Integer value, Listener<Integer> ignoreListener) {
101                 ArrayList<Listener<Integer>> currentListeners;
102                 synchronized (this) {
103                         currentListeners = new ArrayList<Listener<Integer>>(listeners);
104                 }
105                 for(Listener<Integer> listener : currentListeners) {
106                         if(listener.isDisposed()) {
107                                 synchronized (this) {
108                                         listeners.remove(listener);
109                                 }
110                         } else {
111                                 if (!listener.equals(ignoreListener)) {
112                                         listener.execute(value);
113                                 }
114                         }
115                 }
116         }
117         
118         public Pair<Integer, Collection<JSONObject>> readChanges(int sequenceNumber) {
119                 synchronized (entries) {
120                         Collection<JSONObject> changes = 
121                                         sequenceNumber < lastRevision ?
122                                                         new ArrayList<JSONObject>(entries.tailMap(sequenceNumber, false).values()) : 
123                                                                 Collections.emptyList();
124                         return Pair.make(lastRevision, changes);
125                 }
126         }
127         
128         public void refresh(List<JSONObject> result) {
129                 synchronized (pendingChanges) {
130                         incoming = result;
131                         if (!pending) {
132                                 pending = true;
133                                 long delay = Math.max(0, lastUpdateTime + DELAY_BETWEEN_UPDATES * 1000000 - System.nanoTime());
134                                 scheduleRefresh(new RefresherRunnable(), delay, TimeUnit.NANOSECONDS);
135                         }
136                 }
137         }
138         
139         private class RefresherRunnable implements Runnable {
140                 
141                 @Override
142                 public void run() {
143
144                         Integer revision = null;
145                         synchronized (entries) {
146                                 List<JSONObject> update;
147                                 synchronized (pendingChanges) {
148                                         update = incoming;
149                                         incoming = null;
150                                 }
151         
152                                 if (update != null) {
153                                         revision = refreshInternal(update);
154                                 }
155                                 
156                                 synchronized (pendingChanges) {
157                                         if (incoming != null) {
158                                                 scheduleRefresh(new RefresherRunnable(), DELAY_BETWEEN_UPDATES, TimeUnit.MILLISECONDS);
159                                         } else {
160                                                 pending = false;
161                                                 if (update != null) {
162                                                         lastUpdateTime = System.nanoTime();
163                                                 }
164                                         }
165                                 }
166                         }
167                         if (revision != null) {
168                                 fireListeners(revision, null);
169                         }
170                 }
171         }
172         
173         private Pair<Integer, Collection<JSONObject>> flushAndReadChanges(int sequenceNumber, Listener<Integer> ignoreListener) {
174                 Integer revision = null;
175                 Pair<Integer, Collection<JSONObject>> changes;
176                 synchronized (entries) {
177                         List<JSONObject> update;
178                         synchronized (pendingChanges) {
179                                 update = incoming;
180                                 incoming = null;
181                         }
182                         
183                         if (update != null) {
184                                 revision = refreshInternal(update);
185                         }
186                         changes = readChanges(sequenceNumber);
187                 }
188                 if (revision != null) {
189                         fireListeners(revision, ignoreListener);
190                 }
191                 return changes;
192         }
193
194         
195         private int refreshInternal(List<JSONObject> result) {
196                 Set<JSONObject> added = null;
197                 Set<JSONObject> removed = null;
198
199                 int oldIndex = 0;
200                 int newIndex = 0;
201                 
202                 while(true) {
203
204                         boolean noMoreOlds = oldIndex == previous.size();
205                         boolean noMoreNews = newIndex == result.size();
206                         
207                         if(noMoreOlds) {
208                                 if(noMoreNews) break;
209                                 else {
210                                         // all remaining objects are added
211                                         for(int i=newIndex;i<result.size();i++) {
212                                                 if(added == null) added = new THashSet<JSONObject>();
213                                                 added.add(result.get(i));
214                                         }
215                                         break;
216                                 }
217                         } else {
218                                 // old objects are still available
219                                 if(noMoreNews) {
220                                         // all remaining olds have been removed
221                                         for(int i=oldIndex;i<previous.size();i++) {
222                                                 if(removed == null) removed = new THashSet<JSONObject>();
223                                                 removed.add(previous.get(i));
224                                         }
225                                         break;
226                                 }
227                         }
228                         
229                         JSONObject o = previous.get(oldIndex);
230                         JSONObject n = result.get(newIndex);
231                         
232                         int comp = o.id.compareTo(n.id);
233                         if(comp == 0) {
234                                 // objects are same => compare
235                                 if(!n.equals(o)) {
236                                         if(added == null) added = new THashSet<JSONObject>();
237                                         added.add(n);
238                                 }
239                                 oldIndex++;
240                                 newIndex++;
241                         } else if (comp > 0) {
242                                 // old is bigger than new => new has been added
243                                 if(added == null) added = new THashSet<JSONObject>();
244                                 added.add(n);
245                                 newIndex++;
246                         } else if (comp < 0) {
247                                 // old is smaller than new => old has been removed 
248                                 if(removed == null) removed = new THashSet<JSONObject>();
249                                 removed.add(o);
250                                 oldIndex++;
251                         }
252                         
253                 }
254
255                 if(added != null) {
256                         for(JSONObject entry : added) {
257
258 //                              String parent = entry.getParent();
259 //                              // Null parent = exists but no parent , empty parent = has been removed
260 //                              if(parent != null && parent.isEmpty()) continue;
261
262                                 add(entry);
263
264                         }
265                 }
266                 if(removed != null) {
267                         for(JSONObject entry : removed) {
268                                 
269 //                              String parent = entry.getParent();
270 //                              // Null parent = exists but no parent , empty parent = has been removed
271 //                              if(parent != null && parent.isEmpty()) continue;
272                                 
273                                 remove(entry);
274                                 
275                         }
276                 }
277                 
278                 previous = result;
279
280                 return lastRevision;
281         }
282         
283         private void add(JSONObject entry) {
284                 
285                 Integer rev = lastRevisions.get(entry.getId());
286                 if(rev != null) {
287                         entries.remove(rev);
288                 }
289
290                 lastRevision++;
291                 lastRevisions.put(entry.getId(), lastRevision);
292                 entries.put(lastRevision, entry);
293         }
294             
295         private void remove(JSONObject entry) {
296
297                 JSONObject object = new JSONObject(entry.getId());
298                 object.addJSONField("id", entry.getId());
299
300                 Integer rev = lastRevisions.get(object.getId());
301                 if(rev != null) {
302                         entries.remove(rev);
303                 }
304
305                 lastRevision++;
306                 lastRevisions.put(object.getId(), lastRevision);
307                 entries.put(lastRevision, object);
308         }
309         
310         public Pair<Integer, Collection<JSONObject>> getContent(Listener<Integer> listener, String location, int sequenceNumber) {
311                 Listener<List<JSONObject>> newUpdater = null;
312                 
313                 boolean cached = false;
314                 synchronized (this) {
315                         if (!listener.isDisposed()) {
316                                 registerListener(listener);
317                         }
318         
319                         // Try cache
320                         if ((updater != null) && !updater.isDisposed()) {
321                                 cached = true;
322                         } else {
323                                 if (hasListeners()) {
324                                         updater = new DocumentHistoryListener(this);
325                                         newUpdater = updater;
326                                 }
327                         }
328                 }
329                 if (cached) {
330                         return flushAndReadChanges(sequenceNumber, listener);
331                 }
332                 
333
334                 try {
335                         if (newUpdater != null) {
336                                 Simantics.getSession().syncRequest(new URIDocumentRequest(location), newUpdater);
337                         } else {
338                                 synchronized (entries) {
339                                         List<JSONObject> result = Simantics.getSession().syncRequest(new URIDocumentRequest(location), TransientCacheListener.<List<JSONObject>>instance());
340                                         refreshInternal(result);
341                                         return readChanges(sequenceNumber);
342                                 }
343                         }
344                 } catch (DatabaseException e) {
345                         Logger.defaultLogError(e);
346                 }
347
348                 return flushAndReadChanges(sequenceNumber, listener);
349         }
350         
351         protected void removeUpdater() {
352                 updater = null;
353         }
354         
355 }