Skip to content
Snippets Groups Projects
Commit 3d2d5b9d authored by Masood Malekghassemi's avatar Masood Malekghassemi
Browse files

Don't hold the GIL when calling anything in core

parent 85b460fb
No related branches found
No related tags found
No related merge requests found
# Copyright 2015, Google Inc.
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
......@@ -40,14 +40,17 @@ cdef class Call:
def start_batch(self, operations, tag):
if not self.is_valid:
raise ValueError("invalid call object cannot be used from Python")
cdef grpc_call_error result
cdef Operations cy_operations = Operations(operations)
cdef OperationTag operation_tag = OperationTag(tag)
operation_tag.operation_call = self
operation_tag.batch_operations = cy_operations
cpython.Py_INCREF(operation_tag)
return grpc_call_start_batch(
self.c_call, cy_operations.c_ops, cy_operations.c_nops,
<cpython.PyObject *>operation_tag, NULL)
with nogil:
result = grpc_call_start_batch(
self.c_call, cy_operations.c_ops, cy_operations.c_nops,
<cpython.PyObject *>operation_tag, NULL)
return result
def cancel(
self, grpc_status_code error_code=GRPC_STATUS__DO_NOT_USE,
......@@ -57,6 +60,8 @@ cdef class Call:
if (details is None) != (error_code == GRPC_STATUS__DO_NOT_USE):
raise ValueError("if error_code is specified, so must details "
"(and vice-versa)")
cdef grpc_call_error result
cdef char *c_details = NULL
if error_code != GRPC_STATUS__DO_NOT_USE:
if isinstance(details, bytes):
pass
......@@ -65,25 +70,37 @@ cdef class Call:
else:
raise TypeError("expected details to be str or bytes")
self.references.append(details)
return grpc_call_cancel_with_status(
self.c_call, error_code, details, NULL)
c_details = details
with nogil:
result = grpc_call_cancel_with_status(
self.c_call, error_code, c_details, NULL)
return result
else:
return grpc_call_cancel(self.c_call, NULL)
with nogil:
result = grpc_call_cancel(self.c_call, NULL)
return result
def set_credentials(
self, CallCredentials call_credentials not None):
return grpc_call_set_credentials(
self.c_call, call_credentials.c_credentials)
cdef grpc_call_error result
with nogil:
result = grpc_call_set_credentials(
self.c_call, call_credentials.c_credentials)
return result
def peer(self):
cdef char *peer = grpc_call_get_peer(self.c_call)
cdef char *peer = NULL
with nogil:
peer = grpc_call_get_peer(self.c_call)
result = <bytes>peer
gpr_free(peer)
with nogil:
gpr_free(peer)
return result
def __dealloc__(self):
if self.c_call != NULL:
grpc_call_destroy(self.c_call)
with nogil:
grpc_call_destroy(self.c_call)
# The object *should* always be valid from Python. Used for debugging.
@property
......
......@@ -35,6 +35,7 @@ cdef class Channel:
def __cinit__(self, target, ChannelArgs arguments=None,
ChannelCredentials channel_credentials=None):
cdef grpc_channel_args *c_arguments = NULL
cdef char *c_target = NULL
self.c_channel = NULL
self.references = []
if arguments is not None:
......@@ -45,12 +46,15 @@ cdef class Channel:
target = target.encode()
else:
raise TypeError("expected target to be str or bytes")
c_target = target
if channel_credentials is None:
self.c_channel = grpc_insecure_channel_create(target, c_arguments,
NULL)
with nogil:
self.c_channel = grpc_insecure_channel_create(c_target, c_arguments,
NULL)
else:
self.c_channel = grpc_secure_channel_create(
channel_credentials.c_credentials, target, c_arguments, NULL)
with nogil:
self.c_channel = grpc_secure_channel_create(
channel_credentials.c_credentials, c_target, c_arguments, NULL)
self.references.append(channel_credentials)
self.references.append(target)
self.references.append(arguments)
......@@ -66,6 +70,7 @@ cdef class Channel:
method = method.encode()
else:
raise TypeError("expected method to be str or bytes")
cdef char *method_c_string = method
cdef char *host_c_string = NULL
if host is None:
pass
......@@ -81,31 +86,40 @@ cdef class Channel:
cdef grpc_call *parent_call = NULL
if parent is not None:
parent_call = parent.c_call
operation_call.c_call = grpc_channel_create_call(
self.c_channel, parent_call, flags,
queue.c_completion_queue, method, host_c_string, deadline.c_time,
NULL)
with nogil:
operation_call.c_call = grpc_channel_create_call(
self.c_channel, parent_call, flags,
queue.c_completion_queue, method_c_string, host_c_string,
deadline.c_time, NULL)
return operation_call
def check_connectivity_state(self, bint try_to_connect):
return grpc_channel_check_connectivity_state(self.c_channel,
try_to_connect)
cdef grpc_connectivity_state result
with nogil:
result = grpc_channel_check_connectivity_state(self.c_channel,
try_to_connect)
return result
def watch_connectivity_state(
self, grpc_connectivity_state last_observed_state,
Timespec deadline not None, CompletionQueue queue not None, tag):
cdef OperationTag operation_tag = OperationTag(tag)
cpython.Py_INCREF(operation_tag)
grpc_channel_watch_connectivity_state(
self.c_channel, last_observed_state, deadline.c_time,
queue.c_completion_queue, <cpython.PyObject *>operation_tag)
with nogil:
grpc_channel_watch_connectivity_state(
self.c_channel, last_observed_state, deadline.c_time,
queue.c_completion_queue, <cpython.PyObject *>operation_tag)
def target(self):
cdef char * target = grpc_channel_get_target(self.c_channel)
cdef char *target = NULL
with nogil:
target = grpc_channel_get_target(self.c_channel)
result = <bytes>target
gpr_free(target)
with nogil:
gpr_free(target)
return result
def __dealloc__(self):
if self.c_channel != NULL:
grpc_channel_destroy(self.c_channel)
with nogil:
grpc_channel_destroy(self.c_channel)
......@@ -36,7 +36,8 @@ import time
cdef class CompletionQueue:
def __cinit__(self):
self.c_completion_queue = grpc_completion_queue_create(NULL)
with nogil:
self.c_completion_queue = grpc_completion_queue_create(NULL)
self.is_shutting_down = False
self.is_shutdown = False
self.pluck_condition = threading.Condition()
......@@ -82,8 +83,9 @@ cdef class CompletionQueue:
def poll(self, Timespec deadline=None):
# We name this 'poll' to avoid problems with CPython's expectations for
# 'special' methods (like next and __next__).
cdef gpr_timespec c_deadline = gpr_inf_future(
GPR_CLOCK_REALTIME)
cdef gpr_timespec c_deadline
with nogil:
c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
if deadline is not None:
c_deadline = deadline.c_time
cdef grpc_event event
......@@ -123,7 +125,8 @@ cdef class CompletionQueue:
return self._interpret_event(event)
def shutdown(self):
grpc_completion_queue_shutdown(self.c_completion_queue)
with nogil:
grpc_completion_queue_shutdown(self.c_completion_queue)
self.is_shutting_down = True
def clear(self):
......@@ -133,15 +136,19 @@ cdef class CompletionQueue:
pass
def __dealloc__(self):
cdef gpr_timespec c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
cdef gpr_timespec c_deadline
with nogil:
c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
if self.c_completion_queue != NULL:
# Ensure shutdown
if not self.is_shutting_down:
grpc_completion_queue_shutdown(self.c_completion_queue)
with nogil:
grpc_completion_queue_shutdown(self.c_completion_queue)
# Pump the queue
while not self.is_shutdown:
with nogil:
event = grpc_completion_queue_next(
self.c_completion_queue, c_deadline, NULL)
self._interpret_event(event)
grpc_completion_queue_destroy(self.c_completion_queue)
with nogil:
grpc_completion_queue_destroy(self.c_completion_queue)
# Copyright 2015, Google Inc.
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
......@@ -46,7 +46,8 @@ cdef class ChannelCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
grpc_channel_credentials_release(self.c_credentials)
with nogil:
grpc_channel_credentials_release(self.c_credentials)
cdef class CallCredentials:
......@@ -63,7 +64,8 @@ cdef class CallCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
grpc_call_credentials_release(self.c_credentials)
with nogil:
grpc_call_credentials_release(self.c_credentials)
cdef class ServerCredentials:
......@@ -74,7 +76,8 @@ cdef class ServerCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
grpc_server_credentials_release(self.c_credentials)
with nogil:
grpc_server_credentials_release(self.c_credentials)
cdef class CredentialsMetadataPlugin:
......@@ -139,7 +142,8 @@ cdef void plugin_destroy_c_plugin_state(void *state):
def channel_credentials_google_default():
cdef ChannelCredentials credentials = ChannelCredentials();
credentials.c_credentials = grpc_google_default_credentials_create()
with nogil:
credentials.c_credentials = grpc_google_default_credentials_create()
return credentials
def channel_credentials_ssl(pem_root_certificates,
......@@ -158,12 +162,14 @@ def channel_credentials_ssl(pem_root_certificates,
c_pem_root_certificates = pem_root_certificates
credentials.references.append(pem_root_certificates)
if ssl_pem_key_cert_pair is not None:
credentials.c_credentials = grpc_ssl_credentials_create(
c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL)
with nogil:
credentials.c_credentials = grpc_ssl_credentials_create(
c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL)
credentials.references.append(ssl_pem_key_cert_pair)
else:
credentials.c_credentials = grpc_ssl_credentials_create(
c_pem_root_certificates, NULL, NULL)
with nogil:
credentials.c_credentials = grpc_ssl_credentials_create(
c_pem_root_certificates, NULL, NULL)
return credentials
def channel_credentials_composite(
......@@ -172,8 +178,9 @@ def channel_credentials_composite(
if not credentials_1.is_valid or not credentials_2.is_valid:
raise ValueError("passed credentials must both be valid")
cdef ChannelCredentials credentials = ChannelCredentials()
credentials.c_credentials = grpc_composite_channel_credentials_create(
credentials_1.c_credentials, credentials_2.c_credentials, NULL)
with nogil:
credentials.c_credentials = grpc_composite_channel_credentials_create(
credentials_1.c_credentials, credentials_2.c_credentials, NULL)
credentials.references.append(credentials_1)
credentials.references.append(credentials_2)
return credentials
......@@ -184,16 +191,18 @@ def call_credentials_composite(
if not credentials_1.is_valid or not credentials_2.is_valid:
raise ValueError("passed credentials must both be valid")
cdef CallCredentials credentials = CallCredentials()
credentials.c_credentials = grpc_composite_call_credentials_create(
credentials_1.c_credentials, credentials_2.c_credentials, NULL)
with nogil:
credentials.c_credentials = grpc_composite_call_credentials_create(
credentials_1.c_credentials, credentials_2.c_credentials, NULL)
credentials.references.append(credentials_1)
credentials.references.append(credentials_2)
return credentials
def call_credentials_google_compute_engine():
cdef CallCredentials credentials = CallCredentials()
credentials.c_credentials = (
grpc_google_compute_engine_credentials_create(NULL))
with nogil:
credentials.c_credentials = (
grpc_google_compute_engine_credentials_create(NULL))
return credentials
def call_credentials_service_account_jwt_access(
......@@ -205,9 +214,11 @@ def call_credentials_service_account_jwt_access(
else:
raise TypeError("expected json_key to be str or bytes")
cdef CallCredentials credentials = CallCredentials()
credentials.c_credentials = (
grpc_service_account_jwt_access_credentials_create(
json_key, token_lifetime.c_time, NULL))
cdef char *json_key_c_string = json_key
with nogil:
credentials.c_credentials = (
grpc_service_account_jwt_access_credentials_create(
json_key_c_string, token_lifetime.c_time, NULL))
credentials.references.append(json_key)
return credentials
......@@ -219,8 +230,10 @@ def call_credentials_google_refresh_token(json_refresh_token):
else:
raise TypeError("expected json_refresh_token to be str or bytes")
cdef CallCredentials credentials = CallCredentials()
credentials.c_credentials = grpc_google_refresh_token_credentials_create(
json_refresh_token, NULL)
cdef char *json_refresh_token_c_string = json_refresh_token
with nogil:
credentials.c_credentials = grpc_google_refresh_token_credentials_create(
json_refresh_token_c_string, NULL)
credentials.references.append(json_refresh_token)
return credentials
......@@ -238,17 +251,21 @@ def call_credentials_google_iam(authorization_token, authority_selector):
else:
raise TypeError("expected authority_selector to be str or bytes")
cdef CallCredentials credentials = CallCredentials()
credentials.c_credentials = grpc_google_iam_credentials_create(
authorization_token, authority_selector, NULL)
cdef char *authorization_token_c_string = authorization_token
cdef char *authority_selector_c_string = authority_selector
with nogil:
credentials.c_credentials = grpc_google_iam_credentials_create(
authorization_token_c_string, authority_selector_c_string, NULL)
credentials.references.append(authorization_token)
credentials.references.append(authority_selector)
return credentials
def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin):
cdef CallCredentials credentials = CallCredentials()
credentials.c_credentials = (
grpc_metadata_credentials_create_from_plugin(plugin.make_c_plugin(),
NULL))
cdef grpc_metadata_credentials_plugin c_plugin = plugin.make_c_plugin()
with nogil:
credentials.c_credentials = (
grpc_metadata_credentials_create_from_plugin(c_plugin, NULL))
# TODO(atash): the following held reference is *probably* never necessary
credentials.references.append(plugin)
return credentials
......@@ -274,11 +291,12 @@ def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs,
credentials.references.append(pem_key_cert_pairs)
credentials.references.append(pem_root_certs)
credentials.c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs)
credentials.c_ssl_pem_key_cert_pairs = (
<grpc_ssl_pem_key_cert_pair *>gpr_malloc(
sizeof(grpc_ssl_pem_key_cert_pair) *
credentials.c_ssl_pem_key_cert_pairs_count
))
with nogil:
credentials.c_ssl_pem_key_cert_pairs = (
<grpc_ssl_pem_key_cert_pair *>gpr_malloc(
sizeof(grpc_ssl_pem_key_cert_pair) *
credentials.c_ssl_pem_key_cert_pairs_count
))
for i in range(credentials.c_ssl_pem_key_cert_pairs_count):
credentials.c_ssl_pem_key_cert_pairs[i] = (
(<SslPemKeyCertPair>pem_key_cert_pairs[i]).c_pair)
......
......@@ -38,27 +38,27 @@ cdef extern from "grpc/_cython/loader.h":
int pygrpc_load_core(char*)
void *gpr_malloc(size_t size)
void gpr_free(void *ptr)
void *gpr_realloc(void *p, size_t size)
void *gpr_malloc(size_t size) nogil
void gpr_free(void *ptr) nogil
void *gpr_realloc(void *p, size_t size) nogil
ctypedef struct gpr_slice:
# don't worry about writing out the members of gpr_slice; we never access
# them directly.
pass
gpr_slice gpr_slice_ref(gpr_slice s)
void gpr_slice_unref(gpr_slice s)
gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *))
gpr_slice gpr_slice_ref(gpr_slice s) nogil
void gpr_slice_unref(gpr_slice s) nogil
gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil
gpr_slice gpr_slice_new_with_len(
void *p, size_t len, void (*destroy)(void *, size_t))
gpr_slice gpr_slice_malloc(size_t length)
gpr_slice gpr_slice_from_copied_string(const char *source)
gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len)
void *p, size_t len, void (*destroy)(void *, size_t)) nogil
gpr_slice gpr_slice_malloc(size_t length) nogil
gpr_slice gpr_slice_from_copied_string(const char *source) nogil
gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len) nogil
# Declare functions for function-like macros (because Cython)...
void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s)
size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s)
void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s) nogil
size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) nogil
ctypedef enum gpr_clock_type:
GPR_CLOCK_MONOTONIC
......@@ -71,14 +71,14 @@ cdef extern from "grpc/_cython/loader.h":
int32_t nanoseconds "tv_nsec"
gpr_clock_type clock_type
gpr_timespec gpr_time_0(gpr_clock_type type)
gpr_timespec gpr_inf_future(gpr_clock_type type)
gpr_timespec gpr_inf_past(gpr_clock_type type)
gpr_timespec gpr_time_0(gpr_clock_type type) nogil
gpr_timespec gpr_inf_future(gpr_clock_type type) nogil
gpr_timespec gpr_inf_past(gpr_clock_type type) nogil
gpr_timespec gpr_now(gpr_clock_type clock)
gpr_timespec gpr_now(gpr_clock_type clock) nogil
gpr_timespec gpr_convert_clock_type(gpr_timespec t,
gpr_clock_type target_clock)
gpr_clock_type target_clock) nogil
ctypedef enum grpc_status_code:
GRPC_STATUS_OK
......@@ -114,15 +114,15 @@ cdef extern from "grpc/_cython/loader.h":
pass
grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices,
size_t nslices)
size_t grpc_byte_buffer_length(grpc_byte_buffer *bb)
void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer)
size_t nslices) nogil
size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) nogil
void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) nogil
void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer)
grpc_byte_buffer *buffer) nogil
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
gpr_slice *slice)
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader)
gpr_slice *slice) nogil
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) nogil
const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING
const char *GRPC_ARG_ENABLE_CENSUS
......@@ -221,8 +221,8 @@ cdef extern from "grpc/_cython/loader.h":
size_t capacity
grpc_metadata *metadata
void grpc_metadata_array_init(grpc_metadata_array *array)
void grpc_metadata_array_destroy(grpc_metadata_array *array)
void grpc_metadata_array_init(grpc_metadata_array *array) nogil
void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil
ctypedef struct grpc_call_details:
char *method
......@@ -231,8 +231,8 @@ cdef extern from "grpc/_cython/loader.h":
size_t host_capacity
gpr_timespec deadline
void grpc_call_details_init(grpc_call_details *details)
void grpc_call_details_destroy(grpc_call_details *details)
void grpc_call_details_init(grpc_call_details *details) nogil
void grpc_call_details_destroy(grpc_call_details *details) nogil
ctypedef enum grpc_op_type:
GRPC_OP_SEND_INITIAL_METADATA
......@@ -277,61 +277,62 @@ cdef extern from "grpc/_cython/loader.h":
uint32_t flags
grpc_op_data data
void grpc_init()
void grpc_shutdown()
void grpc_init() nogil
void grpc_shutdown() nogil
grpc_completion_queue *grpc_completion_queue_create(void *reserved)
grpc_completion_queue *grpc_completion_queue_create(void *reserved) nogil
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline,
void *reserved) nogil
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline,
void *reserved) nogil
void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
void grpc_completion_queue_destroy(grpc_completion_queue *cq)
void grpc_completion_queue_shutdown(grpc_completion_queue *cq) nogil
void grpc_completion_queue_destroy(grpc_completion_queue *cq) nogil
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag, void *reserved)
grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved)
grpc_call_error grpc_call_start_batch(
grpc_call *call, const grpc_op *ops, size_t nops, void *tag,
void *reserved) nogil
grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) nogil
grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
grpc_status_code status,
const char *description,
void *reserved)
char *grpc_call_get_peer(grpc_call *call)
void grpc_call_destroy(grpc_call *call)
void *reserved) nogil
char *grpc_call_get_peer(grpc_call *call) nogil
void grpc_call_destroy(grpc_call *call) nogil
grpc_channel *grpc_insecure_channel_create(const char *target,
const grpc_channel_args *args,
void *reserved)
grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_call *parent_call,
uint32_t propagation_mask,
grpc_completion_queue *completion_queue,
const char *method, const char *host,
gpr_timespec deadline, void *reserved)
void *reserved) nogil
grpc_call *grpc_channel_create_call(
grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
grpc_completion_queue *completion_queue, const char *method,
const char *host, gpr_timespec deadline, void *reserved) nogil
grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_channel *channel, int try_to_connect)
grpc_channel *channel, int try_to_connect) nogil
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
gpr_timespec deadline, grpc_completion_queue *cq, void *tag)
char *grpc_channel_get_target(grpc_channel *channel)
void grpc_channel_destroy(grpc_channel *channel)
gpr_timespec deadline, grpc_completion_queue *cq, void *tag) nogil
char *grpc_channel_get_target(grpc_channel *channel) nogil
void grpc_channel_destroy(grpc_channel *channel) nogil
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved)
grpc_server *grpc_server_create(
const grpc_channel_args *args, void *reserved) nogil
grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata, grpc_completion_queue
*cq_bound_to_call, grpc_completion_queue *cq_for_notification, void
*tag_new)
*tag_new) nogil
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
void *reserved)
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr)
void grpc_server_start(grpc_server *server)
void *reserved) nogil
int grpc_server_add_insecure_http2_port(
grpc_server *server, const char *addr) nogil
void grpc_server_start(grpc_server *server) nogil
void grpc_server_shutdown_and_notify(
grpc_server *server, grpc_completion_queue *cq, void *tag)
void grpc_server_cancel_all_calls(grpc_server *server)
void grpc_server_destroy(grpc_server *server)
grpc_server *server, grpc_completion_queue *cq, void *tag) nogil
void grpc_server_cancel_all_calls(grpc_server *server) nogil
void grpc_server_destroy(grpc_server *server) nogil
ctypedef struct grpc_ssl_pem_key_cert_pair:
const char *private_key
......@@ -347,35 +348,36 @@ cdef extern from "grpc/_cython/loader.h":
ctypedef void (*grpc_ssl_roots_override_callback)(char **pem_root_certs)
void grpc_set_ssl_roots_override_callback(grpc_ssl_roots_override_callback cb)
void grpc_set_ssl_roots_override_callback(
grpc_ssl_roots_override_callback cb) nogil
grpc_channel_credentials *grpc_google_default_credentials_create()
grpc_channel_credentials *grpc_google_default_credentials_create() nogil
grpc_channel_credentials *grpc_ssl_credentials_create(
const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair,
void *reserved)
void *reserved) nogil
grpc_channel_credentials *grpc_composite_channel_credentials_create(
grpc_channel_credentials *creds1, grpc_call_credentials *creds2,
void *reserved)
void grpc_channel_credentials_release(grpc_channel_credentials *creds)
void *reserved) nogil
void grpc_channel_credentials_release(grpc_channel_credentials *creds) nogil
grpc_call_credentials *grpc_composite_call_credentials_create(
grpc_call_credentials *creds1, grpc_call_credentials *creds2,
void *reserved)
void *reserved) nogil
grpc_call_credentials *grpc_google_compute_engine_credentials_create(
void *reserved)
void *reserved) nogil
grpc_call_credentials *grpc_service_account_jwt_access_credentials_create(
const char *json_key,
gpr_timespec token_lifetime, void *reserved)
gpr_timespec token_lifetime, void *reserved) nogil
grpc_call_credentials *grpc_google_refresh_token_credentials_create(
const char *json_refresh_token, void *reserved)
const char *json_refresh_token, void *reserved) nogil
grpc_call_credentials *grpc_google_iam_credentials_create(
const char *authorization_token, const char *authority_selector,
void *reserved)
void grpc_call_credentials_release(grpc_call_credentials *creds)
void *reserved) nogil
void grpc_call_credentials_release(grpc_call_credentials *creds) nogil
grpc_channel *grpc_secure_channel_create(
grpc_channel_credentials *creds, const char *target,
const grpc_channel_args *args, void *reserved)
const grpc_channel_args *args, void *reserved) nogil
ctypedef struct grpc_server_credentials:
# We don't care about the internals (and in fact don't know them)
......@@ -385,13 +387,13 @@ cdef extern from "grpc/_cython/loader.h":
const char *pem_root_certs,
grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs,
size_t num_key_cert_pairs, int force_client_auth, void *reserved)
void grpc_server_credentials_release(grpc_server_credentials *creds)
void grpc_server_credentials_release(grpc_server_credentials *creds) nogil
int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
grpc_server_credentials *creds)
grpc_server_credentials *creds) nogil
grpc_call_error grpc_call_set_credentials(grpc_call *call,
grpc_call_credentials *creds)
grpc_call_credentials *creds) nogil
ctypedef struct grpc_auth_context:
# We don't care about the internals (and in fact don't know them)
......@@ -415,4 +417,4 @@ cdef extern from "grpc/_cython/loader.h":
const char *type
grpc_call_credentials *grpc_metadata_credentials_create_from_plugin(
grpc_metadata_credentials_plugin plugin, void *reserved)
grpc_metadata_credentials_plugin plugin, void *reserved) nogil
......@@ -107,15 +107,18 @@ cdef class Timespec:
def __cinit__(self, time):
if time is None:
self.c_time = gpr_now(GPR_CLOCK_REALTIME)
with nogil:
self.c_time = gpr_now(GPR_CLOCK_REALTIME)
return
if isinstance(time, int):
time = float(time)
if isinstance(time, float):
if time == float("+inf"):
self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME)
with nogil:
self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME)
elif time == float("-inf"):
self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME)
with nogil:
self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME)
else:
self.c_time.seconds = time
self.c_time.nanoseconds = (time - float(self.c_time.seconds)) * 1e9
......@@ -131,8 +134,10 @@ cdef class Timespec:
# TODO(atash) ensure that everywhere a Timespec is created that it's
# converted to GPR_CLOCK_REALTIME then and not every time someone wants to
# read values off in Python.
cdef gpr_timespec real_time = (
gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME))
cdef gpr_timespec real_time
with nogil:
real_time = (
gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME))
return real_time.seconds
@property
......@@ -158,10 +163,12 @@ cdef class Timespec:
cdef class CallDetails:
def __cinit__(self):
grpc_call_details_init(&self.c_details)
with nogil:
grpc_call_details_init(&self.c_details)
def __dealloc__(self):
grpc_call_details_destroy(&self.c_details)
with nogil:
grpc_call_details_destroy(&self.c_details)
@property
def method(self):
......@@ -229,10 +236,15 @@ cdef class ByteBuffer:
"ByteBuffer, not {}".format(type(data)))
cdef char *c_data = data
data_slice = gpr_slice_from_copied_buffer(c_data, len(data))
self.c_byte_buffer = grpc_raw_byte_buffer_create(
&data_slice, 1)
gpr_slice_unref(data_slice)
cdef gpr_slice data_slice
cdef size_t data_length = len(data)
with nogil:
data_slice = gpr_slice_from_copied_buffer(c_data, data_length)
with nogil:
self.c_byte_buffer = grpc_raw_byte_buffer_create(
&data_slice, 1)
with nogil:
gpr_slice_unref(data_slice)
def bytes(self):
cdef grpc_byte_buffer_reader reader
......@@ -240,20 +252,27 @@ cdef class ByteBuffer:
cdef size_t data_slice_length
cdef void *data_slice_pointer
if self.c_byte_buffer != NULL:
grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer)
with nogil:
grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer)
result = b""
while grpc_byte_buffer_reader_next(&reader, &data_slice):
data_slice_pointer = gpr_slice_start_ptr(data_slice)
data_slice_length = gpr_slice_length(data_slice)
result += (<char *>data_slice_pointer)[:data_slice_length]
grpc_byte_buffer_reader_destroy(&reader)
with nogil:
while grpc_byte_buffer_reader_next(&reader, &data_slice):
data_slice_pointer = gpr_slice_start_ptr(data_slice)
data_slice_length = gpr_slice_length(data_slice)
with gil:
result += (<char *>data_slice_pointer)[:data_slice_length]
with nogil:
grpc_byte_buffer_reader_destroy(&reader)
return result
else:
return None
def __len__(self):
cdef size_t result
if self.c_byte_buffer != NULL:
return grpc_byte_buffer_length(self.c_byte_buffer)
with nogil:
result = grpc_byte_buffer_length(self.c_byte_buffer)
return result
else:
return 0
......@@ -262,7 +281,8 @@ cdef class ByteBuffer:
def __dealloc__(self):
if self.c_byte_buffer != NULL:
grpc_byte_buffer_destroy(self.c_byte_buffer)
with nogil:
grpc_byte_buffer_destroy(self.c_byte_buffer)
cdef class SslPemKeyCertPair:
......@@ -319,14 +339,15 @@ cdef class ChannelArgs:
if not isinstance(arg, ChannelArg):
raise TypeError("expected list of ChannelArg")
self.c_args.arguments_length = len(self.args)
self.c_args.arguments = <grpc_arg *>gpr_malloc(
self.c_args.arguments_length*sizeof(grpc_arg)
)
with nogil:
self.c_args.arguments = <grpc_arg *>gpr_malloc(
self.c_args.arguments_length*sizeof(grpc_arg))
for i in range(self.c_args.arguments_length):
self.c_args.arguments[i] = (<ChannelArg>self.args[i]).c_arg
def __dealloc__(self):
gpr_free(self.c_args.arguments)
with nogil:
gpr_free(self.c_args.arguments)
def __len__(self):
# self.args is never stale; it's only updated from this file
......@@ -407,21 +428,24 @@ cdef class Metadata:
for metadatum in metadata:
if not isinstance(metadatum, Metadatum):
raise TypeError("expected list of Metadatum")
grpc_metadata_array_init(&self.c_metadata_array)
with nogil:
grpc_metadata_array_init(&self.c_metadata_array)
self.c_metadata_array.count = len(self.metadata)
self.c_metadata_array.capacity = len(self.metadata)
self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
self.c_metadata_array.count*sizeof(grpc_metadata)
)
with nogil:
self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
self.c_metadata_array.count*sizeof(grpc_metadata)
)
for i in range(self.c_metadata_array.count):
self.c_metadata_array.metadata[i] = (
(<Metadatum>self.metadata[i]).c_metadata)
def __dealloc__(self):
# this frees the allocated memory for the grpc_metadata_array (although
# it'd be nice if that were documented somewhere...) TODO(atash): document
# this in the C core
grpc_metadata_array_destroy(&self.c_metadata_array)
# it'd be nice if that were documented somewhere...)
# TODO(atash): document this in the C core
with nogil:
grpc_metadata_array_destroy(&self.c_metadata_array)
def __len__(self):
return self.c_metadata_array.count
......@@ -526,7 +550,8 @@ cdef class Operation:
# Python. The remaining one(s) are primitive fields filled in by GRPC core.
# This means that we need to clean up after receive_status_on_client.
if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT:
gpr_free(self._received_status_details)
with nogil:
gpr_free(self._received_status_details)
def operation_send_initial_metadata(Metadata metadata):
cdef Operation op = Operation()
......@@ -648,8 +673,8 @@ cdef class Operations:
if not isinstance(operation, Operation):
raise TypeError("expected operations to be iterable of Operation")
self.c_nops = len(self.operations)
self.c_ops = <grpc_op *>gpr_malloc(
sizeof(grpc_op)*self.c_nops)
with nogil:
self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op)*self.c_nops)
for i in range(self.c_nops):
self.c_ops[i] = (<Operation>(self.operations[i])).c_op
......@@ -661,7 +686,8 @@ cdef class Operations:
return self.operations[i]
def __dealloc__(self):
gpr_free(self.c_ops)
with nogil:
gpr_free(self.c_ops)
def __iter__(self):
return _OperationsIterator(self)
......
......@@ -41,7 +41,8 @@ cdef class Server:
if arguments is not None:
c_arguments = &arguments.c_args
self.references.append(arguments)
self.c_server = grpc_server_create(c_arguments, NULL)
with nogil:
self.c_server = grpc_server_create(c_arguments, NULL)
self.is_started = False
self.is_shutting_down = False
self.is_shutdown = False
......@@ -53,6 +54,7 @@ cdef class Server:
raise ValueError("server must be started and not shutting down")
if server_queue not in self.registered_completion_queues:
raise ValueError("server_queue must be a registered completion queue")
cdef grpc_call_error result
cdef OperationTag operation_tag = OperationTag(tag)
operation_tag.operation_call = Call()
operation_tag.request_call_details = CallDetails()
......@@ -61,19 +63,22 @@ cdef class Server:
operation_tag.is_new_request = True
operation_tag.batch_operations = Operations([])
cpython.Py_INCREF(operation_tag)
return grpc_server_request_call(
self.c_server, &operation_tag.operation_call.c_call,
&operation_tag.request_call_details.c_details,
&operation_tag.request_metadata.c_metadata_array,
call_queue.c_completion_queue, server_queue.c_completion_queue,
<cpython.PyObject *>operation_tag)
with nogil:
result = grpc_server_request_call(
self.c_server, &operation_tag.operation_call.c_call,
&operation_tag.request_call_details.c_details,
&operation_tag.request_metadata.c_metadata_array,
call_queue.c_completion_queue, server_queue.c_completion_queue,
<cpython.PyObject *>operation_tag)
return result
def register_completion_queue(
self, CompletionQueue queue not None):
if self.is_started:
raise ValueError("cannot register completion queues after start")
grpc_server_register_completion_queue(
self.c_server, queue.c_completion_queue, NULL)
with nogil:
grpc_server_register_completion_queue(
self.c_server, queue.c_completion_queue, NULL)
self.registered_completion_queues.append(queue)
def start(self):
......@@ -82,7 +87,8 @@ cdef class Server:
self.backup_shutdown_queue = CompletionQueue()
self.register_completion_queue(self.backup_shutdown_queue)
self.is_started = True
grpc_server_start(self.c_server)
with nogil:
grpc_server_start(self.c_server)
# Ensure the core has gotten a chance to do the start-up work
self.backup_shutdown_queue.pluck(None, Timespec(None))
......@@ -95,21 +101,28 @@ cdef class Server:
else:
raise TypeError("expected address to be a str or bytes")
self.references.append(address)
cdef int result
cdef char *address_c_string = address
if server_credentials is not None:
self.references.append(server_credentials)
return grpc_server_add_secure_http2_port(
self.c_server, address, server_credentials.c_credentials)
with nogil:
result = grpc_server_add_secure_http2_port(
self.c_server, address_c_string, server_credentials.c_credentials)
else:
return grpc_server_add_insecure_http2_port(self.c_server, address)
with nogil:
result = grpc_server_add_insecure_http2_port(self.c_server,
address_c_string)
return result
cdef _c_shutdown(self, CompletionQueue queue, tag):
self.is_shutting_down = True
operation_tag = OperationTag(tag)
operation_tag.shutting_down_server = self
cpython.Py_INCREF(operation_tag)
grpc_server_shutdown_and_notify(
self.c_server, queue.c_completion_queue,
<cpython.PyObject *>operation_tag)
with nogil:
grpc_server_shutdown_and_notify(
self.c_server, queue.c_completion_queue,
<cpython.PyObject *>operation_tag)
def shutdown(self, CompletionQueue queue not None, tag):
cdef OperationTag operation_tag
......@@ -134,7 +147,8 @@ cdef class Server:
elif self.is_shutdown:
return
else:
grpc_server_cancel_all_calls(self.c_server)
with nogil:
grpc_server_cancel_all_calls(self.c_server)
def __dealloc__(self):
if self.c_server != NULL:
......@@ -153,5 +167,6 @@ cdef class Server:
# much but repeatedly release the GIL and wait
while not self.is_shutdown:
time.sleep(0)
grpc_server_destroy(self.c_server)
with nogil:
grpc_server_destroy(self.c_server)
......@@ -57,14 +57,17 @@ cdef class _ModuleState:
'grpc._cython', '_windows/grpc_c.64.python')
if not pygrpc_load_core(filename):
raise ImportError('failed to load core gRPC library')
grpc_init()
with nogil:
grpc_init()
self.is_loaded = True
grpc_set_ssl_roots_override_callback(
<grpc_ssl_roots_override_callback>ssl_roots_override_callback)
with nogil:
grpc_set_ssl_roots_override_callback(
<grpc_ssl_roots_override_callback>ssl_roots_override_callback)
def __dealloc__(self):
if self.is_loaded:
grpc_shutdown()
with nogil:
grpc_shutdown()
_module_state = _ModuleState()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment