try {
getClusterTable().refresh(csid, this, clusterUID);
} catch (Throwable t) {
- Logger.defaultLogError("Refesh failed.", t);
+ LOGGER.error("refresh({}, {}) failed", thread, csid, t);
}
}
});
assert (null != writer);
-// writer.state.barrier.inc();
+ writer.asyncBarrier.inc();
try {
request.perform(writer);
assert (null != writer);
} catch (Throwable t) {
if (!(t instanceof CancelTransactionException))
- Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
+ LOGGER.error("Write transaction caused an unexpected error, see exception.", t);
writeState.except(t);
} finally {
-// writer.state.barrier.dec();
+ writer.asyncBarrier.dec();
// writer.waitAsync(request);
}
// Log it first, just to be safe that the error is always logged.
if (!(e instanceof CancelTransactionException))
- Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
+ LOGGER.error("Write transaction caused an unexpected error, see exception.", e);
// writeState.getGraph().state.barrier.dec();
// writeState.getGraph().waitAsync(request);
VirtualGraph vg = getProvider(request.getProvider());
WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
- try {
- WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
- writeState = writeStateT;
+ WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
+ writeState = writeStateT;
+ assert (null != writer);
- assert (null != writer);
-// writer.state.barrier.inc();
+ try {
+ writer.asyncBarrier.inc();
writeStateT.setResult(request.perform(writer));
assert (null != writer);
-
-// writer.state.barrier.dec();
-// writer.waitAsync(null);
-
} catch (Throwable e) {
-
-// writer.state.barrier.dec();
-// writer.waitAsync(null);
-
writeState.except(e);
-
-// state.stopWriteTransaction(clusterStream);
-//
-// } catch (Throwable e) {
-// // Log it first, just to be safe that the error is always logged.
-// Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
-//
-// try {
-// // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
-// // All we can do here is to log those, can't really pass them anywhere.
-// if (procedure != null) {
-// if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
-// else procedure.exception(new DatabaseException(e));
-// }
-// } catch (Throwable e2) {
-// Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
-// }
-//
-// clientChanges = new ClientChangesImpl(SessionImplSocket.this);
-//
-// state.stopWriteTransaction(clusterStream);
-
} finally {
+ writer.asyncBarrier.dec();
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
}
-// if(notify != null) notify.release();
-
task.finish();
}
@Override
public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
- final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
+ final ITask total = ThreadLogger.task("ScheduleDelayedWrite");
assert (request != null);
if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
else callback.accept(new DatabaseException(t));
} else
- Logger.defaultLogError("Unhandled exception", t);
+ LOGGER.error("Unhandled exception", t);
}
};
delayedWriteState = null;
- ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
+ ITask task2 = ThreadLogger.task("DelayedWriteCommit");
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
flushCounter = 0;
callback.exception(new DatabaseException(e));
state.stopWriteTransaction(clusterStream);
- Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
+ LOGGER.error("Write transaction caused an unexpected error, see exception.", e);
} finally {
assert (request != null);
assert (procedure != null);
- //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
requestManager.scheduleRead(new SessionRead(throwable, notify) {
@Override
fireSessionVariableChange(SessionVariables.QUEUED_READS);
+ ITask task = ThreadLogger.task(request);
+
ListenerBase listener = getListenerBase(procedure);
final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
try {
-// newGraph.state.barrier.inc();
-
T t = request.perform(newGraph);
try {
if(throwable != null) {
throwable.set(th);
} else {
- Logger.defaultLogError("Unhandled exception", th);
+ LOGGER.error("Unhandled exception", th);
}
}
if(throwable != null) {
throwable.set(t);
} else {
- Logger.defaultLogError("Unhandled exception", t);
+ LOGGER.error("Unhandled exception", t);
}
try {
if(throwable != null) {
throwable.set(t2);
} else {
- Logger.defaultLogError("Unhandled exception", t2);
+ LOGGER.error("Unhandled exception", t2);
}
}
}
-
-// newGraph.state.barrier.dec();
-// newGraph.waitAsync(request);
+
+ task.finish();
}
fireSessionVariableChange(SessionVariables.QUEUED_READS);
+ ITask task = ThreadLogger.task(request);
+
final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+ newGraph.asyncBarrier.inc();
try {
if (listener != null) {
try {
- QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
- //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
- //newGraph.processor.query(newGraph, request, null, procedure, listener);
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
+ QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
+ } catch (DatabaseException e) {
+ LOGGER.error("Unhandled query exception", e);
+ }
} else {
-// final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
-// procedure, "request");
+ BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
- BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request);
+ public void execute(AsyncReadGraph graph_, T result) {
+ task.finish();
+ super.execute(graph_, result);
+ }
- try {
+ public void exception(AsyncReadGraph graph_, Throwable t) {
+ task.finish();
+ super.exception(graph_, t);
+ }
- request.perform(newGraph, wrap);
- wrap.get();
+ };
+ try {
+ wrap.performSync(request);
} catch (DatabaseException e) {
-
- Logger.defaultLogError(e);
-
+ LOGGER.error("Unhandled query exception", e);
}
}
} finally {
+ newGraph.asyncBarrier.dec();
+
fireSessionVariableChange(SessionVariables.QUEUED_READS);
}
return;
WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
+ reactionGraph.asyncBarrier.inc();
try {
} finally {
+ reactionGraph.asyncBarrier.dec();
+
}
} catch (Throwable t) {
*/
@SuppressWarnings("unchecked")
@Override
- public synchronized <T> T peekService(Class<T> api) {
-
- if(serviceKey1 == api) {
- return (T)service1;
- } else if (serviceKey2 == api) {
- // Promote this key
- Object result = service2;
- service2 = service1;
- serviceKey2 = serviceKey1;
- service1 = result;
- serviceKey1 = api;
- return (T)result;
- }
+ public <T> T peekService(Class<T> api) {
+ if (Layer0.class == api)
+ return (T) L0;
- if (Layer0.class == api)
- return (T) L0;
- if (ServerInformation.class == api)
- return (T) getCachedServerInformation();
- else if (WriteGraphImpl.class == api)
- return (T) writeState.getGraph();
- else if (ClusterBuilder.class == api)
- return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
- else if (ClusterBuilderFactory.class == api)
- return (T)new ClusterBuilderFactoryImpl(this);
+ synchronized (this) {
+ if (serviceKey1 == api) {
+ return (T) service1;
+ }
+ if (serviceKey2 == api) {
+ // Promote this key
+ Object result = service2;
+ service2 = service1;
+ serviceKey2 = serviceKey1;
+ service1 = result;
+ serviceKey1 = api;
+ return (T)result;
+ }
- service2 = service1;
- serviceKey2 = serviceKey1;
+ if (ServerInformation.class == api)
+ return (T) getCachedServerInformation();
+ else if (WriteGraphImpl.class == api)
+ return (T) writeState.getGraph();
+ else if (ClusterBuilder.class == api)
+ return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
+ else if (ClusterBuilderFactory.class == api)
+ return (T)new ClusterBuilderFactoryImpl(this);
- service1 = serviceLocator.peekService(api);
- serviceKey1 = api;
+ service2 = service1;
+ serviceKey2 = serviceKey1;
- return (T)service1;
+ service1 = serviceLocator.peekService(api);
+ serviceKey1 = api;
+ return (T)service1;
+ }
}
/*
try {
h.valuesChanged(ctx);
} catch (Exception e) {
- Logger.defaultLogError("monitor handler notification produced the following exception", e);
+ LOGGER.error("monitor handler notification produced the following exception", e);
} catch (LinkageError e) {
- Logger.defaultLogError("monitor handler notification produced a linkage error", e);
+ LOGGER.error("monitor handler notification produced a linkage error", e);
}
}
}
while(dirtyPrimitives) {
dirtyPrimitives = false;
- getQueryProvider2().performDirtyUpdates(writer);
- getQueryProvider2().performScheduledUpdates(writer);
+ getQueryProvider2().propagateChangesInQueryCache(writer);
+ getQueryProvider2().listening.fireListeners(writer);
}
fireMetadataListeners(writer, clientChanges);
state.setCombine(false);
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T l0() {
+ return (T) L0;
+ }
+
}