diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index eac8d16fb151d01a1096e77def16876025586296..62cb4432725bd3ce4ace5cd2e57d37cc07d24249 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -34,6 +34,9 @@ <HintPath>..\packages\NUnit.2.6.4\lib\nunit.framework.dll</HintPath> </Reference> <Reference Include="System" /> + <Reference Include="System.Interactive.Async"> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="Properties\AssemblyInfo.cs" /> @@ -57,7 +60,5 @@ <ItemGroup> <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" /> </ItemGroup> - <ItemGroup> - <Folder Include="Internal\" /> - </ItemGroup> + <ItemGroup /> </Project> \ No newline at end of file diff --git a/src/csharp/Grpc.Core.Tests/packages.config b/src/csharp/Grpc.Core.Tests/packages.config index c714ef3a23ec95ef83e4143701d37470a0363797..28af8d78c6c724d7c7e3dfe6f4eb1ad1a9f4f38f 100644 --- a/src/csharp/Grpc.Core.Tests/packages.config +++ b/src/csharp/Grpc.Core.Tests/packages.config @@ -1,4 +1,5 @@ <?xml version="1.0" encoding="utf-8"?> <packages> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="NUnit" version="2.6.4" targetFramework="net45" /> </packages> \ No newline at end of file diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index b95776f66d46bc143e54602d1f8452303be85b94..8cdc1c895bfd63087a09b61025e0161bc8eb6575 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -41,8 +41,6 @@ namespace Grpc.Core /// Return type for client streaming calls. /// </summary> public sealed class AsyncClientStreamingCall<TRequest, TResponse> - where TRequest : class - where TResponse : class { readonly IClientStreamWriter<TRequest> requestStream; readonly Task<TResponse> result; @@ -53,22 +51,6 @@ namespace Grpc.Core this.result = result; } - /// <summary> - /// Writes a request to RequestStream. - /// </summary> - public Task Write(TRequest message) - { - return requestStream.Write(message); - } - - /// <summary> - /// Closes the RequestStream. - /// </summary> - public Task Close() - { - return requestStream.Close(); - } - /// <summary> /// Asynchronous call result. /// </summary> diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index ee0543741673b59a53b369be3f84da9f7e99c817..0d13a3d052320cf46bd6f7a59d8bc1ffedf4466e 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -41,8 +41,6 @@ namespace Grpc.Core /// Return type for bidirectional streaming calls. /// </summary> public sealed class AsyncDuplexStreamingCall<TRequest, TResponse> - where TRequest : class - where TResponse : class { readonly IClientStreamWriter<TRequest> requestStream; readonly IAsyncStreamReader<TResponse> responseStream; @@ -53,31 +51,6 @@ namespace Grpc.Core this.responseStream = responseStream; } - /// <summary> - /// Writes a request to RequestStream. - /// </summary> - public Task Write(TRequest message) - { - return requestStream.Write(message); - } - - /// <summary> - /// Closes the RequestStream. - /// </summary> - public Task Close() - { - return requestStream.Close(); - } - - /// <summary> - /// Reads a response from ResponseStream. - /// </summary> - /// <returns></returns> - public Task<TResponse> ReadNext() - { - return responseStream.ReadNext(); - } - /// <summary> /// Async stream to read streaming responses. /// </summary> diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index 73b96149850ff4e0e3203893e95ae56b71e29f73..6a258d132cff34199a8c9637e2cdabe0a926a360 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -41,7 +41,6 @@ namespace Grpc.Core /// Return type for server streaming calls. /// </summary> public sealed class AsyncServerStreamingCall<TResponse> - where TResponse : class { readonly IAsyncStreamReader<TResponse> responseStream; @@ -50,15 +49,6 @@ namespace Grpc.Core this.responseStream = responseStream; } - /// <summary> - /// Reads the next response from ResponseStream - /// </summary> - /// <returns></returns> - public Task<TResponse> ReadNext() - { - return responseStream.ReadNext(); - } - /// <summary> /// Async stream to read streaming responses. /// </summary> diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs index d1ee59ff0a0fe73bac64dd715a40c2dac185b86c..37b452f020d390acd31a0deacc468be13688a562 100644 --- a/src/csharp/Grpc.Core/Call.cs +++ b/src/csharp/Grpc.Core/Call.cs @@ -41,8 +41,6 @@ namespace Grpc.Core /// Abstraction of a call to be invoked on a client. /// </summary> public class Call<TRequest, TResponse> - where TRequest : class - where TResponse : class { readonly string name; readonly Marshaller<TRequest> requestMarshaller; diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index f5f2cf5f220e8f7dc44c38b3209832d456f2cb43..6b4345cbe13c4479546c2ef957b931feae0a8747 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -37,6 +37,9 @@ <Reference Include="System.Collections.Immutable"> <HintPath>..\packages\Microsoft.Bcl.Immutable.1.0.34\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> </Reference> + <Reference Include="System.Interactive.Async"> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="AsyncDuplexStreamingCall.cs" /> diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec index e54908cb8bacf30e3a202c73cf1d918823f3c97d..5269881afa7964eb12c0b8daa533fb58c093f88a 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.nuspec +++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec @@ -16,6 +16,7 @@ <tags>gRPC RPC Protocol HTTP/2</tags> <dependencies> <dependency id="Microsoft.Bcl.Immutable" version="1.0.34" /> + <dependency id="Ix-Async" version="1.2.3" /> <dependency id="grpc.native.csharp_ext" version="0.8.0.0" /> </dependencies> </metadata> diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs index 699741cd054420c48915615af10aaf629de060e9..95b674c0188831cfd9ccbbe0add1dff193255cb6 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs @@ -43,13 +43,7 @@ namespace Grpc.Core /// A stream of messages to be read. /// </summary> /// <typeparam name="T"></typeparam> - public interface IAsyncStreamReader<T> - where T : class + public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse> { - /// <summary> - /// Reads a single message. Returns null if the last message was already read. - /// A following read can only be started when the previous one finishes. - /// </summary> - Task<T> ReadNext(); } } diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs index 4bd8bfb8df2ed829bf7743ff01359f95318e702a..644f445401f71ce5789c9367363031927b3c0420 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs @@ -44,10 +44,9 @@ namespace Grpc.Core /// </summary> /// <typeparam name="T"></typeparam> public interface IAsyncStreamWriter<T> - where T : class { /// <summary> - /// Writes a single message. Only one write can be pending at a time. + /// Writes a single asynchronously. Only one write can be pending at a time. /// </summary> /// <param name="message">the message to be written. Cannot be null.</param> Task Write(T message); diff --git a/src/csharp/Grpc.Core/IClientStreamWriter.cs b/src/csharp/Grpc.Core/IClientStreamWriter.cs index 0847a928e6c185e2a6734eff5f8664d5ed69007d..cc76d1369da50ce649f88d0fc33643ccd604f4b2 100644 --- a/src/csharp/Grpc.Core/IClientStreamWriter.cs +++ b/src/csharp/Grpc.Core/IClientStreamWriter.cs @@ -44,11 +44,10 @@ namespace Grpc.Core /// </summary> /// <typeparam name="T"></typeparam> public interface IClientStreamWriter<T> : IAsyncStreamWriter<T> - where T : class { /// <summary> - /// Closes the stream. Can only be called once there is no pending write. No writes should follow calling this. + /// Completes/closes the stream. Can only be called once there is no pending write. No writes should follow calling this. /// </summary> - Task Close(); + Task Complete(); } } diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index 16970587328540e59d8df70c6f624fdffee4cd0c..b9fc10cd169a16e1303cd8763f818f3ff3023459 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -38,8 +38,6 @@ namespace Grpc.Core.Internal /// Writes requests asynchronously to an underlying AsyncCall object. /// </summary> internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest> - where TRequest : class - where TResponse : class { readonly AsyncCall<TRequest, TResponse> call; @@ -55,7 +53,7 @@ namespace Grpc.Core.Internal return taskSource.Task; } - public Task Close() + public Task Complete() { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendCloseFromClient(taskSource.CompletionDelegate); diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index b2378cade66d14a9413cc667868b068449876190..6c445210381532135cebecc7d3c1792b3793a828 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core.Internal @@ -42,17 +43,41 @@ namespace Grpc.Core.Internal where TResponse : class { readonly AsyncCall<TRequest, TResponse> call; + TResponse current; public ClientResponseStream(AsyncCall<TRequest, TResponse> call) { this.call = call; } - public Task<TResponse> ReadNext() + public TResponse Current { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task<bool> MoveNext(CancellationToken token) + { + if (token != CancellationToken.None) + { + throw new InvalidOperationException("Cancellation of individual reads is not supported."); + } var taskSource = new AsyncCompletionTaskSource<TResponse>(); call.StartReadMessage(taskSource.CompletionDelegate); - return taskSource.Task; + var result = await taskSource.Task; + this.current = result; + return result != null; + } + + public void Dispose() + { + // TODO(jtattermusch): implement the semantics of stream disposal. } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 95d8e97869276aefd56d3243fb07819fd424cf07..20ac46c234e99ab90ce8b5bf5d4ea10ee1c6cbdb 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -71,9 +72,10 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var request = await requestStream.ReadNext(); + Preconditions.CheckArgument(await requestStream.MoveNext()); + var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - Preconditions.CheckArgument(await requestStream.ReadNext() == null); + Preconditions.CheckArgument(!await requestStream.MoveNext()); var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context var result = await handler(context, request); await responseStream.Write(result); @@ -122,9 +124,10 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var request = await requestStream.ReadNext(); + Preconditions.CheckArgument(await requestStream.MoveNext()); + var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - Preconditions.CheckArgument(await requestStream.ReadNext() == null); + Preconditions.CheckArgument(!await requestStream.MoveNext()); var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context await handler(context, request, responseStream); diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index d9ee0c815b2570af7ffb4a4d9834697506616b70..3fccb88abba2d7580a5ee0ccfbbec2397767380b 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core.Internal @@ -42,17 +43,41 @@ namespace Grpc.Core.Internal where TResponse : class { readonly AsyncCallServer<TRequest, TResponse> call; + TRequest current; public ServerRequestStream(AsyncCallServer<TRequest, TResponse> call) { this.call = call; } - public Task<TRequest> ReadNext() + public TRequest Current { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task<bool> MoveNext(CancellationToken token) + { + if (token != CancellationToken.None) + { + throw new InvalidOperationException("Cancellation of individual reads is not supported."); + } var taskSource = new AsyncCompletionTaskSource<TRequest>(); call.StartReadMessage(taskSource.CompletionDelegate); - return taskSource.Task; + var result = await taskSource.Task; + this.current = result; + return result != null; + } + + public void Dispose() + { + // TODO(jtattermusch): implement the semantics of stream disposal. } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 731ea2be81b66e19b6933ecafef6f1242b9a2722..7a1c016ae206cdf153926dc7b8815a1e30662dd3 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -39,9 +39,6 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { - // TODO: we need to make sure that the delegates are not collected before invoked. - //internal delegate void ServerShutdownCallbackDelegate(bool success); - /// <summary> /// grpc_server from grpc/grpc.h /// </summary> diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index e873b3e88a74d0036737f7830e0726ac29a4b60c..bc9a499c5187f1fd49b1103a875c0502c007c8de 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -42,7 +42,6 @@ namespace Grpc.Core /// </summary> public sealed class ServerCallContext { - // TODO(jtattermusch): add cancellationToken // TODO(jtattermusch): add deadline info diff --git a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs index f915155f8a4094ee8c0b6531017a3df7f16b49e7..a4f8989b3057db4a3c2fa2280d0d61b477a735df 100644 --- a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs +++ b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs @@ -49,14 +49,9 @@ namespace Grpc.Core.Utils public static async Task ForEach<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction) where T : class { - while (true) + while (await streamReader.MoveNext()) { - var elem = await streamReader.ReadNext(); - if (elem == null) - { - break; - } - await asyncAction(elem); + await asyncAction(streamReader.Current); } } @@ -67,32 +62,27 @@ namespace Grpc.Core.Utils where T : class { var result = new List<T>(); - while (true) + while (await streamReader.MoveNext()) { - var elem = await streamReader.ReadNext(); - if (elem == null) - { - break; - } - result.Add(elem); + result.Add(streamReader.Current); } return result; } /// <summary> /// Writes all elements from given enumerable to the stream. - /// Closes the stream afterwards unless close = false. + /// Completes the stream afterwards unless close = false. /// </summary> - public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool close = true) + public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true) where T : class { foreach (var element in elements) { await streamWriter.Write(element); } - if (close) + if (complete) { - await streamWriter.Close(); + await streamWriter.Complete(); } } diff --git a/src/csharp/Grpc.Core/packages.config b/src/csharp/Grpc.Core/packages.config index 71967de56e5d9b42ebe2c2911e5b21e9d5fd7ccc..fb7eaaeeda1851d636c8a1e6db1e33fa32c1d918 100644 --- a/src/csharp/Grpc.Core/packages.config +++ b/src/csharp/Grpc.Core/packages.config @@ -2,5 +2,6 @@ <packages> <package id="grpc.dependencies.openssl.redist" version="1.0.2.2" targetFramework="net45" /> <package id="grpc.dependencies.zlib.redist" version="1.2.8.9" targetFramework="net45" /> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="Microsoft.Bcl.Immutable" version="1.0.34" targetFramework="net45" /> </packages> \ No newline at end of file diff --git a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj index 87ccf07dd8bf3c475c2f19a6a4b717d96d7639ae..6e84add42baff0160a6987b2c1b909eb6c781c65 100644 --- a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj +++ b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj @@ -37,6 +37,10 @@ <Reference Include="Google.ProtocolBuffers"> <HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath> </Reference> + <Reference Include="System.Interactive.Async, Version=1.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> + <SpecificVersion>False</SpecificVersion> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="Properties\AssemblyInfo.cs" /> diff --git a/src/csharp/Grpc.Examples.Tests/packages.config b/src/csharp/Grpc.Examples.Tests/packages.config index 4d6ec63b3cddfe197c0c27e21b307e5d687b1473..cc6e9af40f6dec846a30bcb9db14004d455465c4 100644 --- a/src/csharp/Grpc.Examples.Tests/packages.config +++ b/src/csharp/Grpc.Examples.Tests/packages.config @@ -1,5 +1,6 @@ <?xml version="1.0" encoding="utf-8"?> <packages> <package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" /> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="NUnit" version="2.6.4" targetFramework="net45" /> </packages> \ No newline at end of file diff --git a/src/csharp/Grpc.Examples/Grpc.Examples.csproj b/src/csharp/Grpc.Examples/Grpc.Examples.csproj index 2c5019c214b44785193e4dd16da8ededd1ee01a7..5ce490f40340124bca527c1d18e99da249191ffc 100644 --- a/src/csharp/Grpc.Examples/Grpc.Examples.csproj +++ b/src/csharp/Grpc.Examples/Grpc.Examples.csproj @@ -35,6 +35,9 @@ <Reference Include="Google.ProtocolBuffers"> <HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath> </Reference> + <Reference Include="System.Interactive.Async"> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="Properties\AssemblyInfo.cs" /> diff --git a/src/csharp/Grpc.Examples/packages.config b/src/csharp/Grpc.Examples/packages.config index 51c17bcd5e74685ab31264eae00dc062b5b9083d..4c8d60fa62ae538d8d58b1e205bdac9105a107f6 100644 --- a/src/csharp/Grpc.Examples/packages.config +++ b/src/csharp/Grpc.Examples/packages.config @@ -1,5 +1,6 @@ <?xml version="1.0" encoding="utf-8"?> <packages> <package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" /> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="NUnit" version="2.6.4" targetFramework="net45" /> </packages> \ No newline at end of file diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 1ca3dd24e1cdfdc86e21867363093621b9660fe8..b3a0a2917be97c8d8c6f38823ea4bdc1898aa507 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -54,6 +54,9 @@ <Reference Include="Google.ProtocolBuffers"> <HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath> </Reference> + <Reference Include="System.Interactive.Async"> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> <Reference Include="System.Net" /> <Reference Include="System.Net.Http" /> <Reference Include="System.Net.Http.Extensions"> diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 02f8a369defa37d64f0b39288fe5ee1749f515bb..d9076996981f9dc6a163eb121c0286896c6ca90a 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -256,48 +256,45 @@ namespace Grpc.IntegrationTesting var call = client.FullDuplexCall(); - StreamingOutputCallResponse response; - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() .SetResponseType(PayloadType.COMPRESSABLE) .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) .SetPayload(CreateZerosPayload(27182)).Build()); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(31415, response.Payload.Body.Length); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length); await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() .SetResponseType(PayloadType.COMPRESSABLE) .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9)) .SetPayload(CreateZerosPayload(8)).Build()); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(9, response.Payload.Body.Length); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(9, call.ResponseStream.Current.Payload.Body.Length); await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() .SetResponseType(PayloadType.COMPRESSABLE) .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653)) .SetPayload(CreateZerosPayload(1828)).Build()); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(2653, response.Payload.Body.Length); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(2653, call.ResponseStream.Current.Payload.Body.Length); await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() .SetResponseType(PayloadType.COMPRESSABLE) .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979)) .SetPayload(CreateZerosPayload(45904)).Build()); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(58979, response.Payload.Body.Length); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(58979, call.ResponseStream.Current.Payload.Body.Length); - await call.RequestStream.Close(); + await call.RequestStream.Complete(); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(null, response); + Assert.IsFalse(await call.ResponseStream.MoveNext()); Console.WriteLine("Passed!"); }).Wait(); @@ -309,7 +306,7 @@ namespace Grpc.IntegrationTesting { Console.WriteLine("running empty_stream"); var call = client.FullDuplexCall(); - await call.Close(); + await call.RequestStream.Complete(); var responseList = await call.ResponseStream.ToList(); Assert.AreEqual(0, responseList.Count); @@ -392,22 +389,20 @@ namespace Grpc.IntegrationTesting var cts = new CancellationTokenSource(); var call = client.FullDuplexCall(cts.Token); - StreamingOutputCallResponse response; - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() .SetResponseType(PayloadType.COMPRESSABLE) .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) .SetPayload(CreateZerosPayload(27182)).Build()); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(31415, response.Payload.Body.Length); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length); cts.Cancel(); try { - response = await call.ResponseStream.ReadNext(); + await call.ResponseStream.MoveNext(); Assert.Fail(); } catch (RpcException e) diff --git a/src/csharp/Grpc.IntegrationTesting/packages.config b/src/csharp/Grpc.IntegrationTesting/packages.config index e33b6e3e464119fedd1e83ebc41ca19958cb8a00..291b7b8599ae96269e14e6b00aee79e359b12892 100644 --- a/src/csharp/Grpc.IntegrationTesting/packages.config +++ b/src/csharp/Grpc.IntegrationTesting/packages.config @@ -3,6 +3,7 @@ <package id="Google.Apis.Auth" version="1.9.1" targetFramework="net45" /> <package id="Google.Apis.Core" version="1.9.1" targetFramework="net45" /> <package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" /> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="Microsoft.Bcl" version="1.1.9" targetFramework="net45" /> <package id="Microsoft.Bcl.Async" version="1.0.168" targetFramework="net45" /> <package id="Microsoft.Bcl.Build" version="1.0.14" targetFramework="net45" />