Skip to content
Snippets Groups Projects
Commit 9b0f6efb authored by Yaron Shani's avatar Yaron Shani
Browse files

Merge branch 'master' of https://github.com/smulikHakipod/grpc

parents 8d1cfdcb 495cf83c
No related branches found
No related tags found
No related merge requests found
...@@ -140,7 +140,8 @@ namespace Grpc.IntegrationTesting ...@@ -140,7 +140,8 @@ namespace Grpc.IntegrationTesting
readonly ClientType clientType; readonly ClientType clientType;
readonly RpcType rpcType; readonly RpcType rpcType;
readonly PayloadConfig payloadConfig; readonly PayloadConfig payloadConfig;
readonly Histogram histogram; readonly Lazy<byte[]> cachedByteBufferRequest;
readonly ThreadLocal<Histogram> threadLocalHistogram;
readonly List<Task> runnerTasks; readonly List<Task> runnerTasks;
readonly CancellationTokenSource stoppedCts = new CancellationTokenSource(); readonly CancellationTokenSource stoppedCts = new CancellationTokenSource();
...@@ -155,7 +156,8 @@ namespace Grpc.IntegrationTesting ...@@ -155,7 +156,8 @@ namespace Grpc.IntegrationTesting
this.clientType = clientType; this.clientType = clientType;
this.rpcType = rpcType; this.rpcType = rpcType;
this.payloadConfig = payloadConfig; this.payloadConfig = payloadConfig;
this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); this.cachedByteBufferRequest = new Lazy<byte[]>(() => new byte[payloadConfig.BytebufParams.ReqSize]);
this.threadLocalHistogram = new ThreadLocal<Histogram>(() => new Histogram(histogramParams.Resolution, histogramParams.MaxPossible), true);
this.runnerTasks = new List<Task>(); this.runnerTasks = new List<Task>();
foreach (var channel in this.channels) foreach (var channel in this.channels)
...@@ -171,7 +173,12 @@ namespace Grpc.IntegrationTesting ...@@ -171,7 +173,12 @@ namespace Grpc.IntegrationTesting
public ClientStats GetStats(bool reset) public ClientStats GetStats(bool reset)
{ {
var histogramData = histogram.GetSnapshot(reset); var histogramData = new HistogramData();
foreach (var hist in threadLocalHistogram.Values)
{
hist.GetSnapshot(histogramData, reset);
}
var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds; var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds;
if (reset) if (reset)
...@@ -232,7 +239,7 @@ namespace Grpc.IntegrationTesting ...@@ -232,7 +239,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop(); stopwatch.Stop();
// spec requires data point in nanoseconds. // spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
timer.WaitForNext(); timer.WaitForNext();
} }
...@@ -251,7 +258,7 @@ namespace Grpc.IntegrationTesting ...@@ -251,7 +258,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop(); stopwatch.Stop();
// spec requires data point in nanoseconds. // spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync(); await timer.WaitForNextAsync();
} }
...@@ -273,7 +280,7 @@ namespace Grpc.IntegrationTesting ...@@ -273,7 +280,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop(); stopwatch.Stop();
// spec requires data point in nanoseconds. // spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync(); await timer.WaitForNextAsync();
} }
...@@ -286,7 +293,7 @@ namespace Grpc.IntegrationTesting ...@@ -286,7 +293,7 @@ namespace Grpc.IntegrationTesting
private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer) private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer)
{ {
var request = CreateByteBufferRequest(); var request = cachedByteBufferRequest.Value;
var stopwatch = new Stopwatch(); var stopwatch = new Stopwatch();
var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions()); var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions());
...@@ -301,7 +308,7 @@ namespace Grpc.IntegrationTesting ...@@ -301,7 +308,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop(); stopwatch.Stop();
// spec requires data point in nanoseconds. // spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync(); await timer.WaitForNextAsync();
} }
...@@ -351,11 +358,6 @@ namespace Grpc.IntegrationTesting ...@@ -351,11 +358,6 @@ namespace Grpc.IntegrationTesting
}; };
} }
private byte[] CreateByteBufferRequest()
{
return new byte[payloadConfig.BytebufParams.ReqSize];
}
private static Payload CreateZerosPayload(int size) private static Payload CreateZerosPayload(int size)
{ {
return new Payload { Body = ByteString.CopyFrom(new byte[size]) }; return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
......
...@@ -84,15 +84,27 @@ namespace Grpc.IntegrationTesting ...@@ -84,15 +84,27 @@ namespace Grpc.IntegrationTesting
} }
} }
/// <summary> /// <summary>
/// Gets snapshot of stats and reset /// Gets snapshot of stats and optionally resets the histogram.
/// </summary> /// </summary>
public HistogramData GetSnapshot(bool reset = false) public HistogramData GetSnapshot(bool reset = false)
{ {
lock (myLock) lock (myLock)
{ {
return GetSnapshotUnsafe(reset); var histogramData = new HistogramData();
GetSnapshotUnsafe(histogramData, reset);
return histogramData;
}
}
/// <summary>
/// Merges snapshot of stats into <c>mergeTo</c> and optionally resets the histogram.
/// </summary>
public void GetSnapshot(HistogramData mergeTo, bool reset)
{
lock (myLock)
{
GetSnapshotUnsafe(mergeTo, reset);
} }
} }
...@@ -117,24 +129,39 @@ namespace Grpc.IntegrationTesting ...@@ -117,24 +129,39 @@ namespace Grpc.IntegrationTesting
this.buckets[FindBucket(value)]++; this.buckets[FindBucket(value)]++;
} }
private HistogramData GetSnapshotUnsafe(bool reset) private void GetSnapshotUnsafe(HistogramData mergeTo, bool reset)
{ {
var data = new HistogramData GrpcPreconditions.CheckArgument(mergeTo.Bucket.Count == 0 || mergeTo.Bucket.Count == buckets.Length);
if (mergeTo.Count == 0)
{ {
Count = count, mergeTo.MinSeen = min;
Sum = sum, mergeTo.MaxSeen = max;
SumOfSquares = sumOfSquares, }
MinSeen = min, else
MaxSeen = max, {
Bucket = { buckets } mergeTo.MinSeen = Math.Min(mergeTo.MinSeen, min);
}; mergeTo.MaxSeen = Math.Max(mergeTo.MaxSeen, max);
}
mergeTo.Count += count;
mergeTo.Sum += sum;
mergeTo.SumOfSquares += sumOfSquares;
if (reset) if (mergeTo.Bucket.Count == 0)
{ {
ResetUnsafe(); mergeTo.Bucket.AddRange(buckets);
}
else
{
for (int i = 0; i < buckets.Length; i++)
{
mergeTo.Bucket[i] += buckets[i];
}
} }
return data; if (reset)
{
ResetUnsafe();
}
} }
private void ResetUnsafe() private void ResetUnsafe()
......
...@@ -73,13 +73,37 @@ namespace Grpc.IntegrationTesting ...@@ -73,13 +73,37 @@ namespace Grpc.IntegrationTesting
{ {
var hist = new Histogram(0.01, 60e9); var hist = new Histogram(0.01, 60e9);
hist.AddObservation(-0.5); // should be in the first bucket hist.AddObservation(-0.5); // should be in the first bucket
hist.AddObservation(1e12); // should be in the last bucket hist.AddObservation(1e12); // should be in the last bucket
var data = hist.GetSnapshot(); var data = hist.GetSnapshot();
Assert.AreEqual(1, data.Bucket[0]); Assert.AreEqual(1, data.Bucket[0]);
Assert.AreEqual(1, data.Bucket[data.Bucket.Count - 1]); Assert.AreEqual(1, data.Bucket[data.Bucket.Count - 1]);
} }
[Test]
public void MergeSnapshots()
{
var data = new HistogramData();
var hist1 = new Histogram(0.01, 60e9);
hist1.AddObservation(-0.5); // should be in the first bucket
hist1.AddObservation(1e12); // should be in the last bucket
hist1.GetSnapshot(data, false);
var hist2 = new Histogram(0.01, 60e9);
hist2.AddObservation(10000);
hist2.AddObservation(11000);
hist2.GetSnapshot(data, false);
Assert.AreEqual(4, data.Count);
Assert.AreEqual(-0.5, data.MinSeen);
Assert.AreEqual(1e12, data.MaxSeen);
Assert.AreEqual(1, data.Bucket[0]);
Assert.AreEqual(1, data.Bucket[925]);
Assert.AreEqual(1, data.Bucket[935]);
Assert.AreEqual(1, data.Bucket[data.Bucket.Count - 1]);
}
[Test] [Test]
public void Reset() public void Reset()
{ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment