From 1dae4acbec4b63288d3037071684d4129e2ca801 Mon Sep 17 00:00:00 2001
From: Tim Emiola <temiola@google.com>
Date: Fri, 27 Mar 2015 17:17:37 -0700
Subject: [PATCH] Adds Call#run_batch

- named run_batch rather than start_batch
- this is because the func starts the batch, but additionally waits for it to
  complete
---
 src/ruby/ext/grpc/rb_call.c | 538 ++++++++++++++++++++++++++++++++----
 src/ruby/ext/grpc/rb_call.h |   3 +
 2 files changed, 491 insertions(+), 50 deletions(-)

diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index f2a275ecdb..dbb0e2acf2 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -36,11 +36,20 @@
 #include <ruby.h>
 
 #include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+
 #include "rb_byte_buffer.h"
 #include "rb_completion_queue.h"
 #include "rb_metadata.h"
 #include "rb_grpc.h"
 
+/* rb_sBatchResult is struct class used to hold the results of a batch call */
+static VALUE rb_sBatchResult;
+
+/* rb_cMdAry is the MetadataArray class whose instances proxy
+ * grpc_metadata_array. */
+static VALUE rb_cMdAry;
+
 /* id_cq is the name of the hidden ivar that preserves a reference to a
  * completion queue */
 static ID id_cq;
@@ -62,6 +71,15 @@ static ID id_metadata;
  * received by the call and subsequently saved on it. */
 static ID id_status;
 
+/* sym_* are the symbol for attributes of rb_sBatchResult. */
+static VALUE sym_send_message;
+static VALUE sym_send_metadata;
+static VALUE sym_send_close;
+static VALUE sym_send_status;
+static VALUE sym_message;
+static VALUE sym_status;
+static VALUE sym_cancelled;
+
 /* hash_all_calls is a hash of Call address -> reference count that is used to
  * track the creation and destruction of rb_call instances.
  */
@@ -147,14 +165,17 @@ int grpc_rb_call_add_metadata_hash_cb(VALUE key, VALUE val, VALUE call_obj) {
   return ST_CONTINUE;
 }
 
