diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 532fd2d157eee9d38484ffad5bd26d85da4ebf61..91e3d3018da0b42c54e6e05718626183f8e71c5a 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -198,15 +198,6 @@ typedef struct grpc_metadata { typedef enum grpc_completion_type { GRPC_QUEUE_SHUTDOWN, /* Shutting down */ GRPC_OP_COMPLETE, /* operation completion */ - GRPC_READ, /* A read has completed */ - GRPC_WRITE_ACCEPTED, /* A write has been accepted by - flow control */ - GRPC_FINISH_ACCEPTED, /* writes_done or write_status has been accepted */ - GRPC_CLIENT_METADATA_READ, /* The metadata array sent by server received at - client */ - GRPC_FINISHED, /* An RPC has finished. The event contains status. - On the server this will be OK or Cancelled. */ - GRPC_SERVER_RPC_NEW, /* A new RPC has arrived at the server */ GRPC_SERVER_SHUTDOWN, /* The server has finished shutting down */ GRPC_COMPLETION_DO_NOT_USE /* must be last, forces users to include a default: case */ @@ -219,30 +210,7 @@ typedef struct grpc_event { /* Data associated with the completion type. Field names match the type of completion as listed in grpc_completion_type. */ union { - /* Contains a pointer to the buffer that was read, or NULL at the end of a - stream. */ - grpc_byte_buffer *read; - grpc_op_error write_accepted; - grpc_op_error finish_accepted; - grpc_op_error invoke_accepted; grpc_op_error op_complete; - struct { - size_t count; - grpc_metadata *elements; - } client_metadata_read; - struct { - grpc_status_code status; - const char *details; - size_t metadata_count; - grpc_metadata *metadata_elements; - } finished; - struct { - const char *method; - const char *host; - gpr_timespec deadline; - size_t metadata_count; - grpc_metadata *metadata_elements; - } server_rpc_new; } data; } grpc_event; diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index c1c97af3377d60ef47aba834dbc5b4c22860baa3..3e9031807e0b3a6ad60ef553c64629466967db11 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -183,111 +183,17 @@ void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } -void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - grpc_byte_buffer *read) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data); - ev->base.data.read = read; - end_op_locked(cc, GRPC_READ); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - -void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, - grpc_call *call, - grpc_event_finish_func on_finish, - void *user_data, grpc_op_error error) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data); - ev->base.data.write_accepted = error; - end_op_locked(cc, GRPC_WRITE_ACCEPTED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - -void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag, - grpc_call *call, grpc_event_finish_func on_finish, - void *user_data, grpc_op_error error) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data); - ev->base.data.write_accepted = error; - end_op_locked(cc, GRPC_OP_COMPLETE); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data); - ev->base.data.write_accepted = error; + ev->base.data.op_complete = error; end_op_locked(cc, GRPC_OP_COMPLETE); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } -void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, - grpc_call *call, - grpc_event_finish_func on_finish, - void *user_data, grpc_op_error error) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data); - ev->base.data.finish_accepted = error; - end_op_locked(cc, GRPC_FINISH_ACCEPTED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - -void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, - grpc_call *call, - grpc_event_finish_func on_finish, - void *user_data, size_t count, - grpc_metadata *elements) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish, - user_data); - ev->base.data.client_metadata_read.count = count; - ev->base.data.client_metadata_read.elements = elements; - end_op_locked(cc, GRPC_CLIENT_METADATA_READ); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - -void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - grpc_status_code status, const char *details, - grpc_metadata *metadata_elements, - size_t metadata_count) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data); - ev->base.data.finished.status = status; - ev->base.data.finished.details = details; - ev->base.data.finished.metadata_count = metadata_count; - ev->base.data.finished.metadata_elements = metadata_elements; - end_op_locked(cc, GRPC_FINISHED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - -void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - const char *method, const char *host, - gpr_timespec deadline, size_t metadata_count, - grpc_metadata *metadata_elements) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data); - ev->base.data.server_rpc_new.method = method; - ev->base.data.server_rpc_new.host = host; - ev->base.data.server_rpc_new.deadline = deadline; - ev->base.data.server_rpc_new.metadata_count = metadata_count; - ev->base.data.server_rpc_new.metadata_elements = metadata_elements; - end_op_locked(cc, GRPC_SERVER_RPC_NEW); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - /* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ static event *create_shutdown_event(void) { event *ev = gpr_malloc(sizeof(event)); diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c index 0fa3f166e2c56024579b4b8e1e14d23293e96434..30bdff6b852ae6693c3493467f9dd612e0cdde1e 100644 --- a/src/core/surface/event_string.c +++ b/src/core/surface/event_string.c @@ -62,7 +62,6 @@ static void adderr(gpr_strvec *buf, grpc_op_error err) { char *grpc_event_string(grpc_event *ev) { char *out; - char *tmp; gpr_strvec buf; if (ev == NULL) return gpr_strdup("null"); @@ -76,55 +75,11 @@ char *grpc_event_string(grpc_event *ev) { case GRPC_QUEUE_SHUTDOWN: gpr_strvec_add(&buf, gpr_strdup("QUEUE_SHUTDOWN")); break; - case GRPC_READ: - gpr_strvec_add(&buf, gpr_strdup("READ: ")); - addhdr(&buf, ev); - if (ev->data.read) { - gpr_asprintf(&tmp, " %d bytes", - (int)grpc_byte_buffer_length(ev->data.read)); - gpr_strvec_add(&buf, tmp); - } else { - gpr_strvec_add(&buf, gpr_strdup(" end-of-stream")); - } - break; case GRPC_OP_COMPLETE: gpr_strvec_add(&buf, gpr_strdup("OP_COMPLETE: ")); addhdr(&buf, ev); adderr(&buf, ev->data.op_complete); break; - case GRPC_WRITE_ACCEPTED: - gpr_strvec_add(&buf, gpr_strdup("WRITE_ACCEPTED: ")); - addhdr(&buf, ev); - adderr(&buf, ev->data.write_accepted); - break; - case GRPC_FINISH_ACCEPTED: - gpr_strvec_add(&buf, gpr_strdup("FINISH_ACCEPTED: ")); - addhdr(&buf, ev); - adderr(&buf, ev->data.write_accepted); - break; - case GRPC_CLIENT_METADATA_READ: - gpr_strvec_add(&buf, gpr_strdup("CLIENT_METADATA_READ: ")); - addhdr(&buf, ev); - gpr_asprintf(&tmp, " %d elements", - (int)ev->data.client_metadata_read.count); - gpr_strvec_add(&buf, tmp); - break; - case GRPC_FINISHED: - gpr_strvec_add(&buf, gpr_strdup("FINISHED: ")); - addhdr(&buf, ev); - gpr_asprintf(&tmp, " status=%d details='%s' %d metadata elements", - ev->data.finished.status, ev->data.finished.details, - (int)ev->data.finished.metadata_count); - gpr_strvec_add(&buf, tmp); - break; - case GRPC_SERVER_RPC_NEW: - gpr_strvec_add(&buf, gpr_strdup("SERVER_RPC_NEW: ")); - addhdr(&buf, ev); - gpr_asprintf(&tmp, " method='%s' host='%s' %d metadata elements", - ev->data.server_rpc_new.method, ev->data.server_rpc_new.host, - (int)ev->data.server_rpc_new.metadata_count); - gpr_strvec_add(&buf, tmp); - break; case GRPC_COMPLETION_DO_NOT_USE: gpr_strvec_add(&buf, gpr_strdup("DO_NOT_USE (this is a bug)")); addhdr(&buf, ev); diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c index 9369dfd7ec6c50485c28902c0afd72e86ed09556..f291e73e3b136acb2bfee348ab7eab252b160cab 100644 --- a/test/core/end2end/cq_verifier.c +++ b/test/core/end2end/cq_verifier.c @@ -61,23 +61,7 @@ typedef struct expectation { grpc_completion_type type; void *tag; union { - grpc_op_error finish_accepted; - grpc_op_error write_accepted; grpc_op_error op_complete; - struct { - const char *method; - const char *host; - gpr_timespec deadline; - grpc_call **output_call; - metadata *metadata; - } server_rpc_new; - metadata *client_metadata_read; - struct { - grpc_status_code status; - const char *details; - metadata *metadata; - } finished; - gpr_slice *read; } data; } expectation; @@ -121,17 +105,6 @@ int contains_metadata(grpc_metadata_array *array, const char *key, return has_metadata(array->metadata, array->count, key, value); } -static void verify_and_destroy_metadata(metadata *md, grpc_metadata *elems, - size_t count) { - size_t i; - for (i = 0; i < md->count; i++) { - GPR_ASSERT(has_metadata(elems, count, md->keys[i], md->values[i])); - } - gpr_free(md->keys); - gpr_free(md->values); - gpr_free(md); -} - static gpr_slice merge_slices(gpr_slice *slices, size_t nslices) { size_t i; size_t len = 0; @@ -168,60 +141,13 @@ int byte_buffer_eq_string(grpc_byte_buffer *bb, const char *str) { return byte_buffer_eq_slice(bb, gpr_slice_from_copied_string(str)); } -static int string_equivalent(const char *a, const char *b) { - if (a == NULL) return b == NULL || b[0] == 0; - if (b == NULL) return a[0] == 0; - return strcmp(a, b) == 0; -} - static void verify_matches(expectation *e, grpc_event *ev) { GPR_ASSERT(e->type == ev->type); switch (e->type) { - case GRPC_FINISH_ACCEPTED: - GPR_ASSERT(e->data.finish_accepted == ev->data.finish_accepted); - break; - case GRPC_WRITE_ACCEPTED: - GPR_ASSERT(e->data.write_accepted == ev->data.write_accepted); - break; - case GRPC_SERVER_RPC_NEW: - GPR_ASSERT(string_equivalent(e->data.server_rpc_new.method, - ev->data.server_rpc_new.method)); - GPR_ASSERT(string_equivalent(e->data.server_rpc_new.host, - ev->data.server_rpc_new.host)); - GPR_ASSERT(gpr_time_cmp(e->data.server_rpc_new.deadline, - ev->data.server_rpc_new.deadline) <= 0); - *e->data.server_rpc_new.output_call = ev->call; - verify_and_destroy_metadata(e->data.server_rpc_new.metadata, - ev->data.server_rpc_new.metadata_elements, - ev->data.server_rpc_new.metadata_count); - break; - case GRPC_CLIENT_METADATA_READ: - verify_and_destroy_metadata(e->data.client_metadata_read, - ev->data.client_metadata_read.elements, - ev->data.client_metadata_read.count); - break; - case GRPC_FINISHED: - if (e->data.finished.status != GRPC_STATUS__DO_NOT_USE) { - GPR_ASSERT(e->data.finished.status == ev->data.finished.status); - GPR_ASSERT(string_equivalent(e->data.finished.details, - ev->data.finished.details)); - } - verify_and_destroy_metadata(e->data.finished.metadata, - ev->data.finished.metadata_elements, - ev->data.finished.metadata_count); - break; case GRPC_QUEUE_SHUTDOWN: gpr_log(GPR_ERROR, "premature queue shutdown"); abort(); break; - case GRPC_READ: - if (e->data.read) { - GPR_ASSERT(byte_buffer_eq_slice(ev->data.read, *e->data.read)); - gpr_free(e->data.read); - } else { - GPR_ASSERT(ev->data.read == NULL); - } - break; case GRPC_OP_COMPLETE: GPR_ASSERT(e->data.op_complete == ev->data.op_complete); break; @@ -234,66 +160,14 @@ static void verify_matches(expectation *e, grpc_event *ev) { } } -static void metadata_expectation(gpr_strvec *buf, metadata *md) { - size_t i; - char *tmp; - - if (!md) { - gpr_strvec_add(buf, gpr_strdup("nil")); - } else { - for (i = 0; i < md->count; i++) { - gpr_asprintf(&tmp, "%c%s:%s", i ? ',' : '{', md->keys[i], md->values[i]); - gpr_strvec_add(buf, tmp); - } - if (md->count) { - gpr_strvec_add(buf, gpr_strdup("}")); - } - } -} - static void expectation_to_strvec(gpr_strvec *buf, expectation *e) { - gpr_timespec timeout; char *tmp; switch (e->type) { - case GRPC_FINISH_ACCEPTED: - gpr_asprintf(&tmp, "GRPC_FINISH_ACCEPTED result=%d", - e->data.finish_accepted); - gpr_strvec_add(buf, tmp); - break; - case GRPC_WRITE_ACCEPTED: - gpr_asprintf(&tmp, "GRPC_WRITE_ACCEPTED result=%d", - e->data.write_accepted); - gpr_strvec_add(buf, tmp); - break; case GRPC_OP_COMPLETE: gpr_asprintf(&tmp, "GRPC_OP_COMPLETE result=%d", e->data.op_complete); gpr_strvec_add(buf, tmp); break; - case GRPC_SERVER_RPC_NEW: - timeout = gpr_time_sub(e->data.server_rpc_new.deadline, gpr_now()); - gpr_asprintf(&tmp, "GRPC_SERVER_RPC_NEW method=%s host=%s timeout=%fsec", - e->data.server_rpc_new.method, e->data.server_rpc_new.host, - timeout.tv_sec + 1e-9 * timeout.tv_nsec); - gpr_strvec_add(buf, tmp); - break; - case GRPC_CLIENT_METADATA_READ: - gpr_strvec_add(buf, gpr_strdup("GRPC_CLIENT_METADATA_READ ")); - metadata_expectation(buf, e->data.client_metadata_read); - break; - case GRPC_FINISHED: - gpr_asprintf(&tmp, "GRPC_FINISHED status=%d details=%s ", - e->data.finished.status, e->data.finished.details); - gpr_strvec_add(buf, tmp); - metadata_expectation(buf, e->data.finished.metadata); - break; - case GRPC_READ: - gpr_strvec_add(buf, gpr_strdup("GRPC_READ data=")); - gpr_strvec_add( - buf, - gpr_hexdump((char *)GPR_SLICE_START_PTR(*e->data.read), - GPR_SLICE_LENGTH(*e->data.read), GPR_HEXDUMP_PLAINTEXT)); - break; case GRPC_SERVER_SHUTDOWN: gpr_strvec_add(buf, gpr_strdup("GRPC_SERVER_SHUTDOWN")); break; @@ -395,104 +269,10 @@ static expectation *add(cq_verifier *v, grpc_completion_type type, void *tag) { return e; } -static metadata *metadata_from_args(va_list args) { - metadata *md = gpr_malloc(sizeof(metadata)); - const char *key, *value; - md->count = 0; - md->cap = 0; - md->keys = NULL; - md->values = NULL; - - for (;;) { - key = va_arg(args, const char *); - if (!key) return md; - value = va_arg(args, const char *); - GPR_ASSERT(value); - - if (md->cap == md->count) { - md->cap = GPR_MAX(md->cap + 1, md->cap * 3 / 2); - md->keys = gpr_realloc(md->keys, sizeof(char *) * md->cap); - md->values = gpr_realloc(md->values, sizeof(char *) * md->cap); - } - md->keys[md->count] = (char *)key; - md->values[md->count] = (char *)value; - md->count++; - } -} - -void cq_expect_write_accepted(cq_verifier *v, void *tag, grpc_op_error result) { - add(v, GRPC_WRITE_ACCEPTED, tag)->data.write_accepted = result; -} - void cq_expect_completion(cq_verifier *v, void *tag, grpc_op_error result) { add(v, GRPC_OP_COMPLETE, tag)->data.op_complete = result; } -void cq_expect_finish_accepted(cq_verifier *v, void *tag, - grpc_op_error result) { - add(v, GRPC_FINISH_ACCEPTED, tag)->data.finish_accepted = result; -} - -void cq_expect_read(cq_verifier *v, void *tag, gpr_slice bytes) { - expectation *e = add(v, GRPC_READ, tag); - e->data.read = gpr_malloc(sizeof(gpr_slice)); - *e->data.read = bytes; -} - -void cq_expect_empty_read(cq_verifier *v, void *tag) { - expectation *e = add(v, GRPC_READ, tag); - e->data.read = NULL; -} - -void cq_expect_server_rpc_new(cq_verifier *v, grpc_call **output_call, - void *tag, const char *method, const char *host, - gpr_timespec deadline, ...) { - va_list args; - expectation *e = add(v, GRPC_SERVER_RPC_NEW, tag); - e->data.server_rpc_new.method = method; - e->data.server_rpc_new.host = host; - e->data.server_rpc_new.deadline = deadline; - e->data.server_rpc_new.output_call = output_call; - - va_start(args, deadline); - e->data.server_rpc_new.metadata = metadata_from_args(args); - va_end(args); -} - -void cq_expect_client_metadata_read(cq_verifier *v, void *tag, ...) { - va_list args; - expectation *e = add(v, GRPC_CLIENT_METADATA_READ, tag); - - va_start(args, tag); - e->data.client_metadata_read = metadata_from_args(args); - va_end(args); -} - -static void finished_internal(cq_verifier *v, void *tag, - grpc_status_code status, const char *details, - va_list args) { - expectation *e = add(v, GRPC_FINISHED, tag); - e->data.finished.status = status; - e->data.finished.details = details; - e->data.finished.metadata = metadata_from_args(args); -} - -void cq_expect_finished_with_status(cq_verifier *v, void *tag, - grpc_status_code status, - const char *details, ...) { - va_list args; - va_start(args, details); - finished_internal(v, tag, status, details, args); - va_end(args); -} - -void cq_expect_finished(cq_verifier *v, void *tag, ...) { - va_list args; - va_start(args, tag); - finished_internal(v, tag, GRPC_STATUS__DO_NOT_USE, NULL, args); - va_end(args); -} - void cq_expect_server_shutdown(cq_verifier *v, void *tag) { add(v, GRPC_SERVER_SHUTDOWN, tag); } diff --git a/test/core/end2end/cq_verifier.h b/test/core/end2end/cq_verifier.h index c1e25d8aa40267d76cd94806d42565c6127bdc00..bae3c6caf03b8150831619a293047a6004154502 100644 --- a/test/core/end2end/cq_verifier.h +++ b/test/core/end2end/cq_verifier.h @@ -57,20 +57,7 @@ void cq_verify_empty(cq_verifier *v); Any functions taking ... expect a NULL terminated list of key/value pairs (each pair using two parameter slots) of metadata that MUST be present in the event. */ -void cq_expect_write_accepted(cq_verifier *v, void *tag, grpc_op_error result); -void cq_expect_finish_accepted(cq_verifier *v, void *tag, grpc_op_error result); -void cq_expect_read(cq_verifier *v, void *tag, gpr_slice bytes); -void cq_expect_empty_read(cq_verifier *v, void *tag); void cq_expect_completion(cq_verifier *v, void *tag, grpc_op_error result); -/* *output_call is set the the server call instance */ -void cq_expect_server_rpc_new(cq_verifier *v, grpc_call **output_call, - void *tag, const char *method, const char *host, - gpr_timespec deadline, ...); -void cq_expect_client_metadata_read(cq_verifier *v, void *tag, ...); -void cq_expect_finished_with_status(cq_verifier *v, void *tag, - grpc_status_code status_code, - const char *details, ...); -void cq_expect_finished(cq_verifier *v, void *tag, ...); void cq_expect_server_shutdown(cq_verifier *v, void *tag); int byte_buffer_eq_string(grpc_byte_buffer *byte_buffer, const char *string);