]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.impl/src/org/simantics/db/impl/graph/AsyncBarrierImpl.java
Trying to wait for procedures
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / graph / AsyncBarrierImpl.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.db.impl.graph;
13
14 import java.util.Collection;
15 import java.util.concurrent.atomic.AtomicInteger;
16
17 import org.simantics.db.common.utils.Logger;
18 import org.simantics.db.exception.RuntimeDatabaseException;
19 import org.simantics.db.impl.query.CacheEntry;
20 import org.simantics.db.impl.query.QueryProcessor.AsyncBarrier;
21
22 final public class AsyncBarrierImpl extends AtomicInteger implements AsyncBarrier {
23
24         private static final long serialVersionUID = 4724463372850048672L;
25
26         static final int WAIT_TIME = 600;
27
28         public static final boolean PRINT = false;
29
30         final public AsyncBarrierImpl caller;
31
32         public AsyncBarrierImpl(AsyncBarrierImpl caller, CacheEntry<?> entry) {
33                 super(0);
34                 this.caller = caller;
35         if (BarrierTracing.BOOKKEEPING) {
36             BarrierTracing.trace(this, entry);
37         }
38         }
39
40
41         public void inc() {
42
43             if(BarrierTracing.BOOKKEEPING) {
44                 BarrierTracing.inc(this);
45             } else {
46                 inc(null, null);
47             }
48             
49         }
50
51         void inc(Object id, String info) {
52
53                 if(PRINT) {
54                         System.err.println("inc barrier[" + get() + "] " + this);
55                         StackTraceElement[] elems = new Exception().getStackTrace();
56                         for(int i=0;i<4;i++) System.err.println(elems[i]);
57                 }
58
59                 if (incrementAndGet() == 1) {
60                         if (caller != null) {
61                         if(BarrierTracing.BOOKKEEPING) {
62                     caller.inc(this, "Child");
63                         } else {
64                     caller.inc(null, null);
65                         }
66                         }
67                 }
68
69         }
70
71         public void dec() {
72
73                 if(PRINT) {
74                         System.err.println("dec barrier[" + get() + "] " + this);
75                         StackTraceElement[] elems = new Exception().getStackTrace();
76                         for(int i=0;i<3;i++) System.err.println(elems[i]);
77                 }
78
79                 int count = decrementAndGet();
80                 if (count < 1) {
81             if(BarrierTracing.BOOKKEEPING) {
82                 BarrierTracing.dec(this, count);
83             }
84                         if (count == 0) {
85                                 if (caller != null) {
86                                         caller.dec();
87                                 }
88                         }
89                         if (count < 0) {
90                                 Logger.defaultLogError(
91                                                 "Database request processing error. The application code has performed illegal actions (probably called multiple times the execute or exception method of a single result request.",
92                                                 new Exception());
93                         }
94                         assert (count >= 0);
95                 }
96
97         }
98
99         public static String report(AsyncBarrierImpl barrier) {
100                 CacheEntry<?> e = BarrierTracing.entryMap.get(barrier);
101                 if(e != null) return e.toString();
102                 else return "Barrier@" + System.identityHashCode(barrier);
103         }
104         
105         public static void printReverse(AsyncBarrierImpl barrier, int indent) {
106
107                 if (barrier.get() == 0)
108                         return;
109                 for (int i = 0; i < indent; i++)
110                         System.err.print(" ");
111                 System.err.println("[" + barrier.get() + " requests]: " + report(barrier));
112 //              if (BOOKKEEPING) {
113 //                      Debugger debugger = debuggerMap.get(barrier);
114 //                      debugger.toErr(indent + 2);
115 //              }
116
117                 Collection<AsyncBarrierImpl> children = BarrierTracing.reverseLookup.get(barrier);
118                 if (children != null) {
119                         for (AsyncBarrierImpl child : children)
120                                 printReverse(child, indent + 2);
121                 }
122
123         }
124
125         public void waitBarrier(Object request, ReadGraphImpl impl) {
126
127                 if (get() > 0) {
128
129                         long waitCount = 0;
130
131                         while (get() != 0) {
132
133                                 boolean executed = impl.performPending();
134                                 if(executed) waitCount = 0;
135                                 
136                                 ++waitCount;
137                                 if(waitCount > 100) Thread.yield();
138                                 if(waitCount > 1000) {
139                                         try {
140                                                 Thread.sleep(1);
141                                         } catch (InterruptedException e) {
142                                                 e.printStackTrace();
143                                         }
144                                 }
145                                 if(waitCount > WAIT_TIME*1000) {
146
147                                         System.err.println("AsyncBarrierImpl.waitBarrier("
148                                                         + request
149                                                         + ") is taking long to execute, so far "
150                                                         + (waitCount / 1000) + " s.");
151
152                                         if (BarrierTracing.BOOKKEEPING) {
153                                                 synchronized (BarrierTracing.reverseLookup) {
154                                                         printReverse(this, 0);
155                                                 }
156                                         }
157
158 //                                      if(Development.DEVELOPMENT) {
159 //                                              impl.processor.threadLocks[0].lock();
160 //                                              System.err.println("-queues=" + impl.processor.queues[0].size());
161 //                                              impl.processor.threadLocks[0].unlock();
162 //                                              System.err.println("-own=" + impl.processor.ownTasks[0].size());
163 //                                              System.err.println("-ownSync=" + impl.processor.ownSyncTasks[0].size());
164 //                                              for(SessionTask task : impl.processor.ownSyncTasks[0]) {
165 //                                                      System.err.println("--" + task);
166 //                                              }
167 //                                      }
168
169                                         throw new RuntimeDatabaseException("Request timed out.");
170                                         //waitCount = 0;
171
172                                 }
173
174                         }
175
176                 }
177
178         }
179
180         public void restart() {
181                 assertReady();
182                 if(BarrierTracing.BOOKKEEPING) {
183                     BarrierTracing.restart(this);
184                 }
185         }
186
187         public void assertReady() {
188                 int current = get();
189                 if (current != 0)
190                         throw new AssertionError("Barrier was not finished (pending="
191                                         + current + ").");
192         }
193
194         public void report() {
195                 // System.out.println("Barrier log:");
196                 // for(Map.Entry<String, Integer> entry : sources.entrySet()) {
197                 // System.out.println(entry.getKey() + " " + entry.getValue());
198                 // }
199                 // System.out.println("SyncIntProcedure log:");
200                 // for(Map.Entry<String, Integer> entry :
201                 // SyncIntProcedure.counters.entrySet()) {
202                 // System.out.println(entry.getKey() + " " + entry.getValue());
203                 // }
204         }
205
206         @Override
207         public String toString() {
208                 return report(this);
209 //              return "AsyncBarrierImpl@" + System.identityHashCode(this)
210 //                              + " - counter = " + get() + " - caller = " + caller;
211         }
212
213 }