+
 /*
   call-seq:
-     call.add_metadata(completion_queue, hash_elements, flags=nil)
+  call.add_metadata(completion_queue, hash_elements, flags=nil)
 
   Add metadata elements to the call from a ruby hash, to be sent upon
   invocation. flags is a bit-field combination of the write flags defined
-  above.  REQUIRES: grpc_call_invoke/grpc_call_accept have not been
-  called on this call.  Produces no events. */
+  above.
+
+  REQUIRES: grpc_call_invoke/grpc_call_accept have not been
+            called on this call.  Produces no events. */
 
 static VALUE grpc_rb_call_add_metadata(int argc, VALUE *argv, VALUE self) {
   VALUE metadata;
@@ -196,13 +217,14 @@ static VALUE grpc_rb_call_cancel(VALUE self) {
 
 /*
   call-seq:
-     call.invoke(completion_queue, tag, flags=nil)
+  call.invoke(completion_queue, tag, flags=nil)
 
-   Invoke the RPC. Starts sending metadata and request headers on the wire.
-   flags is a bit-field combination of the write flags defined above.
-   REQUIRES: Can be called at most once per call.
-             Can only be called on the client.
-   Produces a GRPC_INVOKE_ACCEPTED event on completion. */
+  Invoke the RPC. Starts sending metadata and request headers on the wire.
+  flags is a bit-field combination of the write flags defined above.
+
+  REQUIRES: Can be called at most once per call.
+            Can only be called on the client.
+  Produces a GRPC_INVOKE_ACCEPTED event on completion. */
 static VALUE grpc_rb_call_invoke(int argc, VALUE *argv, VALUE self) {
   VALUE cqueue = Qnil;
   VALUE metadata_read_tag = Qnil;
@@ -230,14 +252,15 @@ static VALUE grpc_rb_call_invoke(int argc, VALUE *argv, VALUE self) {
   /* Add the completion queue as an instance attribute, prevents it from being
    * GCed until this call object is GCed */
   rb_ivar_set(self, id_cq, cqueue);
-
   return Qnil;
 }
 
 /* Initiate a read on a call. Output event contains a byte buffer with the
    result of the read.
-   REQUIRES: No other reads are pending on the call. It is only safe to start
-   the next read after the corresponding read event is received. */
+
+   REQUIRES: No other reads are pending on the call.
+             It is only safe to start the next read after the corresponding
+             read event is received. */
 static VALUE grpc_rb_call_start_read(VALUE self, VALUE tag) {
   grpc_call *call = NULL;
   grpc_call_error err;
@@ -253,18 +276,18 @@ static VALUE grpc_rb_call_start_read(VALUE self, VALUE tag) {
 
 /*
   call-seq:
-    status = call.status
+  status = call.status
 
-    Gets the status object saved the call.  */
+  Gets the status object saved the call.  */
 static VALUE grpc_rb_call_get_status(VALUE self) {
   return rb_ivar_get(self, id_status);
 }
 
 /*
   call-seq:
-    call.status = status
+  call.status = status
 
-    Saves a status object on the call.  */
+  Saves a status object on the call.  */
 static VALUE grpc_rb_call_set_status(VALUE self, VALUE status) {
   if (!NIL_P(status) && rb_obj_class(status) != rb_sStatus) {
     rb_raise(rb_eTypeError, "bad status: got:<%s> want: <Struct::Status>",
@@ -277,18 +300,18 @@ static VALUE grpc_rb_call_set_status(VALUE self, VALUE status) {
 
 /*
   call-seq:
-    metadata = call.metadata
+  metadata = call.metadata
 
-    Gets the metadata object saved the call.  */
+  Gets the metadata object saved the call.  */
 static VALUE grpc_rb_call_get_metadata(VALUE self) {
   return rb_ivar_get(self, id_metadata);
 }
 
 /*
   call-seq:
-    call.metadata = metadata
+  call.metadata = metadata
 
-    Saves the metadata hash on the call.  */
+  Saves the metadata hash on the call.  */
 static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
   if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
     rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
@@ -301,20 +324,21 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
 
 /*
   call-seq:
-     call.start_write(byte_buffer, tag, flags=nil)
-
-   Queue a byte buffer for writing.
-   flags is a bit-field combination of the write flags defined above.
-   A write with byte_buffer null is allowed, and will not send any bytes on the
-   wire. If this is performed without GRPC_WRITE_BUFFER_HINT flag it provides
-   a mechanism to flush any previously buffered writes to outgoing flow control.
-   REQUIRES: No other writes are pending on the call. It is only safe to
-             start the next write after the corresponding write_accepted event
-             is received.
-             GRPC_INVOKE_ACCEPTED must have been received by the application
-             prior to calling this on the client. On the server,
-             grpc_call_accept must have been called successfully.
-   Produces a GRPC_WRITE_ACCEPTED event. */
+  call.start_write(byte_buffer, tag, flags=nil)
+
+  Queue a byte buffer for writing.
+  flags is a bit-field combination of the write flags defined above.
+  A write with byte_buffer null is allowed, and will not send any bytes on the
+  wire. If this is performed without GRPC_WRITE_BUFFER_HINT flag it provides
+  a mechanism to flush any previously buffered writes to outgoing flow control.
+
+  REQUIRES: No other writes are pending on the call. It is only safe to
+            start the next write after the corresponding write_accepted event
+            is received.
+            GRPC_INVOKE_ACCEPTED must have been received by the application
+            prior to calling this on the client. On the server,
+            grpc_call_accept must have been called successfully.
+            Produces a GRPC_WRITE_ACCEPTED event. */
 static VALUE grpc_rb_call_start_write(int argc, VALUE *argv, VALUE self) {
   VALUE byte_buffer = Qnil;
   VALUE tag = Qnil;
@@ -342,17 +366,18 @@ static VALUE grpc_rb_call_start_write(int argc, VALUE *argv, VALUE self) {
 /* Queue a status for writing.
 
    call-seq:
-      tag = Object.new
-      call.write_status(200, "OK", tag)
+   tag = Object.new
+   call.write_status(200, "OK", tag)
 
    REQUIRES: No other writes are pending on the call. It is only safe to
-   start the next write after the corresponding write_accepted event
-   is received.
-   GRPC_INVOKE_ACCEPTED must have been received by the application
-   prior to calling this.
-   Only callable on the server.
-   Produces a GRPC_FINISHED event when the status is sent and the stream is
-   fully closed */
+             start the next write after the corresponding write_accepted event
+             is received.
+             GRPC_INVOKE_ACCEPTED must have been received by the application
+             prior to calling this.
+             Only callable on the server.
+
+   Produces a GRPC_FINISHED event when the status is sent and the
+   stream is fully closed */
 static VALUE grpc_rb_call_start_write_status(VALUE self, VALUE code,
                                              VALUE status, VALUE tag) {
   grpc_call *call = NULL;
@@ -384,14 +409,14 @@ static VALUE grpc_rb_call_writes_done(VALUE self, VALUE tag) {
 }
 
 /* call-seq:
-     call.server_end_initial_metadata(flag)
+   call.server_end_initial_metadata(flag)
 
    Only to be called on servers, before sending messages.
    flags is a bit-field combination of the write flags defined above.
 
    REQUIRES: Can be called at most once per call.
-             Can only be called on the server, must be called after
-             grpc_call_server_accept
+   Can only be called on the server, must be called after
+   grpc_call_server_accept
    Produces no events */
 static VALUE grpc_rb_call_server_end_initial_metadata(int argc, VALUE *argv,
                                                       VALUE self) {
@@ -413,17 +438,405 @@ static VALUE grpc_rb_call_server_end_initial_metadata(int argc, VALUE *argv,
   return Qnil;
 }
 
+/* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used
+   to fill grpc_metadata_array.
+
+   it's capacity should have been computed via a prior call to
+   grpc_rb_md_ary_fill_hash_cb
+*/
+int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
+  grpc_metadata_array *md_ary = NULL;
+  int array_length;
+  int i;
+
+  /* Construct a metadata object from key and value and add it */
+  Data_Get_Struct(md_ary_obj, grpc_metadata_array, md_ary);
+
+  if (TYPE(val) == T_ARRAY) {
+    /* If the value is an array, add capacity for each value in the array */
+    array_length = RARRAY_LEN(val);
+    for (i = 0; i < array_length; i++) {
+      if (TYPE(key) == T_SYMBOL) {
+        md_ary->metadata[md_ary->count].key = (char *)rb_id2name(SYM2ID(key));
+      } else { /* StringValueCStr does all other type exclusions for us */
+        md_ary->metadata[md_ary->count].key = StringValueCStr(key);
+      }
+      md_ary->metadata[md_ary->count].value = RSTRING_PTR(rb_ary_entry(val, i));
+      md_ary->metadata[md_ary->count].value_length =
+        RSTRING_LEN(rb_ary_entry(val, i));
+      md_ary->count += 1;
+    }
+  } else {
+    if (TYPE(key) == T_SYMBOL) {
+      md_ary->metadata[md_ary->count].key = (char *)rb_id2name(SYM2ID(key));
+    } else { /* StringValueCStr does all other type exclusions for us */
+      md_ary->metadata[md_ary->count].key = StringValueCStr(key);
+    }
+    md_ary->metadata[md_ary->count].value = RSTRING_PTR(val);
+    md_ary->metadata[md_ary->count].value_length = RSTRING_LEN(val);
+    md_ary->count += 1;
+  }
+
+  return ST_CONTINUE;
+}
+
+/* grpc_rb_md_ary_capacity_hash_cb is the hash iteration callback used
+   to pre-compute the capacity a grpc_metadata_array.
+*/
+int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
+  grpc_metadata_array *md_ary = NULL;
+
+  /* Construct a metadata object from key and value and add it */
+  Data_Get_Struct(md_ary_obj, grpc_metadata_array, md_ary);
+
+  if (TYPE(val) == T_ARRAY) {
+    /* If the value is an array, add capacity for each value in the array */
+    md_ary->capacity += RARRAY_LEN(val);
+  } else {
+    md_ary->capacity += 1;
+  }
+  return ST_CONTINUE;
+}
+
+/* grpc_rb_md_ary_convert converts a ruby metadata hash into
+   a grpc_metadata_array.
+*/
+void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) {
+  VALUE md_ary_obj = Qnil;
+  if (md_ary_hash == Qnil) {
+    return;  /* Do nothing if the expected has value is nil */
+  }
+  if (TYPE(md_ary_hash) != T_HASH) {
+    rb_raise(rb_eTypeError, "md_ary_convert: got <%s>, want <Hash>",
+             rb_obj_classname(md_ary_hash));
+    return;
+  }
+
+  /* Initialize the array, compute it's capacity, then fill it. */
+  grpc_metadata_array_init(md_ary);
+  md_ary_obj = Data_Wrap_Struct(rb_cMdAry, GC_NOT_MARKED, GC_DONT_FREE, md_ary);
+  rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj);
+  md_ary->metadata = gpr_malloc(md_ary->capacity * sizeof(grpc_metadata));
+  rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj);
+}
+
+/* Converts a metadata array to a hash. */
+VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary) {
+  VALUE key = Qnil;
+  VALUE new_ary = Qnil;
+  VALUE value = Qnil;
+  VALUE result = rb_hash_new();
+  size_t i;
+
+  for (i = 0; i < md_ary->count; i++) {
+    key = rb_str_new2(md_ary->metadata[i].key);
+    value = rb_hash_aref(result, key);
+    if (value == Qnil) {
+      value = rb_str_new(md_ary->metadata[i].value,
+                         md_ary->metadata[i].value_length);
+      rb_hash_aset(result, key, value);
+    } else if (TYPE(value) == T_ARRAY) {
+      /* Add the string to the returned array */
+      rb_ary_push(value,
+                  rb_str_new(md_ary->metadata[i].value,
+                             md_ary->metadata[i].value_length));
+    } else {
+      /* Add the current value with this key and the new one to an array */
+      new_ary = rb_ary_new();
+      rb_ary_push(new_ary, value);
+      rb_ary_push(new_ary,
+                  rb_str_new(md_ary->metadata[i].value,
+                             md_ary->metadata[i].value_length));
+      rb_hash_aset(result, key, new_ary);
+    }
+  }
+  return result;
+}
+
+/* grpc_rb_call_check_op_keys_hash_cb is a hash iteration func that checks
+   each key of an ops hash is valid.
+*/
+int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, VALUE ops_ary) {
+  /* Update the capacity; the value is an array, add capacity for each value in
+   * the array */
+  if (TYPE(key) != T_FIXNUM) {
+    rb_raise(rb_eTypeError, "invalid operation : got <%s>, want <Fixnum>",
+             rb_obj_classname(key));
+    return ST_STOP;
+  }
+  switch(NUM2INT(key)) {
+    case GRPC_OP_SEND_INITIAL_METADATA:
+    case GRPC_OP_SEND_MESSAGE:
+    case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+    case GRPC_OP_SEND_STATUS_FROM_SERVER:
+    case GRPC_OP_RECV_INITIAL_METADATA:
+    case GRPC_OP_RECV_MESSAGE:
+    case GRPC_OP_RECV_STATUS_ON_CLIENT:
+    case GRPC_OP_RECV_CLOSE_ON_SERVER:
+      rb_ary_push(ops_ary, key);
+      return ST_CONTINUE;
+    default:
+      rb_raise(rb_eTypeError, "invalid operation : bad value %d",
+               NUM2INT(key));
+  };
+  return ST_STOP;
+}
+
+/* grpc_rb_op_update_status_from_server adds the values in a ruby status
+   struct to the 'send_status_from_server' portion of an op.
+*/
+void grpc_rb_op_update_status_from_server(grpc_op *op,
+                                          grpc_metadata_array* md_ary,
+                                          VALUE status) {
+  VALUE code = rb_struct_aref(status, sym_code);
+  VALUE details = rb_struct_aref(status, sym_details);
+  VALUE metadata_hash = rb_struct_aref(status, sym_metadata);
+
+  /* TODO: add check to ensure status is the correct struct type */
+  if (TYPE(code) != T_FIXNUM) {
+    rb_raise(rb_eTypeError, "invalid code : got <%s>, want <Fixnum>",
+             rb_obj_classname(code));
+    return;
+  }
+  if (TYPE(details) != T_STRING) {
+    rb_raise(rb_eTypeError, "invalid details : got <%s>, want <String>",
+             rb_obj_classname(code));
+    return;
+  }
+  op->data.send_status_from_server.status = NUM2INT(code);
+  op->data.send_status_from_server.status_details = StringValueCStr(details);
+  grpc_rb_md_ary_convert(metadata_hash, md_ary);
+  op->data.send_status_from_server.trailing_metadata_count = md_ary->count;
+  op->data.send_status_from_server.trailing_metadata = md_ary->metadata;
+}
+
+/* run_batch_stack holds various values used by the
+ * grpc_rb_call_run_batch function */
+typedef struct run_batch_stack {
+  /* The batch ops */
+  grpc_op ops[8];  /* 8 is the maximum number of operations */
+  size_t op_num;   /* tracks the last added operation */
+
+  /* Data being sent */
+  grpc_metadata_array send_metadata;
+  grpc_metadata_array send_trailing_metadata;
+
+  /* Data being received */
+  grpc_byte_buffer *recv_message;
+  grpc_metadata_array recv_metadata;
+  grpc_metadata_array recv_trailing_metadata;
+  int recv_cancelled;
+  grpc_status_code recv_status;
+  char *recv_status_details;
+  size_t recv_status_details_capacity;
+} run_batch_stack;
+
+/* grpc_run_batch_stack_init ensures the run_batch_stack is properly
+ * initialized */
+static void grpc_run_batch_stack_init(run_batch_stack* st) {
+  MEMZERO(st, run_batch_stack, 1);
+  grpc_metadata_array_init(&st->send_metadata);
+  grpc_metadata_array_init(&st->send_trailing_metadata);
+  grpc_metadata_array_init(&st->recv_metadata);
+  grpc_metadata_array_init(&st->recv_trailing_metadata);
+  st->op_num = 0;
+}
+
+/* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
+ * cleaned up */
+static void grpc_run_batch_stack_cleanup(run_batch_stack* st) {
+  grpc_metadata_array_destroy(&st->send_metadata);
+  grpc_metadata_array_destroy(&st->send_trailing_metadata);
+  grpc_metadata_array_destroy(&st->recv_metadata);
+  grpc_metadata_array_destroy(&st->recv_trailing_metadata);
+  if (st->recv_status_details != NULL) {
+    gpr_free(st->recv_status_details);
+  }
+}
+
+/* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from
+ * ops_hash */
+static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
+  VALUE this_op = Qnil;
+  VALUE this_value = Qnil;
+  VALUE ops_ary = rb_ary_new();
+  size_t i = 0;
+
+  /* Create a ruby array with just the operation keys */
+  rb_hash_foreach(ops_hash, grpc_rb_call_check_op_keys_hash_cb, ops_ary);
+
+  /* Fill the ops array */
+  for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
+    this_op = rb_ary_entry(ops_ary, i);
+    this_value = rb_hash_aref(ops_hash, this_op);
+    switch(NUM2INT(this_op)) {
+      case GRPC_OP_SEND_INITIAL_METADATA:
+        grpc_rb_md_ary_convert(this_value, &st->send_metadata);
+        st->ops[st->op_num].data.send_initial_metadata.count =
+            st->send_metadata.count;
+        st->ops[st->op_num].data.send_initial_metadata.metadata =
+            st->send_metadata.metadata;
+        break;
+      case GRPC_OP_SEND_MESSAGE:
+        st->ops[st->op_num].data.send_message =
+            grpc_rb_s_to_byte_buffer(RSTRING_PTR(this_value),
+                                     RSTRING_LEN(this_value));
+        break;
+      case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+        break;
+      case GRPC_OP_SEND_STATUS_FROM_SERVER:
+        grpc_rb_op_update_status_from_server(&st->ops[st->op_num],
+                                             &st->send_trailing_metadata,
+                                             this_value);
+        break;
+      case GRPC_OP_RECV_INITIAL_METADATA:
+        st->ops[st->op_num].data.recv_initial_metadata = &st->recv_metadata;
+        break;
+      case GRPC_OP_RECV_MESSAGE:
+        st->ops[st->op_num].data.recv_message = &st->recv_message;
+        break;
+      case GRPC_OP_RECV_STATUS_ON_CLIENT:
+        st->ops[st->op_num].data.recv_status_on_client.trailing_metadata =
+            &st->recv_trailing_metadata;
+        st->ops[st->op_num].data.recv_status_on_client.status =
+            &st->recv_status;
+        st->ops[st->op_num].data.recv_status_on_client.status_details =
+            &st->recv_status_details;
+        st->ops[st->op_num].data.recv_status_on_client.status_details_capacity =
+            &st->recv_status_details_capacity;
+        break;
+      case GRPC_OP_RECV_CLOSE_ON_SERVER:
+        st->ops[st->op_num].data.recv_close_on_server.cancelled =
+            &st->recv_cancelled;
+        break;
+      default:
+        grpc_run_batch_stack_cleanup(st);
+        rb_raise(rb_eTypeError, "invalid operation : bad value %d",
+                 NUM2INT(this_op));
+    };
+    st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
+    st->op_num++;
+  }
+}
+
+/* grpc_run_batch_stack_build_result fills constructs a ruby BatchResult struct
+   after the results have run */
+static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) {
+  size_t i = 0;
+  VALUE result = rb_struct_new(rb_sBatchResult, Qnil, Qnil, Qnil, Qnil, Qnil,
+                               Qnil, Qnil, Qnil, NULL);
+  for (i = 0; i < st->op_num; i++) {
+    switch(st->ops[i].op) {
+      case GRPC_OP_SEND_INITIAL_METADATA:
+        rb_struct_aset(result, sym_send_metadata, Qtrue);
+        break;
+      case GRPC_OP_SEND_MESSAGE:
+        rb_struct_aset(result, sym_send_message, Qtrue);
+        break;
+      case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+        rb_struct_aset(result, sym_send_close, Qtrue);
+        break;
+      case GRPC_OP_SEND_STATUS_FROM_SERVER:
+        rb_struct_aset(result, sym_send_status, Qtrue);
+        break;
+      case GRPC_OP_RECV_INITIAL_METADATA:
+        rb_struct_aset(result, sym_metadata,
+                       grpc_rb_md_ary_to_h(&st->recv_metadata));
+      case GRPC_OP_RECV_MESSAGE:
+        rb_struct_aset(result, sym_message,
+                       grpc_rb_byte_buffer_to_s(st->recv_message));
+        break;
+      case GRPC_OP_RECV_STATUS_ON_CLIENT:
+        rb_struct_aset(
+            result,
+            sym_status,
+            rb_struct_new(rb_sStatus,
+                          UINT2NUM(st->recv_status),
+                          (st->recv_status_details == NULL
+                           ? Qnil
+                           : rb_str_new2(st->recv_status_details)),
+                          grpc_rb_md_ary_to_h(&st->recv_trailing_metadata),
+                          NULL));
+        break;
+      case GRPC_OP_RECV_CLOSE_ON_SERVER:
+        rb_struct_aset(result, sym_send_close, Qtrue);
+        break;
+      default:
+        break;
+    }
+  }
+  return result;
+}
+
 /* call-seq:
-     call.server_accept(completion_queue, finished_tag)
+   cq = CompletionQueue.new
+   ops = {
+     GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>,
+     GRPC::Core::CallOps::SEND_MESSAGE => <op_value>,
+     ...
+   }
+   tag = Object.new
+   timeout = 10
+   call.start_batch(cqueue, tag, timeout, ops)
+
+   Start a batch of operations defined in the array ops; when complete, post a
+   completion of type 'tag' to the completion queue bound to the call.
+
+   Also waits for the batch to complete, until timeout is reached.
+   The order of ops specified in the batch has no significance.
+   Only one operation of each type can be active at once in any given
+   batch */
+static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
+                                    VALUE timeout, VALUE ops_hash) {
+  run_batch_stack st;
+  grpc_call *call = NULL;
+  grpc_event *ev = NULL;
+  grpc_call_error err;
+  VALUE result = Qnil;
+  Data_Get_Struct(self, grpc_call, call);
+
+  /* Validate the ops args, adding them to a ruby array */
+  if (TYPE(ops_hash) != T_HASH) {
+    rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
+    return Qnil;
+  }
+  grpc_run_batch_stack_init(&st);
+  grpc_run_batch_stack_fill_ops(&st, ops_hash);
+
+  /* call grpc_call_start_batch, then wait for it to complete using
+   * pluck_event */
+  err = grpc_call_start_batch(call, st.ops, st.op_num, ROBJECT(tag));
+  if (err != GRPC_CALL_OK) {
+    grpc_run_batch_stack_cleanup(&st);
+    rb_raise(rb_eCallError, "grpc_call_start_batch failed with %s (code=%d)",
+             grpc_call_error_detail_of(err), err);
+    return;
+  }
+  ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout);
+  if (ev->data.op_complete != GRPC_OP_OK) {
+    grpc_run_batch_stack_cleanup(&st);
+    rb_raise(rb_eCallError, "start_batch completion failed, (code=%d)",
+             ev->data.op_complete);
+    return;
+  }
+
+  /* Build and return the BatchResult struct result */
+  result = grpc_run_batch_stack_build_result(&st);
+  grpc_run_batch_stack_cleanup(&st);
+  return result;
+}
+
+/* call-seq:
+   call.server_accept(completion_queue, finished_tag)
 
    Accept an incoming RPC, binding a completion queue to it.
    To be called before sending or receiving messages.
 
    REQUIRES: Can be called at most once per call.
-             Can only be called on the server.
+   Can only be called on the server.
    Produces a GRPC_FINISHED event with finished_tag when the call has been
-       completed (there may be other events for the call pending at this
-       time) */
+   completed (there may be other events for the call pending at this
+   time) */
 static VALUE grpc_rb_call_server_accept(VALUE self, VALUE cqueue,
                                         VALUE finished_tag) {
   grpc_call *call = NULL;
@@ -525,6 +938,8 @@ void Init_grpc_call() {
   rb_eCallError =
       rb_define_class_under(rb_mGrpcCore, "CallError", rb_eException);
   rb_cCall = rb_define_class_under(rb_mGrpcCore, "Call", rb_cObject);
+  rb_cMdAry = rb_define_class_under(rb_mGrpcCore, "MetadataArray",
+                                    rb_cObject);
 
   /* Prevent allocation or inialization of the Call class */
   rb_define_alloc_func(rb_cCall, grpc_rb_cannot_alloc);
@@ -532,6 +947,7 @@ void Init_grpc_call() {
   rb_define_method(rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy, 1);
 
   /* Add ruby analogues of the Call methods. */
+  rb_define_method(rb_cCall, "run_batch", grpc_rb_call_run_batch, 4);
   rb_define_method(rb_cCall, "server_accept", grpc_rb_call_server_accept, 2);
   rb_define_method(rb_cCall, "server_end_initial_metadata",
                    grpc_rb_call_server_end_initial_metadata, -1);
@@ -557,6 +973,28 @@ void Init_grpc_call() {
   id_flags = rb_intern("__flags");
   id_input_md = rb_intern("__input_md");
 
+  /* Ids used in constructing the batch result. */
+  sym_send_message = ID2SYM(rb_intern("send_message"));
+  sym_send_metadata = ID2SYM(rb_intern("send_metadata"));
+  sym_send_close = ID2SYM(rb_intern("send_close"));
+  sym_send_status = ID2SYM(rb_intern("send_status"));
+  sym_message = ID2SYM(rb_intern("message"));
+  sym_status = ID2SYM(rb_intern("status"));
+  sym_cancelled = ID2SYM(rb_intern("cancelled"));
+
+  /* The Struct used to return the run_batch result. */
+  rb_sBatchResult = rb_struct_define(
+      "BatchResult",
+      "send_message",
+      "send_metadata",
+      "send_close",
+      "send_status",
+      "message",
+      "metadata",
+      "status",
+      "cancelled",
+      NULL);
+
   /* The hash for reference counting calls, to ensure they can't be destroyed
    * more than once */
   hash_all_calls = rb_hash_new();
diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h
index bb51759a46..5f79d37454 100644
--- a/src/ruby/ext/grpc/rb_call.h
+++ b/src/ruby/ext/grpc/rb_call.h
@@ -46,6 +46,9 @@ VALUE grpc_rb_wrap_call(grpc_call* c);
 /* Provides the details of an call error */
 const char* grpc_call_error_detail_of(grpc_call_error err);
 
+/* Converts a metadata array to a hash. */
+VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary);
+
 /* rb_cCall is the Call class whose instances proxy grpc_call. */
 extern VALUE rb_cCall;
 
-- 
GitLab