Skip to content
Snippets Groups Projects
Commit 445612ec authored by Abhishek Kumar's avatar Abhishek Kumar
Browse files

Btach handling of unary

parent 4151cac0
No related branches found
No related tags found
No related merge requests found
...@@ -57,6 +57,8 @@ static grpc_call_details call_details; ...@@ -57,6 +57,8 @@ static grpc_call_details call_details;
static grpc_metadata_array request_metadata_recv; static grpc_metadata_array request_metadata_recv;
static grpc_metadata_array initial_metadata_send; static grpc_metadata_array initial_metadata_send;
static grpc_byte_buffer *payload_buffer = NULL; static grpc_byte_buffer *payload_buffer = NULL;
/* Used to drain the terminal read in unary calls. */
static grpc_byte_buffer *terminal_buffer = NULL;
static grpc_op read_op; static grpc_op read_op;
...@@ -64,7 +66,7 @@ static grpc_op metadata_send_op; ...@@ -64,7 +66,7 @@ static grpc_op metadata_send_op;
static grpc_op write_op; static grpc_op write_op;
static grpc_op status_op[2]; static grpc_op status_op[2];
static int was_cancelled = 2; static int was_cancelled = 2;
static grpc_op unary_ops[6];
static int got_sigint = 0; static int got_sigint = 0;
static void *tag(gpr_intptr t) { return (void *)t; } static void *tag(gpr_intptr t) { return (void *)t; }
...@@ -75,16 +77,34 @@ typedef struct { ...@@ -75,16 +77,34 @@ typedef struct {
} call_state; } call_state;
static void request_call(void) { static void request_call(void) {
/*
call_state *s = gpr_malloc(sizeof(call_state));
gpr_ref_init(&s->pending_ops, 2);
*/
grpc_metadata_array_init(&request_metadata_recv); grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details); grpc_call_details_init(&call_details);
grpc_server_request_call(server, &call, &call_details, &request_metadata_recv, grpc_server_request_call(server, &call, &call_details, &request_metadata_recv,
cq, tag(101)); cq, tag(101));
} }
static void handle_unary_method(void) {
grpc_metadata_array_init(&initial_metadata_send);
unary_ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
unary_ops[0].data.send_initial_metadata.count = 0;
unary_ops[1].op = GRPC_OP_RECV_MESSAGE;
unary_ops[1].data.recv_message = &terminal_buffer;
unary_ops[2].op = GRPC_OP_SEND_MESSAGE;
if (payload_buffer == NULL) {
gpr_log(GPR_INFO, "NULL payload buffer !!!");
}
unary_ops[2].data.send_message = payload_buffer;
unary_ops[3].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
unary_ops[3].data.send_status_from_server.status = GRPC_STATUS_OK;
unary_ops[3].data.send_status_from_server.trailing_metadata_count = 0;
unary_ops[3].data.send_status_from_server.status_details = "";
unary_ops[4].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
unary_ops[4].data.recv_close_on_server.cancelled = &was_cancelled;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, unary_ops, 5,
tag(6)));
}
static void send_initial_metadata(void) { static void send_initial_metadata(void) {
grpc_metadata_array_init(&initial_metadata_send); grpc_metadata_array_init(&initial_metadata_send);
metadata_send_op.op = GRPC_OP_SEND_INITIAL_METADATA; metadata_send_op.op = GRPC_OP_SEND_INITIAL_METADATA;
...@@ -93,12 +113,12 @@ static void send_initial_metadata(void) { ...@@ -93,12 +113,12 @@ static void send_initial_metadata(void) {
tag(3))); tag(3)));
} }
static void start_read_op(void) { static void start_read_op(int t) {
/* Starting read at server */ /* Starting read at server */
read_op.op = GRPC_OP_RECV_MESSAGE; read_op.op = GRPC_OP_RECV_MESSAGE;
read_op.data.recv_message = &payload_buffer; read_op.data.recv_message = &payload_buffer;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &read_op, 1, GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &read_op, 1,
tag(1))); tag(t)));
} }
static void start_write_op(void) { static void start_write_op(void) {
...@@ -200,11 +220,12 @@ int main(int argc, char **argv) { ...@@ -200,11 +220,12 @@ int main(int argc, char **argv) {
if (0 == strcmp(call_details.method, if (0 == strcmp(call_details.method,
"/Reflector/reflectStream")) { "/Reflector/reflectStream")) {
/* Received streaming call. Send metadata here. */ /* Received streaming call. Send metadata here. */
start_read_op(1);
send_initial_metadata();
} else { } else {
/* Received unary call. Can do all ops in one batch. */ /* Received unary call. Can do all ops in one batch. */
start_read_op(5);
} }
start_read_op();
send_initial_metadata();
} }
else { else {
GPR_ASSERT(shutdown_started); GPR_ASSERT(shutdown_started);
...@@ -224,7 +245,7 @@ int main(int argc, char **argv) { ...@@ -224,7 +245,7 @@ int main(int argc, char **argv) {
break; break;
case 2: case 2:
/* Write completed at server */ /* Write completed at server */
start_read_op(); start_read_op(1);
break; break;
case 3: case 3:
/* Metadata send completed at server */ /* Metadata send completed at server */
...@@ -234,45 +255,26 @@ int main(int argc, char **argv) { ...@@ -234,45 +255,26 @@ int main(int argc, char **argv) {
grpc_call_destroy(call); grpc_call_destroy(call);
request_call(); request_call();
break; break;
case 5:
/* Finished payload read for unary. Start all reamaining
* unary ops in a batch.
*/
handle_unary_method();
break;
case 6:
/* Finished unary call. */
grpc_call_destroy(call);
request_call();
break;
} }
break; break;
case GRPC_SERVER_RPC_NEW: case GRPC_SERVER_RPC_NEW:
if (ev->call != NULL) {
/* initial ops are already started in request_call */
if (0 == strcmp(ev->data.server_rpc_new.method,
"/Reflector/reflectStream")) {
s->flags = 0;
} else {
s->flags = GRPC_WRITE_BUFFER_HINT;
}
grpc_call_server_accept_old(ev->call, cq, s);
grpc_call_server_end_initial_metadata_old(ev->call, s->flags);
GPR_ASSERT(grpc_call_start_read_old(ev->call, s) == GRPC_CALL_OK);
request_call();
} else {
GPR_ASSERT(shutdown_started);
gpr_free(s);
}
break;
case GRPC_WRITE_ACCEPTED: case GRPC_WRITE_ACCEPTED:
GPR_ASSERT(ev->data.write_accepted == GRPC_OP_OK);
GPR_ASSERT(grpc_call_start_read_old(ev->call, s) == GRPC_CALL_OK);
break;
case GRPC_READ: case GRPC_READ:
if (ev->data.read) {
GPR_ASSERT(grpc_call_start_write_old(ev->call, ev->data.read, s,
s->flags) == GRPC_CALL_OK);
} else {
GPR_ASSERT(grpc_call_start_write_status_old(ev->call, GRPC_STATUS_OK,
NULL, s) == GRPC_CALL_OK);
}
break;
case GRPC_FINISH_ACCEPTED: case GRPC_FINISH_ACCEPTED:
case GRPC_FINISHED: case GRPC_FINISHED:
if (gpr_unref(&s->pending_ops)) { gpr_log(GPR_ERROR, "Unexpected event type.");
grpc_call_destroy(ev->call); GPR_ASSERT(0);
gpr_free(s);
}
break; break;
case GRPC_QUEUE_SHUTDOWN: case GRPC_QUEUE_SHUTDOWN:
GPR_ASSERT(shutdown_started); GPR_ASSERT(shutdown_started);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment