diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 2caa9680ba75887afcf762d52074075df61005d6..352c0ac653252ff395132d4f07fbf7adc5bd2c25 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -144,7 +144,10 @@ typedef enum grpc_call_error { /* the flags value was illegal for this call */ GRPC_CALL_ERROR_INVALID_FLAGS, /* invalid metadata was passed to this call */ - GRPC_CALL_ERROR_INVALID_METADATA + GRPC_CALL_ERROR_INVALID_METADATA, + /* completion queue for notification has not been registered with the server + */ + GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE } grpc_call_error; /* Write Flags: */ diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 8e585a007d8a0476868fe834c541990b698ecb09..e5e3435febf369854785e7b22a9c9dabc8a25652 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -100,8 +100,9 @@ static int multipoll_with_epoll_pollset_maybe_work( if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { timeout_ms = -1; } else { - timeout_ms = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout_ms <= 0) { + timeout_ms = gpr_time_to_millis( + gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500))); + if (timeout_ms < 0) { return 1; } } diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 5ee698073291d3a3bd4a5a552c36a849883ec731..d21c52c0f00f9a55045dc53c26d337bfe9699073 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -116,8 +116,9 @@ static int multipoll_with_poll_pollset_maybe_work( if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { timeout = -1; } else { - timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout <= 0) { + timeout = gpr_time_to_millis( + gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500))); + if (timeout < 0) { return 1; } } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index db704b9df1403a065a604c5ac41b99877869fbca..0fe54c8f1dcfe5956d7567b700210639fd995f38 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -346,8 +346,9 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset, if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { timeout = -1; } else { - timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); - if (timeout <= 0) { + timeout = gpr_time_to_millis( + gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500))); + if (timeout < 0) { return 1; } } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index b5f8af3fc17454d2f057a4f0642920f05ae2c329..6cffd92e6b88e869887cb4805a528fbdc12a21a4 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -99,6 +99,8 @@ typedef enum { /* Status came from 'the wire' - or somewhere below the surface layer */ STATUS_FROM_WIRE, + /* Status came from the server sending status */ + STATUS_FROM_SERVER_STATUS, STATUS_SOURCE_COUNT } status_source; @@ -578,10 +580,18 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, call->write_state = WRITE_STATE_WRITE_CLOSED; } break; + case GRPC_IOREQ_SEND_STATUS: + if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details != + NULL) { + grpc_mdstr_unref( + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details); + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = + NULL; + } + break; case GRPC_IOREQ_RECV_CLOSE: case GRPC_IOREQ_SEND_INITIAL_METADATA: case GRPC_IOREQ_SEND_TRAILING_METADATA: - case GRPC_IOREQ_SEND_STATUS: case GRPC_IOREQ_SEND_CLOSE: break; case GRPC_IOREQ_RECV_STATUS: @@ -903,8 +913,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { call->metadata_context, grpc_mdstr_ref( grpc_channel_get_message_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, - data.send_status.details))); + data.send_status.details)); + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = + NULL; } grpc_sopb_add_metadata(&call->send_ops, mdb); } @@ -1004,6 +1015,14 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, GRPC_CALL_ERROR_INVALID_METADATA); } } + if (op == GRPC_IOREQ_SEND_STATUS) { + set_status_code(call, STATUS_FROM_SERVER_STATUS, + reqs[i].data.send_status.code); + if (reqs[i].data.send_status.details) { + set_status_details(call, STATUS_FROM_SERVER_STATUS, + grpc_mdstr_ref(reqs[i].data.send_status.details)); + } + } have_ops |= 1u << op; call->request_data[op] = data; @@ -1277,7 +1296,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->op = GRPC_IOREQ_SEND_STATUS; req->data.send_status.code = op->data.send_status_from_server.status; req->data.send_status.details = - op->data.send_status_from_server.status_details; + op->data.send_status_from_server.status_details != NULL + ? grpc_mdstr_from_string( + call->metadata_context, + op->data.send_status_from_server.status_details) + : NULL; req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_CLOSE; break; diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 17db8c2cdce392d937c6650e28dcea3eb63d2b86..7ea6cd7fa9cbde3f69b362a985a06705e038e3df 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -72,7 +72,7 @@ typedef union { grpc_byte_buffer *send_message; struct { grpc_status_code code; - const char *details; + grpc_mdstr *details; } send_status; } grpc_ioreq_data; diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 57ecf365cc567b09aad41bdd75c2fb1f8d7c1659..bd0fabf9dac99375e7a5a8eafa90a6ed29c57502 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -73,6 +73,7 @@ struct grpc_completion_queue { event *queue; /* Fixed size chained hash table of events for pluck() */ event *buckets[NUM_TAG_BUCKETS]; + int is_server_cq; }; grpc_completion_queue *grpc_completion_queue_create(void) { @@ -323,3 +324,7 @@ void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) { gpr_time_add(gpr_now(), gpr_time_from_millis(100))); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } + +void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } + +int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 2249d0e78974b22ddd67910fb0192746b517b3c4..e76910c00b34eb1c06a01b901cdde57d2f58bfa3 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -63,4 +63,7 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc); +void grpc_cq_mark_server_cq(grpc_completion_queue *cc); +int grpc_cq_is_server_cq(grpc_completion_queue *cc); + #endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 525fe2e10302736ecedebb5aeb688b278527d1db..10cb8538ac8dbd7be094be6f46c2a305bc861828 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -709,6 +709,7 @@ void grpc_server_register_completion_queue(grpc_server *server, if (server->cqs[i] == cq) return; } GRPC_CQ_INTERNAL_REF(cq, "server"); + grpc_cq_mark_server_cq(cq); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue *)); @@ -1010,7 +1011,7 @@ void grpc_server_destroy(grpc_server *server) { listener *l; gpr_mu_lock(&server->mu); - GPR_ASSERT(server->shutdown); + GPR_ASSERT(server->shutdown || !server->listeners); GPR_ASSERT(server->listeners_destroyed == num_listeners(server)); while (server->listeners) { @@ -1081,6 +1082,9 @@ grpc_call_error grpc_server_request_call( GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag); + if (!grpc_cq_is_server_cq(cq_for_notification)) { + return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + } grpc_cq_begin_op(cq_for_notification, NULL); rc.type = BATCH_CALL; rc.tag = tag; @@ -1099,6 +1103,9 @@ grpc_call_error grpc_server_request_registered_call( grpc_completion_queue *cq_for_notification, void *tag) { requested_call rc; registered_method *registered_method = rm; + if (!grpc_cq_is_server_cq(cq_for_notification)) { + return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; + } grpc_cq_begin_op(cq_for_notification, NULL); rc.type = REGISTERED_CALL; rc.tag = tag; diff --git a/src/csharp/Grpc.Core.Tests/ChannelOptionsTest.cs b/src/csharp/Grpc.Core.Tests/ChannelOptionsTest.cs new file mode 100644 index 0000000000000000000000000000000000000000..df09857efe361af1c692100446873ac03f23fbe8 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/ChannelOptionsTest.cs @@ -0,0 +1,105 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + public class ChannelOptionsTest + { + [Test] + public void IntOption() + { + var option = new ChannelOption("somename", 1); + + Assert.AreEqual(ChannelOption.OptionType.Integer, option.Type); + Assert.AreEqual("somename", option.Name); + Assert.AreEqual(1, option.IntValue); + Assert.Throws(typeof(InvalidOperationException), () => { var s = option.StringValue; }); + } + + [Test] + public void StringOption() + { + var option = new ChannelOption("somename", "ABCDEF"); + + Assert.AreEqual(ChannelOption.OptionType.String, option.Type); + Assert.AreEqual("somename", option.Name); + Assert.AreEqual("ABCDEF", option.StringValue); + Assert.Throws(typeof(InvalidOperationException), () => { var s = option.IntValue; }); + } + + [Test] + public void ConstructorPreconditions() + { + Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption(null, "abc"); }); + Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption(null, 1); }); + Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption("abc", null); }); + } + + [Test] + public void CreateChannelArgsNull() + { + var channelArgs = ChannelOptions.CreateChannelArgs(null); + Assert.IsTrue(channelArgs.IsInvalid); + } + + [Test] + public void CreateChannelArgsEmpty() + { + var options = new List<ChannelOption>(); + var channelArgs = ChannelOptions.CreateChannelArgs(options); + channelArgs.Dispose(); + } + + [Test] + public void CreateChannelArgs() + { + var options = new List<ChannelOption> + { + new ChannelOption("ABC", "XYZ"), + new ChannelOption("somename", "IJKLM"), + new ChannelOption("intoption", 12345), + new ChannelOption("GHIJK", 12345), + }; + + var channelArgs = ChannelOptions.CreateChannelArgs(options); + channelArgs.Dispose(); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index 029653967b6ded5b6649a95179709afa2ec2b47d..92e28b7d74cd2f6c8db37d264e472f16d1c70d6f 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -3,8 +3,6 @@ <PropertyGroup> <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> - <ProductVersion>8.0.30703</ProductVersion> - <SchemaVersion>2.0</SchemaVersion> <ProjectGuid>{86EC5CB4-4EA2-40A2-8057-86542A0353BB}</ProjectGuid> <OutputType>Library</OutputType> <RootNamespace>Grpc.Core.Tests</RootNamespace> @@ -48,6 +46,8 @@ <Compile Include="Internal\MetadataArraySafeHandleTest.cs" /> <Compile Include="Internal\CompletionQueueSafeHandleTest.cs" /> <Compile Include="Internal\CompletionQueueEventTest.cs" /> + <Compile Include="Internal\ChannelArgsSafeHandleTest.cs" /> + <Compile Include="ChannelOptionsTest.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> @@ -63,4 +63,4 @@ <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" /> </ItemGroup> <ItemGroup /> -</Project> \ No newline at end of file +</Project> diff --git a/src/csharp/Grpc.Core.Tests/Internal/ChannelArgsSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/ChannelArgsSafeHandleTest.cs new file mode 100644 index 0000000000000000000000000000000000000000..af0aaa5f0129e922b623296584cdc4643851fa9b --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/ChannelArgsSafeHandleTest.cs @@ -0,0 +1,75 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + public class ChannelArgsSafeHandleTest + { + [Test] + public void CreateEmptyAndDestroy() + { + var channelArgs = ChannelArgsSafeHandle.Create(0); + channelArgs.Dispose(); + } + + [Test] + public void CreateNonEmptyAndDestroy() + { + var channelArgs = ChannelArgsSafeHandle.Create(5); + channelArgs.Dispose(); + } + + [Test] + public void CreateNullAndDestroy() + { + var channelArgs = ChannelArgsSafeHandle.CreateNull(); + channelArgs.Dispose(); + } + + [Test] + public void CreateFillAndDestroy() + { + var channelArgs = ChannelArgsSafeHandle.Create(3); + channelArgs.SetInteger(0, "somekey", 12345); + channelArgs.SetString(1, "somekey", "abcdefghijkl"); + channelArgs.SetString(2, "somekey", "XYZ"); + channelArgs.Dispose(); + } + } +} diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 44b610f65b872bd1293a964cec1af2253258647e..d6bfbb7bc458aea5dbcb143bd7f4e492dcab7143 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -29,6 +29,7 @@ // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #endregion using System; +using System.Collections.Generic; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -50,10 +51,10 @@ namespace Grpc.Core /// </summary> /// <param name="host">The DNS name of IP address of the host.</param> /// <param name="credentials">Optional credentials to create a secure channel.</param> - /// <param name="channelArgs">Optional channel arguments.</param> - public Channel(string host, Credentials credentials = null, ChannelArgs channelArgs = null) + /// <param name="options">Channel options.</param> + public Channel(string host, Credentials credentials = null, IEnumerable<ChannelOption> options = null) { - using (ChannelArgsSafeHandle nativeChannelArgs = CreateNativeChannelArgs(channelArgs)) + using (ChannelArgsSafeHandle nativeChannelArgs = ChannelOptions.CreateChannelArgs(options)) { if (credentials != null) { @@ -67,7 +68,7 @@ namespace Grpc.Core this.handle = ChannelSafeHandle.Create(host, nativeChannelArgs); } } - this.target = GetOverridenTarget(host, channelArgs); + this.target = GetOverridenTarget(host, options); } /// <summary> @@ -76,9 +77,9 @@ namespace Grpc.Core /// <param name="host">DNS name or IP address</param> /// <param name="port">the port</param> /// <param name="credentials">Optional credentials to create a secure channel.</param> - /// <param name="channelArgs">Optional channel arguments.</param> - public Channel(string host, int port, Credentials credentials = null, ChannelArgs channelArgs = null) : - this(string.Format("{0}:{1}", host, port), credentials, channelArgs) + /// <param name="options">Channel options.</param> + public Channel(string host, int port, Credentials credentials = null, IEnumerable<ChannelOption> options = null) : + this(string.Format("{0}:{1}", host, port), credentials, options) { } @@ -112,22 +113,25 @@ namespace Grpc.Core } } - private static string GetOverridenTarget(string target, ChannelArgs args) + /// <summary> + /// Look for SslTargetNameOverride option and return its value instead of originalTarget + /// if found. + /// </summary> + private static string GetOverridenTarget(string originalTarget, IEnumerable<ChannelOption> options) { - if (args != null && !string.IsNullOrEmpty(args.GetSslTargetNameOverride())) + if (options == null) { - return args.GetSslTargetNameOverride(); + return originalTarget; } - return target; - } - - private static ChannelArgsSafeHandle CreateNativeChannelArgs(ChannelArgs args) - { - if (args == null) + foreach (var option in options) { - return ChannelArgsSafeHandle.CreateNull(); + if (option.Type == ChannelOption.OptionType.String + && option.Name == ChannelOptions.SslTargetNameOverride) + { + return option.StringValue; + } } - return args.ToNativeChannelArgs(); + return originalTarget; } } } diff --git a/src/csharp/Grpc.Core/ChannelArgs.cs b/src/csharp/Grpc.Core/ChannelArgs.cs deleted file mode 100644 index 74ab310e44e64800d097e3f9afc1fac3c549730f..0000000000000000000000000000000000000000 --- a/src/csharp/Grpc.Core/ChannelArgs.cs +++ /dev/null @@ -1,115 +0,0 @@ -#region Copyright notice and license -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -#endregion -using System; -using System.Collections.Generic; -using System.Collections.Immutable; -using System.Runtime.InteropServices; -using System.Threading; -using System.Threading.Tasks; -using Grpc.Core.Internal; - -namespace Grpc.Core -{ - /// <summary> - /// gRPC channel options. - /// </summary> - public class ChannelArgs - { - public const string SslTargetNameOverrideKey = "grpc.ssl_target_name_override"; - - readonly ImmutableDictionary<string, string> stringArgs; - - private ChannelArgs(ImmutableDictionary<string, string> stringArgs) - { - this.stringArgs = stringArgs; - } - - public string GetSslTargetNameOverride() - { - string result; - if (stringArgs.TryGetValue(SslTargetNameOverrideKey, out result)) - { - return result; - } - return null; - } - - public static Builder CreateBuilder() - { - return new Builder(); - } - - public class Builder - { - readonly Dictionary<string, string> stringArgs = new Dictionary<string, string>(); - - // TODO: AddInteger not supported yet. - public Builder AddString(string key, string value) - { - stringArgs.Add(key, value); - return this; - } - - public ChannelArgs Build() - { - return new ChannelArgs(stringArgs.ToImmutableDictionary()); - } - } - - /// <summary> - /// Creates native object for the channel arguments. - /// </summary> - /// <returns>The native channel arguments.</returns> - internal ChannelArgsSafeHandle ToNativeChannelArgs() - { - ChannelArgsSafeHandle nativeArgs = null; - try - { - nativeArgs = ChannelArgsSafeHandle.Create(stringArgs.Count); - int i = 0; - foreach (var entry in stringArgs) - { - nativeArgs.SetString(i, entry.Key, entry.Value); - i++; - } - return nativeArgs; - } - catch (Exception) - { - if (nativeArgs != null) - { - nativeArgs.Dispose(); - } - throw; - } - } - } -} diff --git a/src/csharp/Grpc.Core/ChannelOptions.cs b/src/csharp/Grpc.Core/ChannelOptions.cs new file mode 100644 index 0000000000000000000000000000000000000000..bc23bb59b103891452be2b4579b43045cf11e542 --- /dev/null +++ b/src/csharp/Grpc.Core/ChannelOptions.cs @@ -0,0 +1,178 @@ +#region Copyright notice and license +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#endregion +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core +{ + /// <summary> + /// Channel option specified when creating a channel. + /// Corresponds to grpc_channel_args from grpc/grpc.h. + /// </summary> + public sealed class ChannelOption + { + public enum OptionType + { + Integer, + String + } + + private readonly OptionType type; + private readonly string name; + private readonly int intValue; + private readonly string stringValue; + + /// <summary> + /// Creates a channel option with a string value. + /// </summary> + /// <param name="name">Name.</param> + /// <param name="stringValue">String value.</param> + public ChannelOption(string name, string stringValue) + { + this.type = OptionType.String; + this.name = Preconditions.CheckNotNull(name); + this.stringValue = Preconditions.CheckNotNull(stringValue); + } + + /// <summary> + /// Creates a channel option with an integer value. + /// </summary> + /// <param name="name">Name.</param> + /// <param name="stringValue">String value.</param> + public ChannelOption(string name, int intValue) + { + this.type = OptionType.Integer; + this.name = Preconditions.CheckNotNull(name); + this.intValue = intValue; + } + + public OptionType Type + { + get + { + return type; + } + } + + public string Name + { + get + { + return name; + } + } + + public int IntValue + { + get + { + Preconditions.CheckState(type == OptionType.Integer); + return intValue; + } + } + + public string StringValue + { + get + { + Preconditions.CheckState(type == OptionType.String); + return stringValue; + } + } + } + + public static class ChannelOptions + { + // Override SSL target check. Only to be used for testing. + public const string SslTargetNameOverride = "grpc.ssl_target_name_override"; + + // Enable census for tracing and stats collection + public const string Census = "grpc.census"; + + // Maximum number of concurrent incoming streams to allow on a http2 connection + public const string MaxConcurrentStreams = "grpc.max_concurrent_streams"; + + // Maximum message length that the channel can receive + public const string MaxMessageLength = "grpc.max_message_length"; + + // Initial sequence number for http2 transports + public const string Http2InitialSequenceNumber = "grpc.http2.initial_sequence_number"; + + /// <summary> + /// Creates native object for a collection of channel options. + /// </summary> + /// <returns>The native channel arguments.</returns> + internal static ChannelArgsSafeHandle CreateChannelArgs(IEnumerable<ChannelOption> options) + { + if (options == null) + { + return ChannelArgsSafeHandle.CreateNull(); + } + var optionList = new List<ChannelOption>(options); // It's better to do defensive copy + ChannelArgsSafeHandle nativeArgs = null; + try + { + nativeArgs = ChannelArgsSafeHandle.Create(optionList.Count); + for (int i = 0; i < optionList.Count; i++) + { + var option = optionList[i]; + if (option.Type == ChannelOption.OptionType.Integer) + { + nativeArgs.SetInteger(i, option.Name, option.IntValue); + } + else if (option.Type == ChannelOption.OptionType.String) + { + nativeArgs.SetString(i, option.Name, option.StringValue); + } + else + { + throw new InvalidOperationException("Unknown option type"); + } + } + return nativeArgs; + } + catch (Exception) + { + if (nativeArgs != null) + { + nativeArgs.Dispose(); + } + throw; + } + } + } +} diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 5c7b9a8bb672215a9b07481ad30aed80db13ecaf..a36a6a5acc813a4e08220ccfa96b0b941c3fec2f 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -5,8 +5,6 @@ <PropertyGroup> <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> - <ProductVersion>8.0.30703</ProductVersion> - <SchemaVersion>2.0</SchemaVersion> <ProjectGuid>{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}</ProjectGuid> <OutputType>Library</OutputType> <RootNamespace>Grpc.Core</RootNamespace> @@ -78,7 +76,6 @@ <Compile Include="Internal\CredentialsSafeHandle.cs" /> <Compile Include="Credentials.cs" /> <Compile Include="Internal\ChannelArgsSafeHandle.cs" /> - <Compile Include="ChannelArgs.cs" /> <Compile Include="Internal\AsyncCompletion.cs" /> <Compile Include="Internal\AsyncCallBase.cs" /> <Compile Include="Internal\AsyncCallServer.cs" /> @@ -103,6 +100,7 @@ <Compile Include="Internal\CompletionQueueEvent.cs" /> <Compile Include="Internal\CompletionRegistry.cs" /> <Compile Include="Internal\BatchContextSafeHandle.cs" /> + <Compile Include="ChannelOptions.cs" /> </ItemGroup> <ItemGroup> <None Include="packages.config" /> @@ -132,4 +130,4 @@ </Target> <Import Project="..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets" Condition="Exists('..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets')" /> <Import Project="..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets" Condition="Exists('..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets')" /> -</Project> \ No newline at end of file +</Project> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index db1b86937f89a1e60972ed2cd829967ceefa7926..4f510ba40ac83d0912d2c0c23cb8d6dae513aed2 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -107,6 +107,7 @@ namespace Grpc.Core.Internal call.StartSendStatusFromServer(status, HandleHalfclosed); halfcloseRequested = true; + readingDone = true; sendCompletionDelegate = completionDelegate; } } diff --git a/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs index c69f1a0d0255b5a449ab9f8e2edff29f9c2206c4..c12aec5a3a4667661216bd51df5c7a87d311ea02 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs @@ -45,6 +45,9 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)] static extern void grpcsharp_channel_args_set_string(ChannelArgsSafeHandle args, UIntPtr index, string key, string value); + [DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)] + static extern void grpcsharp_channel_args_set_integer(ChannelArgsSafeHandle args, UIntPtr index, string key, int value); + [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_channel_args_destroy(IntPtr args); @@ -67,6 +70,11 @@ namespace Grpc.Core.Internal grpcsharp_channel_args_set_string(this, new UIntPtr((uint)index), key, value); } + public void SetInteger(int index, string key, int value) + { + grpcsharp_channel_args_set_integer(this, new UIntPtr((uint)index), key, value); + } + protected override bool ReleaseHandle() { grpcsharp_channel_args_destroy(handle); diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index f494d9e0ffcae18f3825131084c7d68537f0b629..c0e5bae13f0901c6d773759cfdf7e410b4dfc999 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -267,8 +267,6 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method.")); - // TODO(jtattermusch): if we don't read what client has sent, the server call never gets disposed. - await requestStream.ToList(); await finishedTask; } } diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index eda9afcb87e113659f0b45d03765b133f9894ad3..83dbb910aa7498a66d7dd5b3d8eee26892fd7d21 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -45,7 +45,7 @@ namespace Grpc.Core.Internal internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid { [DllImport("grpc_csharp_ext.dll")] - static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); + static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args); [DllImport("grpc_csharp_ext.dll")] static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr); @@ -72,7 +72,7 @@ namespace Grpc.Core.Internal { } - public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, IntPtr args) + public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args) { return grpcsharp_server_create(cq, args); } diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 3352fc93a12156571d1005e1c7002e3419ee504a..8e818885d181a1dc3a3903480b350a728ad7086a 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -61,9 +61,16 @@ namespace Grpc.Core bool startRequested; bool shutdownRequested; - public Server() + /// <summary> + /// Create a new server. + /// </summary> + /// <param name="options">Channel options.</param> + public Server(IEnumerable<ChannelOption> options = null) { - this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); + using (var channelArgs = ChannelOptions.CreateChannelArgs(options)) + { + this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), channelArgs); + } } /// <summary> diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 66171fae576532cb49b2e4e49df9950269bfb4cd..f0be522bc6e8f604f26583d02040ee443e40077f 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -110,14 +110,16 @@ namespace Grpc.IntegrationTesting credentials = TestCredentials.CreateTestClientCredentials(options.useTestCa); } - ChannelArgs channelArgs = null; + List<ChannelOption> channelOptions = null; if (!string.IsNullOrEmpty(options.serverHostOverride)) { - channelArgs = ChannelArgs.CreateBuilder() - .AddString(ChannelArgs.SslTargetNameOverrideKey, options.serverHostOverride).Build(); + channelOptions = new List<ChannelOption> + { + new ChannelOption(ChannelOptions.SslTargetNameOverride, options.serverHostOverride) + }; } - using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelArgs)) + using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions)) { var stubConfig = StubConfiguration.Default; if (options.testCase == "service_account_creds" || options.testCase == "compute_engine_creds") diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs index f756dfbc409b4ddc6ca56df9640c55dfefc69c01..1a733450c1a5d0fcd27de45016732e157a9ad92c 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs @@ -62,10 +62,11 @@ namespace Grpc.IntegrationTesting int port = server.AddListeningPort(host, Server.PickUnusedPort, TestCredentials.CreateTestServerCredentials()); server.Start(); - var channelArgs = ChannelArgs.CreateBuilder() - .AddString(ChannelArgs.SslTargetNameOverrideKey, TestCredentials.DefaultHostOverride).Build(); - - channel = new Channel(host, port, TestCredentials.CreateTestClientCredentials(true), channelArgs); + var options = new List<ChannelOption> + { + new ChannelOption(ChannelOptions.SslTargetNameOverride, TestCredentials.DefaultHostOverride) + }; + channel = new Channel(host, port, TestCredentials.CreateTestClientCredentials(true), options); client = TestService.NewStub(channel); } diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index c2a0b729d45cd2347deae3d3d8d96f95db82ddb6..540f92674471479f0e54be32aceac480f23a4e23 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -354,6 +354,16 @@ grpcsharp_channel_args_set_string(grpc_channel_args *args, size_t index, args->args[index].value.string = gpr_strdup(value); } +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_channel_args_set_integer(grpc_channel_args *args, size_t index, + const char *key, int value) { + GPR_ASSERT(args); + GPR_ASSERT(index < args->num_args); + args->args[index].type = GRPC_ARG_INTEGER; + args->args[index].key = gpr_strdup(key); + args->args[index].value.integer = value; +} + GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_args_destroy(grpc_channel_args *args) { size_t i; diff --git a/src/python/src/grpc/_adapter/_c/types/server.c b/src/python/src/grpc/_adapter/_c/types/server.c index 65d84b58fe8e3539f39f2124d2dfbaf6755fa26e..2a00f34039c95691628b9125f8944fe86c406fec 100644 --- a/src/python/src/grpc/_adapter/_c/types/server.c +++ b/src/python/src/grpc/_adapter/_c/types/server.c @@ -105,6 +105,7 @@ Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) } self = (Server *)type->tp_alloc(type, 0); self->c_serv = grpc_server_create(&c_args); + grpc_server_register_completion_queue(self->c_serv, cq->c_cq); pygrpc_discard_channel_args(c_args); self->cq = cq; Py_INCREF(self->cq); @@ -167,17 +168,13 @@ PyObject *pygrpc_Server_start(Server *self, PyObject *ignored) { PyObject *pygrpc_Server_shutdown( Server *self, PyObject *args, PyObject *kwargs) { - PyObject *user_tag = NULL; + PyObject *user_tag; pygrpc_tag *tag; static char *keywords[] = {"tag", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|O", keywords, &user_tag)) { + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", keywords, &user_tag)) { return NULL; } - if (user_tag) { - tag = pygrpc_produce_server_shutdown_tag(user_tag); - grpc_server_shutdown_and_notify(self->c_serv, tag); - } else { - grpc_server_shutdown(self->c_serv); - } + tag = pygrpc_produce_server_shutdown_tag(user_tag); + grpc_server_shutdown_and_notify(self->c_serv, self->cq->c_cq, tag); Py_RETURN_NONE; } diff --git a/src/python/src/grpc/_adapter/_c/utility.c b/src/python/src/grpc/_adapter/_c/utility.c index e3139f28874e4635f06504b355c8819c906d58eb..cfb82708329860749a7dc8e1b773fa9bf630f343 100644 --- a/src/python/src/grpc/_adapter/_c/utility.c +++ b/src/python/src/grpc/_adapter/_c/utility.c @@ -41,6 +41,7 @@ #include <grpc/support/slice.h> #include <grpc/support/time.h> #include <grpc/support/string_util.h> +#include <grpc/support/log.h> #include "grpc/_adapter/_c/types.h" @@ -123,7 +124,8 @@ PyObject *pygrpc_consume_event(grpc_event event) { event.success ? Py_True : Py_False); } else { result = Py_BuildValue("iOOONO", GRPC_OP_COMPLETE, tag->user_tag, - tag->call, Py_None, pygrpc_consume_ops(tag->ops, tag->nops), + tag->call ? tag->call : Py_None, Py_None, + pygrpc_consume_ops(tag->ops, tag->nops), event.success ? Py_True : Py_False); } break; diff --git a/src/python/src/grpc/_adapter/_intermediary_low.py b/src/python/src/grpc/_adapter/_intermediary_low.py index a6e325c4e5b63e62b029ea53c8b50057647cd6ba..6b96aef1d34e0a99c309b50dc17bfd98060b1645 100644 --- a/src/python/src/grpc/_adapter/_intermediary_low.py +++ b/src/python/src/grpc/_adapter/_intermediary_low.py @@ -100,7 +100,7 @@ class _TagAdapter(collections.namedtuple('_TagAdapter', [ class Call(object): """Adapter from old _low.Call interface to new _low.Call.""" - + def __init__(self, channel, completion_queue, method, host, deadline): self._internal = channel._internal.create_call( completion_queue._internal, method, host, deadline) @@ -207,7 +207,7 @@ class CompletionQueue(object): complete_accepted = ev.success if kind == Event.Kind.COMPLETE_ACCEPTED else None service_acceptance = ServiceAcceptance(Call._from_internal(ev.call), ev.call_details.method, ev.call_details.host, ev.call_details.deadline) if kind == Event.Kind.SERVICE_ACCEPTED else None message_bytes = ev.results[0].message if kind == Event.Kind.READ_ACCEPTED else None - status = Status(ev.results[0].status.code, ev.results[0].status.details) if (kind == Event.Kind.FINISH and ev.results[0].status) else Status(_types.StatusCode.CANCELLED if ev.results[0].cancelled else _types.StatusCode.OK, '') if ev.results[0].cancelled is not None else None + status = Status(ev.results[0].status.code, ev.results[0].status.details) if (kind == Event.Kind.FINISH and ev.results[0].status) else Status(_types.StatusCode.CANCELLED if ev.results[0].cancelled else _types.StatusCode.OK, '') if len(ev.results) > 0 and ev.results[0].cancelled is not None else None metadata = ev.results[0].initial_metadata if (kind in [Event.Kind.SERVICE_ACCEPTED, Event.Kind.METADATA_ACCEPTED]) else (ev.results[0].trailing_metadata if kind == Event.Kind.FINISH else None) else: raise RuntimeError('unknown event') @@ -241,7 +241,7 @@ class Server(object): return self._internal.request_call(self._internal_cq, _TagAdapter(tag, Event.Kind.SERVICE_ACCEPTED)) def stop(self): - return self._internal.shutdown() + return self._internal.shutdown(_TagAdapter(None, Event.Kind.STOP)) class ClientCredentials(object): @@ -253,6 +253,6 @@ class ClientCredentials(object): class ServerCredentials(object): """Adapter from old _low.ServerCredentials interface to new _low.ServerCredentials.""" - + def __init__(self, root_credentials, pair_sequence): self._internal = _low.ServerCredentials.ssl(root_credentials, list(pair_sequence)) diff --git a/src/python/src/grpc/_adapter/_intermediary_low_test.py b/src/python/src/grpc/_adapter/_intermediary_low_test.py index 6ff51c43a69ae02ade3de7c6aac228beef6fed65..478346341b08d62a70dffb82bea474f037bcf249 100644 --- a/src/python/src/grpc/_adapter/_intermediary_low_test.py +++ b/src/python/src/grpc/_adapter/_intermediary_low_test.py @@ -94,14 +94,6 @@ class EchoTest(unittest.TestCase): def tearDown(self): self.server.stop() - # NOTE(nathaniel): Yep, this is weird; it's a consequence of - # grpc_server_destroy's being what has the effect of telling the server's - # completion queue to pump out all pending events/tags immediately rather - # than gracefully completing all outstanding RPCs while accepting no new - # ones. - # TODO(nathaniel): Deallocation of a Python object shouldn't have this kind - # of observable side effect let alone such an important one. - del self.server self.server_completion_queue.stop() self.client_completion_queue.stop() while True: @@ -114,6 +106,7 @@ class EchoTest(unittest.TestCase): break self.server_completion_queue = None self.client_completion_queue = None + del self.server def _perform_echo_test(self, test_data): method = 'test method' @@ -316,7 +309,6 @@ class CancellationTest(unittest.TestCase): def tearDown(self): self.server.stop() - del self.server self.server_completion_queue.stop() self.client_completion_queue.stop() while True: @@ -327,6 +319,7 @@ class CancellationTest(unittest.TestCase): event = self.client_completion_queue.get(0) if event is not None and event.kind is _low.Event.Kind.STOP: break + del self.server def testCancellation(self): method = 'test method' diff --git a/src/python/src/grpc/_adapter/_low.py b/src/python/src/grpc/_adapter/_low.py index 0c1d3b40a5b2df1f8e8f6c8e36e59bde0849b2c8..dcf67dbc11732f5abc9dd7ea31b14ccc042b6ed2 100644 --- a/src/python/src/grpc/_adapter/_low.py +++ b/src/python/src/grpc/_adapter/_low.py @@ -101,11 +101,8 @@ class Server(_types.Server): def start(self): return self.server.start() - def shutdown(self, tag=_NO_TAG): - if tag is _NO_TAG: - return self.server.shutdown() - else: - return self.server.shutdown(tag) + def shutdown(self, tag=None): + return self.server.shutdown(tag) def request_call(self, completion_queue, tag): return self.server.request_call(completion_queue.completion_queue, tag) diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py index e53b176caf95562d69379a128cd4a74c0a082d11..268e5fe765b1b543c08cf0c87a170932d11e29e9 100644 --- a/src/python/src/grpc/_adapter/_low_test.py +++ b/src/python/src/grpc/_adapter/_low_test.py @@ -27,6 +27,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import threading import time import unittest @@ -34,6 +35,33 @@ from grpc._adapter import _types from grpc._adapter import _low +def WaitForEvents(completion_queues, deadline): + """ + Args: + completion_queues: list of completion queues to wait for events on + deadline: absolute deadline to wait until + + Returns: + a sequence of events of length len(completion_queues). + """ + + results = [None] * len(completion_queues) + lock = threading.Lock() + threads = [] + def set_ith_result(i, completion_queue): + result = completion_queue.next(deadline) + with lock: + print i, completion_queue, result, time.time() - deadline + results[i] = result + for i, completion_queue in enumerate(completion_queues): + thread = threading.Thread(target=set_ith_result, + args=[i, completion_queue]) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + return results + class InsecureServerInsecureClient(unittest.TestCase): def setUp(self): @@ -48,7 +76,6 @@ class InsecureServerInsecureClient(unittest.TestCase): def tearDown(self): self.server.shutdown() del self.client_channel - del self.server self.client_completion_queue.shutdown() while self.client_completion_queue.next().type != _types.EventType.QUEUE_SHUTDOWN: @@ -59,6 +86,7 @@ class InsecureServerInsecureClient(unittest.TestCase): del self.client_completion_queue del self.server_completion_queue + del self.server def testEcho(self): DEADLINE = time.time()+5 @@ -95,7 +123,8 @@ class InsecureServerInsecureClient(unittest.TestCase): ], client_call_tag) self.assertEquals(_types.CallError.OK, client_start_batch_result) - request_event = self.server_completion_queue.next(DEADLINE) + client_no_event, request_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 2) + self.assertEquals(client_no_event, None) self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type) self.assertIsInstance(request_event.call, _low.Call) self.assertIs(server_request_tag, request_event.tag) @@ -118,8 +147,7 @@ class InsecureServerInsecureClient(unittest.TestCase): ], server_call_tag) self.assertEquals(_types.CallError.OK, server_start_batch_result) - client_event = self.client_completion_queue.next(DEADLINE) - server_event = self.server_completion_queue.next(DEADLINE) + client_event, server_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 1) self.assertEquals(6, len(client_event.results)) found_client_op_types = set() diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index 1142c37ea4f23277d75ec626bded53bb7b23d92f..e5310a469506f7c4dfb817c930b2f33690936ea6 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -177,7 +177,7 @@ void test_connect(const char *server_host, const char *client_host, int port, GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); grpc_call_destroy(s); } else { diff --git a/test/core/end2end/tests/census_simple_request.c b/test/core/end2end/tests/census_simple_request.c index f3a46e23c9c1487062389243202e553cd4f08838..3aa3018de0edc2f53fda162e33fbe94094a5a7b8 100644 --- a/test/core/end2end/tests/census_simple_request.c +++ b/test/core/end2end/tests/census_simple_request.c @@ -164,7 +164,7 @@ static void test_body(grpc_end2end_test_fixture f) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/disappearing_server.c b/test/core/end2end/tests/disappearing_server.c index d962f870b01f18354ecfc3297386cc381ab88dff..a93ac3192476b028b83e25b54aeee6c546fc7fa6 100644 --- a/test/core/end2end/tests/disappearing_server.c +++ b/test/core/end2end/tests/disappearing_server.c @@ -157,7 +157,7 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f, GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/graceful_server_shutdown.c b/test/core/end2end/tests/graceful_server_shutdown.c index c1fc6fb4c5c124fdd2f45ac89842dbe9de3c882a..82b20fb3d7cde6b6634c48c4dd9c722f985a9022 100644 --- a/test/core/end2end/tests/graceful_server_shutdown.c +++ b/test/core/end2end/tests/graceful_server_shutdown.c @@ -173,7 +173,7 @@ static void test_early_server_shutdown_finishes_inflight_calls( GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c index 4c551987629058a311096edc195d5b3cfdc9782a..cc976639d22442838b66eec1faa38f7315a26515 100644 --- a/test/core/end2end/tests/invoke_large_request.c +++ b/test/core/end2end/tests/invoke_large_request.c @@ -198,7 +198,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c index 8cfa15a9bc85a6f1f8b4c1e97069118a94d44d53..792d757f7574c65cf38da945fb3c2fc020f88415 100644 --- a/test/core/end2end/tests/max_concurrent_streams.c +++ b/test/core/end2end/tests/max_concurrent_streams.c @@ -166,7 +166,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/registered_call.c b/test/core/end2end/tests/registered_call.c index 70c70e9c4079d68a4d5daf2008cb0e93f7913373..ee7de233c15f1cf044e5f38f517616ba4be4fc56 100644 --- a/test/core/end2end/tests/registered_call.c +++ b/test/core/end2end/tests/registered_call.c @@ -167,7 +167,7 @@ static void simple_request_body(grpc_end2end_test_fixture f, void *rc) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/server_finishes_request.c b/test/core/end2end/tests/server_finishes_request.c index fe8657a2cc85d884f66d81e8dcb021da02b36c8e..b3386d52920d08f5328e8b0177980adb3356bcb1 100644 --- a/test/core/end2end/tests/server_finishes_request.c +++ b/test/core/end2end/tests/server_finishes_request.c @@ -166,7 +166,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c index f399d0ea9b494569c0f03935b85e0efa33ceb483..5d36172ba1dcd723599a1c20626f3d649f2d48b1 100644 --- a/test/core/end2end/tests/simple_delayed_request.c +++ b/test/core/end2end/tests/simple_delayed_request.c @@ -162,7 +162,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index 2246ad540c7d05d7312e962bb03b9e542b0241d1..a2df20d20f9887b3c3e03a5e5fac1ccb7edcd2b4 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -168,7 +168,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c index ff00a5aa6e3b93caf59f7c5b22018c11d8697d24..5cd484d42fa21c3d27c5d7ad728b28d5d59063ae 100644 --- a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c +++ b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c @@ -168,7 +168,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); - GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(was_cancelled == 1); gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); diff --git a/test/cpp/qps/qps_test_with_poll.cc b/test/cpp/qps/qps_test_with_poll.cc new file mode 100644 index 0000000000000000000000000000000000000000..90a8da8d1109fbc76a612f4e33e2bf831f2592a6 --- /dev/null +++ b/test/cpp/qps/qps_test_with_poll.cc @@ -0,0 +1,90 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <set> + +#include <grpc/support/log.h> + +#include <signal.h> + +#include "test/cpp/qps/driver.h" +#include "test/cpp/qps/report.h" +#include "test/cpp/util/benchmark_config.h" + +extern "C" { +#include "src/core/iomgr/pollset_posix.h" +} + +namespace grpc { +namespace testing { + +static const int WARMUP = 5; +static const int BENCHMARK = 5; + +static void RunQPS() { + gpr_log(GPR_INFO, "Running QPS test"); + + ClientConfig client_config; + client_config.set_client_type(ASYNC_CLIENT); + client_config.set_enable_ssl(false); + client_config.set_outstanding_rpcs_per_channel(1000); + client_config.set_client_channels(8); + client_config.set_payload_size(1); + client_config.set_async_client_threads(8); + client_config.set_rpc_type(UNARY); + + ServerConfig server_config; + server_config.set_server_type(ASYNC_SERVER); + server_config.set_enable_ssl(false); + server_config.set_threads(4); + + const auto result = + RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); + + GetReporter()->ReportQPSPerCore(*result); + GetReporter()->ReportLatency(*result); +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::InitBenchmark(&argc, &argv, true); + + grpc_platform_become_multipoller = grpc_poll_become_multipoller; + + signal(SIGPIPE, SIG_IGN); + grpc::testing::RunQPS(); + + return 0; +} diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index d34023a1c3c4c09bcd1bc691b6a92b4c2eaf6a83..77b9f4b5f8572f828bca00f609166a70a71ff2e6 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -50,6 +50,9 @@ ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..')) os.chdir(ROOT) +_FORCE_ENVIRON_FOR_WRAPPERS = {} + + # SimpleConfig: just compile with CONFIG=config, and run the binary to test class SimpleConfig(object): @@ -146,7 +149,7 @@ class NodeLanguage(object): def test_specs(self, config, travis): return [config.job_spec(['tools/run_tests/run_node.sh'], None, - environ={'GRPC_TRACE': 'surface,batch'})] + environ=_FORCE_ENVIRON_FOR_WRAPPERS)] def make_targets(self): return ['static_c', 'shared_c'] @@ -165,7 +168,7 @@ class PhpLanguage(object): def test_specs(self, config, travis): return [config.job_spec(['src/php/bin/run_tests.sh'], None, - environ={'GRPC_TRACE': 'surface,batch'})] + environ=_FORCE_ENVIRON_FOR_WRAPPERS)] def make_targets(self): return ['static_c', 'shared_c'] @@ -190,13 +193,13 @@ class PythonLanguage(object): modules = [config.job_spec(['tools/run_tests/run_python.sh', '-m', test['module']], None, - environ={'GRPC_TRACE': 'surface,batch'}, + environ=_FORCE_ENVIRON_FOR_WRAPPERS, shortname=test['module']) for test in self._tests if 'module' in test] files = [config.job_spec(['tools/run_tests/run_python.sh', test['file']], None, - environ={'GRPC_TRACE': 'surface,batch'}, + environ=_FORCE_ENVIRON_FOR_WRAPPERS, shortname=test['file']) for test in self._tests if 'file' in test] return files + modules @@ -218,7 +221,7 @@ class RubyLanguage(object): def test_specs(self, config, travis): return [config.job_spec(['tools/run_tests/run_ruby.sh'], None, - environ={'GRPC_TRACE': 'surface,batch'})] + environ=_FORCE_ENVIRON_FOR_WRAPPERS)] def make_targets(self): return ['run_dep_checks'] @@ -251,7 +254,7 @@ class CSharpLanguage(object): cmd = 'tools/run_tests/run_csharp.sh' return [config.job_spec([cmd, assembly], None, shortname=assembly, - environ={'GRPC_TRACE': 'surface,batch'}) + environ=_FORCE_ENVIRON_FOR_WRAPPERS) for assembly in assemblies ] def make_targets(self): @@ -402,6 +405,9 @@ run_configs = set(_CONFIGS[cfg] for x in args.config)) build_configs = set(cfg.build_config for cfg in run_configs) +if args.travis: + _FORCE_ENVIRON_FOR_WRAPPERS = {'GRPC_TRACE': 'surface,batch'} + make_targets = [] languages = set(_LANGUAGES[l] for l in itertools.chain.from_iterable(