Skip to content
Snippets Groups Projects
Commit 09eaec02 authored by Tim Emiola's avatar Tim Emiola
Browse files

Merge pull request #1053 from jtattermusch/csharp_metadata

Added support for sending metadata from clients.
parents 4708fbb3 c0b3721d
No related branches found
No related tags found
No related merge requests found
Showing
with 694 additions and 154 deletions
...@@ -46,6 +46,8 @@ namespace Grpc.Core.Tests ...@@ -46,6 +46,8 @@ namespace Grpc.Core.Tests
{ {
string host = "localhost"; string host = "localhost";
string serviceName = "/tests.Test";
Method<string, string> unaryEchoStringMethod = new Method<string, string>( Method<string, string> unaryEchoStringMethod = new Method<string, string>(
MethodType.Unary, MethodType.Unary,
"/tests.Test/UnaryEchoString", "/tests.Test/UnaryEchoString",
...@@ -69,7 +71,7 @@ namespace Grpc.Core.Tests ...@@ -69,7 +71,7 @@ namespace Grpc.Core.Tests
{ {
Server server = new Server(); Server server = new Server();
server.AddServiceDefinition( server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService") ServerServiceDefinition.CreateBuilder(serviceName)
.AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build()); .AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build());
int port = server.AddPort(host + ":0"); int port = server.AddPort(host + ":0");
...@@ -77,7 +79,7 @@ namespace Grpc.Core.Tests ...@@ -77,7 +79,7 @@ namespace Grpc.Core.Tests
using (Channel channel = new Channel(host + ":" + port)) using (Channel channel = new Channel(host + ":" + port))
{ {
var call = new Call<string, string>(unaryEchoStringMethod, channel); var call = new Call<string, string>(serviceName, unaryEchoStringMethod, channel, Metadata.Empty);
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken))); Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)));
...@@ -92,7 +94,7 @@ namespace Grpc.Core.Tests ...@@ -92,7 +94,7 @@ namespace Grpc.Core.Tests
{ {
Server server = new Server(); Server server = new Server();
server.AddServiceDefinition( server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService") ServerServiceDefinition.CreateBuilder(serviceName)
.AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build()); .AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build());
int port = server.AddPort(host + ":0"); int port = server.AddPort(host + ":0");
...@@ -100,7 +102,7 @@ namespace Grpc.Core.Tests ...@@ -100,7 +102,7 @@ namespace Grpc.Core.Tests
using (Channel channel = new Channel(host + ":" + port)) using (Channel channel = new Channel(host + ":" + port))
{ {
var call = new Call<string, string>(unaryEchoStringMethod, channel); var call = new Call<string, string>(serviceName, unaryEchoStringMethod, channel, Metadata.Empty);
BenchmarkUtil.RunBenchmark(100, 1000, BenchmarkUtil.RunBenchmark(100, 1000,
() => { Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); }); () => { Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); });
} }
...@@ -113,14 +115,14 @@ namespace Grpc.Core.Tests ...@@ -113,14 +115,14 @@ namespace Grpc.Core.Tests
{ {
Server server = new Server(); Server server = new Server();
server.AddServiceDefinition( server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService").Build()); ServerServiceDefinition.CreateBuilder(serviceName).Build());
int port = server.AddPort(host + ":0"); int port = server.AddPort(host + ":0");
server.Start(); server.Start();
using (Channel channel = new Channel(host + ":" + port)) using (Channel channel = new Channel(host + ":" + port))
{ {
var call = new Call<string, string>(unaryEchoStringMethod, channel); var call = new Call<string, string>(serviceName, unaryEchoStringMethod, channel, Metadata.Empty);
try try
{ {
......
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
<Compile Include="GrpcEnvironmentTest.cs" /> <Compile Include="GrpcEnvironmentTest.cs" />
<Compile Include="TimespecTest.cs" /> <Compile Include="TimespecTest.cs" />
<Compile Include="PInvokeTest.cs" /> <Compile Include="PInvokeTest.cs" />
<Compile Include="Internal\MetadataArraySafeHandleTest.cs" />
</ItemGroup> </ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup> <ItemGroup>
...@@ -56,4 +57,7 @@ ...@@ -56,4 +57,7 @@
<ItemGroup> <ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" /> <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Folder Include="Internal\" />
</ItemGroup>
</Project> </Project>
\ No newline at end of file
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class MetadataArraySafeHandleTest
{
[Test]
public void CreateEmptyAndDestroy()
{
var metadata = Metadata.CreateBuilder().Build();
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
nativeMetadata.Dispose();
}
[Test]
public void CreateAndDestroy()
{
var metadata = Metadata.CreateBuilder()
.Add(new Metadata.MetadataEntry("host", "somehost"))
.Add(new Metadata.MetadataEntry("header2", "header value")).Build();
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
nativeMetadata.Dispose();
}
}
}
...@@ -33,65 +33,70 @@ ...@@ -33,65 +33,70 @@
using System; using System;
using Grpc.Core.Internal; using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core namespace Grpc.Core
{ {
public class Call<TRequest, TResponse> public class Call<TRequest, TResponse>
{ {
readonly string methodName; readonly string name;
readonly Func<TRequest, byte[]> requestSerializer; readonly Marshaller<TRequest> requestMarshaller;
readonly Func<byte[], TResponse> responseDeserializer; readonly Marshaller<TResponse> responseMarshaller;
readonly Channel channel; readonly Channel channel;
readonly Metadata headers;
public Call(string methodName, public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers)
Func<TRequest, byte[]> requestSerializer,
Func<byte[], TResponse> responseDeserializer,
TimeSpan timeout,
Channel channel)
{ {
this.methodName = methodName; this.name = Preconditions.CheckNotNull(serviceName) + "/" + method.Name;
this.requestSerializer = requestSerializer; this.requestMarshaller = method.RequestMarshaller;
this.responseDeserializer = responseDeserializer; this.responseMarshaller = method.ResponseMarshaller;
this.channel = channel; this.channel = Preconditions.CheckNotNull(channel);
this.headers = Preconditions.CheckNotNull(headers);
} }
public Call(Method<TRequest, TResponse> method, Channel channel) public Channel Channel
{ {
this.methodName = method.Name; get
this.requestSerializer = method.RequestMarshaller.Serializer; {
this.responseDeserializer = method.ResponseMarshaller.Deserializer; return this.channel;
this.channel = channel; }
} }
public Channel Channel /// <summary>
/// Full methods name including the service name.
/// </summary>
public string Name
{ {
get get
{ {
return this.channel; return name;
} }
} }
public string MethodName /// <summary>
/// Headers to send at the beginning of the call.
/// </summary>
public Metadata Headers
{ {
get get
{ {
return this.methodName; return headers;
} }
} }
public Func<TRequest, byte[]> RequestSerializer public Marshaller<TRequest> RequestMarshaller
{ {
get get
{ {
return this.requestSerializer; return requestMarshaller;
} }
} }
public Func<byte[], TResponse> ResponseDeserializer public Marshaller<TResponse> ResponseMarshaller
{ {
get get
{ {
return this.responseDeserializer; return responseMarshaller;
} }
} }
} }
......
...@@ -45,30 +45,29 @@ namespace Grpc.Core ...@@ -45,30 +45,29 @@ namespace Grpc.Core
{ {
public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
return asyncCall.UnaryCall(call.Channel, call.MethodName, req); return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Headers);
} }
public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
return await asyncCall.UnaryCallAsync(req); return await asyncCall.UnaryCallAsync(req, call.Headers);
} }
public static void AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token) public static void AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token)
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); asyncCall.StartServerStreamingCall(req, outputs, call.Headers);
asyncCall.StartServerStreamingCall(req, outputs);
} }
public static ClientStreamingAsyncResult<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) public static ClientStreamingAsyncResult<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
var task = asyncCall.ClientStreamingCallAsync(); var task = asyncCall.ClientStreamingCallAsync(call.Headers);
var inputs = new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall); var inputs = new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall);
return new ClientStreamingAsyncResult<TRequest, TResponse>(task, inputs); return new ClientStreamingAsyncResult<TRequest, TResponse>(task, inputs);
} }
...@@ -80,10 +79,9 @@ namespace Grpc.Core ...@@ -80,10 +79,9 @@ namespace Grpc.Core
public static IObserver<TRequest> DuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, IObserver<TResponse> outputs, CancellationToken token) public static IObserver<TRequest> DuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, IObserver<TResponse> outputs, CancellationToken token)
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
asyncCall.StartDuplexStreamingCall(outputs, call.Headers);
asyncCall.StartDuplexStreamingCall(outputs);
return new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall); return new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall);
} }
......
...@@ -79,6 +79,10 @@ ...@@ -79,6 +79,10 @@
<Compile Include="Utils\Preconditions.cs" /> <Compile Include="Utils\Preconditions.cs" />
<Compile Include="Internal\ServerCredentialsSafeHandle.cs" /> <Compile Include="Internal\ServerCredentialsSafeHandle.cs" />
<Compile Include="ServerCredentials.cs" /> <Compile Include="ServerCredentials.cs" />
<Compile Include="Metadata.cs" />
<Compile Include="Internal\MetadataArraySafeHandle.cs" />
<Compile Include="Stub\AbstractStub.cs" />
<Compile Include="Stub\StubConfiguration.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<None Include="packages.config" /> <None Include="packages.config" />
...@@ -96,4 +100,7 @@ ...@@ -96,4 +100,7 @@
<Otherwise /> <Otherwise />
</Choose> </Choose>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
<Folder Include="Stub\" />
</ItemGroup>
</Project> </Project>
\ No newline at end of file
...@@ -77,7 +77,7 @@ namespace Grpc.Core.Internal ...@@ -77,7 +77,7 @@ namespace Grpc.Core.Internal
/// <summary> /// <summary>
/// Blocking unary request - unary response call. /// Blocking unary request - unary response call.
/// </summary> /// </summary>
public TResponse UnaryCall(Channel channel, string methodName, TRequest msg) public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers)
{ {
using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create()) using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{ {
...@@ -92,7 +92,11 @@ namespace Grpc.Core.Internal ...@@ -92,7 +92,11 @@ namespace Grpc.Core.Internal
halfcloseRequested = true; halfcloseRequested = true;
readingDone = true; readingDone = true;
} }
call.BlockingUnary(cq, payload, unaryResponseHandler);
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.BlockingUnary(cq, payload, unaryResponseHandler, metadataArray);
}
try try
{ {
...@@ -109,7 +113,7 @@ namespace Grpc.Core.Internal ...@@ -109,7 +113,7 @@ namespace Grpc.Core.Internal
/// <summary> /// <summary>
/// Starts a unary request - unary response call. /// Starts a unary request - unary response call.
/// </summary> /// </summary>
public Task<TResponse> UnaryCallAsync(TRequest msg) public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers)
{ {
lock (myLock) lock (myLock)
{ {
...@@ -122,8 +126,10 @@ namespace Grpc.Core.Internal ...@@ -122,8 +126,10 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg); byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>(); unaryResponseTcs = new TaskCompletionSource<TResponse>();
call.StartUnary(payload, unaryResponseHandler); using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartUnary(payload, unaryResponseHandler, metadataArray);
}
return unaryResponseTcs.Task; return unaryResponseTcs.Task;
} }
} }
...@@ -132,7 +138,7 @@ namespace Grpc.Core.Internal ...@@ -132,7 +138,7 @@ namespace Grpc.Core.Internal
/// Starts a streamed request - unary response call. /// Starts a streamed request - unary response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests. /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary> /// </summary>
public Task<TResponse> ClientStreamingCallAsync() public Task<TResponse> ClientStreamingCallAsync(Metadata headers)
{ {
lock (myLock) lock (myLock)
{ {
...@@ -142,7 +148,10 @@ namespace Grpc.Core.Internal ...@@ -142,7 +148,10 @@ namespace Grpc.Core.Internal
readingDone = true; readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>(); unaryResponseTcs = new TaskCompletionSource<TResponse>();
call.StartClientStreaming(unaryResponseHandler); using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartClientStreaming(unaryResponseHandler, metadataArray);
}
return unaryResponseTcs.Task; return unaryResponseTcs.Task;
} }
...@@ -151,7 +160,7 @@ namespace Grpc.Core.Internal ...@@ -151,7 +160,7 @@ namespace Grpc.Core.Internal
/// <summary> /// <summary>
/// Starts a unary request - streamed response call. /// Starts a unary request - streamed response call.
/// </summary> /// </summary>
public void StartServerStreamingCall(TRequest msg, IObserver<TResponse> readObserver) public void StartServerStreamingCall(TRequest msg, IObserver<TResponse> readObserver, Metadata headers)
{ {
lock (myLock) lock (myLock)
{ {
...@@ -165,7 +174,10 @@ namespace Grpc.Core.Internal ...@@ -165,7 +174,10 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg); byte[] payload = UnsafeSerialize(msg);
call.StartServerStreaming(payload, finishedHandler); using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartServerStreaming(payload, finishedHandler, metadataArray);
}
StartReceiveMessage(); StartReceiveMessage();
} }
...@@ -175,7 +187,7 @@ namespace Grpc.Core.Internal ...@@ -175,7 +187,7 @@ namespace Grpc.Core.Internal
/// Starts a streaming request - streaming response call. /// Starts a streaming request - streaming response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests. /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary> /// </summary>
public void StartDuplexStreamingCall(IObserver<TResponse> readObserver) public void StartDuplexStreamingCall(IObserver<TResponse> readObserver, Metadata headers)
{ {
lock (myLock) lock (myLock)
{ {
...@@ -185,7 +197,10 @@ namespace Grpc.Core.Internal ...@@ -185,7 +197,10 @@ namespace Grpc.Core.Internal
this.readObserver = readObserver; this.readObserver = readObserver;
call.StartDuplexStreaming(finishedHandler); using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartDuplexStreaming(finishedHandler, metadataArray);
}
StartReceiveMessage(); StartReceiveMessage();
} }
......
...@@ -57,25 +57,28 @@ namespace Grpc.Core.Internal ...@@ -57,25 +57,28 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len); byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq, static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len); byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len); byte[] send_buffer, UIntPtr send_buffer_len,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
...@@ -109,29 +112,29 @@ namespace Grpc.Core.Internal ...@@ -109,29 +112,29 @@ namespace Grpc.Core.Internal
return grpcsharp_channel_create_call(channel, cq, method, host, deadline); return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
} }
public void StartUnary(byte[] payload, CompletionCallbackDelegate callback) public void StartUnary(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{ {
AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length))); AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
} }
public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback) public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{ {
grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length)); grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray);
} }
public void StartClientStreaming(CompletionCallbackDelegate callback) public void StartClientStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{ {
AssertCallOk(grpcsharp_call_start_client_streaming(this, callback)); AssertCallOk(grpcsharp_call_start_client_streaming(this, callback, metadataArray));
} }
public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback) public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{ {
AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length))); AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
} }
public void StartDuplexStreaming(CompletionCallbackDelegate callback) public void StartDuplexStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{ {
AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback)); AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback, metadataArray));
} }
public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)
......
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
{
/// <summary>
/// grpc_metadata_array from <grpc/grpc.h>
/// </summary>
internal class MetadataArraySafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern MetadataArraySafeHandle grpcsharp_metadata_array_create(UIntPtr capacity);
[DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)]
static extern void grpcsharp_metadata_array_add(MetadataArraySafeHandle array, string key, byte[] value, UIntPtr valueLength);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_metadata_array_destroy_full(IntPtr array);
private MetadataArraySafeHandle()
{
}
public static MetadataArraySafeHandle Create(Metadata metadata)
{
var entries = metadata.Entries;
var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)entries.Count));
for (int i = 0; i < entries.Count; i++)
{
grpcsharp_metadata_array_add(metadataArray, entries[i].Key, entries[i].ValueBytes, new UIntPtr((ulong)entries[i].ValueBytes.Length));
}
return metadataArray;
}
protected override bool ReleaseHandle()
{
grpcsharp_metadata_array_destroy_full(handle);
return true;
}
}
}
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#endregion #endregion
using System; using System;
using Grpc.Core.Utils;
namespace Grpc.Core namespace Grpc.Core
{ {
...@@ -45,8 +46,8 @@ namespace Grpc.Core ...@@ -45,8 +46,8 @@ namespace Grpc.Core
public Marshaller(Func<T, byte[]> serializer, Func<byte[], T> deserializer) public Marshaller(Func<T, byte[]> serializer, Func<byte[], T> deserializer)
{ {
this.serializer = serializer; this.serializer = Preconditions.CheckNotNull(serializer);
this.deserializer = deserializer; this.deserializer = Preconditions.CheckNotNull(deserializer);
} }
public Func<T, byte[]> Serializer public Func<T, byte[]> Serializer
......
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Runtime.InteropServices;
using System.Text;
namespace Grpc.Core
{
/// <summary>
/// gRPC call metadata.
/// </summary>
public class Metadata
{
public static readonly Metadata Empty = new Metadata(ImmutableList<MetadataEntry>.Empty);
readonly ImmutableList<MetadataEntry> entries;
public Metadata(ImmutableList<MetadataEntry> entries)
{
this.entries = entries;
}
public ImmutableList<MetadataEntry> Entries
{
get
{
return this.entries;
}
}
public static Builder CreateBuilder()
{
return new Builder();
}
public struct MetadataEntry
{
readonly string key;
readonly byte[] valueBytes;
public MetadataEntry(string key, byte[] valueBytes)
{
this.key = key;
this.valueBytes = valueBytes;
}
public MetadataEntry(string key, string value)
{
this.key = key;
this.valueBytes = Encoding.ASCII.GetBytes(value);
}
public string Key
{
get
{
return this.key;
}
}
// TODO: using ByteString would guarantee immutability.
public byte[] ValueBytes
{
get
{
return this.valueBytes;
}
}
}
public class Builder
{
readonly List<Metadata.MetadataEntry> entries = new List<Metadata.MetadataEntry>();
public List<MetadataEntry> Entries
{
get
{
return entries;
}
}
public Builder Add(MetadataEntry entry)
{
entries.Add(entry);
return this;
}
public Metadata Build()
{
return new Metadata(entries.ToImmutableList());
}
}
}
}
...@@ -43,12 +43,10 @@ namespace Grpc.Core ...@@ -43,12 +43,10 @@ namespace Grpc.Core
/// </summary> /// </summary>
public class ServerServiceDefinition public class ServerServiceDefinition
{ {
readonly string serviceName;
readonly ImmutableDictionary<string, IServerCallHandler> callHandlers; readonly ImmutableDictionary<string, IServerCallHandler> callHandlers;
private ServerServiceDefinition(string serviceName, ImmutableDictionary<string, IServerCallHandler> callHandlers) private ServerServiceDefinition(ImmutableDictionary<string, IServerCallHandler> callHandlers)
{ {
this.serviceName = serviceName;
this.callHandlers = callHandlers; this.callHandlers = callHandlers;
} }
...@@ -79,7 +77,7 @@ namespace Grpc.Core ...@@ -79,7 +77,7 @@ namespace Grpc.Core
Method<TRequest, TResponse> method, Method<TRequest, TResponse> method,
UnaryRequestServerMethod<TRequest, TResponse> handler) UnaryRequestServerMethod<TRequest, TResponse> handler)
{ {
callHandlers.Add(method.Name, ServerCalls.UnaryRequestCall(method, handler)); callHandlers.Add(GetFullMethodName(serviceName, method.Name), ServerCalls.UnaryRequestCall(method, handler));
return this; return this;
} }
...@@ -87,13 +85,18 @@ namespace Grpc.Core ...@@ -87,13 +85,18 @@ namespace Grpc.Core
Method<TRequest, TResponse> method, Method<TRequest, TResponse> method,
StreamingRequestServerMethod<TRequest, TResponse> handler) StreamingRequestServerMethod<TRequest, TResponse> handler)
{ {
callHandlers.Add(method.Name, ServerCalls.StreamingRequestCall(method, handler)); callHandlers.Add(GetFullMethodName(serviceName, method.Name), ServerCalls.StreamingRequestCall(method, handler));
return this; return this;
} }
public ServerServiceDefinition Build() public ServerServiceDefinition Build()
{ {
return new ServerServiceDefinition(serviceName, callHandlers.ToImmutableDictionary()); return new ServerServiceDefinition(callHandlers.ToImmutableDictionary());
}
private string GetFullMethodName(string serviceName, string methodName)
{
return serviceName + "/" + methodName;
} }
} }
} }
......
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using Grpc.Core.Internal;
namespace Grpc.Core
{
// TODO: support adding timeout to methods.
/// <summary>
/// Base for client-side stubs.
/// </summary>
public abstract class AbstractStub<TStub, TConfig>
where TConfig : StubConfiguration
{
readonly Channel channel;
readonly TConfig config;
public AbstractStub(Channel channel, TConfig config)
{
this.channel = channel;
this.config = config;
}
public Channel Channel
{
get
{
return this.channel;
}
}
/// <summary>
/// Creates a new call to given method.
/// </summary>
protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method)
{
var headerBuilder = Metadata.CreateBuilder();
config.HeaderInterceptor(headerBuilder);
return new Call<TRequest, TResponse>(serviceName, method, channel, headerBuilder.Build());
}
}
}
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core
{
public delegate void HeaderInterceptorDelegate(Metadata.Builder headerBuilder);
public class StubConfiguration
{
/// <summary>
/// The default stub configuration.
/// </summary>
public static readonly StubConfiguration Default = new StubConfiguration((headerBuilder) => { });
readonly HeaderInterceptorDelegate headerInterceptor;
public StubConfiguration(HeaderInterceptorDelegate headerInterceptor)
{
this.headerInterceptor = Preconditions.CheckNotNull(headerInterceptor);
}
public HeaderInterceptorDelegate HeaderInterceptor
{
get
{
return headerInterceptor;
}
}
}
}
...@@ -61,7 +61,14 @@ namespace math.Tests ...@@ -61,7 +61,14 @@ namespace math.Tests
int port = server.AddPort(host + ":0"); int port = server.AddPort(host + ":0");
server.Start(); server.Start();
channel = new Channel(host + ":" + port); channel = new Channel(host + ":" + port);
client = MathGrpc.NewStub(channel);
// TODO: get rid of the custom header here once we have dedicated tests
// for header support.
var stubConfig = new StubConfiguration((headerBuilder) =>
{
headerBuilder.Add(new Metadata.MetadataEntry("customHeader", "abcdef"));
});
client = MathGrpc.NewStub(channel, stubConfig);
} }
[TestFixtureTearDown] [TestFixtureTearDown]
......
...@@ -45,6 +45,8 @@ namespace math ...@@ -45,6 +45,8 @@ namespace math
/// </summary> /// </summary>
public class MathGrpc public class MathGrpc
{ {
static readonly string ServiceName = "/math.Math";
static readonly Marshaller<DivArgs> DivArgsMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), DivArgs.ParseFrom); static readonly Marshaller<DivArgs> DivArgsMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), DivArgs.ParseFrom);
static readonly Marshaller<DivReply> DivReplyMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), DivReply.ParseFrom); static readonly Marshaller<DivReply> DivReplyMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), DivReply.ParseFrom);
static readonly Marshaller<Num> NumMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), Num.ParseFrom); static readonly Marshaller<Num> NumMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), Num.ParseFrom);
...@@ -52,25 +54,25 @@ namespace math ...@@ -52,25 +54,25 @@ namespace math
static readonly Method<DivArgs, DivReply> DivMethod = new Method<DivArgs, DivReply>( static readonly Method<DivArgs, DivReply> DivMethod = new Method<DivArgs, DivReply>(
MethodType.Unary, MethodType.Unary,
"/math.Math/Div", "Div",
DivArgsMarshaller, DivArgsMarshaller,
DivReplyMarshaller); DivReplyMarshaller);
static readonly Method<FibArgs, Num> FibMethod = new Method<FibArgs, Num>( static readonly Method<FibArgs, Num> FibMethod = new Method<FibArgs, Num>(
MethodType.ServerStreaming, MethodType.ServerStreaming,
"/math.Math/Fib", "Fib",
FibArgsMarshaller, FibArgsMarshaller,
NumMarshaller); NumMarshaller);
static readonly Method<Num, Num> SumMethod = new Method<Num, Num>( static readonly Method<Num, Num> SumMethod = new Method<Num, Num>(
MethodType.ClientStreaming, MethodType.ClientStreaming,
"/math.Math/Sum", "Sum",
NumMarshaller, NumMarshaller,
NumMarshaller); NumMarshaller);
static readonly Method<DivArgs, DivReply> DivManyMethod = new Method<DivArgs, DivReply>( static readonly Method<DivArgs, DivReply> DivManyMethod = new Method<DivArgs, DivReply>(
MethodType.DuplexStreaming, MethodType.DuplexStreaming,
"/math.Math/DivMany", "DivMany",
DivArgsMarshaller, DivArgsMarshaller,
DivReplyMarshaller); DivReplyMarshaller);
...@@ -87,42 +89,43 @@ namespace math ...@@ -87,42 +89,43 @@ namespace math
IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver, CancellationToken token = default(CancellationToken)); IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver, CancellationToken token = default(CancellationToken));
} }
public class MathServiceClientStub : IMathServiceClient public class MathServiceClientStub : AbstractStub<MathServiceClientStub, StubConfiguration>, IMathServiceClient
{ {
readonly Channel channel; public MathServiceClientStub(Channel channel) : this(channel, StubConfiguration.Default)
{
}
public MathServiceClientStub(Channel channel) public MathServiceClientStub(Channel channel, StubConfiguration config) : base(channel, config)
{ {
this.channel = channel;
} }
public DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken)) public DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<DivArgs, DivReply>(DivMethod, channel); var call = CreateCall(ServiceName, DivMethod);
return Calls.BlockingUnaryCall(call, request, token); return Calls.BlockingUnaryCall(call, request, token);
} }
public Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken)) public Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<DivArgs, DivReply>(DivMethod, channel); var call = CreateCall(ServiceName, DivMethod);
return Calls.AsyncUnaryCall(call, request, token); return Calls.AsyncUnaryCall(call, request, token);
} }
public void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken)) public void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<FibArgs, Num>(FibMethod, channel); var call = CreateCall(ServiceName, FibMethod);
Calls.AsyncServerStreamingCall(call, request, responseObserver, token); Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
} }
public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken)) public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<Num, Num>(SumMethod, channel); var call = CreateCall(ServiceName, SumMethod);
return Calls.AsyncClientStreamingCall(call, token); return Calls.AsyncClientStreamingCall(call, token);
} }
public IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver, CancellationToken token = default(CancellationToken)) public IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<DivArgs, DivReply>(DivManyMethod, channel); var call = CreateCall(ServiceName, DivManyMethod);
return Calls.DuplexStreamingCall(call, responseObserver, token); return Calls.DuplexStreamingCall(call, responseObserver, token);
} }
} }
...@@ -141,7 +144,7 @@ namespace math ...@@ -141,7 +144,7 @@ namespace math
public static ServerServiceDefinition BindService(IMathService serviceImpl) public static ServerServiceDefinition BindService(IMathService serviceImpl)
{ {
return ServerServiceDefinition.CreateBuilder("/math.Math/") return ServerServiceDefinition.CreateBuilder(ServiceName)
.AddMethod(DivMethod, serviceImpl.Div) .AddMethod(DivMethod, serviceImpl.Div)
.AddMethod(FibMethod, serviceImpl.Fib) .AddMethod(FibMethod, serviceImpl.Fib)
.AddMethod(SumMethod, serviceImpl.Sum) .AddMethod(SumMethod, serviceImpl.Sum)
...@@ -152,5 +155,10 @@ namespace math ...@@ -152,5 +155,10 @@ namespace math
{ {
return new MathServiceClientStub(channel); return new MathServiceClientStub(channel);
} }
public static IMathServiceClient NewStub(Channel channel, StubConfiguration config)
{
return new MathServiceClientStub(channel, config);
}
} }
} }
...@@ -39,8 +39,7 @@ ...@@ -39,8 +39,7 @@
<Reference Include="Google.ProtocolBuffers"> <Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath> <HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference> </Reference>
<Reference Include="System.Collections.Immutable, Version=1.1.34.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL"> <Reference Include="System.Collections.Immutable">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\System.Collections.Immutable.1.1.34-rc\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> <HintPath>..\packages\System.Collections.Immutable.1.1.34-rc\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
</Reference> </Reference>
</ItemGroup> </ItemGroup>
......
...@@ -44,6 +44,8 @@ namespace grpc.testing ...@@ -44,6 +44,8 @@ namespace grpc.testing
/// </summary> /// </summary>
public class TestServiceGrpc public class TestServiceGrpc
{ {
static readonly string ServiceName = "/grpc.testing.TestService";
static readonly Marshaller<Empty> EmptyMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), Empty.ParseFrom); static readonly Marshaller<Empty> EmptyMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), Empty.ParseFrom);
static readonly Marshaller<SimpleRequest> SimpleRequestMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), SimpleRequest.ParseFrom); static readonly Marshaller<SimpleRequest> SimpleRequestMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), SimpleRequest.ParseFrom);
static readonly Marshaller<SimpleResponse> SimpleResponseMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), SimpleResponse.ParseFrom); static readonly Marshaller<SimpleResponse> SimpleResponseMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), SimpleResponse.ParseFrom);
...@@ -54,37 +56,37 @@ namespace grpc.testing ...@@ -54,37 +56,37 @@ namespace grpc.testing
static readonly Method<Empty, Empty> EmptyCallMethod = new Method<Empty, Empty>( static readonly Method<Empty, Empty> EmptyCallMethod = new Method<Empty, Empty>(
MethodType.Unary, MethodType.Unary,
"/grpc.testing.TestService/EmptyCall", "EmptyCall",
EmptyMarshaller, EmptyMarshaller,
EmptyMarshaller); EmptyMarshaller);
static readonly Method<SimpleRequest, SimpleResponse> UnaryCallMethod = new Method<SimpleRequest, SimpleResponse>( static readonly Method<SimpleRequest, SimpleResponse> UnaryCallMethod = new Method<SimpleRequest, SimpleResponse>(
MethodType.Unary, MethodType.Unary,
"/grpc.testing.TestService/UnaryCall", "UnaryCall",
SimpleRequestMarshaller, SimpleRequestMarshaller,
SimpleResponseMarshaller); SimpleResponseMarshaller);
static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> StreamingOutputCallMethod = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>( static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> StreamingOutputCallMethod = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
MethodType.ServerStreaming, MethodType.ServerStreaming,
"/grpc.testing.TestService/StreamingOutputCall", "StreamingOutputCall",
StreamingOutputCallRequestMarshaller, StreamingOutputCallRequestMarshaller,
StreamingOutputCallResponseMarshaller); StreamingOutputCallResponseMarshaller);
static readonly Method<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCallMethod = new Method<StreamingInputCallRequest, StreamingInputCallResponse>( static readonly Method<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCallMethod = new Method<StreamingInputCallRequest, StreamingInputCallResponse>(
MethodType.ClientStreaming, MethodType.ClientStreaming,
"/grpc.testing.TestService/StreamingInputCall", "StreamingInputCall",
StreamingInputCallRequestMarshaller, StreamingInputCallRequestMarshaller,
StreamingInputCallResponseMarshaller); StreamingInputCallResponseMarshaller);
static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCallMethod = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>( static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCallMethod = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
MethodType.DuplexStreaming, MethodType.DuplexStreaming,
"/grpc.testing.TestService/FullDuplexCall", "FullDuplexCall",
StreamingOutputCallRequestMarshaller, StreamingOutputCallRequestMarshaller,
StreamingOutputCallResponseMarshaller); StreamingOutputCallResponseMarshaller);
static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCallMethod = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>( static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCallMethod = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
MethodType.DuplexStreaming, MethodType.DuplexStreaming,
"/grpc.testing.TestService/HalfDuplexCall", "HalfDuplexCall",
StreamingOutputCallRequestMarshaller, StreamingOutputCallRequestMarshaller,
StreamingOutputCallResponseMarshaller); StreamingOutputCallResponseMarshaller);
...@@ -107,60 +109,61 @@ namespace grpc.testing ...@@ -107,60 +109,61 @@ namespace grpc.testing
IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)); IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken));
} }
public class TestServiceClientStub : ITestServiceClient public class TestServiceClientStub : AbstractStub<TestServiceClientStub, StubConfiguration>, ITestServiceClient
{ {
readonly Channel channel; public TestServiceClientStub(Channel channel) : base(channel, StubConfiguration.Default)
{
}
public TestServiceClientStub(Channel channel) public TestServiceClientStub(Channel channel, StubConfiguration config) : base(channel, config)
{ {
this.channel = channel;
} }
public Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken)) public Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<Empty, Empty>(EmptyCallMethod, channel); var call = CreateCall(ServiceName, EmptyCallMethod);
return Calls.BlockingUnaryCall(call, request, token); return Calls.BlockingUnaryCall(call, request, token);
} }
public Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken)) public Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<Empty, Empty>(EmptyCallMethod, channel); var call = CreateCall(ServiceName, EmptyCallMethod);
return Calls.AsyncUnaryCall(call, request, token); return Calls.AsyncUnaryCall(call, request, token);
} }
public SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken)) public SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<SimpleRequest, SimpleResponse>(UnaryCallMethod, channel); var call = CreateCall(ServiceName, UnaryCallMethod);
return Calls.BlockingUnaryCall(call, request, token); return Calls.BlockingUnaryCall(call, request, token);
} }
public Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken)) public Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<SimpleRequest, SimpleResponse>(UnaryCallMethod, channel); var call = CreateCall(ServiceName, UnaryCallMethod);
return Calls.AsyncUnaryCall(call, request, token); return Calls.AsyncUnaryCall(call, request, token);
} }
public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(StreamingOutputCallMethod, channel); var call = CreateCall(ServiceName, StreamingOutputCallMethod);
Calls.AsyncServerStreamingCall(call, request, responseObserver, token); Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
} }
public ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)) public ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<StreamingInputCallRequest, StreamingInputCallResponse>(StreamingInputCallMethod, channel); var call = CreateCall(ServiceName, StreamingInputCallMethod);
return Calls.AsyncClientStreamingCall(call, token); return Calls.AsyncClientStreamingCall(call, token);
} }
public IObserver<StreamingOutputCallRequest> FullDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) public IObserver<StreamingOutputCallRequest> FullDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(FullDuplexCallMethod, channel); var call = CreateCall(ServiceName, FullDuplexCallMethod);
return Calls.DuplexStreamingCall(call, responseObserver, token); return Calls.DuplexStreamingCall(call, responseObserver, token);
} }
public IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) public IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(HalfDuplexCallMethod, channel); var call = CreateCall(ServiceName, HalfDuplexCallMethod);
return Calls.DuplexStreamingCall(call, responseObserver, token); return Calls.DuplexStreamingCall(call, responseObserver, token);
} }
} }
...@@ -183,7 +186,7 @@ namespace grpc.testing ...@@ -183,7 +186,7 @@ namespace grpc.testing
public static ServerServiceDefinition BindService(ITestService serviceImpl) public static ServerServiceDefinition BindService(ITestService serviceImpl)
{ {
return ServerServiceDefinition.CreateBuilder("/grpc.testing.TestService/") return ServerServiceDefinition.CreateBuilder(ServiceName)
.AddMethod(EmptyCallMethod, serviceImpl.EmptyCall) .AddMethod(EmptyCallMethod, serviceImpl.EmptyCall)
.AddMethod(UnaryCallMethod, serviceImpl.UnaryCall) .AddMethod(UnaryCallMethod, serviceImpl.UnaryCall)
.AddMethod(StreamingOutputCallMethod, serviceImpl.StreamingOutputCall) .AddMethod(StreamingOutputCallMethod, serviceImpl.StreamingOutputCall)
......
...@@ -102,34 +102,114 @@ grpcsharp_batch_context *grpcsharp_batch_context_create() { ...@@ -102,34 +102,114 @@ grpcsharp_batch_context *grpcsharp_batch_context_create() {
return ctx; return ctx;
} }
/** /*
* Destroys metadata array including keys and values. * Destroys array->metadata.
* The array pointer itself is not freed.
*/
void grpcsharp_metadata_array_destroy_metadata_only(
grpc_metadata_array *array) {
gpr_free(array->metadata);
}
/*
* Destroys keys, values and array->metadata.
* The array pointer itself is not freed.
*/
void grpcsharp_metadata_array_destroy_metadata_including_entries(
grpc_metadata_array *array) {
size_t i;
if (array->metadata) {
for (i = 0; i < array->count; i++) {
gpr_free((void *)array->metadata[i].key);
gpr_free((void *)array->metadata[i].value);
}
}
gpr_free(array->metadata);
}
/*
* Fully destroys the metadata array.
*/
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_metadata_array_destroy_full(grpc_metadata_array *array) {
if (!array) {
return;
}
grpcsharp_metadata_array_destroy_metadata_including_entries(array);
gpr_free(array);
}
/*
* Creates an empty metadata array with given capacity.
* Array can later be destroyed by grpc_metadata_array_destroy_full.
*/ */
void grpcsharp_metadata_array_destroy_recursive(grpc_metadata_array *array) { GPR_EXPORT grpc_metadata_array *GPR_CALLTYPE
if (!array->metadata) { grpcsharp_metadata_array_create(size_t capacity) {
grpc_metadata_array *array =
(grpc_metadata_array *)gpr_malloc(sizeof(grpc_metadata_array));
grpc_metadata_array_init(array);
array->capacity = capacity;
array->count = 0;
if (capacity > 0) {
array->metadata =
(grpc_metadata *)gpr_malloc(sizeof(grpc_metadata) * capacity);
memset(array->metadata, 0, sizeof(grpc_metadata) * capacity);
} else {
array->metadata = NULL;
}
return array;
}
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_metadata_array_add(grpc_metadata_array *array, const char *key,
const char *value, size_t value_length) {
size_t i = array->count;
GPR_ASSERT(array->count < array->capacity);
array->metadata[i].key = gpr_strdup(key);
array->metadata[i].value = (char *)gpr_malloc(value_length);
memcpy((void *)array->metadata[i].value, value, value_length);
array->metadata[i].value_length = value_length;
array->count++;
}
/* Move contents of metadata array */
void grpcsharp_metadata_array_move(grpc_metadata_array *dest,
grpc_metadata_array *src) {
if (!src) {
dest->capacity = 0;
dest->count = 0;
dest->metadata = NULL;
return; return;
} }
/* TODO: destroy also keys and values */
grpc_metadata_array_destroy(array); dest->capacity = src->capacity;
dest->count = src->count;
dest->metadata = src->metadata;
src->capacity = 0;
src->count = 0;
src->metadata = NULL;
} }
void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) { void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
if (!ctx) { if (!ctx) {
return; return;
} }
grpcsharp_metadata_array_destroy_recursive(&(ctx->send_initial_metadata)); grpcsharp_metadata_array_destroy_metadata_including_entries(
&(ctx->send_initial_metadata));
grpc_byte_buffer_destroy(ctx->send_message); grpc_byte_buffer_destroy(ctx->send_message);
grpcsharp_metadata_array_destroy_recursive( grpcsharp_metadata_array_destroy_metadata_including_entries(
&(ctx->send_status_from_server.trailing_metadata)); &(ctx->send_status_from_server.trailing_metadata));
gpr_free(ctx->send_status_from_server.status_details); gpr_free(ctx->send_status_from_server.status_details);
grpc_metadata_array_destroy(&(ctx->recv_initial_metadata)); grpcsharp_metadata_array_destroy_metadata_only(&(ctx->recv_initial_metadata));
grpc_byte_buffer_destroy(ctx->recv_message); grpc_byte_buffer_destroy(ctx->recv_message);
grpc_metadata_array_destroy(&(ctx->recv_status_on_client.trailing_metadata)); grpcsharp_metadata_array_destroy_metadata_only(
&(ctx->recv_status_on_client.trailing_metadata));
gpr_free((void *)ctx->recv_status_on_client.status_details); gpr_free((void *)ctx->recv_status_on_client.status_details);
/* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is /* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is
...@@ -137,7 +217,8 @@ void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) { ...@@ -137,7 +217,8 @@ void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
to take its ownership. */ to take its ownership. */
grpc_call_details_destroy(&(ctx->server_rpc_new.call_details)); grpc_call_details_destroy(&(ctx->server_rpc_new.call_details));
grpc_metadata_array_destroy(&(ctx->server_rpc_new.request_metadata)); grpcsharp_metadata_array_destroy_metadata_only(
&(ctx->server_rpc_new.request_metadata));
gpr_free(ctx); gpr_free(ctx);
} }
...@@ -346,17 +427,19 @@ grpcsharp_call_start_write_from_copied_buffer(grpc_call *call, ...@@ -346,17 +427,19 @@ grpcsharp_call_start_write_from_copied_buffer(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback, grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
const char *send_buffer, size_t send_buffer_len) { const char *send_buffer, size_t send_buffer_len,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */ /* TODO: don't use magic number */
grpc_op ops[6]; grpc_op ops[6];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback; ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */ grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
ops[0].data.send_initial_metadata.count = 0; initial_metadata);
ops[0].data.send_initial_metadata.metadata = NULL; ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[1].op = GRPC_OP_SEND_MESSAGE; ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
...@@ -389,9 +472,11 @@ GPR_EXPORT void GPR_CALLTYPE ...@@ -389,9 +472,11 @@ GPR_EXPORT void GPR_CALLTYPE
grpcsharp_call_blocking_unary(grpc_call *call, grpcsharp_call_blocking_unary(grpc_call *call,
grpc_completion_queue *dedicated_cq, grpc_completion_queue *dedicated_cq,
callback_funcptr callback, callback_funcptr callback,
const char *send_buffer, size_t send_buffer_len) { const char *send_buffer, size_t send_buffer_len,
grpc_metadata_array *initial_metadata) {
GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer, GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer,
send_buffer_len) == GRPC_CALL_OK); send_buffer_len,
initial_metadata) == GRPC_CALL_OK);
/* TODO: we would like to use pluck, but we don't know the tag */ /* TODO: we would like to use pluck, but we don't know the tag */
GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) ==
...@@ -403,17 +488,19 @@ grpcsharp_call_blocking_unary(grpc_call *call, ...@@ -403,17 +488,19 @@ grpcsharp_call_blocking_unary(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_client_streaming(grpc_call *call, grpcsharp_call_start_client_streaming(grpc_call *call,
callback_funcptr callback) { callback_funcptr callback,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */ /* TODO: don't use magic number */
grpc_op ops[4]; grpc_op ops[4];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback; ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */ grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
ops[0].data.send_initial_metadata.count = 0; initial_metadata);
ops[0].data.send_initial_metadata.metadata = NULL; ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
...@@ -435,21 +522,20 @@ grpcsharp_call_start_client_streaming(grpc_call *call, ...@@ -435,21 +522,20 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
} }
GPR_EXPORT grpc_call_error GPR_CALLTYPE GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
grpcsharp_call_start_server_streaming(grpc_call *call, grpc_call *call, callback_funcptr callback, const char *send_buffer,
callback_funcptr callback, size_t send_buffer_len, grpc_metadata_array *initial_metadata) {
const char *send_buffer,
size_t send_buffer_len) {
/* TODO: don't use magic number */ /* TODO: don't use magic number */
grpc_op ops[5]; grpc_op ops[5];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback; ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */ grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
ops[0].data.send_initial_metadata.count = 0; initial_metadata);
ops[0].data.send_initial_metadata.metadata = NULL; ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[1].op = GRPC_OP_SEND_MESSAGE; ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
...@@ -476,17 +562,19 @@ grpcsharp_call_start_server_streaming(grpc_call *call, ...@@ -476,17 +562,19 @@ grpcsharp_call_start_server_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_duplex_streaming(grpc_call *call, grpcsharp_call_start_duplex_streaming(grpc_call *call,
callback_funcptr callback) { callback_funcptr callback,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */ /* TODO: don't use magic number */
grpc_op ops[3]; grpc_op ops[3];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback; ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */ grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
ops[0].data.send_initial_metadata.count = 0; initial_metadata);
ops[0].data.send_initial_metadata.metadata = NULL; ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment