diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 73d1ff7b97d3bc382a8bb7fbf2fb604629bafd1a..e4c24a83abd8cc4f6a9e92377157ef28981f2d17 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -60,19 +60,23 @@ cdef class Channel: method, host, Timespec deadline not None): if queue.is_shutting_down: raise ValueError("queue must not be shutting down or shutdown") - cdef char *method_c_string = method - cdef char *host_c_string = NULL + cdef Slice method_slice = Slice.from_bytes(method) + cdef Slice host_slice + cdef grpc_slice *host_c_slice = NULL if host is not None: - host_c_string = host + host_slice = Slice.from_bytes(host) + host_c_slice = &host_slice.c_slice + else: + host_slice = Slice() cdef Call operation_call = Call() - operation_call.references = [self, method, host, queue] + operation_call.references = [self, method_slice, host_slice, queue] cdef grpc_call *parent_call = NULL if parent is not None: parent_call = parent.c_call with nogil: operation_call.c_call = grpc_channel_create_call( self.c_channel, parent_call, flags, - queue.c_completion_queue, method_c_string, host_c_string, + queue.c_completion_queue, method_slice.c_slice, host_c_slice, deadline.c_time, NULL) return operation_call diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index ad766186bd1bc39a52fd6eb386856a1a36a5afd0..141580b82ada301537bef8a62db35f3415efaad8 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -51,6 +51,13 @@ cdef extern from "grpc/byte_buffer_reader.h": pass +cdef extern from "grpc/impl/codegen/exec_ctx_fwd.h": + + struct grpc_exec_ctx: + # We don't care about the internals + pass + + cdef extern from "grpc/grpc.h": ctypedef struct grpc_slice: @@ -60,6 +67,7 @@ cdef extern from "grpc/grpc.h": grpc_slice grpc_slice_ref(grpc_slice s) nogil void grpc_slice_unref(grpc_slice s) nogil + grpc_slice grpc_empty_slice() nogil grpc_slice grpc_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil grpc_slice grpc_slice_new_with_len( void *p, size_t len, void (*destroy)(void *, size_t)) nogil @@ -175,7 +183,7 @@ cdef extern from "grpc/grpc.h": ctypedef struct grpc_arg_pointer_vtable: void *(*copy)(void *) - void (*destroy)(void *) + void (*destroy)(grpc_exec_ctx *, void *) int (*cmp)(void *, void *) ctypedef struct grpc_arg_value_pointer: @@ -217,9 +225,8 @@ cdef extern from "grpc/grpc.h": GRPC_CHANNEL_SHUTDOWN ctypedef struct grpc_metadata: - const char *key - const char *value - size_t value_length + grpc_slice key + grpc_slice value # ignore the 'internal_data.obfuscated' fields. ctypedef enum grpc_completion_type: @@ -241,10 +248,8 @@ cdef extern from "grpc/grpc.h": void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil ctypedef struct grpc_call_details: - char *method - size_t method_capacity - char *host - size_t host_capacity + grpc_slice method + grpc_slice host gpr_timespec deadline void grpc_call_details_init(grpc_call_details *details) nogil @@ -268,13 +273,12 @@ cdef extern from "grpc/grpc.h": size_t trailing_metadata_count grpc_metadata *trailing_metadata grpc_status_code status - const char *status_details + grpc_slice *status_details ctypedef struct grpc_op_data_recv_status_on_client: grpc_metadata_array *trailing_metadata grpc_status_code *status - char **status_details - size_t *status_details_capacity + grpc_slice *status_details ctypedef struct grpc_op_data_recv_close_on_server: int *cancelled @@ -321,9 +325,9 @@ cdef extern from "grpc/grpc.h": const grpc_channel_args *args, void *reserved) nogil grpc_call *grpc_channel_create_call( - grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, - grpc_completion_queue *completion_queue, const char *method, - const char *host, gpr_timespec deadline, void *reserved) nogil + grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, + grpc_completion_queue *completion_queue, grpc_slice method, + const grpc_slice *host, gpr_timespec deadline, void *reserved) nogil grpc_connectivity_state grpc_channel_check_connectivity_state( grpc_channel *channel, int try_to_connect) nogil void grpc_channel_watch_connectivity_state( @@ -473,8 +477,7 @@ cdef extern from "grpc/compression.h": grpc_compression_algorithm default_compression_algorithm int grpc_compression_algorithm_parse( - const char *name, size_t name_length, - grpc_compression_algorithm *algorithm) nogil + grpc_slice value, grpc_compression_algorithm *algorithm) nogil int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm, char **name) nogil grpc_compression_algorithm grpc_compression_algorithm_for_level( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index 00ec91b131eae402594005792079c7936200910a..870da51fa8bad31b61423b8ee6928c8575d2eabd 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -70,6 +70,15 @@ cdef class Event: cdef readonly Operations batch_operations +cdef class Slice: + + cdef grpc_slice c_slice + + cdef void _assign_slice(self, grpc_slice new_slice) nogil + @staticmethod + cdef Slice from_slice(grpc_slice slice) + + cdef class ByteBuffer: cdef grpc_byte_buffer *c_byte_buffer @@ -97,7 +106,8 @@ cdef class ChannelArgs: cdef class Metadatum: cdef grpc_metadata c_metadata - cdef object _key, _value + cdef Slice _key, + cdef Slice _value cdef class Metadata: @@ -112,8 +122,7 @@ cdef class Operation: cdef ByteBuffer _received_message cdef Metadata _received_metadata cdef grpc_status_code _received_status_code - cdef char *_received_status_details - cdef size_t _received_status_details_capacity + cdef Slice _received_status_details cdef int _received_cancelled cdef readonly bint is_valid cdef object references diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index cadfce6ee6b689bff002162e3fca083d828c5693..b7a75cd97a09f3f2f4d460e6423cf863f4923b56 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -189,17 +189,11 @@ cdef class CallDetails: @property def method(self): - if self.c_details.method != NULL: - return <bytes>self.c_details.method - else: - return None + return Slice.from_slice(self.c_details.method).bytes() @property def host(self): - if self.c_details.host != NULL: - return <bytes>self.c_details.host - else: - return None + return Slice.from_slice(self.c_details.host).bytes() @property def deadline(self): @@ -233,6 +227,42 @@ cdef class Event: self.is_new_request = is_new_request +cdef class Slice: + + def __cinit__(self): + with nogil: + grpc_init() + self.c_slice = grpc_empty_slice() + + cdef void _assign_slice(self, grpc_slice new_slice) nogil: + grpc_slice_unref(self.c_slice) + self.c_slice = new_slice + + @staticmethod + def from_bytes(bytes data): + cdef Slice self = Slice() + self._assign_slice(grpc_slice_from_copied_buffer(data, len(data))) + return self + + @staticmethod + cdef Slice from_slice(grpc_slice slice): + cdef Slice self = Slice() + grpc_slice_ref(slice) + self._assign_slice(slice) + return self + + def bytes(self): + with nogil: + pointer = grpc_slice_start_ptr(self.c_slice) + length = grpc_slice_length(self.c_slice) + return (<char *>pointer)[:length] + + def __dealloc__(self): + with nogil: + grpc_slice_unref(self.c_slice) + grpc_shutdown() + + cdef class ByteBuffer: def __cinit__(self, bytes data): @@ -310,7 +340,7 @@ cdef void* copy_ptr(void* ptr): return ptr -cdef void destroy_ptr(void* ptr): +cdef void destroy_ptr(grpc_exec_ctx* ctx, void* ptr): pass @@ -382,20 +412,20 @@ cdef class ChannelArgs: cdef class Metadatum: + # TODO(atash) this should just accept Slice objects. def __cinit__(self, bytes key, bytes value): - self._key = key - self._value = value - self.c_metadata.key = self._key - self.c_metadata.value = self._value - self.c_metadata.value_length = len(self._value) + self._key = Slice.from_bytes(key) + self._value = Slice.from_bytes(value) + self.c_metadata.key = self._key.c_slice + self.c_metadata.value = self._value.c_slice @property def key(self): - return <bytes>self.c_metadata.key + return self._key.bytes() @property def value(self): - return <bytes>self.c_metadata.value[:self.c_metadata.value_length] + return self._value.bytes() def __len__(self): return 2 @@ -465,9 +495,8 @@ cdef class Metadata: def __getitem__(self, size_t i): return Metadatum( - key=<bytes>self.c_metadata_array.metadata[i].key, - value=<bytes>self.c_metadata_array.metadata[i].value[ - :self.c_metadata_array.metadata[i].value_length]) + key=Slice.from_slice(self.c_metadata_array.metadata[i].key).bytes(), + value=Slice.from_slice(self.c_metadata_array.metadata[i].value).bytes()) def __iter__(self): return _MetadataIterator(self) @@ -478,8 +507,7 @@ cdef class Operation: def __cinit__(self): grpc_init() self.references = [] - self._received_status_details = NULL - self._received_status_details_capacity = 0 + self._received_status_details = Slice() self.is_valid = False @property @@ -536,19 +564,13 @@ cdef class Operation: def received_status_details(self): if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: raise TypeError("self must be an operation receiving status details") - if self._received_status_details: - return self._received_status_details - else: - return None + return self._received_status_details.bytes() @property def received_status_details_or_none(self): if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: return None - if self._received_status_details: - return self._received_status_details - else: - return None + return self._received_status_details.bytes() @property def received_cancelled(self): @@ -564,11 +586,6 @@ cdef class Operation: return False if self._received_cancelled == 0 else True def __dealloc__(self): - # We *almost* don't need to do anything; most of the objects are handled by - # Python. The remaining one(s) are primitive fields filled in by GRPC core. - # This means that we need to clean up after receive_status_on_client. - if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT: - gpr_free(self._received_status_details) grpc_shutdown() def operation_send_initial_metadata(Metadata metadata, int flags): @@ -609,9 +626,10 @@ def operation_send_status_from_server( op.c_op.data.send_status_from_server.trailing_metadata = ( metadata.c_metadata_array.metadata) op.c_op.data.send_status_from_server.status = code - op.c_op.data.send_status_from_server.status_details = details + cdef Slice details_slice = Slice.from_bytes(details) + op.c_op.data.send_status_from_server.status_details = &details_slice.c_slice op.references.append(metadata) - op.references.append(details) + op.references.append(details_slice) op.is_valid = True return op @@ -647,9 +665,7 @@ def operation_receive_status_on_client(int flags): op.c_op.data.receive_status_on_client.status = ( &op._received_status_code) op.c_op.data.receive_status_on_client.status_details = ( - &op._received_status_details) - op.c_op.data.receive_status_on_client.status_details_capacity = ( - &op._received_status_details_capacity) + &op._received_status_details.c_slice) op.is_valid = True return op