Skip to content
Snippets Groups Projects
Commit a0bb0651 authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

allow sending trailers from server handler

parent 998eb9bc
No related branches found
No related tags found
No related merge requests found
...@@ -101,14 +101,17 @@ namespace Grpc.Core.Internal ...@@ -101,14 +101,17 @@ namespace Grpc.Core.Internal
/// Only one pending send action is allowed at any given time. /// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes. /// completionDelegate is called when the operation finishes.
/// </summary> /// </summary>
public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate<object> completionDelegate) public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate)
{ {
lock (myLock) lock (myLock)
{ {
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed(); CheckSendingAllowed();
call.StartSendStatusFromServer(status, HandleHalfclosed); using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray);
}
halfcloseRequested = true; halfcloseRequested = true;
readingDone = true; readingDone = true;
sendCompletionDelegate = completionDelegate; sendCompletionDelegate = completionDelegate;
......
...@@ -81,7 +81,7 @@ namespace Grpc.Core.Internal ...@@ -81,7 +81,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage); BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
...@@ -159,11 +159,11 @@ namespace Grpc.Core.Internal ...@@ -159,11 +159,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
} }
public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback) public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{ {
var ctx = BatchContextSafeHandle.Create(); var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback); completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk(); grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk();
} }
public void StartReceiveMessage(BatchCompletionDelegate callback) public void StartReceiveMessage(BatchCompletionDelegate callback)
......
...@@ -72,13 +72,13 @@ namespace Grpc.Core.Internal ...@@ -72,13 +72,13 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status; Status status;
var context = HandlerUtils.NewContext(newRpc);
try try
{ {
Preconditions.CheckArgument(await requestStream.MoveNext()); Preconditions.CheckArgument(await requestStream.MoveNext());
var request = requestStream.Current; var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
Preconditions.CheckArgument(!await requestStream.MoveNext()); Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = HandlerUtils.NewContext(newRpc);
var result = await handler(context, request); var result = await handler(context, request);
status = context.Status; status = context.Status;
await responseStream.WriteAsync(result); await responseStream.WriteAsync(result);
...@@ -90,7 +90,7 @@ namespace Grpc.Core.Internal ...@@ -90,7 +90,7 @@ namespace Grpc.Core.Internal
} }
try try
{ {
await responseStream.WriteStatusAsync(status); await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
...@@ -126,14 +126,13 @@ namespace Grpc.Core.Internal ...@@ -126,14 +126,13 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status; Status status;
var context = HandlerUtils.NewContext(newRpc);
try try
{ {
Preconditions.CheckArgument(await requestStream.MoveNext()); Preconditions.CheckArgument(await requestStream.MoveNext());
var request = requestStream.Current; var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
Preconditions.CheckArgument(!await requestStream.MoveNext()); Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = HandlerUtils.NewContext(newRpc);
await handler(context, request, responseStream); await handler(context, request, responseStream);
status = context.Status; status = context.Status;
} }
...@@ -145,7 +144,7 @@ namespace Grpc.Core.Internal ...@@ -145,7 +144,7 @@ namespace Grpc.Core.Internal
try try
{ {
await responseStream.WriteStatusAsync(status); await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
...@@ -179,9 +178,10 @@ namespace Grpc.Core.Internal ...@@ -179,9 +178,10 @@ namespace Grpc.Core.Internal
var finishedTask = asyncCall.ServerSideCallAsync(); var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
var context = HandlerUtils.NewContext(newRpc);
Status status; Status status;
var context = HandlerUtils.NewContext(newRpc);
try try
{ {
var result = await handler(context, requestStream); var result = await handler(context, requestStream);
...@@ -203,7 +203,7 @@ namespace Grpc.Core.Internal ...@@ -203,7 +203,7 @@ namespace Grpc.Core.Internal
try try
{ {
await responseStream.WriteStatusAsync(status); await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
...@@ -237,9 +237,9 @@ namespace Grpc.Core.Internal ...@@ -237,9 +237,9 @@ namespace Grpc.Core.Internal
var finishedTask = asyncCall.ServerSideCallAsync(); var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
var context = HandlerUtils.NewContext(newRpc);
Status status; Status status;
var context = HandlerUtils.NewContext(newRpc);
try try
{ {
await handler(context, requestStream, responseStream); await handler(context, requestStream, responseStream);
...@@ -252,7 +252,7 @@ namespace Grpc.Core.Internal ...@@ -252,7 +252,7 @@ namespace Grpc.Core.Internal
} }
try try
{ {
await responseStream.WriteStatusAsync(status); await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
...@@ -277,7 +277,7 @@ namespace Grpc.Core.Internal ...@@ -277,7 +277,7 @@ namespace Grpc.Core.Internal
var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall); var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall);
var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method.")); await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method."), Metadata.Empty);
await finishedTask; await finishedTask;
} }
} }
......
...@@ -56,10 +56,10 @@ namespace Grpc.Core.Internal ...@@ -56,10 +56,10 @@ namespace Grpc.Core.Internal
return taskSource.Task; return taskSource.Task;
} }
public Task WriteStatusAsync(Status status) public Task WriteStatusAsync(Status status, Metadata trailers)
{ {
var taskSource = new AsyncCompletionTaskSource<object>(); var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendStatusFromServer(status, taskSource.CompletionDelegate); call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
return taskSource.Task; return taskSource.Task;
} }
} }
......
...@@ -630,15 +630,20 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE ...@@ -630,15 +630,20 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_status_from_server(grpc_call *call, grpcsharp_call_send_status_from_server(grpc_call *call,
grpcsharp_batch_context *ctx, grpcsharp_batch_context *ctx,
grpc_status_code status_code, grpc_status_code status_code,
const char *status_details) { const char *status_details,
grpc_metadata_array *trailing_metadata) {
/* TODO: don't use magic number */ /* TODO: don't use magic number */
grpc_op ops[1]; grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code; ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details = ops[0].data.send_status_from_server.status_details =
gpr_strdup(status_details); gpr_strdup(status_details);
ops[0].data.send_status_from_server.trailing_metadata = NULL; grpcsharp_metadata_array_move(&(ctx->send_status_from_server.trailing_metadata),
ops[0].data.send_status_from_server.trailing_metadata_count = 0; trailing_metadata);
ops[0].data.send_status_from_server.trailing_metadata_count =
ctx->send_status_from_server.trailing_metadata.count;
ops[0].data.send_status_from_server.trailing_metadata =
ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = 0; ops[0].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment