]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.document.server/src/org/simantics/document/server/DocumentHistory.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.document.server / src / org / simantics / document / server / DocumentHistory.java
1 package org.simantics.document.server;\r
2 \r
3 import java.util.ArrayList;\r
4 import java.util.Collection;\r
5 import java.util.Collections;\r
6 import java.util.List;\r
7 import java.util.Map;\r
8 import java.util.Set;\r
9 import java.util.TreeMap;\r
10 import java.util.concurrent.ScheduledThreadPoolExecutor;\r
11 import java.util.concurrent.SynchronousQueue;\r
12 import java.util.concurrent.ThreadFactory;\r
13 import java.util.concurrent.ThreadPoolExecutor;\r
14 import java.util.concurrent.TimeUnit;\r
15 \r
16 import org.simantics.Simantics;\r
17 import org.simantics.db.common.procedure.adapter.TransientCacheListener;\r
18 import org.simantics.db.common.utils.Logger;\r
19 import org.simantics.db.exception.DatabaseException;\r
20 import org.simantics.db.procedure.Listener;\r
21 import org.simantics.document.server.request.URIDocumentRequest;\r
22 import org.simantics.utils.datastructures.Pair;\r
23 \r
24 import gnu.trove.map.hash.THashMap;\r
25 import gnu.trove.set.hash.THashSet;\r
26 \r
27 public class DocumentHistory {\r
28 \r
29         public static final int MIN_REFRESHER_THREADS = 1;\r
30         public static final int MAX_REFRESHER_THREADS = 32;\r
31         public static final long DELAY_BETWEEN_UPDATES = 50; // milliseconds\r
32         public static final long REFRESHER_THREAD_TTL = 10; // seconds\r
33         \r
34         private ArrayList<Listener<Integer>> listeners = new ArrayList<Listener<Integer>>(); \r
35 \r
36         private TreeMap<Integer, JSONObject> entries = new TreeMap<Integer, JSONObject>();\r
37         private Map<String, Integer> lastRevisions = new THashMap<String, Integer>();\r
38         private List<JSONObject> previous = new ArrayList<JSONObject>();\r
39         private int lastRevision;\r
40 \r
41         private List<JSONObject> incoming;\r
42         private long lastUpdateTime = System.nanoTime();\r
43         private Boolean pending = false;\r
44         private Object pendingChanges = new Object();\r
45         \r
46         private DocumentHistoryListener updater = null;\r
47 \r
48         private static ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {\r
49                 @Override\r
50                 public Thread newThread(Runnable r) {\r
51                         Thread t = new Thread(null, r, "Document-Refresh-Scheduler");\r
52                         t.setDaemon(true);\r
53                         return t;\r
54                 }\r
55         });\r
56         \r
57         private static ThreadPoolExecutor executor = new ThreadPoolExecutor(MIN_REFRESHER_THREADS, MAX_REFRESHER_THREADS, REFRESHER_THREAD_TTL, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true), new ThreadFactory() {\r
58                 @Override\r
59                 public Thread newThread(Runnable r) {\r
60                         Thread t = new Thread(null, r, "Document-Refresher-" + (executor.getPoolSize() + 1));\r
61                         t.setDaemon(true);\r
62                         return t;\r
63                 }\r
64         });\r
65         \r
66         private static void scheduleRefresh(Runnable runnable, long delay, TimeUnit unit) {\r
67                 Runnable wrapper = new Runnable() {\r
68                         @Override\r
69                         public void run() {\r
70                                 executor.execute(runnable);\r
71                         }\r
72                 };\r
73                 scheduler.schedule(wrapper, delay, unit);\r
74         }\r
75         \r
76         public DocumentHistory() {\r
77                 lastRevision = 0;\r
78         }\r
79         \r
80         private void pruneListeners() {\r
81                 ArrayList<Listener<Integer>> removals = new ArrayList<Listener<Integer>>(); \r
82                 for(Listener<Integer> listener : listeners) {\r
83                         if(listener.isDisposed()) {\r
84                                 removals.add(listener);\r
85                         }\r
86                 }\r
87                 listeners.removeAll(removals);\r
88         }\r
89         \r
90         private void registerListener(Listener<Integer> listener) {\r
91                 if(!listeners.contains(listener))\r
92                         listeners.add(listener);\r
93         }\r
94 \r
95         public synchronized boolean hasListeners() {\r
96                 pruneListeners();\r
97                 return !listeners.isEmpty();\r
98         }\r
99         \r
100         private void fireListeners(Integer value, Listener<Integer> ignoreListener) {\r
101                 ArrayList<Listener<Integer>> currentListeners;\r
102                 synchronized (this) {\r
103                         currentListeners = new ArrayList<Listener<Integer>>(listeners);\r
104                 }\r
105                 for(Listener<Integer> listener : currentListeners) {\r
106                         if(listener.isDisposed()) {\r
107                                 synchronized (this) {\r
108                                         listeners.remove(listener);\r
109                                 }\r
110                         } else {\r
111                                 if (!listener.equals(ignoreListener)) {\r
112                                         listener.execute(value);\r
113                                 }\r
114                         }\r
115                 }\r
116         }\r
117         \r
118         public Pair<Integer, Collection<JSONObject>> readChanges(int sequenceNumber) {\r
119                 synchronized (entries) {\r
120                         Collection<JSONObject> changes = \r
121                                         sequenceNumber < lastRevision ?\r
122                                                         new ArrayList<JSONObject>(entries.tailMap(sequenceNumber, false).values()) : \r
123                                                                 Collections.emptyList();\r
124                         return Pair.make(lastRevision, changes);\r
125                 }\r
126         }\r
127         \r
128         public void refresh(List<JSONObject> result) {\r
129                 synchronized (pendingChanges) {\r
130                         incoming = result;\r
131                         if (!pending) {\r
132                                 pending = true;\r
133                                 long delay = Math.max(0, lastUpdateTime + DELAY_BETWEEN_UPDATES * 1000000 - System.nanoTime());\r
134                                 scheduleRefresh(new RefresherRunnable(), delay, TimeUnit.NANOSECONDS);\r
135                         }\r
136                 }\r
137         }\r
138         \r
139         private class RefresherRunnable implements Runnable {\r
140                 \r
141                 @Override\r
142                 public void run() {\r
143 \r
144                         Integer revision = null;\r
145                         synchronized (entries) {\r
146                                 List<JSONObject> update;\r
147                                 synchronized (pendingChanges) {\r
148                                         update = incoming;\r
149                                         incoming = null;\r
150                                 }\r
151         \r
152                                 if (update != null) {\r
153                                         revision = refreshInternal(update);\r
154                                 }\r
155                                 \r
156                                 synchronized (pendingChanges) {\r
157                                         if (incoming != null) {\r
158                                                 scheduleRefresh(new RefresherRunnable(), DELAY_BETWEEN_UPDATES, TimeUnit.MILLISECONDS);\r
159                                         } else {\r
160                                                 pending = false;\r
161                                                 if (update != null) {\r
162                                                         lastUpdateTime = System.nanoTime();\r
163                                                 }\r
164                                         }\r
165                                 }\r
166                         }\r
167                         if (revision != null) {\r
168                                 fireListeners(revision, null);\r
169                         }\r
170                 }\r
171         }\r
172         \r
173         private Pair<Integer, Collection<JSONObject>> flushAndReadChanges(int sequenceNumber, Listener<Integer> ignoreListener) {\r
174                 Integer revision = null;\r
175                 Pair<Integer, Collection<JSONObject>> changes;\r
176                 synchronized (entries) {\r
177                         List<JSONObject> update;\r
178                         synchronized (pendingChanges) {\r
179                                 update = incoming;\r
180                                 incoming = null;\r
181                         }\r
182                         \r
183                         if (update != null) {\r
184                                 revision = refreshInternal(update);\r
185                         }\r
186                         changes = readChanges(sequenceNumber);\r
187                 }\r
188                 if (revision != null) {\r
189                         fireListeners(revision, ignoreListener);\r
190                 }\r
191                 return changes;\r
192         }\r
193 \r
194         \r
195         private int refreshInternal(List<JSONObject> result) {\r
196                 Set<JSONObject> added = null;\r
197                 Set<JSONObject> removed = null;\r
198 \r
199                 int oldIndex = 0;\r
200                 int newIndex = 0;\r
201                 \r
202                 while(true) {\r
203 \r
204                         boolean noMoreOlds = oldIndex == previous.size();\r
205                         boolean noMoreNews = newIndex == result.size();\r
206                         \r
207                         if(noMoreOlds) {\r
208                                 if(noMoreNews) break;\r
209                                 else {\r
210                                         // all remaining objects are added\r
211                                         for(int i=newIndex;i<result.size();i++) {\r
212                                                 if(added == null) added = new THashSet<JSONObject>();\r
213                                                 added.add(result.get(i));\r
214                                         }\r
215                                         break;\r
216                                 }\r
217                         } else {\r
218                                 // old objects are still available\r
219                                 if(noMoreNews) {\r
220                                         // all remaining olds have been removed\r
221                                         for(int i=oldIndex;i<previous.size();i++) {\r
222                                                 if(removed == null) removed = new THashSet<JSONObject>();\r
223                                                 removed.add(previous.get(i));\r
224                                         }\r
225                                         break;\r
226                                 }\r
227                         }\r
228                         \r
229                         JSONObject o = previous.get(oldIndex);\r
230                         JSONObject n = result.get(newIndex);\r
231                         \r
232                         int comp = o.id.compareTo(n.id);\r
233                         if(comp == 0) {\r
234                                 // objects are same => compare\r
235                                 if(!n.equals(o)) {\r
236                                         if(added == null) added = new THashSet<JSONObject>();\r
237                                         added.add(n);\r
238                                 }\r
239                                 oldIndex++;\r
240                                 newIndex++;\r
241                         } else if (comp > 0) {\r
242                                 // old is bigger than new => new has been added\r
243                                 if(added == null) added = new THashSet<JSONObject>();\r
244                                 added.add(n);\r
245                                 newIndex++;\r
246                         } else if (comp < 0) {\r
247                                 // old is smaller than new => old has been removed \r
248                                 if(removed == null) removed = new THashSet<JSONObject>();\r
249                                 removed.add(o);\r
250                                 oldIndex++;\r
251                         }\r
252                         \r
253                 }\r
254 \r
255                 if(added != null) {\r
256                         for(JSONObject entry : added) {\r
257 \r
258 //                              String parent = entry.getParent();\r
259 //                              // Null parent = exists but no parent , empty parent = has been removed\r
260 //                              if(parent != null && parent.isEmpty()) continue;\r
261 \r
262                                 add(entry);\r
263 \r
264                         }\r
265                 }\r
266                 if(removed != null) {\r
267                         for(JSONObject entry : removed) {\r
268                                 \r
269 //                              String parent = entry.getParent();\r
270 //                              // Null parent = exists but no parent , empty parent = has been removed\r
271 //                              if(parent != null && parent.isEmpty()) continue;\r
272                                 \r
273                                 remove(entry);\r
274                                 \r
275                         }\r
276                 }\r
277                 \r
278                 previous = result;\r
279 \r
280                 return lastRevision;\r
281         }\r
282         \r
283         private void add(JSONObject entry) {\r
284                 \r
285                 Integer rev = lastRevisions.get(entry.getId());\r
286                 if(rev != null) {\r
287                         entries.remove(rev);\r
288                 }\r
289 \r
290                 lastRevision++;\r
291                 lastRevisions.put(entry.getId(), lastRevision);\r
292                 entries.put(lastRevision, entry);\r
293         }\r
294             \r
295         private void remove(JSONObject entry) {\r
296 \r
297                 JSONObject object = new JSONObject(entry.getId());\r
298                 object.addJSONField("id", entry.getId());\r
299 \r
300                 Integer rev = lastRevisions.get(object.getId());\r
301                 if(rev != null) {\r
302                         entries.remove(rev);\r
303                 }\r
304 \r
305                 lastRevision++;\r
306                 lastRevisions.put(object.getId(), lastRevision);\r
307                 entries.put(lastRevision, object);\r
308         }\r
309         \r
310         public Pair<Integer, Collection<JSONObject>> getContent(Listener<Integer> listener, String location, int sequenceNumber) {\r
311                 Listener<List<JSONObject>> newUpdater = null;\r
312                 \r
313                 boolean cached = false;\r
314                 synchronized (this) {\r
315                         if (!listener.isDisposed()) {\r
316                                 registerListener(listener);\r
317                         }\r
318         \r
319                         // Try cache\r
320                         if ((updater != null) && !updater.isDisposed()) {\r
321                                 cached = true;\r
322                         } else {\r
323                                 if (hasListeners()) {\r
324                                         updater = new DocumentHistoryListener(this);\r
325                                         newUpdater = updater;\r
326                                 }\r
327                         }\r
328                 }\r
329                 if (cached) {\r
330                         return flushAndReadChanges(sequenceNumber, listener);\r
331                 }\r
332                 \r
333 \r
334                 try {\r
335                         if (newUpdater != null) {\r
336                                 Simantics.getSession().syncRequest(new URIDocumentRequest(location), newUpdater);\r
337                         } else {\r
338                                 synchronized (entries) {\r
339                                         List<JSONObject> result = Simantics.getSession().syncRequest(new URIDocumentRequest(location), TransientCacheListener.<List<JSONObject>>instance());\r
340                                         refreshInternal(result);\r
341                                         return readChanges(sequenceNumber);\r
342                                 }\r
343                         }\r
344                 } catch (DatabaseException e) {\r
345                         Logger.defaultLogError(e);\r
346                 }\r
347 \r
348                 return flushAndReadChanges(sequenceNumber, listener);\r
349         }\r
350         \r
351         protected void removeUpdater() {\r
352                 updater = null;\r
353         }\r
354         \r
355 }