Skip to content
Snippets Groups Projects
Select Git revision
  • 1ff429da2a94bc79300ebce3f8aae7efb10e9a75
  • master default protected
  • arm-aarch-platform
  • arm-platform
  • vjpai-patch-3
  • vjpai-patch-1
  • v1.27.x
  • jtattermusch-patch-2
  • jtattermusch-patch-1
  • update-java-worker-example-in-performance-docs
  • revert-21805-revert-21797-reintroduce_21527
  • revert-21804-tls-credentials-1
  • zhen_cleanup_namecheck
  • revert-21806-revert-21767-revert-21725-revert-21680-cq_ordering
  • vjpai-patch-2
  • revert-21766-tls-credentials-1
  • revert-21640-change_local_tcp_security_level
  • revert-21680-cq_ordering
  • revert-21527-unify_boringssl_deps2
  • revert-20803-grpclb_stabilization
  • fix-kokoro-rvm-key
  • v1.27.0
  • v1.27.0-pre2
  • v1.27.0-pre1
  • v1.26.0
  • v1.26.0-pre1
  • v1.25.0
  • v1.25.0-pre1
  • v1.24.3
  • v1.24.2
  • v1.24.1
  • v1.23.1
  • v1.24.0
  • v1.24.0-pre2
  • v1.24.0-pre1
  • v1.22.1
  • v1.23.0
  • v1.23.0-pre1
  • v1.22.0
  • v1.22.0-pre1
  • v1.21.4
41 results

_exit_test.py

Blame
  • StressTestClient.cs 11.95 KiB
    #region Copyright notice and license
    
    // Copyright 2015-2016, Google Inc.
    // All rights reserved.
    //
    // Redistribution and use in source and binary forms, with or without
    // modification, are permitted provided that the following conditions are
    // met:
    //
    //     * Redistributions of source code must retain the above copyright
    // notice, this list of conditions and the following disclaimer.
    //     * Redistributions in binary form must reproduce the above
    // copyright notice, this list of conditions and the following disclaimer
    // in the documentation and/or other materials provided with the
    // distribution.
    //     * Neither the name of Google Inc. nor the names of its
    // contributors may be used to endorse or promote products derived from
    // this software without specific prior written permission.
    //
    // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
    // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
    // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
    // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
    // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
    // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
    // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
    // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
    // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
    // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
    // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    
    #endregion
    
    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    
    using CommandLine;
    using CommandLine.Text;
    using Grpc.Core;
    using Grpc.Core.Logging;
    using Grpc.Core.Utils;
    using Grpc.Testing;
    
    namespace Grpc.IntegrationTesting
    {
        public class StressTestClient
        {
            static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<StressTestClient>();
            const double SecondsToNanos = 1e9;
    
            private class ClientOptions
            {
                [Option("server_addresses", DefaultValue = "localhost:8080")]
                public string ServerAddresses { get; set; }
    
                [Option("test_cases", DefaultValue = "large_unary:100")]
                public string TestCases { get; set; }
    
                [Option("test_duration_secs", DefaultValue = -1)]
                public int TestDurationSecs { get; set; }
    
                [Option("num_channels_per_server", DefaultValue = 1)]
                public int NumChannelsPerServer { get; set; }
    
                [Option("num_stubs_per_channel", DefaultValue = 1)]
                public int NumStubsPerChannel { get; set; }
    
                [Option("metrics_port", DefaultValue = 8081)]
                public int MetricsPort { get; set; }
    
                [HelpOption]
                public string GetUsage()
                {
                    var help = new HelpText
                    {
                        Heading = "gRPC C# stress test client",
                        AddDashesToOption = true
                    };
                    help.AddPreOptionsLine("Usage:");
                    help.AddOptions(this);
                    return help;
                }
            }
    
            ClientOptions options;
            List<string> serverAddresses;
            Dictionary<string, int> weightedTestCases;
            WeightedRandomGenerator testCaseGenerator;
    
            // cancellation will be emitted once test_duration_secs has elapsed.
            CancellationTokenSource finishedTokenSource = new CancellationTokenSource();
            Histogram histogram = new Histogram(0.01, 60 * SecondsToNanos);
    
            private StressTestClient(ClientOptions options, List<string> serverAddresses, Dictionary<string, int> weightedTestCases)
            {
                this.options = options;
                this.serverAddresses = serverAddresses;
                this.weightedTestCases = weightedTestCases;
                this.testCaseGenerator = new WeightedRandomGenerator(this.weightedTestCases);
            }
    
            public static void Run(string[] args)
            {
                var options = new ClientOptions();
                if (!Parser.Default.ParseArguments(args, options))
                {
                    Environment.Exit(1);
                }
    
                GrpcPreconditions.CheckArgument(options.NumChannelsPerServer > 0);
                GrpcPreconditions.CheckArgument(options.NumStubsPerChannel > 0);
    
                var serverAddresses = options.ServerAddresses.Split(',');
                GrpcPreconditions.CheckArgument(serverAddresses.Length > 0, "You need to provide at least one server address");
    
                var testCases = ParseWeightedTestCases(options.TestCases);
                GrpcPreconditions.CheckArgument(testCases.Count > 0, "You need to provide at least one test case");
    
                var interopClient = new StressTestClient(options, serverAddresses.ToList(), testCases);
                interopClient.Run().Wait();
            }
    
            async Task Run()
            {
                var metricsServer = new Server()
                {
                    Services = { MetricsService.BindService(new MetricsServiceImpl(histogram)) },
                    Ports = { { "[::]", options.MetricsPort, ServerCredentials.Insecure } }
                };
                metricsServer.Start();
    
                if (options.TestDurationSecs >= 0)
                {
                    finishedTokenSource.CancelAfter(TimeSpan.FromSeconds(options.TestDurationSecs));
                }
    
                var tasks = new List<Task>();
                var channels = new List<Channel>();
                foreach (var serverAddress in serverAddresses)
                {
                    for (int i = 0; i < options.NumChannelsPerServer; i++)
                    {
                        var channel = new Channel(serverAddress, ChannelCredentials.Insecure);
                        channels.Add(channel);
                        for (int j = 0; j < options.NumStubsPerChannel; j++)
                        {
                            var client = TestService.NewClient(channel);
                            var task = Task.Factory.StartNew(() => RunBodyAsync(client).GetAwaiter().GetResult(),
                                TaskCreationOptions.LongRunning);
                            tasks.Add(task);  
                        }
                    }
                }
                await Task.WhenAll(tasks);
    
                foreach (var channel in channels)
                {
                    await channel.ShutdownAsync();
                }
    
                await metricsServer.ShutdownAsync();
            }
    
            async Task RunBodyAsync(TestService.TestServiceClient client)
            {
                Logger.Info("Starting stress test client thread.");
                while (!finishedTokenSource.Token.IsCancellationRequested)
                {
                    var testCase = testCaseGenerator.GetNext();
    
                    var stopwatch = Stopwatch.StartNew();
    
                    await RunTestCaseAsync(client, testCase);
    
                    stopwatch.Stop();
                    histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
                }
                Logger.Info("Stress test client thread finished.");
            }
    
            async Task RunTestCaseAsync(TestService.TestServiceClient client, string testCase)
            {
                switch (testCase)
                {
                    case "empty_unary":
                        InteropClient.RunEmptyUnary(client);
                        break;
                    case "large_unary":
                        InteropClient.RunLargeUnary(client);
                        break;
                    case "client_streaming":
                        await InteropClient.RunClientStreamingAsync(client);
                        break;
                    case "server_streaming":
                        await InteropClient.RunServerStreamingAsync(client);
                        break;
                    case "ping_pong":
                        await InteropClient.RunPingPongAsync(client);
                        break;
                    case "empty_stream":
                        await InteropClient.RunEmptyStreamAsync(client);
                        break;
                    case "cancel_after_begin":
                        await InteropClient.RunCancelAfterBeginAsync(client);
                        break;
                    case "cancel_after_first_response":
                        await InteropClient.RunCancelAfterFirstResponseAsync(client);
                        break;
                    case "timeout_on_sleeping_server":
                        await InteropClient.RunTimeoutOnSleepingServerAsync(client);
                        break;
                    case "custom_metadata":
                        await InteropClient.RunCustomMetadataAsync(client);
                        break;
                    case "status_code_and_message":
                        await InteropClient.RunStatusCodeAndMessageAsync(client);
                        break;
                    default:
                        throw new ArgumentException("Unsupported test case  " + testCase);
                }
            }
    
            static Dictionary<string, int> ParseWeightedTestCases(string weightedTestCases)
            {
                var result = new Dictionary<string, int>();
                foreach (var weightedTestCase in weightedTestCases.Split(','))
                {
                    var parts = weightedTestCase.Split(new char[] {':'}, 2);
                    GrpcPreconditions.CheckArgument(parts.Length == 2, "Malformed test_cases option.");
                    result.Add(parts[0], int.Parse(parts[1]));
                }
                return result;
            }
    
            class WeightedRandomGenerator
            {
                readonly Random random = new Random();
                readonly List<Tuple<int, string>> cumulativeSums;
                readonly int weightSum;
    
                public WeightedRandomGenerator(Dictionary<string, int> weightedItems)
                {
                    cumulativeSums = new List<Tuple<int, string>>();
                    weightSum = 0;
                    foreach (var entry in weightedItems)
                    {
                        weightSum += entry.Value;
                        cumulativeSums.Add(Tuple.Create(weightSum, entry.Key));
                    }
                }
    
                public string GetNext()
                {
                    int rand = random.Next(weightSum);
                    foreach (var entry in cumulativeSums)
                    {
                        if (rand < entry.Item1)
                        {
                            return entry.Item2;
                        }
                    }
                    throw new InvalidOperationException("GetNext() failed.");
                }
            }
    
            class MetricsServiceImpl : MetricsService.MetricsServiceBase 
            {
                const string GaugeName = "csharp_overall_qps";
    
                readonly Histogram histogram;
                readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
    
                public MetricsServiceImpl(Histogram histogram)
                {
                    this.histogram = histogram;
                }
    
                public override Task<GaugeResponse> GetGauge(GaugeRequest request, ServerCallContext context)
                {
                    if (request.Name == GaugeName)
                    {
                        long qps = GetQpsAndReset();
    
                        return Task.FromResult(new GaugeResponse
                        {
                            Name = GaugeName,
                            LongValue = qps
                        });
                    }
                    throw new RpcException(new Status(StatusCode.InvalidArgument, "Gauge does not exist"));
                }
    
                public override async Task GetAllGauges(EmptyMessage request, IServerStreamWriter<GaugeResponse> responseStream, ServerCallContext context)
                {
                    long qps = GetQpsAndReset();
    
                    var response = new GaugeResponse
                    {
                        Name = GaugeName,
                        LongValue = qps
                    };
                    await responseStream.WriteAsync(response);
                }
    
                long GetQpsAndReset()
                {
                    var snapshot = histogram.GetSnapshot(true);
                    var elapsedSnapshot = wallClockStopwatch.GetElapsedSnapshot(true);
    
                    return (long) (snapshot.Count / elapsedSnapshot.Seconds);
                }
            }
        }
    }