Newer
Older
#region Copyright notice and license
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Google.Apis.Auth.OAuth2;
using Google.Protobuf;
using Grpc.Auth;
Jan Tattermusch
committed
using Grpc.Core;
using Grpc.Core.Utils;
Jan Tattermusch
committed
using NUnit.Framework;
namespace Grpc.IntegrationTesting
{
private class ClientOptions
{
[Option("server_host", DefaultValue = "127.0.0.1")]
public string ServerHost { get; set; }
[Option("server_host_override", DefaultValue = TestCredentials.DefaultHostOverride)]
public string ServerHostOverride { get; set; }
[Option("server_port", Required = true)]
public int ServerPort { get; set; }
[Option("test_case", DefaultValue = "large_unary")]
public string TestCase { get; set; }
// Deliberately using nullable bool type to allow --use_tls=true syntax (as opposed to --use_tls)
[Option("use_tls", DefaultValue = false)]
public bool? UseTls { get; set; }
// Deliberately using nullable bool type to allow --use_test_ca=true syntax (as opposed to --use_test_ca)
[Option("use_test_ca", DefaultValue = false)]
public bool? UseTestCa { get; set; }
[Option("default_service_account", Required = false)]
public string DefaultServiceAccount { get; set; }
[Option("oauth_scope", Required = false)]
public string OAuthScope { get; set; }
[Option("service_account_key_file", Required = false)]
public string ServiceAccountKeyFile { get; set; }
[HelpOption]
public string GetUsage()
{
var help = new HelpText
{
Heading = "gRPC C# interop testing client",
AddDashesToOption = true
};
help.AddPreOptionsLine("Usage:");
help.AddOptions(this);
return help;
}
}
ClientOptions options;
this.options = options;
}
var options = new ClientOptions();
if (!Parser.Default.ParseArguments(args, options))
{
Environment.Exit(1);
}
var interopClient = new InteropClient(options);
}
{
var credentials = await CreateCredentialsAsync();
List<ChannelOption> channelOptions = null;
if (!string.IsNullOrEmpty(options.ServerHostOverride))
{
channelOptions = new List<ChannelOption>
{
new ChannelOption(ChannelOptions.SslTargetNameOverride, options.ServerHostOverride)
};
}
var channel = new Channel(options.ServerHost, options.ServerPort, credentials, channelOptions);
await RunTestCaseAsync(channel, options);
await channel.ShutdownAsync();
}
private async Task<ChannelCredentials> CreateCredentialsAsync()
var credentials = ChannelCredentials.Insecure;
if (options.UseTls.Value)
{
credentials = options.UseTestCa.Value ? TestCredentials.CreateSslCredentials() : new SslCredentials();
}
if (options.TestCase == "jwt_token_creds")
{
#if !NETCOREAPP1_0
var googleCredential = await GoogleCredential.GetApplicationDefaultAsync();
Assert.IsTrue(googleCredential.IsCreateScopedRequired);
credentials = ChannelCredentials.Create(credentials, googleCredential.ToCallCredentials());
#else
// TODO(jtattermusch): implement this
throw new NotImplementedException("Not supported on CoreCLR yet");
#endif
}
if (options.TestCase == "compute_engine_creds")
{
#if !NETCOREAPP1_0
var googleCredential = await GoogleCredential.GetApplicationDefaultAsync();
Assert.IsFalse(googleCredential.IsCreateScopedRequired);
credentials = ChannelCredentials.Create(credentials, googleCredential.ToCallCredentials());
#else
// TODO(jtattermusch): implement this
throw new NotImplementedException("Not supported on CoreCLR yet");
#endif
}
private async Task RunTestCaseAsync(Channel channel, ClientOptions options)
var client = new TestService.TestServiceClient(channel);
{
case "empty_unary":
RunEmptyUnary(client);
break;
case "large_unary":
RunLargeUnary(client);
break;
await RunClientStreamingAsync(client);
await RunServerStreamingAsync(client);
await RunPingPongAsync(client);
await RunEmptyStreamAsync(client);
case "compute_engine_creds":
RunComputeEngineCreds(client, options.DefaultServiceAccount, options.OAuthScope);
case "oauth2_auth_token":
await RunOAuth2AuthTokenAsync(client, options.OAuthScope);
break;
case "per_rpc_creds":
await RunPerRpcCredsAsync(client, options.OAuthScope);
await RunCancelAfterBeginAsync(client);
break;
case "cancel_after_first_response":
await RunCancelAfterFirstResponseAsync(client);
case "timeout_on_sleeping_server":
await RunTimeoutOnSleepingServerAsync(client);
break;
case "custom_metadata":
await RunCustomMetadataAsync(client);
break;
case "status_code_and_message":
await RunStatusCodeAndMessageAsync(client);
Jan Tattermusch
committed
break;
case "unimplemented_method":
RunUnimplementedMethod(new UnimplementedService.UnimplementedServiceClient(channel));
break;
case "client_compressed_unary":
RunClientCompressedUnary(client);
break;
case "client_compressed_streaming":
await RunClientCompressedStreamingAsync(client);
break;
default:
throw new ArgumentException("Unknown test case " + options.TestCase);
}
}
public static void RunEmptyUnary(TestService.TestServiceClient client)
{
Console.WriteLine("running empty_unary");
Assert.IsNotNull(response);
}
public static void RunLargeUnary(TestService.TestServiceClient client)
{
Console.WriteLine("running large_unary");
var request = new SimpleRequest
{
ResponseSize = 314159,
Payload = CreateZerosPayload(271828)
};
var response = client.UnaryCall(request);
Assert.AreEqual(314159, response.Payload.Body.Length);
public static async Task RunClientStreamingAsync(TestService.TestServiceClient client)
Console.WriteLine("running client_streaming");
var bodySizes = new List<int> { 27182, 8, 1828, 45904 }.Select((size) => new StreamingInputCallRequest { Payload = CreateZerosPayload(size) });
using (var call = client.StreamingInputCall())
{
await call.RequestStream.WriteAllAsync(bodySizes);
var response = await call.ResponseAsync;
Assert.AreEqual(74922, response.AggregatedPayloadSize);
}
Console.WriteLine("Passed!");
public static async Task RunServerStreamingAsync(TestService.TestServiceClient client)
Console.WriteLine("running server_streaming");
var bodySizes = new List<int> { 31415, 9, 2653, 58979 };
var request = new StreamingOutputCallRequest
{
ResponseParameters = { bodySizes.Select((size) => new ResponseParameters { Size = size }) }
using (var call = client.StreamingOutputCall(request))
{
var responseList = await call.ResponseStream.ToListAsync();
CollectionAssert.AreEqual(bodySizes, responseList.Select((item) => item.Payload.Body.Length));
}
Console.WriteLine("Passed!");
public static async Task RunPingPongAsync(TestService.TestServiceClient client)
Console.WriteLine("running ping_pong");
using (var call = client.FullDuplexCall())
{
await call.RequestStream.WriteAsync(new StreamingOutputCallRequest
{
ResponseParameters = { new ResponseParameters { Size = 31415 } },
Payload = CreateZerosPayload(27182)
});
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.WriteAsync(new StreamingOutputCallRequest
{
ResponseParameters = { new ResponseParameters { Size = 9 } },
Payload = CreateZerosPayload(8)
});
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(9, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.WriteAsync(new StreamingOutputCallRequest
{
ResponseParameters = { new ResponseParameters { Size = 2653 } },
Payload = CreateZerosPayload(1828)
});
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(2653, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.WriteAsync(new StreamingOutputCallRequest
{
ResponseParameters = { new ResponseParameters { Size = 58979 } },
Payload = CreateZerosPayload(45904)
});
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(58979, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.CompleteAsync();
Assert.IsFalse(await call.ResponseStream.MoveNext());
}
Console.WriteLine("Passed!");
public static async Task RunEmptyStreamAsync(TestService.TestServiceClient client)
Console.WriteLine("running empty_stream");
using (var call = client.FullDuplexCall())
await call.RequestStream.CompleteAsync();
var responseList = await call.ResponseStream.ToListAsync();
Assert.AreEqual(0, responseList.Count);
}
Console.WriteLine("Passed!");
public static void RunComputeEngineCreds(TestService.TestServiceClient client, string defaultServiceAccount, string oauthScope)
{
Console.WriteLine("running compute_engine_creds");
var request = new SimpleRequest
{
ResponseSize = 314159,
Payload = CreateZerosPayload(271828),
FillUsername = true,
FillOauthScope = true
};
// not setting credentials here because they were set on channel already
var response = client.UnaryCall(request);
Assert.AreEqual(314159, response.Payload.Body.Length);
Assert.False(string.IsNullOrEmpty(response.OauthScope));
Assert.True(oauthScope.Contains(response.OauthScope));
Assert.AreEqual(defaultServiceAccount, response.Username);
Console.WriteLine("Passed!");
}
public static void RunJwtTokenCreds(TestService.TestServiceClient client)
{
Console.WriteLine("running jwt_token_creds");
Jan Tattermusch
committed
var request = new SimpleRequest
{
ResponseSize = 314159,
Payload = CreateZerosPayload(271828),
FillUsername = true,
};
// not setting credentials here because they were set on channel already
var response = client.UnaryCall(request);
Assert.AreEqual(314159, response.Payload.Body.Length);
Assert.AreEqual(GetEmailFromServiceAccountFile(), response.Username);
Console.WriteLine("Passed!");
}
public static async Task RunOAuth2AuthTokenAsync(TestService.TestServiceClient client, string oauthScope)
#if !NETCOREAPP1_0
Console.WriteLine("running oauth2_auth_token");
ITokenAccess credential = (await GoogleCredential.GetApplicationDefaultAsync()).CreateScoped(new[] { oauthScope });
string oauth2Token = await credential.GetAccessTokenForRequestAsync();
var credentials = GoogleGrpcCredentials.FromAccessToken(oauth2Token);
var request = new SimpleRequest
{
FillUsername = true,
FillOauthScope = true
};
var response = client.UnaryCall(request, new CallOptions(credentials: credentials));
Assert.False(string.IsNullOrEmpty(response.OauthScope));
Assert.True(oauthScope.Contains(response.OauthScope));
Assert.AreEqual(GetEmailFromServiceAccountFile(), response.Username);
Console.WriteLine("Passed!");
#else
// TODO(jtattermusch): implement this
throw new NotImplementedException("Not supported on CoreCLR yet");
#endif
public static async Task RunPerRpcCredsAsync(TestService.TestServiceClient client, string oauthScope)
#if !NETCOREAPP1_0
Console.WriteLine("running per_rpc_creds");
ITokenAccess googleCredential = await GoogleCredential.GetApplicationDefaultAsync();
var credentials = googleCredential.ToCallCredentials();
var request = new SimpleRequest
{
FillUsername = true,
};
var response = client.UnaryCall(request, new CallOptions(credentials: credentials));
Assert.AreEqual(GetEmailFromServiceAccountFile(), response.Username);
Console.WriteLine("Passed!");
#else
// TODO(jtattermusch): implement this
throw new NotImplementedException("Not supported on CoreCLR yet");
#endif
public static async Task RunCancelAfterBeginAsync(TestService.TestServiceClient client)
Console.WriteLine("running cancel_after_begin");
var cts = new CancellationTokenSource();
using (var call = client.StreamingInputCall(cancellationToken: cts.Token))
// TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
await Task.Delay(1000);
cts.Cancel();
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync);
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
Console.WriteLine("Passed!");
public static async Task RunCancelAfterFirstResponseAsync(TestService.TestServiceClient client)
Console.WriteLine("running cancel_after_first_response");
var cts = new CancellationTokenSource();
using (var call = client.FullDuplexCall(cancellationToken: cts.Token))
{
await call.RequestStream.WriteAsync(new StreamingOutputCallRequest
{
ResponseParameters = { new ResponseParameters { Size = 31415 } },
Payload = CreateZerosPayload(27182)
});
Assert.IsTrue(await call.ResponseStream.MoveNext());
Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);
try
{
// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await call.ResponseStream.MoveNext();
Assert.Fail();
}
catch (RpcException ex)
{
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
}
Console.WriteLine("Passed!");
public static async Task RunTimeoutOnSleepingServerAsync(TestService.TestServiceClient client)
{
Console.WriteLine("running timeout_on_sleeping_server");
var deadline = DateTime.UtcNow.AddMilliseconds(1);
using (var call = client.FullDuplexCall(deadline: deadline))
{
try
{
Jan Tattermusch
committed
await call.RequestStream.WriteAsync(new StreamingOutputCallRequest { Payload = CreateZerosPayload(27182) });
}
catch (InvalidOperationException)
{
// Deadline was reached before write has started. Eat the exception and continue.
}
catch (RpcException)
{
// Deadline was reached before write has started. Eat the exception and continue.
}
try
{
await call.ResponseStream.MoveNext();
Assert.Fail();
}
catch (RpcException ex)
{
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
}
Console.WriteLine("Passed!");
}
public static async Task RunCustomMetadataAsync(TestService.TestServiceClient client)
Jan Tattermusch
committed
{
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
Console.WriteLine("running custom_metadata");
{
// step 1: test unary call
var request = new SimpleRequest
{
ResponseSize = 314159,
Payload = CreateZerosPayload(271828)
};
var call = client.UnaryCallAsync(request, headers: CreateTestMetadata());
await call.ResponseAsync;
var responseHeaders = await call.ResponseHeadersAsync;
var responseTrailers = call.GetTrailers();
Assert.AreEqual("test_initial_metadata_value", responseHeaders.First((entry) => entry.Key == "x-grpc-test-echo-initial").Value);
CollectionAssert.AreEqual(new byte[] { 0xab, 0xab, 0xab }, responseTrailers.First((entry) => entry.Key == "x-grpc-test-echo-trailing-bin").ValueBytes);
}
{
// step 2: test full duplex call
var request = new StreamingOutputCallRequest
{
ResponseParameters = { new ResponseParameters { Size = 31415 } },
Payload = CreateZerosPayload(27182)
};
var call = client.FullDuplexCall(headers: CreateTestMetadata());
var responseHeaders = await call.ResponseHeadersAsync;
await call.RequestStream.WriteAsync(request);
await call.RequestStream.CompleteAsync();
await call.ResponseStream.ToListAsync();
var responseTrailers = call.GetTrailers();
Assert.AreEqual("test_initial_metadata_value", responseHeaders.First((entry) => entry.Key == "x-grpc-test-echo-initial").Value);
CollectionAssert.AreEqual(new byte[] { 0xab, 0xab, 0xab }, responseTrailers.First((entry) => entry.Key == "x-grpc-test-echo-trailing-bin").ValueBytes);
}
Console.WriteLine("Passed!");
}
public static async Task RunStatusCodeAndMessageAsync(TestService.TestServiceClient client)
{
Console.WriteLine("running status_code_and_message");
var echoStatus = new EchoStatus
{
Code = 2,
Message = "test status message"
};
{
// step 1: test unary call
var request = new SimpleRequest { ResponseStatus = echoStatus };
var e = Assert.Throws<RpcException>(() => client.UnaryCall(request));
Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
Assert.AreEqual(echoStatus.Message, e.Status.Detail);
}
{
// step 2: test full duplex call
var request = new StreamingOutputCallRequest { ResponseStatus = echoStatus };
var call = client.FullDuplexCall();
await call.RequestStream.WriteAsync(request);
await call.RequestStream.CompleteAsync();
try
{
// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await call.ResponseStream.ToListAsync();
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
Assert.AreEqual(echoStatus.Message, e.Status.Detail);
}
}
Console.WriteLine("Passed!");
Jan Tattermusch
committed
}
public static void RunUnimplementedMethod(UnimplementedService.UnimplementedServiceClient client)
{
Console.WriteLine("running unimplemented_method");
var e = Assert.Throws<RpcException>(() => client.UnimplementedCall(new Empty()));
Assert.AreEqual(StatusCode.Unimplemented, e.Status.StatusCode);
Assert.AreEqual("", e.Status.Detail);
Console.WriteLine("Passed!");
}
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
public static void RunClientCompressedUnary(TestService.TestServiceClient client)
{
Console.WriteLine("running client_compressed_unary");
var probeRequest = new SimpleRequest
{
ExpectCompressed = new BoolValue
{
Value = true // lie about compression
},
ResponseSize = 314159,
Payload = CreateZerosPayload(271828)
};
var e = Assert.Throws<RpcException>(() => client.UnaryCall(probeRequest, CreateClientCompressionMetadata(false)));
Assert.AreEqual(StatusCode.InvalidArgument, e.Status.StatusCode);
var compressedRequest = new SimpleRequest
{
ExpectCompressed = new BoolValue
{
Value = true
},
ResponseSize = 314159,
Payload = CreateZerosPayload(271828)
};
var response1 = client.UnaryCall(compressedRequest, CreateClientCompressionMetadata(true));
Assert.AreEqual(314159, response1.Payload.Body.Length);
var uncompressedRequest = new SimpleRequest
{
ExpectCompressed = new BoolValue
{
Value = false
},
ResponseSize = 314159,
Payload = CreateZerosPayload(271828)
};
var response2 = client.UnaryCall(uncompressedRequest, CreateClientCompressionMetadata(false));
Assert.AreEqual(314159, response2.Payload.Body.Length);
Console.WriteLine("Passed!");
}
public static async Task RunClientCompressedStreamingAsync(TestService.TestServiceClient client)
{
Console.WriteLine("running client_compressed_streaming");
try
{
var probeCall = client.StreamingInputCall(CreateClientCompressionMetadata(false));
await probeCall.RequestStream.WriteAsync(new StreamingInputCallRequest
{
ExpectCompressed = new BoolValue
{
Value = true
},
Payload = CreateZerosPayload(27182)
});
// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await probeCall;
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.InvalidArgument, e.Status.StatusCode);
}
var call = client.StreamingInputCall(CreateClientCompressionMetadata(true));
await call.RequestStream.WriteAsync(new StreamingInputCallRequest
{
ExpectCompressed = new BoolValue
{
Value = true
},
Payload = CreateZerosPayload(27182)
});
call.RequestStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
await call.RequestStream.WriteAsync(new StreamingInputCallRequest
{
ExpectCompressed = new BoolValue
{
Value = false
},
Payload = CreateZerosPayload(45904)
});
await call.RequestStream.CompleteAsync();
var response = await call.ResponseAsync;
Assert.AreEqual(73086, response.AggregatedPayloadSize);
Console.WriteLine("Passed!");
}
private static Payload CreateZerosPayload(int size)
{
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
private static Metadata CreateClientCompressionMetadata(bool compressed)
{
var algorithmName = compressed ? "gzip" : "identity";
return new Metadata
{
{ new Metadata.Entry(Metadata.CompressionRequestAlgorithmMetadataKey, algorithmName) }
// extracts the client_email field from service account file used for auth test cases
private static string GetEmailFromServiceAccountFile()
{
#if !NETCOREAPP1_0
string keyFile = Environment.GetEnvironmentVariable("GOOGLE_APPLICATION_CREDENTIALS");
Assert.IsNotNull(keyFile);
var jobject = JObject.Parse(File.ReadAllText(keyFile));
string email = jobject.GetValue("client_email").Value<string>();
Assert.IsTrue(email.Length > 0); // spec requires nonempty client email.
return email;
#else
// TODO(jtattermusch): implement this
throw new NotImplementedException("Not supported on CoreCLR yet");
#endif
private static Metadata CreateTestMetadata()
{
return new Metadata
{
{"x-grpc-test-echo-initial", "test_initial_metadata_value"},
{"x-grpc-test-echo-trailing-bin", new byte[] {0xab, 0xab, 0xab}}
};
}