X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.common%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fcommon%2Fprocessor%2FMergingGraphRequestProcessor.java;h=dfb5780f96da2edfe4e8f7079c20812a8dfd051f;hb=HEAD;hp=5e07bb034ecb924adc8567f99f09990e141e69ed;hpb=c26409b1caf2f1e560d37c5befd11b442399c3fe;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.common/src/org/simantics/db/common/processor/MergingGraphRequestProcessor.java b/bundles/org.simantics.db.common/src/org/simantics/db/common/processor/MergingGraphRequestProcessor.java index 5e07bb034..dfb5780f9 100644 --- a/bundles/org.simantics.db.common/src/org/simantics/db/common/processor/MergingGraphRequestProcessor.java +++ b/bundles/org.simantics.db.common/src/org/simantics/db/common/processor/MergingGraphRequestProcessor.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * Copyright (c) 2007, 2018 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -19,20 +19,20 @@ import java.util.LinkedList; import java.util.Set; import java.util.UUID; import java.util.concurrent.Semaphore; +import java.util.function.Consumer; import org.simantics.db.AsyncReadGraph; import org.simantics.db.AsyncRequestProcessor; import org.simantics.db.ReadGraph; -import org.simantics.db.RequestProcessor; import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.WriteGraph; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; import org.simantics.db.common.procedure.adapter.ProcedureAdapter; +import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter; import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure; import org.simantics.db.common.request.ReadRequest; import org.simantics.db.common.request.WriteRequest; -import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.CancelTransactionException; import org.simantics.db.exception.DatabaseException; import org.simantics.db.procedure.AsyncListener; @@ -61,10 +61,13 @@ import org.simantics.db.request.WriteOnly; import org.simantics.db.request.WriteOnlyResult; import org.simantics.db.request.WriteResult; import org.simantics.utils.DataContainer; -import org.simantics.utils.datastructures.Callback; import org.simantics.utils.datastructures.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class MergingGraphRequestProcessor implements RequestProcessor { +public class MergingGraphRequestProcessor implements AsyncRequestProcessor { + + private static final Logger LOGGER = LoggerFactory.getLogger(MergingGraphRequestProcessor.class); private static class SyncWriteRequestAdapter implements Write { @@ -128,7 +131,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor { try { semaphore.acquire(); } catch (InterruptedException e) { - Logger.defaultLogError(e); + LOGGER.error("SyncWriteRequestAdapter interrupted", e); } } @@ -223,7 +226,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor { try { MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod); } catch (InterruptedException e) { - Logger.defaultLogError(e); + LOGGER.error("MergedRead interrupted", e); } if (requestQueue.isEmpty()) break; @@ -262,13 +265,13 @@ public class MergingGraphRequestProcessor implements RequestProcessor { } catch(Throwable t) { - Logger.defaultLogError(t); + LOGGER.error("MergedRead failed", t); - if(currentRequest.second instanceof AsyncProcedure) { - ((AsyncProcedure)currentRequest.second).exception(graph, t); - } else { - ((AsyncMultiProcedure)currentRequest.second).exception(graph, t); - } +// if(currentRequest.second instanceof AsyncProcedure) { +// ((AsyncProcedure)currentRequest.second).exception(graph, t); +// } else { +// ((AsyncMultiProcedure)currentRequest.second).exception(graph, t); +// } } @@ -281,30 +284,30 @@ public class MergingGraphRequestProcessor implements RequestProcessor { } else { - try{ + try { if(currentRequest.second instanceof AsyncProcedure) { if(currentRequest.first instanceof AsyncRead) { AsyncRead req = (AsyncRead)currentRequest.first; - graph.asyncRequest(req, (AsyncProcedure)currentRequest.second); + graph.syncRequest(req, (AsyncProcedure)currentRequest.second); } else { Read req = (Read)currentRequest.first; - graph.asyncRequest(req, (AsyncProcedure)currentRequest.second); + graph.syncRequest(req, (AsyncProcedure)currentRequest.second); } } else { AsyncMultiRead req = (AsyncMultiRead)currentRequest.first; - graph.asyncRequest(req, (AsyncMultiProcedure)currentRequest.second); + graph.syncRequest(req, (AsyncMultiProcedure)currentRequest.second); } } catch(Throwable t) { - Logger.defaultLogError(t); + LOGGER.error("MergedRead failed", t); - if(currentRequest.second instanceof AsyncProcedure) { - ((AsyncProcedure)currentRequest.second).exception(graph, t); - } else { - ((AsyncMultiProcedure)currentRequest.second).exception(graph, t); - } +// if(currentRequest.second instanceof AsyncProcedure) { +// ((AsyncProcedure)currentRequest.second).exception(graph, t); +// } else { +// ((AsyncMultiProcedure)currentRequest.second).exception(graph, t); +// } } } @@ -355,7 +358,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor { try { MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod); } catch (InterruptedException e) { - Logger.defaultLogError(e); + LOGGER.error("RunnerWriteGraphRequest interrupted", e); } if (requestQueue.isEmpty()) break; @@ -374,7 +377,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor { } @SuppressWarnings("unchecked") - Callback callback = (Callback)currentRequest.second; + Consumer callback = (Consumer)currentRequest.second; if (currentRequest.first.getClass().equals(SyncWriteRequestAdapter.class)) { @@ -383,10 +386,10 @@ public class MergingGraphRequestProcessor implements RequestProcessor { try { // System.out.println("merg.sync " + adapter); graph.syncRequest(adapter); - if(callback != null) callback.run(null); + if(callback != null) callback.accept(null); } catch(Throwable t) { - Logger.defaultLogError(t); - if(callback != null) callback.run(t); + LOGGER.error("RunnerWriteGraphRequest failed", t); + if(callback != null) callback.accept(t); } adapter.release(); @@ -397,10 +400,10 @@ public class MergingGraphRequestProcessor implements RequestProcessor { try { if(currentRequest.first instanceof Write) graph.syncRequest((Write)currentRequest.first); else if(currentRequest.first instanceof DelayedWrite) graph.syncRequest((DelayedWrite)currentRequest.first); - if(callback != null) callback.run(null); + if(callback != null) callback.accept(null); } catch(Throwable t) { - Logger.defaultLogError(t); - if(callback != null) callback.run(t); + LOGGER.error("RunnerWriteGraphRequest failed", t); + if(callback != null) callback.accept(t); } } @@ -443,10 +446,10 @@ public class MergingGraphRequestProcessor implements RequestProcessor { // System.out.println(this + " asyncRequest(ReadGraphRequest> request, QueryProcedure4 procedure)"); - if (requestSet.contains(request)) + Pair pair = Pair.make(request, procedure); + if (requestSet.contains(pair)) return; - Pair pair = new Pair(request, procedure); requestQueue.add(pair); requestSet.add(pair); @@ -464,10 +467,10 @@ public class MergingGraphRequestProcessor implements RequestProcessor { // System.out.println(this + " asyncRequest(ReadGraphRequest> request, SingleQueryProcedure4 procedure) " + this); - if (requestSet.contains(request)) + Pair pair = Pair.make(request, procedure); + if (requestSet.contains(pair)) return; - Pair pair = new Pair(request, procedure); requestQueue.add(pair); requestSet.add(pair); @@ -482,14 +485,14 @@ public class MergingGraphRequestProcessor implements RequestProcessor { } @Override - public synchronized void asyncRequest(Write request, Callback callback) { + public synchronized void asyncRequest(Write request, Consumer callback) { // System.out.println(this + " asyncRequest(WriteGraphRequest request)"); - if (requestSet.contains(request)) + Pair pair = Pair.make(request, callback); + if (requestSet.contains(pair)) return; - Pair pair = new Pair(request, callback); requestQueue.add(pair); requestSet.add(pair); @@ -505,14 +508,14 @@ public class MergingGraphRequestProcessor implements RequestProcessor { } @Override - public synchronized void asyncRequest(DelayedWrite request, Callback callback) { + public synchronized void asyncRequest(DelayedWrite request, Consumer callback) { // System.out.println(this + " asyncRequest(WriteGraphRequest request)"); - if (requestSet.contains(request)) + Pair pair = Pair.make(request, callback); + if (requestSet.contains(pair)) return; - Pair pair = new Pair(request, callback); requestQueue.add(pair); requestSet.add(pair); @@ -528,14 +531,14 @@ public class MergingGraphRequestProcessor implements RequestProcessor { } @Override - public synchronized void asyncRequest(WriteOnly request, Callback callback) { + public synchronized void asyncRequest(WriteOnly request, Consumer callback) { // System.out.println(this + " asyncRequest(WriteGraphRequest request)"); - if (requestSet.contains(request)) + Pair pair = Pair.make(request, callback); + if (requestSet.contains(pair)) return; - Pair pair = new Pair(request, callback); requestQueue.add(pair); requestSet.add(pair); @@ -572,7 +575,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor { Throwable t = throwable.get(); if(t != null) { - Logger.defaultLogError(t); + LOGGER.error("syncRequest(AsyncMultiRead, AsyncMultiProcedure) failed", t); throw new RuntimeException(t.getMessage()); } @@ -625,7 +628,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor { Throwable t = throwable.get(); if(t != null) { - Logger.defaultLogError(t); + LOGGER.error("syncRequest(AsyncRead, AsyncProcedure) failed", t); throw new RuntimeException(t.getMessage()); } @@ -691,7 +694,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor { @Override public void exception(Throwable t) { - Logger.defaultLogError(t); + LOGGER.error("asyncRequest(AsyncRead) failed", t); } }); @@ -736,7 +739,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor { Throwable t = throwable.get(); if(t != null) { - Logger.defaultLogError(t); + LOGGER.error("syncRequest(AsyncRead) failed", t); throw new RuntimeException(t.getMessage()); } @@ -775,7 +778,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor { Throwable t = throwable.get(); if(t != null) { - Logger.defaultLogError(t); + LOGGER.error("syncRequest(AsyncMultiRead) failed", t); throw new RuntimeException(t.getMessage()); } @@ -878,7 +881,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor { @Override public void exception(Throwable t) { - Logger.defaultLogError(t); + LOGGER.error("asyncRequest(Read) failed", t); } }); @@ -889,10 +892,10 @@ public class MergingGraphRequestProcessor implements RequestProcessor { public synchronized void asyncRequest(Read request, AsyncProcedure procedure) { - if (requestSet.contains(request)) + Pair pair = Pair.make(request, procedure); + if (requestSet.contains(pair)) return; - Pair pair = new Pair(request, procedure); requestQueue.add(pair); requestSet.add(pair); @@ -917,17 +920,17 @@ public class MergingGraphRequestProcessor implements RequestProcessor { final ArrayList result = new ArrayList(); final DataContainer exception = new DataContainer(); - syncRequest(request, new AsyncMultiProcedureAdapter() { + syncRequest(request, new SyncMultiProcedureAdapter() { @Override - public void execute(AsyncReadGraph graph, T t) { + public void execute(ReadGraph graph, T t) { synchronized(result) { result.add(t); } } @Override - public void exception(AsyncReadGraph graph, Throwable t) { + public void exception(ReadGraph graph, Throwable t) { exception.set(t); } @@ -947,11 +950,6 @@ public class MergingGraphRequestProcessor implements RequestProcessor { return result; } - @Override - public Collection syncRequest(MultiRead request, AsyncMultiProcedure procedure) { - throw new UnsupportedOperationException("Not implemented"); - } - @Override public Collection syncRequest(MultiRead request, MultiProcedure procedure) { throw new UnsupportedOperationException("Not implemented"); @@ -967,11 +965,6 @@ public class MergingGraphRequestProcessor implements RequestProcessor { throw new UnsupportedOperationException("Not implemented"); } - @Override - public void asyncRequest(MultiRead request, AsyncMultiProcedure procedure) { - throw new UnsupportedOperationException("Not implemented"); - } - @Override public void asyncRequest(MultiRead request, MultiProcedure procedure) { throw new UnsupportedOperationException("Not implemented"); @@ -1103,12 +1096,6 @@ public class MergingGraphRequestProcessor implements RequestProcessor { } - @Override - public Collection syncRequest(MultiRead arg0, AsyncMultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - @Override public Collection syncRequest(MultiRead arg0, SyncMultiListener arg1) { throw new UnsupportedOperationException("Not implemented."); @@ -1153,12 +1140,6 @@ public class MergingGraphRequestProcessor implements RequestProcessor { } - @Override - public void asyncRequest(MultiRead arg0, AsyncMultiListener arg1) { - throw new UnsupportedOperationException("Not implemented."); - - } - @Override public void asyncRequest(MultiRead arg0, SyncMultiListener arg1) { throw new UnsupportedOperationException("Not implemented."); @@ -1327,4 +1308,9 @@ public class MergingGraphRequestProcessor implements RequestProcessor { throw new UnsupportedOperationException(); } + @Override + public T l0() { + return processor.l0(); + } + }