]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java
4b9f19a2314f3c34658d72e57376cab1ba58ca62
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / graph / AsyncBarrierImpl.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl.graph;
13
14 import java.util.ArrayList;
15 import java.util.Arrays;
16 import java.util.Collection;
17 import java.util.HashMap;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.Semaphore;
20 import java.util.concurrent.atomic.AtomicInteger;
21
22 import org.simantics.db.common.utils.Logger;
23 import org.simantics.db.exception.RuntimeDatabaseException;
24 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
25 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
26 import org.simantics.utils.Development;
27
28 final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
29
30         private static final long serialVersionUID = 4724463372850048672L;
31
32         static final HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>> reverseLookup = new HashMap<AsyncBarrierImpl, Collection<AsyncBarrierImpl>>();
33         static final HashMap<AsyncBarrierImpl, Debugger> debuggerMap = new HashMap<AsyncBarrierImpl, Debugger>();
34         static final HashMap<AsyncBarrierImpl, Boolean> restartMap = new HashMap<AsyncBarrierImpl, Boolean>();
35
36         static final int WAIT_TIME = 600;
37
38         public static final boolean BOOKKEEPING = false;
39         public static final boolean PRINT = false;
40         static final boolean RESTART_GUARD = false;
41
42         final private AsyncBarrierImpl caller;
43         
44         final private Semaphore sema = new Semaphore(0);
45
46         public AsyncBarrierImpl(AsyncBarrierImpl caller) {
47                 super(0);
48                 if (BOOKKEEPING) {
49                         synchronized (debuggerMap) {
50                                 debuggerMap.put(this, new Debugger());
51                         }
52                         synchronized (reverseLookup) {
53                                 Collection<AsyncBarrierImpl> barriers = reverseLookup
54                                                 .get(caller);
55                                 if (barriers == null) {
56                                         barriers = new ArrayList<AsyncBarrierImpl>();
57                                         reverseLookup.put(caller, barriers);
58                                 }
59                                 barriers.add(this);
60                         }
61                 }
62                 this.caller = caller;
63         }
64
65         public class Debugger {
66                 public HashMap<Object, ArrayList<String>> infos = new HashMap<Object, ArrayList<String>>();
67
68                 public synchronized void inc(Object id, String info) {
69                         if (id == null)
70                                 return;
71                         ArrayList<String> exist = infos.get(id);
72                         if (exist == null) {
73                                 exist = new ArrayList<String>();
74                                 infos.put(id, exist);
75                         } else {
76                                 // System.err.println("Appending " + id + " += " + info);
77                         }
78                         exist.add(info);
79                         // String exist = infos.put(id, info);
80                         // if(exist != null) System.err.println("replacing " + exist +
81                         // " => " + info + " for " + id);
82                 }
83
84                 public synchronized void dec(Object id) {
85                         if (id == null)
86                                 return;
87                         ArrayList<String> exist = infos.get(id);
88                         if (exist == null) {
89                                 System.err.println("No data for " + id);
90                         } else {
91                                 exist.remove(0);
92                                 if (exist.isEmpty())
93                                         infos.remove(id);
94                         }
95                 }
96
97                 @Override
98                 public synchronized String toString() {
99                         StringBuilder b = new StringBuilder();
100                         for (ArrayList<String> ss : infos.values()) {
101                                 for (String s : ss)
102                                         b.append("info " + s + "\r\n");
103                         }
104                         return b.toString();
105                 }
106
107                 public synchronized void toErr(int indent) {
108                         char[] spaces = new char[indent];
109                         Arrays.fill(spaces, ' ');
110                         for (ArrayList<String> ss : infos.values()) {
111                                 for (String s : ss) {
112                                         if (!s.startsWith("#"))
113                                                 continue;
114                                         StringBuilder b = new StringBuilder();
115                                         b.append(spaces);
116                                         b.append(s);
117                                         System.err.println(b.toString());
118                                 }
119                         }
120                 }
121         }
122
123         public void inc() {
124
125                 if (BOOKKEEPING)
126                         inc(new Object(), new Exception().getStackTrace()[1].toString());
127                 else
128                         inc(null, null);
129
130         }
131
132         public void inc(String debug) {
133
134                 if (BOOKKEEPING)
135                         inc(new Object(), new Exception().getStackTrace()[1].toString());
136                 else
137                         inc(null, null);
138
139         }
140
141         public void inc(Object id, String info) {
142
143                 //              if (PRINT) {
144                 //                      if (get() < 5)
145                 //                              new Exception("inc " + get() + " " + this).printStackTrace();
146                 //              }
147
148                 if (BOOKKEEPING) {
149                         Debugger debugger = debuggerMap.get(this);
150                         if (debugger != null)
151                                 debugger.inc(id, info);
152                         // StackTraceElement[] tr = new Exception().getStackTrace();
153                         // if(tr.length == 4)
154                         // debugger.inc(new String[] { debug, tr[2].toString(),
155                         // tr[3].toString() });
156                         // else if(tr.length == 5)
157                         // debugger.inc(new String[] { debug, tr[2].toString(),
158                         // tr[3].toString(), tr[4].toString() });
159                         // else if(tr.length == 6)
160                         // debugger.inc(new String[] { debug, tr[2].toString(),
161                         // tr[3].toString(), tr[4].toString(), tr[5].toString() });
162                         // else
163                         // debugger.inc(new String[] { debug, tr[2].toString(),
164                         // tr[3].toString(), tr[4].toString(), tr[5].toString(),
165                         // tr[6].toString() });
166                 }
167
168                 //              new Exception().printStackTrace();
169
170                 if(PRINT) {
171
172                         System.err.println("inc barrier[" + get() + "] " + this);
173                         StackTraceElement[] elems = new Exception().getStackTrace();
174                         for(int i=0;i<4;i++) System.err.println(elems[i]);
175
176                 }
177
178                 if (incrementAndGet() == 1) {
179                         if (caller != null) {
180                                 if (BOOKKEEPING)
181                                         caller.inc(this, "Child");
182                                 else
183                                         caller.inc(null, null);
184                         }
185                 }
186
187         }
188
189         public void dec() {
190
191                 if(PRINT) {
192                         System.err.println("dec barrier[" + get() + "] " + this);
193                         StackTraceElement[] elems = new Exception().getStackTrace();
194                         for(int i=0;i<3;i++) System.err.println(elems[i]);
195                 }
196
197                 int count = decrementAndGet();
198                 if (count < 1) {
199                         if (count == 0) {
200                                 if (caller != null)
201                                         caller.dec(this);
202                                 //                              sema.release();
203                         }
204                         if (count < 0) {
205                                 Logger.defaultLogError(
206                                                 "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
207                                                 new Exception());
208                         }
209                         assert (count >= 0);
210                 }
211
212         }
213
214         public void dec(Object id) {
215
216                 if (PRINT) {
217                         if (get() < 5)
218                                 new Exception("dec" + get() + " " + this).printStackTrace();
219                 }
220
221                 if (BOOKKEEPING) {
222                         Debugger debugger = debuggerMap.get(this);
223                         if (debugger != null)
224                                 debugger.dec(id);
225                         // StackTraceElement[] tr = new Exception().getStackTrace();
226                         // if(tr.length == 3)
227                         // debugger.dec(new String[] { debug, tr[2].toString() });
228                         // else if(tr.length == 4)
229                         // debugger.dec(new String[] { debug, tr[2].toString(),
230                         // tr[3].toString() });
231                         // else
232                         // debugger.dec(new String[] { debug, tr[2].toString(),
233                         // tr[3].toString(), tr[4].toString() });
234                 }
235
236                 //              System.err.println("barrier " + this);
237                 //              StackTraceElement[] elems = new Exception().getStackTrace();
238                 //              for(int i=0;i<3;i++) System.err.println(elems[i]);
239                 //              new Exception().printStackTrace();
240
241                 int count = decrementAndGet();
242                 if (count < 1) {
243                         if (count == 0) {
244                                 if (caller != null)
245                                         caller.dec(this);
246                                 if (RESTART_GUARD)
247                                         restartMap.put(this, true);
248                                 //                              sema.release();
249                                 // if(DEBUGGER) {
250                                 // debuggerMap.remove(this);
251                                 // }
252                                 // if(REVERSE_LOOKUP) {
253                                 // reverseLookup.remove(this);
254                                 // }
255                         }
256                         if (count < 0) {
257                                 Logger.defaultLogError(
258                                                 "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
259                                                 new Exception());
260                                 // String message = ;
261                                 // System.out.println(message);
262                                 // if (DEBUGGER) {
263                                 // JOptionPane.showMessageDialog(null, message);
264                                 // System.out.println(debugger);
265                                 // }
266                                 // sema.release();
267                                 System.exit(-1);
268                         }
269                         assert (count >= 0);
270                 }
271         }
272
273         private static void printReverse(AsyncBarrierImpl barrier, int indent) {
274
275                 if (barrier.get() == 0)
276                         return;
277                 for (int i = 0; i < indent; i++)
278                         System.err.print(" ");
279                 System.err.println("[" + barrier.get() + " requests]: " + barrier);
280                 if (BOOKKEEPING) {
281                         Debugger debugger = debuggerMap.get(barrier);
282                         debugger.toErr(indent + 2);
283                 }
284
285                 Collection<AsyncBarrierImpl> children = reverseLookup.get(barrier);
286                 if (children != null) {
287                         for (AsyncBarrierImpl child : children)
288                                 printReverse(child, indent + 2);
289                 }
290
291         }
292
293         public void waitBarrier(Object request, ReadGraphImpl impl) {
294
295                 if (get() > 0) {
296
297                         long waitCount = 0;
298
299                         while (get() != 0) {
300                                 
301                                 boolean executed = impl.performPending();
302                                 if(executed) waitCount = 0;
303                                 
304                                 ++waitCount;
305                                 if(waitCount > 100) Thread.yield();
306                                 if(waitCount > 1000) {
307                                         try {
308                                                 Thread.sleep(1);
309                                         } catch (InterruptedException e) {
310                                                 e.printStackTrace();
311                                         }
312                                 }
313                                 if(waitCount > WAIT_TIME*1000) {
314
315                                         System.err.println("AsyncBarrierImpl.waitBarrier("
316                                                         + request
317                                                         + ") is taking long to execute, so far "
318                                                         + (waitCount / 1000) + " s.");
319
320                                         if (BOOKKEEPING) {
321
322                                                 synchronized (reverseLookup) {
323                                                         printReverse(this, 0);
324                                                 }
325
326                                         }
327
328                                         if(Development.DEVELOPMENT) {
329
330 //                                              impl.processor.threadLocks[0].lock();
331 //                                              System.err.println("-queues=" + impl.processor.queues[0].size());
332 //                                              impl.processor.threadLocks[0].unlock();
333 //                                              System.err.println("-own=" + impl.processor.ownTasks[0].size());
334 //                                              System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size());
335 //                                              for(SessionTask task : impl.processor.ownSyncTasks[0]) {
336 //                                                      System.err.println("--" + task);
337 //                                              }
338
339                                         }
340
341                                         throw new RuntimeDatabaseException("Request timed out.");
342                                         //waitCount = 0;
343
344                                 }
345
346                         }
347
348                 }
349
350         }
351
352         public void restart() {
353                 assertReady();
354                 // log.clear();
355                 //              sema.drainPermits();
356                 if (RESTART_GUARD)
357                         restartMap.remove(this);
358                 if (BOOKKEEPING)
359                         debuggerMap.put(this, new Debugger());
360         }
361
362         public void assertReady() {
363                 int current = get();
364                 if (current != 0)
365                         throw new AssertionError("Barrier was not finished (pending="
366                                         + current + ").");
367         }
368
369         public void report() {
370                 // System.out.println("Barrier log:");
371                 // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
372                 // System.out.println(entry.getKey() + " " + entry.getValue());
373                 // }
374                 // System.out.println("SyncIntProcedure log:");
375                 // for(Map.Entry<String, Integer> entry :
376                 // SyncIntProcedure.counters.entrySet()) {
377                 // System.out.println(entry.getKey() + " " + entry.getValue());
378                 // }
379         }
380
381         @Override
382         public String toString() {
383                 return "AsyncBarrierImpl@" + System.identityHashCode(this)
384                                 + " - counter = " + get() + " - caller = " + caller;
385         }
386
387 }