Skip to content
Snippets Groups Projects
Commit 19e3d3ba authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

get rid of Histogram lock contention in qps worker

parent d7079b20
No related branches found
No related tags found
No related merge requests found
...@@ -141,7 +141,7 @@ namespace Grpc.IntegrationTesting ...@@ -141,7 +141,7 @@ namespace Grpc.IntegrationTesting
readonly RpcType rpcType; readonly RpcType rpcType;
readonly PayloadConfig payloadConfig; readonly PayloadConfig payloadConfig;
readonly Lazy<byte[]> cachedByteBufferRequest; readonly Lazy<byte[]> cachedByteBufferRequest;
readonly Histogram histogram; readonly ThreadLocal<Histogram> threadLocalHistogram;
readonly List<Task> runnerTasks; readonly List<Task> runnerTasks;
readonly CancellationTokenSource stoppedCts = new CancellationTokenSource(); readonly CancellationTokenSource stoppedCts = new CancellationTokenSource();
...@@ -157,7 +157,7 @@ namespace Grpc.IntegrationTesting ...@@ -157,7 +157,7 @@ namespace Grpc.IntegrationTesting
this.rpcType = rpcType; this.rpcType = rpcType;
this.payloadConfig = payloadConfig; this.payloadConfig = payloadConfig;
this.cachedByteBufferRequest = new Lazy<byte[]>(() => new byte[payloadConfig.BytebufParams.ReqSize]); this.cachedByteBufferRequest = new Lazy<byte[]>(() => new byte[payloadConfig.BytebufParams.ReqSize]);
this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); this.threadLocalHistogram = new ThreadLocal<Histogram>(() => new Histogram(histogramParams.Resolution, histogramParams.MaxPossible), true);
this.runnerTasks = new List<Task>(); this.runnerTasks = new List<Task>();
foreach (var channel in this.channels) foreach (var channel in this.channels)
...@@ -173,7 +173,12 @@ namespace Grpc.IntegrationTesting ...@@ -173,7 +173,12 @@ namespace Grpc.IntegrationTesting
public ClientStats GetStats(bool reset) public ClientStats GetStats(bool reset)
{ {
var histogramData = histogram.GetSnapshot(reset); var histogramData = new HistogramData();
foreach (var hist in threadLocalHistogram.Values)
{
hist.GetSnapshot(histogramData, reset);
}
var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds; var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds;
if (reset) if (reset)
...@@ -234,7 +239,7 @@ namespace Grpc.IntegrationTesting ...@@ -234,7 +239,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop(); stopwatch.Stop();
// spec requires data point in nanoseconds. // spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
timer.WaitForNext(); timer.WaitForNext();
} }
...@@ -253,7 +258,7 @@ namespace Grpc.IntegrationTesting ...@@ -253,7 +258,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop(); stopwatch.Stop();
// spec requires data point in nanoseconds. // spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync(); await timer.WaitForNextAsync();
} }
...@@ -275,7 +280,7 @@ namespace Grpc.IntegrationTesting ...@@ -275,7 +280,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop(); stopwatch.Stop();
// spec requires data point in nanoseconds. // spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync(); await timer.WaitForNextAsync();
} }
...@@ -303,7 +308,7 @@ namespace Grpc.IntegrationTesting ...@@ -303,7 +308,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop(); stopwatch.Stop();
// spec requires data point in nanoseconds. // spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync(); await timer.WaitForNextAsync();
} }
......
...@@ -84,15 +84,27 @@ namespace Grpc.IntegrationTesting ...@@ -84,15 +84,27 @@ namespace Grpc.IntegrationTesting
} }
} }
/// <summary> /// <summary>
/// Gets snapshot of stats and reset /// Gets snapshot of stats and optionally resets the histogram.
/// </summary> /// </summary>
public HistogramData GetSnapshot(bool reset = false) public HistogramData GetSnapshot(bool reset = false)
{ {
lock (myLock) lock (myLock)
{ {
return GetSnapshotUnsafe(reset); var histogramData = new HistogramData();
GetSnapshotUnsafe(histogramData, reset);
return histogramData;
}
}
/// <summary>
/// Merges snapshot of stats into <c>mergeTo</c> and optionally resets the histogram.
/// </summary>
public void GetSnapshot(HistogramData mergeTo, bool reset)
{
lock (myLock)
{
GetSnapshotUnsafe(mergeTo, reset);
} }
} }
...@@ -117,24 +129,39 @@ namespace Grpc.IntegrationTesting ...@@ -117,24 +129,39 @@ namespace Grpc.IntegrationTesting
this.buckets[FindBucket(value)]++; this.buckets[FindBucket(value)]++;
} }
private HistogramData GetSnapshotUnsafe(bool reset) private void GetSnapshotUnsafe(HistogramData mergeTo, bool reset)
{ {
var data = new HistogramData GrpcPreconditions.CheckArgument(mergeTo.Bucket.Count == 0 || mergeTo.Bucket.Count == buckets.Length);
if (mergeTo.Count == 0)
{ {
Count = count, mergeTo.MinSeen = min;
Sum = sum, mergeTo.MaxSeen = max;
SumOfSquares = sumOfSquares, }
MinSeen = min, else
MaxSeen = max, {
Bucket = { buckets } mergeTo.MinSeen = Math.Min(mergeTo.MinSeen, min);
}; mergeTo.MaxSeen = Math.Max(mergeTo.MaxSeen, max);
}
mergeTo.Count += count;
mergeTo.Sum += sum;
mergeTo.SumOfSquares += sumOfSquares;
if (reset) if (mergeTo.Bucket.Count == 0)
{ {
ResetUnsafe(); mergeTo.Bucket.AddRange(buckets);
}
else
{
for (int i = 0; i < buckets.Length; i++)
{
mergeTo.Bucket[i] += buckets[i];
}
} }
return data; if (reset)
{
ResetUnsafe();
}
} }
private void ResetUnsafe() private void ResetUnsafe()
......
...@@ -73,13 +73,37 @@ namespace Grpc.IntegrationTesting ...@@ -73,13 +73,37 @@ namespace Grpc.IntegrationTesting
{ {
var hist = new Histogram(0.01, 60e9); var hist = new Histogram(0.01, 60e9);
hist.AddObservation(-0.5); // should be in the first bucket hist.AddObservation(-0.5); // should be in the first bucket
hist.AddObservation(1e12); // should be in the last bucket hist.AddObservation(1e12); // should be in the last bucket
var data = hist.GetSnapshot(); var data = hist.GetSnapshot();
Assert.AreEqual(1, data.Bucket[0]); Assert.AreEqual(1, data.Bucket[0]);
Assert.AreEqual(1, data.Bucket[data.Bucket.Count - 1]); Assert.AreEqual(1, data.Bucket[data.Bucket.Count - 1]);
} }
[Test]
public void MergeSnapshots()
{
var data = new HistogramData();
var hist1 = new Histogram(0.01, 60e9);
hist1.AddObservation(-0.5); // should be in the first bucket
hist1.AddObservation(1e12); // should be in the last bucket
hist1.GetSnapshot(data, false);
var hist2 = new Histogram(0.01, 60e9);
hist2.AddObservation(10000);
hist2.AddObservation(11000);
hist2.GetSnapshot(data, false);
Assert.AreEqual(4, data.Count);
Assert.AreEqual(-0.5, data.MinSeen);
Assert.AreEqual(1e12, data.MaxSeen);
Assert.AreEqual(1, data.Bucket[0]);
Assert.AreEqual(1, data.Bucket[925]);
Assert.AreEqual(1, data.Bucket[935]);
Assert.AreEqual(1, data.Bucket[data.Bucket.Count - 1]);
}
[Test] [Test]
public void Reset() public void Reset()
{ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment