]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java
Multiple reader thread support for db client
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / graph / AsyncBarrierImpl.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl.graph;
13
14 import java.util.ArrayList;
15 import java.util.Arrays;
16 import java.util.Collection;
17 import java.util.HashMap;
18 import java.util.concurrent.atomic.AtomicInteger;
19
20 import org.simantics.db.common.utils.Logger;
21 import org.simantics.db.exception.RuntimeDatabaseException;
22 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
23
24 final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
25
26         private static final long serialVersionUID = 4724463372850048672L;
27
28         static final HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>> reverseLookup = new HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>>();
29         static final HashMap<AsyncBarrierImpl, Debugger> debuggerMap = new HashMap<AsyncBarrierImpl, Debugger>();
30         static final HashMap<AsyncBarrierImpl, Boolean> restartMap = new HashMap<AsyncBarrierImpl, Boolean>();
31
32         static final int WAIT_TIME = 600;
33
34         public static final boolean BOOKKEEPING = false;
35         public static final boolean PRINT = false;
36         static final boolean RESTART_GUARD = false;
37
38         final private AsyncBarrierImpl caller;
39
40         //private final Semaphore sema = new Semaphore(0);
41
42         public AsyncBarrierImpl(AsyncBarrierImpl caller) {
43                 super(0);
44                 if (BOOKKEEPING) {
45                         synchronized (debuggerMap) {
46                                 debuggerMap.put(this, new Debugger());
47                         }
48                         synchronized (reverseLookup) {
49                                 Collection<AsyncBarrierImpl> barriers = reverseLookup
50                                                 .get(caller);
51                                 if (barriers == null) {
52                                         barriers = new ArrayList<AsyncBarrierImpl>();
53                                         reverseLookup.put(caller, barriers);
54                                 }
55                                 barriers.add(this);
56                         }
57                 }
58                 this.caller = caller;
59         }
60
61         public class Debugger {
62                 public HashMap<Object, ArrayList<String>> infos = new HashMap<Object, ArrayList<String>>();
63
64                 public synchronized void inc(Object id, String info) {
65                         if (id == null)
66                                 return;
67                         ArrayList<String> exist = infos.get(id);
68                         if (exist == null) {
69                                 exist = new ArrayList<String>();
70                                 infos.put(id, exist);
71                         } else {
72                                 // System.err.println("Appending " + id + " += " + info);
73                         }
74                         exist.add(info);
75                         // String exist = infos.put(id, info);
76                         // if(exist != null) System.err.println("replacing " + exist +
77                         // " => " + info + " for " + id);
78                 }
79
80                 public synchronized void dec(Object id) {
81                         if (id == null)
82                                 return;
83                         ArrayList<String> exist = infos.get(id);
84                         if (exist == null) {
85                                 System.err.println("No data for " + id);
86                         } else {
87                                 exist.remove(0);
88                                 if (exist.isEmpty())
89                                         infos.remove(id);
90                         }
91                 }
92
93                 @Override
94                 public synchronized String toString() {
95                         StringBuilder b = new StringBuilder();
96                         for (ArrayList<String> ss : infos.values()) {
97                                 for (String s : ss)
98                                         b.append("info " + s + "\r\n");
99                         }
100                         return b.toString();
101                 }
102
103                 public synchronized void toErr(int indent) {
104                         char[] spaces = new char[indent];
105                         Arrays.fill(spaces, ' ');
106                         for (ArrayList<String> ss : infos.values()) {
107                                 for (String s : ss) {
108                                         if (!s.startsWith("#"))
109                                                 continue;
110                                         StringBuilder b = new StringBuilder();
111                                         b.append(spaces);
112                                         b.append(s);
113                                         System.err.println(b.toString());
114                                 }
115                         }
116                 }
117         }
118
119         public void inc() {
120
121                 if (BOOKKEEPING)
122                         inc(new Object(), new Exception().getStackTrace()[1].toString());
123                 else
124                         inc(null, null);
125
126         }
127
128         public void inc(String debug) {
129
130                 if (BOOKKEEPING)
131                         inc(new Object(), new Exception().getStackTrace()[1].toString());
132                 else
133                         inc(null, null);
134
135         }
136
137         public void inc(Object id, String info) {
138
139                 //              if (PRINT) {
140                 //                      if (get() < 5)
141                 //                              new Exception("inc " + get() + " " + this).printStackTrace();
142                 //              }
143
144                 if (BOOKKEEPING) {
145                         Debugger debugger = debuggerMap.get(this);
146                         if (debugger != null)
147                                 debugger.inc(id, info);
148                         // StackTraceElement[] tr = new Exception().getStackTrace();
149                         // if(tr.length == 4)
150                         // debugger.inc(new String[] { debug, tr[2].toString(),
151                         // tr[3].toString() });
152                         // else if(tr.length == 5)
153                         // debugger.inc(new String[] { debug, tr[2].toString(),
154                         // tr[3].toString(), tr[4].toString() });
155                         // else if(tr.length == 6)
156                         // debugger.inc(new String[] { debug, tr[2].toString(),
157                         // tr[3].toString(), tr[4].toString(), tr[5].toString() });
158                         // else
159                         // debugger.inc(new String[] { debug, tr[2].toString(),
160                         // tr[3].toString(), tr[4].toString(), tr[5].toString(),
161                         // tr[6].toString() });
162                 }
163
164                 //              new Exception().printStackTrace();
165
166                 if(PRINT) {
167
168                         System.err.println("inc barrier[" + get() + "] " + this);
169                         StackTraceElement[] elems = new Exception().getStackTrace();
170                         for(int i=0;i<4;i++) System.err.println(elems[i]);
171
172                 }
173
174                 if (incrementAndGet() == 1) {
175                         if (caller != null) {
176                                 if (BOOKKEEPING)
177                                         caller.inc(this, "Child");
178                                 else
179                                         caller.inc(null, null);
180                         }
181                 }
182
183         }
184
185         public void dec() {
186
187                 if(PRINT) {
188                         System.err.println("dec barrier[" + get() + "] " + this);
189                         StackTraceElement[] elems = new Exception().getStackTrace();
190                         for(int i=0;i<3;i++) System.err.println(elems[i]);
191                 }
192
193                 int count = decrementAndGet();
194                 if (count < 1) {
195                         if (count == 0) {
196                                 if (caller != null)
197                                         caller.dec(this);
198                                 //                              sema.release();
199                         }
200                         if (count < 0) {
201                                 Logger.defaultLogError(
202                                                 "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
203                                                 new Exception());
204                         }
205                         assert (count >= 0);
206                 }
207
208         }
209
210         public void dec(Object id) {
211
212                 if (PRINT) {
213                         if (get() < 5)
214                                 new Exception("dec" + get() + " " + this).printStackTrace();
215                 }
216
217                 if (BOOKKEEPING) {
218                         Debugger debugger = debuggerMap.get(this);
219                         if (debugger != null)
220                                 debugger.dec(id);
221                         // StackTraceElement[] tr = new Exception().getStackTrace();
222                         // if(tr.length == 3)
223                         // debugger.dec(new String[] { debug, tr[2].toString() });
224                         // else if(tr.length == 4)
225                         // debugger.dec(new String[] { debug, tr[2].toString(),
226                         // tr[3].toString() });
227                         // else
228                         // debugger.dec(new String[] { debug, tr[2].toString(),
229                         // tr[3].toString(), tr[4].toString() });
230                 }
231
232                 //              System.err.println("barrier " + this);
233                 //              StackTraceElement[] elems = new Exception().getStackTrace();
234                 //              for(int i=0;i<3;i++) System.err.println(elems[i]);
235                 //              new Exception().printStackTrace();
236
237                 int count = decrementAndGet();
238                 if (count < 1) {
239                         if (count == 0) {
240                                 if (caller != null)
241                                         caller.dec(this);
242                                 if (RESTART_GUARD)
243                                         restartMap.put(this, true);
244                                 //                              sema.release();
245                                 // if(DEBUGGER) {
246                                 // debuggerMap.remove(this);
247                                 // }
248                                 // if(REVERSE_LOOKUP) {
249                                 // reverseLookup.remove(this);
250                                 // }
251                         }
252                         if (count < 0) {
253                                 Logger.defaultLogError(
254                                                 "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
255                                                 new Exception());
256                                 // String message = ;
257                                 // System.out.println(message);
258                                 // if (DEBUGGER) {
259                                 // JOptionPane.showMessageDialog(null, message);
260                                 // System.out.println(debugger);
261                                 // }
262                                 // sema.release();
263                                 System.exit(-1);
264                         }
265                         assert (count >= 0);
266                 }
267         }
268
269         private static void printReverse(AsyncBarrierImpl barrier, int indent) {
270
271                 if (barrier.get() == 0)
272                         return;
273                 for (int i = 0; i < indent; i++)
274                         System.err.print(" ");
275                 System.err.println("[" + barrier.get() + " requests]: " + barrier);
276                 if (BOOKKEEPING) {
277                         Debugger debugger = debuggerMap.get(barrier);
278                         debugger.toErr(indent + 2);
279                 }
280
281                 Collection<AsyncBarrierImpl> children = reverseLookup.get(barrier);
282                 if (children != null) {
283                         for (AsyncBarrierImpl child : children)
284                                 printReverse(child, indent + 2);
285                 }
286
287         }
288
289         public void waitBarrier(Object request, ReadGraphImpl impl) {
290
291                 if (get() > 0) {
292
293                         long waitCount = 0;
294
295                         while (get() != 0) {
296
297                                 boolean executed = impl.performPending();
298                                 if(executed) waitCount = 0;
299                                 
300                                 ++waitCount;
301                                 if(waitCount > 100) Thread.yield();
302                                 if(waitCount > 1000) {
303                                         try {
304                                                 Thread.sleep(1);
305                                         } catch (InterruptedException e) {
306                                                 e.printStackTrace();
307                                         }
308                                 }
309                                 if(waitCount > WAIT_TIME*1000) {
310
311                                         System.err.println("AsyncBarrierImpl.waitBarrier("
312                                                         + request
313                                                         + ") is taking long to execute, so far "
314                                                         + (waitCount / 1000) + " s.");
315
316                                         if (BOOKKEEPING) {
317
318                                                 synchronized (reverseLookup) {
319                                                         printReverse(this, 0);
320                                                 }
321
322                                         }
323
324 //                                      if(Development.DEVELOPMENT) {
325 //                                              impl.processor.threadLocks[0].lock();
326 //                                              System.err.println("-queues=" + impl.processor.queues[0].size());
327 //                                              impl.processor.threadLocks[0].unlock();
328 //                                              System.err.println("-own=" + impl.processor.ownTasks[0].size());
329 //                                              System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size());
330 //                                              for(SessionTask task : impl.processor.ownSyncTasks[0]) {
331 //                                                      System.err.println("--" + task);
332 //                                              }
333 //                                      }
334
335                                         throw new RuntimeDatabaseException("Request timed out.");
336                                         //waitCount = 0;
337
338                                 }
339
340                         }
341
342                 }
343
344         }
345
346         public void restart() {
347                 assertReady();
348                 // log.clear();
349                 //              sema.drainPermits();
350                 if (RESTART_GUARD)
351                         restartMap.remove(this);
352                 if (BOOKKEEPING)
353                         debuggerMap.put(this, new Debugger());
354         }
355
356         public void assertReady() {
357                 int current = get();
358                 if (current != 0)
359                         throw new AssertionError("Barrier was not finished (pending="
360                                         + current + ").");
361         }
362
363         public void report() {
364                 // System.out.println("Barrier log:");
365                 // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
366                 // System.out.println(entry.getKey() + " " + entry.getValue());
367                 // }
368                 // System.out.println("SyncIntProcedure log:");
369                 // for(Map.Entry<String, Integer> entry :
370                 // SyncIntProcedure.counters.entrySet()) {
371                 // System.out.println(entry.getKey() + " " + entry.getValue());
372                 // }
373         }
374
375         @Override
376         public String toString() {
377                 return "AsyncBarrierImpl@" + System.identityHashCode(this)
378                                 + " - counter = " + get() + " - caller = " + caller;
379         }
380
381 }