/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.db.common.processor; import java.util.concurrent.ConcurrentLinkedQueue; import org.simantics.db.AsyncRequestProcessor; import org.simantics.db.WriteGraph; import org.simantics.db.common.request.DelayedWriteRequest; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.request.DelayedWrite; final public class MergingDelayedWriteProcessor extends ProcessorBase { final long transactionKeepalivePeriod; final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); final private AsyncRequestProcessor processor; boolean hasAlreadyRequest = false; public MergingDelayedWriteProcessor(AsyncRequestProcessor processor, long transactionKeepalivePeriod) { this.processor = processor; this.transactionKeepalivePeriod = transactionKeepalivePeriod; } class MergedWrite extends DelayedWriteRequest { @Override public void perform(WriteGraph graph) throws DatabaseException { // System.out.println("MergedWrite begins"); while (true) { DelayedWrite next = queue.poll(); if(next == null) { synchronized (MergingDelayedWriteProcessor.this) { if (transactionKeepalivePeriod > 0) { try { MergingDelayedWriteProcessor.this.wait(transactionKeepalivePeriod); } catch (InterruptedException e) { Logger.defaultLogError(e); } next = queue.poll(); } if(next == null) { hasAlreadyRequest = false; // System.out.println("MergedWrite ends"); return; } } } // System.out.println("MergedWrite executes " + next); try { next.perform(graph); } catch(Throwable t) { Logger.defaultLogError(t); } } } } @Override public void asyncRequest(DelayedWrite request) { queue.add(request); synchronized (this) { if (!hasAlreadyRequest) { processor.asyncRequest(new MergedWrite()); hasAlreadyRequest = true; } else { notify(); } } } @Override public String toString() { return "MergingDelayedWriteProcessor@" + System.identityHashCode(this) + " (based on " + processor + ")"; } }