]> gerrit.simantics Code Review - simantics/platform.git/commitdiff
Do not wait forever for querythreads to dispose 99/3599/4
authorjsimomaa <jani.simomaa@gmail.com>
Thu, 21 Nov 2019 14:11:48 +0000 (16:11 +0200)
committerJani Simomaa <jani.simomaa@semantum.fi>
Thu, 21 Nov 2019 18:27:02 +0000 (18:27 +0000)
gitlab #421

Change-Id: I0b5a1bba5b08a9c8853dbdd974209a8095449945

bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryThread.java
bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java

index 9b54d15f6996fbf708a5fce9fc5ea00fd467e0ba..31a4a6e19ced96b4dc8b785103815d2de1fb271c 100644 (file)
@@ -309,7 +309,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
                @Override
                public String toString() {
-                       return "SessionTask[" + graph.parent + "]";
+                       return "SessionTask[" + graph != null ? String.valueOf(graph.parent) : "null graph" + "]";
                }
 
        }
index ab10efa0baab252a1ff5ef8b8485c313b2b32db1..79124893137d67dc058892edce823eb993ed939c 100644 (file)
@@ -2,6 +2,7 @@ package org.simantics.db.impl.query;
 
 import java.util.ArrayList;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.simantics.db.Session;
@@ -33,6 +34,8 @@ class QueryThread extends Thread implements SessionThread {
        final private int THREADS;
        final private AtomicInteger sleepers;
        final private ThreadState[] threadStates;
+
+       private SessionTask currentTask;
 //     final private ArrayList<SessionTask>[] delayQueues;
 //     final private QueryThread[] executors;
 //     final private ReentrantLock[] threadLocks;
@@ -71,7 +74,12 @@ class QueryThread extends Thread implements SessionThread {
 //             lock.unlock();
                
                try {
-                       exited.acquire();
+                       // we are not willing to wait forever here..
+                       boolean acquired = exited.tryAcquire(60, TimeUnit.SECONDS);
+                       if (!acquired) {
+                               LOGGER.error("Could not safely dispose query thread {} - we were processing task {} and still have tasks to do {}", index, currentTask.graph != null ? String.valueOf(currentTask) : "SessionTask with null graph", tasks.size());
+                               interrupt();
+                       }
                } catch (InterruptedException e) {
                        LOGGER.error("dispose was interrupted", e);
                }
@@ -215,14 +223,15 @@ class QueryThread extends Thread implements SessionThread {
 
                while(!tasks.isEmpty()) {
 
-                       SessionTask task = tasks.remove(tasks.size() - 1);
+                       SessionTask t = currentTask = tasks.remove(tasks.size() - 1);
 
 //                     if(task.syncCaller == index) {
 //                             ownSyncTasks[index].add(task);
 //                     } else {
-                               task.run(index);
+                               t.run(index);
 //                             System.err.println("QT(s) " + index + " runs " + task);
                                didExecute = true;
+                               currentTask = null;
 //                     }
 
                }
@@ -249,10 +258,10 @@ class QueryThread extends Thread implements SessionThread {
                                
                                while(!tasks.isEmpty()) {
 
-                                       SessionTask task = tasks.remove(tasks.size()-1);
+                                       SessionTask t = currentTask = tasks.remove(tasks.size()-1);
 //                                     System.err.println("QT " + index + " runs " + task);
-                                       task.run(index);
-
+                                       t.run(index);
+                                       currentTask = null;
                                }
 
 //                             for(int performer=0;performer<THREADS;performer++) {
index d32c066bcdfc9bee732cd6ecf3829b5b0eb7a9a1..aff219622d6b91254823d1b8fcad7df722749429 100644 (file)
@@ -1345,7 +1345,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         } catch (Throwable e) {
 
-            e.printStackTrace();
+            LOGGER.error("Could not perform write only for request {}", request, e);
 
             releaseWriteOnly(writeState.getGraph());
 
@@ -2671,7 +2671,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 // Serialize as '<resource index>_<cluster id>'
                 return "" + r + "_" + getCluster(resourceImpl);
             } catch (Throwable e) {
-                e.printStackTrace();
+                LOGGER.error("Could not create random access id for resource {}", resource, e);
                 throw new InvalidResourceReferenceException(e);
             } finally {
             }
@@ -2719,7 +2719,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
             } catch (NumberFormatException e) {
                 throw new InvalidResourceReferenceException(e);
             } catch (Throwable e) {
-                e.printStackTrace();
+                LOGGER.error("Could not get resource for randomAccessId {}", randomAccessId, e);
                 throw new InvalidResourceReferenceException(e);
             } finally {
             }
@@ -3462,7 +3462,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
             }
         } catch (InterruptedException e) {
-            e.printStackTrace();
+            LOGGER.error("Acquiring was interrupted", e);
             // FIXME: Should perhaps do something else in this case ??
         }
     }