/******************************************************************************* * Copyright (c) 2012 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.maps.pojo; import java.awt.Image; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.simantics.maps.ProvisionException; import org.simantics.maps.query.Query; import org.simantics.maps.tile.IFilter; import org.simantics.maps.tile.ITileProvider; import org.simantics.maps.tile.TileKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Tuukka Lehtonen * @see TileJobQueue */ public class TileJob implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(TileJob.class); private static final AtomicInteger counter = new AtomicInteger(); private LinkedList> queue = new LinkedList<>(); private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, runnable -> { Thread thread = new Thread(runnable, getClass().getSimpleName() + "-" + counter.getAndIncrement()); thread.setDaemon(true); return thread; }); private ITileProvider provider; public TileJob() { } public void setTileProvider(ITileProvider provider) { this.provider = provider; } protected Image doQuery(TileKey key) throws ProvisionException { return provider.get(key); } public void run() { Query job = pullNextJob(); do { if (job != null) { try { Image result = doQuery(job.source); job.listener.queryComplete(job, result); } catch (Exception e) { LOGGER.error("Querying failed for job {}", job, e); job.listener.queryFailed(job, e); } } // 4. Pick next job job = pullNextJob(); } while (job != null); } protected synchronized int jobsLeft() { return queue.size(); } protected synchronized Query pullNextJob() { if (queue.isEmpty()) return null; return queue.removeFirst(); } public synchronized void clear() { @SuppressWarnings("unchecked") Query jobs[] = queue.toArray(new Query[queue.size()]); for (Query j : jobs) removeJob(j); } public synchronized void addJob(Query job) { queue.addLast(job); if (queue.size() == 1) { executor.execute(this); } } public synchronized void addAsFirstJob(Query job) { queue.addFirst(job); if (queue.size() == 1) { executor.execute(this); } } public synchronized boolean removeJob(Query job) { if (queue.remove(job)) { job.listener.queryCanceled(job); return true; } return false; } public synchronized void filterQueries(IFilter filter) { if (queue.isEmpty()) return; LinkedList> result = new LinkedList>(); for (Query query : queue) { if (filter.select(query.source)) result.add(query); else query.listener.queryCanceled(query); } this.queue = result; } public void dispose() { executor.shutdown(); try { if (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) { // this should shutdown for good List unfinished = executor.shutdownNow(); LOGGER.warn("TileJob did not terminate in time - left jobs {}", unfinished); } } catch (InterruptedException e) { } } }