/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.tests.api.request.misc; import java.util.ArrayList; import java.util.List; import java.util.UUID; import org.junit.Test; import org.simantics.databoard.Bindings; import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.WriteGraph; import org.simantics.db.exception.DatabaseException; import org.simantics.db.procedure.Listener; import org.simantics.db.request.Read; import org.simantics.db.testing.base.ExistingDatabaseTest; import org.simantics.db.testing.common.WriteQuery; import org.simantics.db.tests.common.Configuration; import org.simantics.layer0.Layer0; /** * This test uses several threads to write the graph information and queries to react information changes. * * @author Marko Luukkainen * */ public class ComplexReadWriteQueryTest extends ExistingDatabaseTest { public static int NUM_THREADS = Configuration.get().rwQueryThreds;; public static int MAX_CHANGES = Configuration.get().rwQueryCount; List threads = new ArrayList(); int changes; // not quite correct but works for me volatile int endCount = 0; Thread self = null; boolean ended = false; private synchronized void incChanges() { // System.out.println("changes = " + changes); ++changes; } @Test public void test() throws Exception { final Resource rootLib = getSession().getRootLibrary(); for (int i = 0; i < NUM_THREADS; i++) { getSession().asyncRequest(new WriteQuery(this) { @Override public void run(WriteGraph g) throws Throwable { //System.out.println("creating test resource"); Layer0 b = Layer0.getInstance(g); Resource newResource = g.newResource(); g.claim(newResource, b.InstanceOf, b.Type); g.claim(rootLib, b.ConsistsOf, newResource); String name = getRandomString(); g.claimLiteral(newResource, b.HasName, name, Bindings.STRING); NameUpdateThread t = new NameUpdateThread(getSession(),newResource,name); threads.add(t); t.start(); } }); } if (DEBUG) System.out.println("Created " + NUM_THREADS + " threads."); self = Thread.currentThread(); while(!ended) { int lastChanges = changes; boolean test = true; try { Thread.sleep(10000); } catch (InterruptedException e) { if (DEBUG) System.out.println("Interrupted."); test = false; } if (test && lastChanges == changes) { end(); throw new Exception("No changes in ten seconds, probably database has died. changes="+changes); } try { checkException(); } catch (Exception e) { end(); throw e; } checkException(); } if (DEBUG) System.out.println("ended, changes = " + changes); checkException(); } public void end() { for (NameUpdateThread t : threads) { t.dispose(); } try { Thread.sleep(1000); } catch (InterruptedException e) { } ended = true; } static volatile int threadCount=0; public class NameUpdateThread extends Thread { private boolean disposed = false; private Resource resource; private String currentName; private Session session; //private NameQuery query; private Listener procedure; private NameRead nameRead; public NameUpdateThread(Session session, Resource resource, String name) { super("RWQueryTest-" + ++threadCount); this.currentName = name; this.resource = resource; this.session = session; } @Override public void run() { procedure = new Listener() { @Override public void execute(String result) { NameUpdateThread.this.interrupt(); } @Override public void exception(Throwable t) { ComplexReadWriteQueryTest.fail(); } @Override public boolean isDisposed() { return disposed; } }; while(!disposed) { try { nameRead = new NameRead(resource); //System.out.println("creating query"); session.asyncRequest(new WriteQuery(ComplexReadWriteQueryTest.this) { @Override public void run(WriteGraph g) throws Throwable { g.syncRequest(nameRead, procedure); } }); // sleep until interrupted (if query fails, we still are updating the value) Thread.sleep(1000000000); } catch (InterruptedException e) { } if(disposed) return; updateName(); } } public void updateName() { session.asyncRequest(new WriteQuery(ComplexReadWriteQueryTest.this) { @Override public void run(WriteGraph g) throws Throwable { Layer0 b = Layer0.getInstance(g); String graphName = g.getRelatedValue(resource, b.HasName); if(!graphName.equals(currentName)) { throw new Exception("Graph contains name " + graphName + " while name was set to " + currentName); } currentName = UUID.randomUUID().toString(); g.claimLiteral(resource, b.HasName, currentName, Bindings.STRING); if (changes >= MAX_CHANGES) { dispose(); return; } incChanges(); // System.out.println("changes="+changes); //System.out.println("Name change " +procedure.toString()); } }); } public void dispose() { if (!disposed) { ++endCount; if (endCount >= NUM_THREADS) { ended = true; if (null != self) self.interrupt(); } } disposed = true; } } public class NameRead implements Read { private Resource resource; public NameRead(Resource resource) { this.resource = resource; } @Override public String perform(ReadGraph graph) throws DatabaseException { Layer0 b = Layer0.getInstance(graph); String name = graph.getRelatedValue(resource, b.HasName, Bindings.STRING); return name; } } }