/******************************************************************************* * Copyright (c) 2007, 2018 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.impl.query; import java.util.LinkedList; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.DebugPolicy; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.procedure.AsyncProcedure; import org.simantics.db.request.ExternalRead; import org.simantics.db.request.RequestFlags; final public class ExternalReadEntry extends CacheEntryBase> { final LinkedList items = new LinkedList(); protected ExternalRead request; @Override int makeHash() { return request.hashCode(); } @Override public Object getOriginalRequest() { return request; } @Override public void clearResult(QuerySupport support) { } @Override public void discard() { request.unregistered(); request = null; super.discard(); } @Override public void setPending() { //if(result != NO_RESULT) { //new Exception("result = " + result).printStackTrace(); //} statusOrException = PENDING; result = REQUIRES_COMPUTATION; } public ExternalReadEntry(ExternalRead request) { assert request != null; this.request = request; } final public void queue(T item) { synchronized(items) { items.addLast(item); // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated // In some cases where data is produced really fast this might be necessary but currently this queueing will do. } } final public void addOrSet(QueryProcessor processor, Object item) { try { assert(isPending()); //ArrayList> p = null; synchronized(this) { setResult(item); setReady(); // p = procs; // procs = null; } // if(p != null) // for(Procedure proc : p) { // proc.execute((T)item); // } } catch (Throwable t) { t.printStackTrace(); } } @Override public void except(Throwable t) { if(DebugPolicy.QUERY_STATE) System.out.println("[QUERY STATE]: excepted " + this); if(statusOrException != DISCARDED) { statusOrException = EXCEPTED; result = t; } else { result = t; } assert(isExcepted()); } @Override public void setResult(Object result) { super.setResult(result); assert(!(result instanceof Throwable)); assert(!isExcepted()); } @Override final public Query getQuery() { return new Query() { @Override public void recompute(ReadGraphImpl graph) { synchronized(items) { // Update if(!items.isEmpty()) { setReady(); setResult(items.removeFirst()); } // Reschedule if(!items.isEmpty()) { graph.processor.updatePrimitive(request); } } } @Override public void removeEntry(QueryProcessor processor) { processor.cache.remove(ExternalReadEntry.this); } @Override public int type() { return RequestFlags.IMMEDIATE_UPDATE; } @Override public String toString() { if(request == null) return "DISCARDED ExternalRead"; else return request.toString(); } }; } @Override public String toString() { if(request == null) return "DISCARDED ExternalRead " + System.identityHashCode(this); else return request.toString() + " " + + System.identityHashCode(this); } @Override public Object performFromCache(ReadGraphImpl graph, AsyncProcedure procedure) { AsyncProcedure proc = (AsyncProcedure)procedure; if(isExcepted()) { proc.exception(graph, (Throwable)getResult()); } else { proc.execute(graph, (T)getResult()); } return getResult(); } @Override void prepareRecompute(QuerySupport querySupport) { // Do nothing - the state is already set and cannot be recomputed on demand } public Object compute(ReadGraphImpl graph, AsyncProcedure procedure) throws DatabaseException { return graph.processor.cache.performQuery(graph, request, this, procedure); } }