diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx index 51c46681388f43e6ed720d2da557fce7f8101167..1c07f9f4f4c6870dd3451844a188eee58d998e0b 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx @@ -53,24 +53,24 @@ cdef class Call: self.c_call, cy_operations.c_ops, cy_operations.c_nops, <cpython.PyObject *>operation_tag, NULL) - def cancel(self, - grpc.grpc_status_code error_code=grpc.GRPC_STATUS__DO_NOT_USE, - details=None): + def cancel( + self, grpc.grpc_status_code error_code=grpc.GRPC_STATUS__DO_NOT_USE, + details=None): if not self.is_valid: raise ValueError("invalid call object cannot be used from Python") if (details is None) != (error_code == grpc.GRPC_STATUS__DO_NOT_USE): raise ValueError("if error_code is specified, so must details " "(and vice-versa)") - if isinstance(details, bytes): - pass - elif isinstance(details, basestring): - details = details.encode() - else: - raise TypeError("expected details to be str or bytes") if error_code != grpc.GRPC_STATUS__DO_NOT_USE: + if isinstance(details, bytes): + pass + elif isinstance(details, basestring): + details = details.encode() + else: + raise TypeError("expected details to be str or bytes") self.references.append(details) - return grpc.grpc_call_cancel_with_status(self.c_call, error_code, details, - NULL) + return grpc.grpc_call_cancel_with_status( + self.c_call, error_code, details, NULL) else: return grpc.grpc_call_cancel(self.c_call, NULL) @@ -79,6 +79,12 @@ cdef class Call: return grpc.grpc_call_set_credentials( self.c_call, call_credentials.c_credentials) + def peer(self): + cdef char *peer = grpc.grpc_call_get_peer(self.c_call) + result = <bytes>peer + grpc.gpr_free(peer) + return result + def __dealloc__(self): if self.c_call != NULL: grpc.grpc_call_destroy(self.c_call) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx index e25db3e2a442095aec7c367afcc6b262e6a0474d..a944a83576c5dfe393124af74df9573e74ba5295 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx @@ -27,6 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +cimport cpython + from grpc._cython._cygrpc cimport call from grpc._cython._cygrpc cimport completion_queue from grpc._cython._cygrpc cimport credentials @@ -70,12 +72,16 @@ cdef class Channel: method = method.encode() else: raise TypeError("expected method to be str or bytes") - if isinstance(host, bytes): + cdef char *host_c_string = NULL + if host is None: pass + elif isinstance(host, bytes): + host_c_string = host elif isinstance(host, basestring): host = host.encode() + host_c_string = host else: - raise TypeError("expected host to be str or bytes") + raise TypeError("expected host to be str, bytes, or None") cdef call.Call operation_call = call.Call() operation_call.references = [self, method, host, queue] cdef grpc.grpc_call *parent_call = NULL @@ -83,10 +89,29 @@ cdef class Channel: parent_call = parent.c_call operation_call.c_call = grpc.grpc_channel_create_call( self.c_channel, parent_call, flags, - queue.c_completion_queue, method, host, deadline.c_time, + queue.c_completion_queue, method, host_c_string, deadline.c_time, NULL) return operation_call + def check_connectivity_state(self, bint try_to_connect): + return grpc.grpc_channel_check_connectivity_state(self.c_channel, + try_to_connect) + + def watch_connectivity_state( + self, last_observed_state, records.Timespec deadline not None, + completion_queue.CompletionQueue queue not None, tag): + cdef records.OperationTag operation_tag = records.OperationTag(tag) + cpython.Py_INCREF(operation_tag) + grpc.grpc_channel_watch_connectivity_state( + self.c_channel, last_observed_state, deadline.c_time, + queue.c_completion_queue, <cpython.PyObject *>operation_tag) + + def target(self): + cdef char * target = grpc.grpc_channel_get_target(self.c_channel) + result = <bytes>target + grpc.gpr_free(target) + return result + def __dealloc__(self): if self.c_channel != NULL: grpc.grpc_channel_destroy(self.c_channel) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx index a7a265eab7d34d66a9126f9a917f75702376f093..2cf49707b48dbb83eb69da6194e331c1879f6af9 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx @@ -62,6 +62,8 @@ cdef class CompletionQueue: cdef grpc.grpc_event event # Poll within a critical section + # TODO consider making queue polling contention a hard error to enable + # easier bug discovery with self.poll_condition: while self.is_polling: self.poll_condition.wait(float(deadline) - time.time()) @@ -74,10 +76,12 @@ cdef class CompletionQueue: self.poll_condition.notify() if event.type == grpc.GRPC_QUEUE_TIMEOUT: - return records.Event(event.type, False, None, None, None, None, None) + return records.Event( + event.type, False, None, None, None, None, False, None) elif event.type == grpc.GRPC_QUEUE_SHUTDOWN: self.is_shutdown = True - return records.Event(event.type, True, None, None, None, None, None) + return records.Event( + event.type, True, None, None, None, None, False, None) else: if event.tag != NULL: tag = <records.OperationTag>event.tag @@ -97,7 +101,8 @@ cdef class CompletionQueue: operation_call.references.extend(tag.references) return records.Event( event.type, event.success, user_tag, operation_call, - request_call_details, request_metadata, batch_operations) + request_call_details, request_metadata, tag.is_new_request, + batch_operations) def shutdown(self): grpc.grpc_completion_queue_shutdown(self.c_completion_queue) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx index e9836fec2cf7ed08446373d67634a56d4e57dafd..e6a22e76250096894712d108546a351bf603ccbf 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx @@ -71,6 +71,7 @@ cdef class ServerCredentials: def __cinit__(self): self.c_credentials = NULL + self.references = [] def __dealloc__(self): if self.c_credentials != NULL: @@ -83,7 +84,7 @@ def channel_credentials_google_default(): return credentials def channel_credentials_ssl(pem_root_certificates, - records.SslPemKeyCertPair ssl_pem_key_cert_pair): + records.SslPemKeyCertPair ssl_pem_key_cert_pair): if pem_root_certificates is None: pass elif isinstance(pem_root_certificates, bytes): @@ -104,6 +105,7 @@ def channel_credentials_ssl(pem_root_certificates, else: credentials.c_credentials = grpc.grpc_ssl_credentials_create( c_pem_root_certificates, NULL, NULL) + return credentials def channel_credentials_composite( ChannelCredentials credentials_1 not None, @@ -135,7 +137,6 @@ def call_credentials_google_compute_engine(): grpc.grpc_google_compute_engine_credentials_create(NULL)) return credentials -#TODO rename to something like client_credentials_service_account_jwt_access. def call_credentials_service_account_jwt_access( json_key, records.Timespec token_lifetime not None): if isinstance(json_key, bytes): @@ -186,12 +187,14 @@ def call_credentials_google_iam(authorization_token, authority_selector): def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs, bint force_client_auth): + cdef char *c_pem_root_certs = NULL if pem_root_certs is None: pass elif isinstance(pem_root_certs, bytes): - pass + c_pem_root_certs = pem_root_certs elif isinstance(pem_root_certs, basestring): pem_root_certs = pem_root_certs.encode() + c_pem_root_certs = pem_root_certs else: raise TypeError("expected pem_root_certs to be str or bytes") pem_key_cert_pairs = list(pem_key_cert_pairs) @@ -212,7 +215,7 @@ def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs, credentials.c_ssl_pem_key_cert_pairs[i] = ( (<records.SslPemKeyCertPair>pem_key_cert_pairs[i]).c_pair) credentials.c_credentials = grpc.grpc_ssl_server_credentials_create( - pem_root_certs, credentials.c_ssl_pem_key_cert_pairs, + c_pem_root_certs, credentials.c_ssl_pem_key_cert_pairs, credentials.c_ssl_pem_key_cert_pairs_count, force_client_auth, NULL) return credentials diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd index 36aea81a6c0d8de77d6a187426d9b096c76f7638..054ac7796a7e13e7e248285e49287af52de4235d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd @@ -132,6 +132,22 @@ cdef extern from "grpc/byte_buffer.h": cdef extern from "grpc/grpc.h": + const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING + const char *GRPC_ARG_ENABLE_CENSUS + const char *GRPC_ARG_MAX_CONCURRENT_STREAMS + const char *GRPC_ARG_MAX_MESSAGE_LENGTH + const char *GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER + const char *GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER + const char *GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER + const char *GRPC_ARG_DEFAULT_AUTHORITY + const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING + const char *GRPC_ARG_SECONDARY_USER_AGENT_STRING + const char *GRPC_SSL_TARGET_NAME_OVERRIDE_ARG + + const int GRPC_WRITE_BUFFER_HINT + const int GRPC_WRITE_NO_COMPRESS + const int GRPC_WRITE_USED_MASK + ctypedef struct grpc_completion_queue: # We don't care about the internals (and in fact don't know them) pass @@ -149,9 +165,9 @@ cdef extern from "grpc/grpc.h": pass ctypedef enum grpc_arg_type: - grpc_arg_string "GRPC_ARG_STRING" - grpc_arg_integer "GRPC_ARG_INTEGER" - grpc_arg_pointer "GRPC_ARG_POINTER" + GRPC_ARG_STRING + GRPC_ARG_INTEGER + GRPC_ARG_POINTER ctypedef struct grpc_arg_value_pointer: void *address "p" @@ -185,6 +201,13 @@ cdef extern from "grpc/grpc.h": GRPC_CALL_ERROR_INVALID_FLAGS GRPC_CALL_ERROR_INVALID_METADATA + ctypedef enum grpc_connectivity_state: + GRPC_CHANNEL_IDLE + GRPC_CHANNEL_CONNECTING + GRPC_CHANNEL_READY + GRPC_CHANNEL_TRANSIENT_FAILURE + GRPC_CHANNEL_FATAL_FAILURE + ctypedef struct grpc_metadata: const char *key const char *value @@ -279,9 +302,9 @@ cdef extern from "grpc/grpc.h": grpc_status_code status, const char *description, void *reserved) + char *grpc_call_get_peer(grpc_call *call) void grpc_call_destroy(grpc_call *call) - grpc_channel *grpc_insecure_channel_create(const char *target, const grpc_channel_args *args, void *reserved) @@ -291,6 +314,12 @@ cdef extern from "grpc/grpc.h": grpc_completion_queue *completion_queue, const char *method, const char *host, gpr_timespec deadline, void *reserved) + grpc_connectivity_state grpc_channel_check_connectivity_state( + grpc_channel *channel, int try_to_connect) + void grpc_channel_watch_connectivity_state( + grpc_channel *channel, grpc_connectivity_state last_observed_state, + gpr_timespec deadline, grpc_completion_queue *cq, void *tag) + char *grpc_channel_get_target(grpc_channel *channel) void grpc_channel_destroy(grpc_channel *channel) grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd index 9ee487882aa062a063183a8d050bc93e488f0797..4c844e4cb6cd8be385a1ff82f168ff63fdb93464 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd @@ -66,6 +66,7 @@ cdef class Event: cdef readonly call.Call operation_call # For Server.request_call + cdef readonly bint is_new_request cdef readonly CallDetails request_call_details cdef readonly Metadata request_metadata diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx index 8edee09c2d43ed1ab16c672a7203abd7bc3ef097..be89db884653807462a3604a6e8dc0efd558aca1 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx @@ -32,6 +32,32 @@ from grpc._cython._cygrpc cimport call from grpc._cython._cygrpc cimport server +class ConnectivityState: + idle = grpc.GRPC_CHANNEL_IDLE + connecting = grpc.GRPC_CHANNEL_CONNECTING + ready = grpc.GRPC_CHANNEL_READY + transient_failure = grpc.GRPC_CHANNEL_TRANSIENT_FAILURE + fatal_failure = grpc.GRPC_CHANNEL_FATAL_FAILURE + + +class ChannelArgKey: + enable_census = grpc.GRPC_ARG_ENABLE_CENSUS + max_concurrent_streams = grpc.GRPC_ARG_MAX_CONCURRENT_STREAMS + max_message_length = grpc.GRPC_ARG_MAX_MESSAGE_LENGTH + http2_initial_sequence_number = grpc.GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER + http2_hpack_table_size_decoder = grpc.GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER + http2_hpack_table_size_encoder = grpc.GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER + default_authority = grpc.GRPC_ARG_DEFAULT_AUTHORITY + primary_user_agent_string = grpc.GRPC_ARG_PRIMARY_USER_AGENT_STRING + secondary_user_agent_string = grpc.GRPC_ARG_SECONDARY_USER_AGENT_STRING + ssl_target_name_override = grpc.GRPC_SSL_TARGET_NAME_OVERRIDE_ARG + + +class WriteFlag: + buffer_hint = grpc.GRPC_WRITE_BUFFER_HINT + no_compress = grpc.GRPC_WRITE_NO_COMPRESS + + class StatusCode: ok = grpc.GRPC_STATUS_OK cancelled = grpc.GRPC_STATUS_CANCELLED @@ -88,7 +114,10 @@ cdef class Timespec: def __cinit__(self, time): if time is None: self.c_time = grpc.gpr_now(grpc.GPR_CLOCK_REALTIME) - elif isinstance(time, float): + return + if isinstance(time, int): + time = float(time) + if isinstance(time, float): if time == float("+inf"): self.c_time = grpc.gpr_inf_future(grpc.GPR_CLOCK_REALTIME) elif time == float("-inf"): @@ -97,8 +126,11 @@ cdef class Timespec: self.c_time.seconds = time self.c_time.nanoseconds = (time - float(self.c_time.seconds)) * 1e9 self.c_time.clock_type = grpc.GPR_CLOCK_REALTIME + elif isinstance(time, Timespec): + self.c_time = (<Timespec>time).c_time else: - raise TypeError("expected time to be float") + raise TypeError("expected time to be float, int, or Timespec, not {}" + .format(type(time))) @property def seconds(self): @@ -166,6 +198,7 @@ cdef class Event: object tag, call.Call operation_call, CallDetails request_call_details, Metadata request_metadata, + bint is_new_request, Operations batch_operations): self.type = type self.success = success @@ -174,6 +207,7 @@ cdef class Event: self.request_call_details = request_call_details self.request_metadata = request_metadata self.batch_operations = batch_operations + self.is_new_request = is_new_request cdef class ByteBuffer: @@ -186,8 +220,14 @@ cdef class ByteBuffer: pass elif isinstance(data, basestring): data = data.encode() + elif isinstance(data, ByteBuffer): + data = (<ByteBuffer>data).bytes() + if data is None: + self.c_byte_buffer = NULL + return else: - raise TypeError("expected value to be of type str or bytes") + raise TypeError("expected value to be of type str, bytes, or " + "ByteBuffer, not {}".format(type(data))) cdef char *c_data = data data_slice = grpc.gpr_slice_from_copied_buffer(c_data, len(data)) @@ -409,12 +449,22 @@ cdef class Operation: def type(self): return self.c_op.type + @property + def has_status(self): + return self.c_op.type == grpc.GRPC_OP_RECV_STATUS_ON_CLIENT + @property def received_message(self): if self.c_op.type != grpc.GRPC_OP_RECV_MESSAGE: raise TypeError("self must be an operation receiving a message") return self._received_message + @property + def received_message_or_none(self): + if self.c_op.type != grpc.GRPC_OP_RECV_MESSAGE: + return None + return self._received_message + @property def received_metadata(self): if (self.c_op.type != grpc.GRPC_OP_RECV_INITIAL_METADATA and @@ -422,12 +472,25 @@ cdef class Operation: raise TypeError("self must be an operation receiving metadata") return self._received_metadata + @property + def received_metadata_or_none(self): + if (self.c_op.type != grpc.GRPC_OP_RECV_INITIAL_METADATA and + self.c_op.type != grpc.GRPC_OP_RECV_STATUS_ON_CLIENT): + return None + return self._received_metadata + @property def received_status_code(self): if self.c_op.type != grpc.GRPC_OP_RECV_STATUS_ON_CLIENT: raise TypeError("self must be an operation receiving a status code") return self._received_status_code + @property + def received_status_code_or_none(self): + if self.c_op.type != grpc.GRPC_OP_RECV_STATUS_ON_CLIENT: + return None + return self._received_status_code + @property def received_status_details(self): if self.c_op.type != grpc.GRPC_OP_RECV_STATUS_ON_CLIENT: @@ -437,6 +500,15 @@ cdef class Operation: else: return None + @property + def received_status_details_or_none(self): + if self.c_op.type != grpc.GRPC_OP_RECV_STATUS_ON_CLIENT: + return None + if self._received_status_details: + return self._received_status_details + else: + return None + @property def received_cancelled(self): if self.c_op.type != grpc.GRPC_OP_RECV_CLOSE_ON_SERVER: @@ -444,6 +516,12 @@ cdef class Operation: "information") return False if self._received_cancelled == 0 else True + @property + def received_cancelled_or_none(self): + if self.c_op.type != grpc.GRPC_OP_RECV_CLOSE_ON_SERVER: + return None + return False if self._received_cancelled == 0 else True + def __dealloc__(self): # We *almost* don't need to do anything; most of the objects are handled by # Python. The remaining one(s) are primitive fields filled in by GRPC core. diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx index 6d20d2910c73bef8c2be77fe3991781523426a03..46df8bf77f8d1228f842b67fdbe68d8fbd12ab4c 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx @@ -132,7 +132,7 @@ cdef class Server: def cancel_all_calls(self): if not self.is_shutting_down: - raise ValueError("the server must be shutting down to cancel all calls") + raise RuntimeError("the server must be shutting down to cancel all calls") elif self.is_shutdown: return else: diff --git a/src/python/grpcio/grpc/_cython/adapter_low.py b/src/python/grpcio/grpc/_cython/adapter_low.py index 4f24da330f5e2141cd324cd614f7d22f821169c4..03a1e565afde315e311e93068ca45acda6d02892 100644 --- a/src/python/grpcio/grpc/_cython/adapter_low.py +++ b/src/python/grpcio/grpc/_cython/adapter_low.py @@ -33,7 +33,7 @@ # # TODO(atash): Once this is plugged into grpc._adapter._intermediary_low, remove # both grpc._adapter._intermediary_low and this file. The fore and rear links in -# grpc._adapter should be able to use grpc._cython.types directly. +# grpc._adapter should be able to use grpc._cython.cygrpc directly. from grpc._adapter import _types as type_interfaces from grpc._cython import cygrpc diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index b20dda8a95875dcfdf66687ae52a00967f7037fb..635bf1918ae397c5a5978fc283bfcfb8d30ee898 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -44,6 +44,9 @@ from grpc._cython._cygrpc import completion_queue from grpc._cython._cygrpc import records from grpc._cython._cygrpc import server +ConnectivityState = records.ConnectivityState +ChannelArgKey = records.ChannelArgKey +WriteFlag = records.WriteFlag StatusCode = records.StatusCode CallError = records.CallError CompletionType = records.CompletionType