Skip to content
Snippets Groups Projects
Commit e81adf3b authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

Add more features for C# performance worker

parent 6dd74fcb
No related branches found
No related tags found
No related merge requests found
...@@ -61,15 +61,7 @@ namespace Grpc.IntegrationTesting ...@@ -61,15 +61,7 @@ namespace Grpc.IntegrationTesting
public static IClientRunner CreateStarted(ClientConfig config) public static IClientRunner CreateStarted(ClientConfig config)
{ {
Logger.Debug("ClientConfig: {0}", config); Logger.Debug("ClientConfig: {0}", config);
string target = config.ServerTargets.Single();
GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop,
"Only closed loop scenario supported for C#");
GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1");
if (config.OutstandingRpcsPerChannel != 0)
{
Logger.Warning("ClientConfig.OutstandingRpcsPerChannel is not supported for C#. Ignoring the value");
}
if (config.AsyncClientThreads != 0) if (config.AsyncClientThreads != 0)
{ {
Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value"); Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value");
...@@ -83,22 +75,40 @@ namespace Grpc.IntegrationTesting ...@@ -83,22 +75,40 @@ namespace Grpc.IntegrationTesting
Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value"); Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value");
} }
var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure; var channels = CreateChannels(config.ClientChannels, config.ServerTargets, config.SecurityParams);
return new ClientRunnerImpl(channels,
config.ClientType,
config.RpcType,
config.OutstandingRpcsPerChannel,
config.LoadParams,
config.PayloadConfig,
config.HistogramParams);
}
private static List<Channel> CreateChannels(int clientChannels, IEnumerable<string> serverTargets, SecurityParams securityParams)
{
GrpcPreconditions.CheckArgument(clientChannels > 0, "clientChannels needs to be at least 1.");
GrpcPreconditions.CheckArgument(serverTargets.Count() > 0, "at least one serverTarget needs to be specified.");
var credentials = securityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
List<ChannelOption> channelOptions = null; List<ChannelOption> channelOptions = null;
if (config.SecurityParams != null && config.SecurityParams.ServerHostOverride != "") if (securityParams != null && securityParams.ServerHostOverride != "")
{ {
channelOptions = new List<ChannelOption> channelOptions = new List<ChannelOption>
{ {
new ChannelOption(ChannelOptions.SslTargetNameOverride, config.SecurityParams.ServerHostOverride) new ChannelOption(ChannelOptions.SslTargetNameOverride, securityParams.ServerHostOverride)
}; };
} }
var channel = new Channel(target, credentials, channelOptions);
return new ClientRunnerImpl(channel, var result = new List<Channel>();
config.ClientType, for (int i = 0; i < clientChannels; i++)
config.RpcType, {
config.PayloadConfig, var target = serverTargets.ElementAt(i % serverTargets.Count());
config.HistogramParams); var channel = new Channel(target, credentials, channelOptions);
result.Add(channel);
}
return result;
} }
} }
...@@ -106,30 +116,35 @@ namespace Grpc.IntegrationTesting ...@@ -106,30 +116,35 @@ namespace Grpc.IntegrationTesting
{ {
const double SecondsToNanos = 1e9; const double SecondsToNanos = 1e9;
readonly Channel channel; readonly List<Channel> channels;
readonly ClientType clientType; readonly ClientType clientType;
readonly RpcType rpcType; readonly RpcType rpcType;
readonly PayloadConfig payloadConfig; readonly PayloadConfig payloadConfig;
readonly Histogram histogram; readonly Histogram histogram;
readonly BenchmarkService.BenchmarkServiceClient client; readonly List<Task> runnerTasks;
readonly Task runnerTask; readonly CancellationTokenSource stoppedCts = new CancellationTokenSource();
readonly CancellationTokenSource stoppedCts;
readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams) public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams)
{ {
this.channel = GrpcPreconditions.CheckNotNull(channel); GrpcPreconditions.CheckArgument(outstandingRpcsPerChannel > 0, "outstandingRpcsPerChannel");
this.channels = new List<Channel>(channels);
this.clientType = clientType; this.clientType = clientType;
this.rpcType = rpcType; this.rpcType = rpcType;
this.payloadConfig = payloadConfig; this.payloadConfig = payloadConfig;
this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible);
this.stoppedCts = new CancellationTokenSource(); this.runnerTasks = new List<Task>();
this.client = BenchmarkService.NewClient(channel); foreach (var channel in this.channels)
{
var threadBody = GetThreadBody(); for (int i = 0; i < outstandingRpcsPerChannel; i++)
this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning); {
var timer = CreateTimer(loadParams, 1.0 / this.channels.Count / outstandingRpcsPerChannel);
var threadBody = GetThreadBody(channel, timer);
this.runnerTasks.Add(Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning));
}
}
} }
public ClientStats GetStats(bool reset) public ClientStats GetStats(bool reset)
...@@ -150,12 +165,19 @@ namespace Grpc.IntegrationTesting ...@@ -150,12 +165,19 @@ namespace Grpc.IntegrationTesting
public async Task StopAsync() public async Task StopAsync()
{ {
stoppedCts.Cancel(); stoppedCts.Cancel();
await runnerTask; foreach (var runnerTask in runnerTasks)
await channel.ShutdownAsync(); {
await runnerTask;
}
foreach (var channel in channels)
{
await channel.ShutdownAsync();
}
} }
private void RunClosedLoopUnary() private void RunUnary(Channel channel, IInterarrivalTimer timer)
{ {
var client = BenchmarkService.NewClient(channel);
var request = CreateSimpleRequest(); var request = CreateSimpleRequest();
var stopwatch = new Stopwatch(); var stopwatch = new Stopwatch();
...@@ -167,11 +189,14 @@ namespace Grpc.IntegrationTesting ...@@ -167,11 +189,14 @@ namespace Grpc.IntegrationTesting
// spec requires data point in nanoseconds. // spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
timer.WaitForNext();
} }
} }
private async Task RunClosedLoopUnaryAsync() private async Task RunUnaryAsync(Channel channel, IInterarrivalTimer timer)
{ {
var client = BenchmarkService.NewClient(channel);
var request = CreateSimpleRequest(); var request = CreateSimpleRequest();
var stopwatch = new Stopwatch(); var stopwatch = new Stopwatch();
...@@ -183,11 +208,14 @@ namespace Grpc.IntegrationTesting ...@@ -183,11 +208,14 @@ namespace Grpc.IntegrationTesting
// spec requires data point in nanoseconds. // spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
} }
} }
private async Task RunClosedLoopStreamingAsync() private async Task RunStreamingPingPongAsync(Channel channel, IInterarrivalTimer timer)
{ {
var client = BenchmarkService.NewClient(channel);
var request = CreateSimpleRequest(); var request = CreateSimpleRequest();
var stopwatch = new Stopwatch(); var stopwatch = new Stopwatch();
...@@ -202,6 +230,8 @@ namespace Grpc.IntegrationTesting ...@@ -202,6 +230,8 @@ namespace Grpc.IntegrationTesting
// spec requires data point in nanoseconds. // spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
} }
// finish the streaming call // finish the streaming call
...@@ -210,7 +240,7 @@ namespace Grpc.IntegrationTesting ...@@ -210,7 +240,7 @@ namespace Grpc.IntegrationTesting
} }
} }
private async Task RunGenericClosedLoopStreamingAsync() private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer)
{ {
var request = CreateByteBufferRequest(); var request = CreateByteBufferRequest();
var stopwatch = new Stopwatch(); var stopwatch = new Stopwatch();
...@@ -228,6 +258,8 @@ namespace Grpc.IntegrationTesting ...@@ -228,6 +258,8 @@ namespace Grpc.IntegrationTesting
// spec requires data point in nanoseconds. // spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
} }
// finish the streaming call // finish the streaming call
...@@ -236,7 +268,7 @@ namespace Grpc.IntegrationTesting ...@@ -236,7 +268,7 @@ namespace Grpc.IntegrationTesting
} }
} }
private Action GetThreadBody() private Action GetThreadBody(Channel channel, IInterarrivalTimer timer)
{ {
if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams) if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams)
{ {
...@@ -244,7 +276,7 @@ namespace Grpc.IntegrationTesting ...@@ -244,7 +276,7 @@ namespace Grpc.IntegrationTesting
GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls"); GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls");
return () => return () =>
{ {
RunGenericClosedLoopStreamingAsync().Wait(); RunGenericStreamingAsync(channel, timer).Wait();
}; };
} }
...@@ -252,7 +284,7 @@ namespace Grpc.IntegrationTesting ...@@ -252,7 +284,7 @@ namespace Grpc.IntegrationTesting
if (clientType == ClientType.SYNC_CLIENT) if (clientType == ClientType.SYNC_CLIENT)
{ {
GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#"); GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#");
return RunClosedLoopUnary; return () => RunUnary(channel, timer);
} }
else if (clientType == ClientType.ASYNC_CLIENT) else if (clientType == ClientType.ASYNC_CLIENT)
{ {
...@@ -261,12 +293,12 @@ namespace Grpc.IntegrationTesting ...@@ -261,12 +293,12 @@ namespace Grpc.IntegrationTesting
case RpcType.UNARY: case RpcType.UNARY:
return () => return () =>
{ {
RunClosedLoopUnaryAsync().Wait(); RunUnaryAsync(channel, timer).Wait();
}; };
case RpcType.STREAMING: case RpcType.STREAMING:
return () => return () =>
{ {
RunClosedLoopStreamingAsync().Wait(); RunStreamingPingPongAsync(channel, timer).Wait();
}; };
} }
} }
...@@ -292,5 +324,18 @@ namespace Grpc.IntegrationTesting ...@@ -292,5 +324,18 @@ namespace Grpc.IntegrationTesting
{ {
return new Payload { Body = ByteString.CopyFrom(new byte[size]) }; return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
} }
private static IInterarrivalTimer CreateTimer(LoadParams loadParams, double loadMultiplier)
{
switch (loadParams.LoadCase)
{
case LoadParams.LoadOneofCase.ClosedLoop:
return new ClosedLoopInterarrivalTimer();
case LoadParams.LoadOneofCase.Poisson:
return new PoissonInterarrivalTimer(loadParams.Poisson.OfferedLoad * loadMultiplier);
default:
throw new ArgumentException("Unknown load type");
}
}
} }
} }
...@@ -115,6 +115,7 @@ ...@@ -115,6 +115,7 @@
<Compile Include="GenericService.cs" /> <Compile Include="GenericService.cs" />
<Compile Include="GeneratedServiceBaseTest.cs" /> <Compile Include="GeneratedServiceBaseTest.cs" />
<Compile Include="GeneratedClientTest.cs" /> <Compile Include="GeneratedClientTest.cs" />
<Compile Include="InterarrivalTimers.cs" />
</ItemGroup> </ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup> <ItemGroup>
......
#region Copyright notice and license
// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Core.Utils;
using Grpc.Testing;
namespace Grpc.IntegrationTesting
{
public interface IInterarrivalTimer
{
void WaitForNext();
Task WaitForNextAsync();
}
/// <summary>
/// Interarrival timer that doesn't wait at all.
/// </summary>
public class ClosedLoopInterarrivalTimer : IInterarrivalTimer
{
public ClosedLoopInterarrivalTimer()
{
}
public void WaitForNext()
{
// NOP
}
public Task WaitForNextAsync()
{
return Task.FromResult<object>(null);
}
}
/// <summary>
/// Interarrival timer that generates Poisson process load.
/// </summary>
public class PoissonInterarrivalTimer : IInterarrivalTimer
{
const double NanosToSeconds = 1e-9;
readonly ExponentialDistribution exponentialDistribution;
DateTime? lastEventTime;
public PoissonInterarrivalTimer(double offeredLoad)
{
this.exponentialDistribution = new ExponentialDistribution(new Random(), offeredLoad);
this.lastEventTime = DateTime.UtcNow;
}
public void WaitForNext()
{
var waitDuration = GetNextWaitDuration();
int millisTimeout = (int) Math.Round(waitDuration.TotalMilliseconds);
if (millisTimeout > 0)
{
// TODO(jtattermusch): probably only works well for a relatively low interarrival rate
Thread.Sleep(millisTimeout);
}
}
public async Task WaitForNextAsync()
{
var waitDuration = GetNextWaitDuration();
int millisTimeout = (int) Math.Round(waitDuration.TotalMilliseconds);
if (millisTimeout > 0)
{
// TODO(jtattermusch): probably only works well for a relatively low interarrival rate
await Task.Delay(millisTimeout);
}
}
private TimeSpan GetNextWaitDuration()
{
if (!lastEventTime.HasValue)
{
this.lastEventTime = DateTime.Now;
}
var origLastEventTime = this.lastEventTime.Value;
this.lastEventTime = origLastEventTime + TimeSpan.FromSeconds(exponentialDistribution.Next() * NanosToSeconds);
return this.lastEventTime.Value - origLastEventTime;
}
/// <summary>
/// Exp generator.
/// </summary>
private class ExponentialDistribution
{
readonly Random random;
readonly double lambda;
readonly double lambdaReciprocal;
public ExponentialDistribution(Random random, double lambda)
{
this.random = random;
this.lambda = lambda;
this.lambdaReciprocal = 1.0 / lambda;
}
public double Next()
{
double uniform = random.NextDouble();
// Use 1.0-uni above to avoid NaN if uni is 0
return lambdaReciprocal * (-Math.Log(1.0 - uniform));
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment