Skip to content
Snippets Groups Projects
Commit b8be7a5f authored by Craig Tiller's avatar Craig Tiller
Browse files

Merge branch 'metadata_filter' of github.com:ctiller/grpc into metadata_filter

parents 78bc54e7 5a5b45a6
No related branches found
No related tags found
No related merge requests found
...@@ -60,19 +60,23 @@ cdef class Channel: ...@@ -60,19 +60,23 @@ cdef class Channel:
method, host, Timespec deadline not None): method, host, Timespec deadline not None):
if queue.is_shutting_down: if queue.is_shutting_down:
raise ValueError("queue must not be shutting down or shutdown") raise ValueError("queue must not be shutting down or shutdown")
cdef char *method_c_string = method cdef Slice method_slice = Slice.from_bytes(method)
cdef char *host_c_string = NULL cdef Slice host_slice
cdef grpc_slice *host_c_slice = NULL
if host is not None: if host is not None:
host_c_string = host host_slice = Slice.from_bytes(host)
host_c_slice = &host_slice.c_slice
else:
host_slice = Slice()
cdef Call operation_call = Call() cdef Call operation_call = Call()
operation_call.references = [self, method, host, queue] operation_call.references = [self, method_slice, host_slice, queue]
cdef grpc_call *parent_call = NULL cdef grpc_call *parent_call = NULL
if parent is not None: if parent is not None:
parent_call = parent.c_call parent_call = parent.c_call
with nogil: with nogil:
operation_call.c_call = grpc_channel_create_call( operation_call.c_call = grpc_channel_create_call(
self.c_channel, parent_call, flags, self.c_channel, parent_call, flags,
queue.c_completion_queue, method_c_string, host_c_string, queue.c_completion_queue, method_slice.c_slice, host_c_slice,
deadline.c_time, NULL) deadline.c_time, NULL)
return operation_call return operation_call
......
...@@ -51,6 +51,13 @@ cdef extern from "grpc/byte_buffer_reader.h": ...@@ -51,6 +51,13 @@ cdef extern from "grpc/byte_buffer_reader.h":
pass pass
cdef extern from "grpc/impl/codegen/exec_ctx_fwd.h":
struct grpc_exec_ctx:
# We don't care about the internals
pass
cdef extern from "grpc/grpc.h": cdef extern from "grpc/grpc.h":
ctypedef struct grpc_slice: ctypedef struct grpc_slice:
...@@ -60,6 +67,7 @@ cdef extern from "grpc/grpc.h": ...@@ -60,6 +67,7 @@ cdef extern from "grpc/grpc.h":
grpc_slice grpc_slice_ref(grpc_slice s) nogil grpc_slice grpc_slice_ref(grpc_slice s) nogil
void grpc_slice_unref(grpc_slice s) nogil void grpc_slice_unref(grpc_slice s) nogil
grpc_slice grpc_empty_slice() nogil
grpc_slice grpc_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil grpc_slice grpc_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil
grpc_slice grpc_slice_new_with_len( grpc_slice grpc_slice_new_with_len(
void *p, size_t len, void (*destroy)(void *, size_t)) nogil void *p, size_t len, void (*destroy)(void *, size_t)) nogil
...@@ -175,7 +183,7 @@ cdef extern from "grpc/grpc.h": ...@@ -175,7 +183,7 @@ cdef extern from "grpc/grpc.h":
ctypedef struct grpc_arg_pointer_vtable: ctypedef struct grpc_arg_pointer_vtable:
void *(*copy)(void *) void *(*copy)(void *)
void (*destroy)(void *) void (*destroy)(grpc_exec_ctx *, void *)
int (*cmp)(void *, void *) int (*cmp)(void *, void *)
ctypedef struct grpc_arg_value_pointer: ctypedef struct grpc_arg_value_pointer:
...@@ -217,9 +225,8 @@ cdef extern from "grpc/grpc.h": ...@@ -217,9 +225,8 @@ cdef extern from "grpc/grpc.h":
GRPC_CHANNEL_SHUTDOWN GRPC_CHANNEL_SHUTDOWN
ctypedef struct grpc_metadata: ctypedef struct grpc_metadata:
const char *key grpc_slice key
const char *value grpc_slice value
size_t value_length
# ignore the 'internal_data.obfuscated' fields. # ignore the 'internal_data.obfuscated' fields.
ctypedef enum grpc_completion_type: ctypedef enum grpc_completion_type:
...@@ -241,10 +248,8 @@ cdef extern from "grpc/grpc.h": ...@@ -241,10 +248,8 @@ cdef extern from "grpc/grpc.h":
void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil
ctypedef struct grpc_call_details: ctypedef struct grpc_call_details:
char *method grpc_slice method
size_t method_capacity grpc_slice host
char *host
size_t host_capacity
gpr_timespec deadline gpr_timespec deadline
void grpc_call_details_init(grpc_call_details *details) nogil void grpc_call_details_init(grpc_call_details *details) nogil
...@@ -268,13 +273,12 @@ cdef extern from "grpc/grpc.h": ...@@ -268,13 +273,12 @@ cdef extern from "grpc/grpc.h":
size_t trailing_metadata_count size_t trailing_metadata_count
grpc_metadata *trailing_metadata grpc_metadata *trailing_metadata
grpc_status_code status grpc_status_code status
const char *status_details grpc_slice *status_details
ctypedef struct grpc_op_data_recv_status_on_client: ctypedef struct grpc_op_data_recv_status_on_client:
grpc_metadata_array *trailing_metadata grpc_metadata_array *trailing_metadata
grpc_status_code *status grpc_status_code *status
char **status_details grpc_slice *status_details
size_t *status_details_capacity
ctypedef struct grpc_op_data_recv_close_on_server: ctypedef struct grpc_op_data_recv_close_on_server:
int *cancelled int *cancelled
...@@ -322,8 +326,8 @@ cdef extern from "grpc/grpc.h": ...@@ -322,8 +326,8 @@ cdef extern from "grpc/grpc.h":
void *reserved) nogil void *reserved) nogil
grpc_call *grpc_channel_create_call( grpc_call *grpc_channel_create_call(
grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
grpc_completion_queue *completion_queue, const char *method, grpc_completion_queue *completion_queue, grpc_slice method,
const char *host, gpr_timespec deadline, void *reserved) nogil const grpc_slice *host, gpr_timespec deadline, void *reserved) nogil
grpc_connectivity_state grpc_channel_check_connectivity_state( grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_channel *channel, int try_to_connect) nogil grpc_channel *channel, int try_to_connect) nogil
void grpc_channel_watch_connectivity_state( void grpc_channel_watch_connectivity_state(
...@@ -473,8 +477,7 @@ cdef extern from "grpc/compression.h": ...@@ -473,8 +477,7 @@ cdef extern from "grpc/compression.h":
grpc_compression_algorithm default_compression_algorithm grpc_compression_algorithm default_compression_algorithm
int grpc_compression_algorithm_parse( int grpc_compression_algorithm_parse(
const char *name, size_t name_length, grpc_slice value, grpc_compression_algorithm *algorithm) nogil
grpc_compression_algorithm *algorithm) nogil
int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm, int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
char **name) nogil char **name) nogil
grpc_compression_algorithm grpc_compression_algorithm_for_level( grpc_compression_algorithm grpc_compression_algorithm_for_level(
......
...@@ -70,6 +70,15 @@ cdef class Event: ...@@ -70,6 +70,15 @@ cdef class Event:
cdef readonly Operations batch_operations cdef readonly Operations batch_operations
cdef class Slice:
cdef grpc_slice c_slice
cdef void _assign_slice(self, grpc_slice new_slice) nogil
@staticmethod
cdef Slice from_slice(grpc_slice slice)
cdef class ByteBuffer: cdef class ByteBuffer:
cdef grpc_byte_buffer *c_byte_buffer cdef grpc_byte_buffer *c_byte_buffer
...@@ -97,7 +106,8 @@ cdef class ChannelArgs: ...@@ -97,7 +106,8 @@ cdef class ChannelArgs:
cdef class Metadatum: cdef class Metadatum:
cdef grpc_metadata c_metadata cdef grpc_metadata c_metadata
cdef object _key, _value cdef Slice _key,
cdef Slice _value
cdef class Metadata: cdef class Metadata:
...@@ -112,8 +122,7 @@ cdef class Operation: ...@@ -112,8 +122,7 @@ cdef class Operation:
cdef ByteBuffer _received_message cdef ByteBuffer _received_message
cdef Metadata _received_metadata cdef Metadata _received_metadata
cdef grpc_status_code _received_status_code cdef grpc_status_code _received_status_code
cdef char *_received_status_details cdef Slice _received_status_details
cdef size_t _received_status_details_capacity
cdef int _received_cancelled cdef int _received_cancelled
cdef readonly bint is_valid cdef readonly bint is_valid
cdef object references cdef object references
......
...@@ -189,17 +189,11 @@ cdef class CallDetails: ...@@ -189,17 +189,11 @@ cdef class CallDetails:
@property @property
def method(self): def method(self):
if self.c_details.method != NULL: return Slice.from_slice(self.c_details.method).bytes()
return <bytes>self.c_details.method
else:
return None
@property @property
def host(self): def host(self):
if self.c_details.host != NULL: return Slice.from_slice(self.c_details.host).bytes()
return <bytes>self.c_details.host
else:
return None
@property @property
def deadline(self): def deadline(self):
...@@ -233,6 +227,42 @@ cdef class Event: ...@@ -233,6 +227,42 @@ cdef class Event:
self.is_new_request = is_new_request self.is_new_request = is_new_request
cdef class Slice:
def __cinit__(self):
with nogil:
grpc_init()
self.c_slice = grpc_empty_slice()
cdef void _assign_slice(self, grpc_slice new_slice) nogil:
grpc_slice_unref(self.c_slice)
self.c_slice = new_slice
@staticmethod
def from_bytes(bytes data):
cdef Slice self = Slice()
self._assign_slice(grpc_slice_from_copied_buffer(data, len(data)))
return self
@staticmethod
cdef Slice from_slice(grpc_slice slice):
cdef Slice self = Slice()
grpc_slice_ref(slice)
self._assign_slice(slice)
return self
def bytes(self):
with nogil:
pointer = grpc_slice_start_ptr(self.c_slice)
length = grpc_slice_length(self.c_slice)
return (<char *>pointer)[:length]
def __dealloc__(self):
with nogil:
grpc_slice_unref(self.c_slice)
grpc_shutdown()
cdef class ByteBuffer: cdef class ByteBuffer:
def __cinit__(self, bytes data): def __cinit__(self, bytes data):
...@@ -310,7 +340,7 @@ cdef void* copy_ptr(void* ptr): ...@@ -310,7 +340,7 @@ cdef void* copy_ptr(void* ptr):
return ptr return ptr
cdef void destroy_ptr(void* ptr): cdef void destroy_ptr(grpc_exec_ctx* ctx, void* ptr):
pass pass
...@@ -382,20 +412,20 @@ cdef class ChannelArgs: ...@@ -382,20 +412,20 @@ cdef class ChannelArgs:
cdef class Metadatum: cdef class Metadatum:
# TODO(atash) this should just accept Slice objects.
def __cinit__(self, bytes key, bytes value): def __cinit__(self, bytes key, bytes value):
self._key = key self._key = Slice.from_bytes(key)
self._value = value self._value = Slice.from_bytes(value)
self.c_metadata.key = self._key self.c_metadata.key = self._key.c_slice
self.c_metadata.value = self._value self.c_metadata.value = self._value.c_slice
self.c_metadata.value_length = len(self._value)
@property @property
def key(self): def key(self):
return <bytes>self.c_metadata.key return self._key.bytes()
@property @property
def value(self): def value(self):
return <bytes>self.c_metadata.value[:self.c_metadata.value_length] return self._value.bytes()
def __len__(self): def __len__(self):
return 2 return 2
...@@ -465,9 +495,8 @@ cdef class Metadata: ...@@ -465,9 +495,8 @@ cdef class Metadata:
def __getitem__(self, size_t i): def __getitem__(self, size_t i):
return Metadatum( return Metadatum(
key=<bytes>self.c_metadata_array.metadata[i].key, key=Slice.from_slice(self.c_metadata_array.metadata[i].key).bytes(),
value=<bytes>self.c_metadata_array.metadata[i].value[ value=Slice.from_slice(self.c_metadata_array.metadata[i].value).bytes())
:self.c_metadata_array.metadata[i].value_length])
def __iter__(self): def __iter__(self):
return _MetadataIterator(self) return _MetadataIterator(self)
...@@ -478,8 +507,7 @@ cdef class Operation: ...@@ -478,8 +507,7 @@ cdef class Operation:
def __cinit__(self): def __cinit__(self):
grpc_init() grpc_init()
self.references = [] self.references = []
self._received_status_details = NULL self._received_status_details = Slice()
self._received_status_details_capacity = 0
self.is_valid = False self.is_valid = False
@property @property
...@@ -536,19 +564,13 @@ cdef class Operation: ...@@ -536,19 +564,13 @@ cdef class Operation:
def received_status_details(self): def received_status_details(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
raise TypeError("self must be an operation receiving status details") raise TypeError("self must be an operation receiving status details")
if self._received_status_details: return self._received_status_details.bytes()
return self._received_status_details
else:
return None
@property @property
def received_status_details_or_none(self): def received_status_details_or_none(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
return None return None
if self._received_status_details: return self._received_status_details.bytes()
return self._received_status_details
else:
return None
@property @property
def received_cancelled(self): def received_cancelled(self):
...@@ -564,11 +586,6 @@ cdef class Operation: ...@@ -564,11 +586,6 @@ cdef class Operation:
return False if self._received_cancelled == 0 else True return False if self._received_cancelled == 0 else True
def __dealloc__(self): def __dealloc__(self):
# We *almost* don't need to do anything; most of the objects are handled by
# Python. The remaining one(s) are primitive fields filled in by GRPC core.
# This means that we need to clean up after receive_status_on_client.
if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT:
gpr_free(self._received_status_details)
grpc_shutdown() grpc_shutdown()
def operation_send_initial_metadata(Metadata metadata, int flags): def operation_send_initial_metadata(Metadata metadata, int flags):
...@@ -609,9 +626,10 @@ def operation_send_status_from_server( ...@@ -609,9 +626,10 @@ def operation_send_status_from_server(
op.c_op.data.send_status_from_server.trailing_metadata = ( op.c_op.data.send_status_from_server.trailing_metadata = (
metadata.c_metadata_array.metadata) metadata.c_metadata_array.metadata)
op.c_op.data.send_status_from_server.status = code op.c_op.data.send_status_from_server.status = code
op.c_op.data.send_status_from_server.status_details = details cdef Slice details_slice = Slice.from_bytes(details)
op.c_op.data.send_status_from_server.status_details = &details_slice.c_slice
op.references.append(metadata) op.references.append(metadata)
op.references.append(details) op.references.append(details_slice)
op.is_valid = True op.is_valid = True
return op return op
...@@ -647,9 +665,7 @@ def operation_receive_status_on_client(int flags): ...@@ -647,9 +665,7 @@ def operation_receive_status_on_client(int flags):
op.c_op.data.receive_status_on_client.status = ( op.c_op.data.receive_status_on_client.status = (
&op._received_status_code) &op._received_status_code)
op.c_op.data.receive_status_on_client.status_details = ( op.c_op.data.receive_status_on_client.status_details = (
&op._received_status_details) &op._received_status_details.c_slice)
op.c_op.data.receive_status_on_client.status_details_capacity = (
&op._received_status_details_capacity)
op.is_valid = True op.is_valid = True
return op return op
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment