import org.simantics.db.RequestProcessor;\r
import org.simantics.db.Resource;\r
import org.simantics.db.Session;\r
-import org.simantics.db.common.request.ReadRequest;\r
import org.simantics.db.common.request.SafeName;\r
import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.common.utils.NameUtils;\r
import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.indexing.internal.IndexingJob;\r
import org.simantics.db.layer0.adapter.GenericRelation;\r
import org.simantics.db.request.Read;\r
import org.simantics.db.service.CollectionSupport;\r
import org.simantics.db.service.SerialisationSupport;\r
-import org.simantics.utils.DataContainer;\r
import org.simantics.utils.FileUtils;\r
import org.simantics.utils.datastructures.Pair;\r
\r
Field[] fs = makeFieldsForRelation(r, boundLength, document);\r
\r
for (Object[] documentData : documentsData) {\r
- for (int i = 0; i < documentData.length; i++) {\r
- Object value = documentData[i];\r
- if (value instanceof String) {\r
- fs[i].setStringValue((String) value);\r
- } else if (value instanceof Long) {\r
- fs[i].setLongValue((Long) value);\r
- } else {\r
- System.err.println("Can only index Long and String fields, encountered " + value);\r
- // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state\r
- continue;\r
- }\r
-// System.out.println("index " + fs[i].name() + " = " + result[i]);\r
- }\r
+ if (setFields(fs, documentData) == null)\r
+ continue;\r
\r
if (IndexPolicy.TRACE_INDEX_UPDATE)\r
System.out.println(getDescriptor() + "Inserting document " + document);\r
continue nextDocument;\r
}\r
\r
- for (int i = 0; i < documentData.length; i++) {\r
- Object value = documentData[i];\r
- if (value instanceof String) {\r
- fs[i].setStringValue((String) value);\r
- } else if (keyValue instanceof Long) {\r
- fs[i].setLongValue((Long)value);\r
- } else {\r
- // FIXME: should throw an exception for illegal input data but this would leave the index in an incoherent state\r
- System.err.println("[" + getClass().getSimpleName() + "] Unrecognized document value '" + value + "' for field '" + fs[i].toString() + "', only " + String.class + " and " + Resource.class + " are supported.");\r
- continue nextDocument;\r
- }\r
- }\r
+ if (setFields(fs, documentData) == null)\r
+ continue nextDocument;\r
\r
if (IndexPolicy.TRACE_INDEX_UPDATE)\r
System.out.println(getDescriptor() + "Replacing document with key " + removedTerm + " with " + document);\r
}\r
});\r
\r
- void initializeIndex(IProgressMonitor monitor, ReadGraph graph, final Object[] bound, boolean overwrite) throws IOException,\r
+ void initializeIndex(IProgressMonitor monitor, ReadGraph graph, Object[] bound, boolean overwrite)\r
+ throws IOException, DatabaseException\r
+ {\r
+ IndexingJob.jobifyIfPossible(\r
+ monitor,\r
+ "Reindexing " + NameUtils.getSafeLabel(graph, input),\r
+ mon -> {\r
+ try {\r
+ initializeIndexImpl(mon, graph, bound, overwrite);\r
+ } catch (IOException e) {\r
+ throw new DatabaseException(e);\r
+ }\r
+ });\r
+ }\r
+\r
+ void initializeIndexImpl(IProgressMonitor monitor, ReadGraph graph, final Object[] bound, boolean overwrite) throws IOException,\r
DatabaseException {\r
\r
final SubMonitor mon = SubMonitor.convert(monitor, 100);\r
final AtomicReference<IndexWriter> writer = new AtomicReference<IndexWriter>();\r
\r
try {\r
- mon.subTask("Create index at " + indexPath.toString());\r
+ mon.subTask("Start index write");\r
createDirectory(indexPath);\r
\r
directory.set(FSDirectory.open(indexPath));\r
\r
mon.worked(5);\r
\r
- final DataContainer<Long> start = new DataContainer<Long>();\r
-\r
- graph.syncRequest(new ReadRequest() {\r
+ final GenericRelation r = graph.adapt(relation, GenericRelation.class);\r
+ if (r == null)\r
+ throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation))\r
+ + "could not be adapted to GenericRelation.");\r
\r
- @Override\r
- public void run(ReadGraph graph) throws DatabaseException {\r
+ long realizeStart = 0;\r
+ if (IndexPolicy.PERF_INDEX_INIT)\r
+ realizeStart = System.nanoTime();\r
\r
- final GenericRelation r = graph.adapt(relation, GenericRelation.class);\r
- if (r == null)\r
- throw new DatabaseException("Given resource " + graph.syncRequest(new SafeName(relation))\r
- + "could not be adapted to GenericRelation.");\r
-\r
- mon.worked(45);\r
-\r
- GenericRelation selection = r.select(getPattern(r, bound.length), bound);\r
- \r
- long perfStart = 0;\r
- if (IndexPolicy.PERF_INDEX_INIT)\r
- perfStart = System.nanoTime();\r
+ mon.subTask("Calculating indexed content");\r
+ GenericRelation selection = r.select(getPattern(r, bound.length), bound);\r
+ mon.worked(5);\r
+ List<Object[]> results = selection.realize(graph);\r
+ mon.worked(40);\r
\r
- final List<Object[]> results = selection.realize(graph);\r
+ if (IndexPolicy.PERF_INDEX_INIT)\r
+ System.out.println(getDescriptor() + "Realized index with " + results.size() + " entries at " + indexPath + " in " + (1e-9 * (System.nanoTime()-realizeStart)) + " seconds.");\r
+ if (IndexPolicy.TRACE_INDEX_INIT)\r
+ System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results");\r
\r
- if (IndexPolicy.PERF_INDEX_INIT)\r
- System.out.println(getDescriptor() + "Realized index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-perfStart)) + " seconds.");\r
- \r
- if (IndexPolicy.TRACE_INDEX_INIT)\r
- System.out.println(getDescriptor() + "Indexed relation " + r + " produced " + results.size() + " results");\r
+ long start = IndexPolicy.PERF_INDEX_INIT ? System.nanoTime() : 0;\r
\r
- if (IndexPolicy.PERF_INDEX_INIT)\r
- start.set(System.nanoTime());\r
+ mon.subTask("Indexing content");\r
+ final Semaphore s = new Semaphore(0);\r
+ mon.setWorkRemaining(results.size());\r
\r
- final Semaphore s = new Semaphore(0);\r
+ for (int i = 0; i < INDEXING_THREAD_COUNT; i++) {\r
+ final int startIndex = i;\r
+ executor.submit(() -> {\r
+ try {\r
+ Document document = new Document();\r
+ Field[] fs = makeFieldsForRelation(r, bound.length, document);\r
+\r
+ for (int index = startIndex; index < results.size(); index += INDEXING_THREAD_COUNT) {\r
+ if (setFields(fs, results.get(index)) == null)\r
+ continue;\r
+ try {\r
+ writer.get().addDocument(document);\r
+ } catch (CorruptIndexException e) {\r
+ throw new IllegalStateException(e);\r
+ } catch (IOException e) {\r
+ throw new IllegalStateException(e);\r
+ } finally {\r
+ synchronized (mon) {\r
+ mon.worked(1);\r
+ }\r
+ }\r
+ }\r
\r
- for(int i=0;i<INDEXING_THREAD_COUNT;i++) {\r
- \r
- final int startIndex = i;\r
- \r
- executor.submit(new Runnable() {\r
-\r
- @Override\r
- public void run() {\r
- \r
- try {\r
- \r
- final Document document = new Document();\r
-\r
- Field[] fs = makeFieldsForRelation(r, bound.length, document);\r
-\r
- for (int index = startIndex; index < results.size(); index+=INDEXING_THREAD_COUNT) {\r
- Object[] result = results.get(index);\r
- for (int i = 0; i < result.length; i++) {\r
- Object value = result[i];\r
- if (value instanceof String) {\r
- if (IndexPolicy.DEBUG_INDEX_INIT)\r
- System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : String");\r
- fs[i].setStringValue((String) value);\r
- } else if (value instanceof Long) {\r
- if (IndexPolicy.DEBUG_INDEX_INIT)\r
- System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : Long");\r
- fs[i].setLongValue((Long) value);\r
- }\r
- }\r
- try {\r
- writer.get().addDocument(document);\r
- } catch (CorruptIndexException e) {\r
- throw new IllegalStateException(e);\r
- } catch (IOException e) {\r
- throw new IllegalStateException(e);\r
- } finally {\r
- mon.worked(1);\r
- }\r
- }\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError("DatabaseException occured during initializing index", e);\r
- } catch (Throwable t) {\r
- Logger.defaultLogError("Fatal error occured during initializing index", t);\r
- } finally {\r
- s.release();\r
- }\r
- }\r
- });\r
+ s.release();\r
+ } catch (DatabaseException e) {\r
+ throw new IllegalStateException(e);\r
}\r
- try {\r
- s.acquire(INDEXING_THREAD_COUNT);\r
- } catch (InterruptedException e) {\r
- e.printStackTrace();\r
- }\r
- }\r
- });\r
+ });\r
+ }\r
+\r
+ try {\r
+ s.acquire(INDEXING_THREAD_COUNT);\r
+ } catch (InterruptedException e) {\r
+ Logger.defaultLogError(e);\r
+ }\r
\r
// http://www.gossamer-threads.com/lists/lucene/java-dev/47895\r
// and http://lucene.apache.org/java/docs/index.html#27+November+2011+-+Lucene+Core+3.5.0\r
//writer.get().optimize();\r
//writer.get().commit();\r
\r
- if (IndexPolicy.PERF_INDEX_INIT) {\r
- long end = System.nanoTime();\r
- System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (end-start.get())) + " seconds.");\r
- }\r
+ mon.subTask("Flushing");\r
+\r
+ if (IndexPolicy.PERF_INDEX_INIT)\r
+ System.out.println(getDescriptor() + "Wrote index at " + indexPath + " in " + (1e-9 * (System.nanoTime()-start)) + " seconds.");\r
\r
} catch (DatabaseException e) {\r
\r
\r
}\r
\r
+ private Field[] setFields(Field[] fs, Object[] result) {\r
+ for (int i = 0; i < result.length; i++) {\r
+ Object value = result[i];\r
+ if (value instanceof String) {\r
+ if (IndexPolicy.DEBUG_INDEX_INIT)\r
+ System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : String");\r
+ fs[i].setStringValue((String) value);\r
+ } else if (value instanceof Long) {\r
+ if (IndexPolicy.DEBUG_INDEX_INIT)\r
+ System.out.println(getDescriptor() + "index " + fs[i].name() + " = " + value + " : Long");\r
+ fs[i].setLongValue((Long) value);\r
+ } else {\r
+ Logger.defaultLogError("Can only index Long and String fields, encountered " + value);\r
+ return null;\r
+ }\r
+ }\r
+ return fs;\r
+ }\r
+\r
}\r
--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2016 Association for Decentralized Information Management in\r
+ * Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * Semantum Oy - initial API and implementation\r
+ *******************************************************************************/\r
+package org.simantics.db.indexing.internal;\r
+\r
+import java.util.concurrent.Semaphore;\r
+\r
+import org.eclipse.core.runtime.IProgressMonitor;\r
+import org.eclipse.core.runtime.IStatus;\r
+import org.eclipse.core.runtime.jobs.Job;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.function.DbConsumer;\r
+\r
+/**\r
+ * @author Tuukka Lehtonen\r
+ * @since 1.22.2, 1.25.0\r
+ */\r
+public abstract class IndexingJob extends Job {\r
+\r
+ /**\r
+ * NOTE: this is intentionally the same as\r
+ * <code>org.simantics.DatabaseJob.DATABASE_JOB_FAMILY</code> in order for\r
+ * this job to cause the same UI-behavior as DatabaseJob.\r
+ */\r
+ private static final String DATABASE_JOB_FAMILY = "org.simantics.db.inDatabaseJob";\r
+\r
+ public IndexingJob(String name) {\r
+ super(name);\r
+ }\r
+\r
+ @Override\r
+ public boolean belongsTo(Object family) {\r
+ return DATABASE_JOB_FAMILY.equals(family);\r
+ }\r
+\r
+ /**\r
+ * @param monitor\r
+ * @param jobName\r
+ * @param consumer\r
+ * @throws DatabaseException\r
+ */\r
+ public static void jobifyIfPossible(\r
+ IProgressMonitor monitor,\r
+ String jobName,\r
+ DbConsumer<IProgressMonitor> consumer)\r
+ throws DatabaseException\r
+ {\r
+ // Prevent deadlocks by checking preconditions for using a job.\r
+ // JobManager is suspended e.g. during workbench startup.\r
+ if (Job.getJobManager().isSuspended() || Job.getJobManager().currentJob() != null) {\r
+ consumer.accept(monitor);\r
+ return;\r
+ }\r
+\r
+ Semaphore barrier = new Semaphore(0);\r
+ Throwable[] err = { null };\r
+ IndexingJob job = new IndexingJob(jobName) {\r
+ @Override\r
+ protected IStatus run(IProgressMonitor monitor) {\r
+ try {\r
+ consumer.accept(monitor);\r
+ } catch (Throwable t) {\r
+ err[0] = t;\r
+ } finally {\r
+ monitor.done();\r
+ barrier.release();\r
+ }\r
+ return org.eclipse.core.runtime.Status.OK_STATUS;\r
+ }\r
+ };\r
+ job.schedule();\r
+ try {\r
+ barrier.acquire();\r
+ if (err[0] != null) {\r
+ if (err[0] instanceof DatabaseException)\r
+ throw (DatabaseException) err[0];\r
+ throw new DatabaseException(err[0]);\r
+ }\r
+ } catch (InterruptedException e) {\r
+ throw new DatabaseException(e);\r
+ }\r
+ }\r
+\r
+}\r