/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.impl.query; import java.util.ArrayList; import java.util.LinkedList; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.procedure.Procedure; import org.simantics.db.request.ExternalRead; import org.simantics.db.request.RequestFlags; final public class ExternalReadEntry extends CacheEntryBase { final LinkedList items = new LinkedList(); // public ArrayList> procs; protected ExternalRead request; @Override int makeHash() { return request.hashCode(); } @Override public Object getOriginalRequest() { return request; } @Override public void clearResult(QuerySupport support) { } @Override public void discard() { request.unregistered(); request = null; super.discard(); } public ExternalReadEntry(ExternalRead request) { assert request != null; this.request = request; } final public void queue(T item) { synchronized(items) { items.addLast(item); // TODO: implement flags/logic in ExternalRead to state that all but the latest request result can be evaporated // In some cases where data is produced really fast this might be necessary but currently this queueing will do. } } final public void addOrSet(QueryProcessor processor, Object item) { try { assert(isPending()); ArrayList> p = null; synchronized(this) { setResult(item); setReady(); // p = procs; // procs = null; } // if(p != null) // for(Procedure proc : p) { // proc.execute((T)item); // } } catch (Throwable t) { t.printStackTrace(); } } @Override final public Query getQuery() { return new Query() { @Override public void recompute(ReadGraphImpl graph, Object provider, CacheEntry entry) { final QueryProcessor qp = (QueryProcessor)provider; synchronized(items) { if(entry.isExcepted()) { // Exception persists } else { // Update if(!items.isEmpty()) { setResult(items.removeFirst()); } // Reschedule if(!items.isEmpty()) { qp.updatePrimitive(request); } setReady(); } } } @Override public void removeEntry(QueryProcessor processor) { processor.externalReadMap.remove(request); } @Override public int type() { return RequestFlags.IMMEDIATE_UPDATE; } @Override public String toString() { if(request == null) return "DISCARDED ExternalRead"; else return request.toString(); } }; } public void performFromCache(Object procedure) { Procedure proc = (Procedure)procedure; if(isExcepted()) { proc.exception((Throwable)getResult()); } else { proc.execute((T)getResult()); } } @Override public String toString() { if(request == null) return "DISCARDED ExternalRead " + System.identityHashCode(this); else return request.toString() + " " + + System.identityHashCode(this); } @Override public void performFromCache(ReadGraphImpl graph, Object provider, Object procedure) { performFromCache(procedure); } @Override public void setReady() { super.setReady(); } @Override void prepareRecompute(QuerySupport querySupport) { // Do nothing - the state is already set and cannot be recomputed on demand } }