From 6919c7595b62d2a87f46476abe4c75def8f0070d Mon Sep 17 00:00:00 2001
From: temiola <temiola@google.com>
Date: Wed, 10 Dec 2014 13:22:00 -0800
Subject: [PATCH] Adds support for metadata to the surface APIs

- received metadata is visible on the ActiveCall object
- metadata to send is keyword args on ActiveCall objects

Also
- fixes a typo that meant that the wrong error error code might be returned
- fixes bad references in some tests that are only visible when run via rspec
	Change on 2014/12/10 by temiola <temiola@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=81811008
---
 src/ruby/ext/grpc/rb_event.c              |   5 +-
 src/ruby/ext/grpc/rb_metadata.c           |   3 +-
 src/ruby/lib/grpc/generic/active_call.rb  |  74 +++--
 src/ruby/lib/grpc/generic/client_stub.rb  |  81 ++++--
 src/ruby/lib/grpc/generic/rpc_server.rb   |   1 +
 src/ruby/spec/generic/active_call_spec.rb | 157 +++++++----
 src/ruby/spec/generic/client_stub_spec.rb | 313 ++++++++++++----------
 src/ruby/spec/generic/rpc_server_spec.rb  |   7 +-
 8 files changed, 409 insertions(+), 232 deletions(-)

diff --git a/src/ruby/ext/grpc/rb_event.c b/src/ruby/ext/grpc/rb_event.c
index 6708ea397c..93f36f80e8 100644
--- a/src/ruby/ext/grpc/rb_event.c
+++ b/src/ruby/ext/grpc/rb_event.c
@@ -205,7 +205,7 @@ static VALUE grpc_rb_event_result(VALUE self) {
         return Qnil;
       }
       rb_raise(rb_eEventError, "write failed, not sure why (code=%d)",
-               event->data.invoke_accepted);
+               event->data.write_accepted);
       break;
 
     case GRPC_CLIENT_METADATA_READ:
@@ -263,7 +263,8 @@ void Init_google_rpc_event() {
   rb_define_method(rb_cEvent, "type", grpc_rb_event_type, 0);
 
   /* Constants representing the completion types */
-  rb_mCompletionType = rb_define_module_under(rb_mGoogleRpcCore, "CompletionType");
+  rb_mCompletionType = rb_define_module_under(rb_mGoogleRpcCore,
+                                              "CompletionType");
   rb_define_const(rb_mCompletionType, "QUEUE_SHUTDOWN",
                   INT2NUM(GRPC_QUEUE_SHUTDOWN));
   rb_define_const(rb_mCompletionType, "READ", INT2NUM(GRPC_READ));
diff --git a/src/ruby/ext/grpc/rb_metadata.c b/src/ruby/ext/grpc/rb_metadata.c
index 733a53a7e9..dcacc4a976 100644
--- a/src/ruby/ext/grpc/rb_metadata.c
+++ b/src/ruby/ext/grpc/rb_metadata.c
@@ -189,7 +189,8 @@ static VALUE grpc_rb_metadata_value(VALUE self) {
 /* rb_cMetadata is the Metadata class whose instances proxy grpc_metadata. */
 VALUE rb_cMetadata = Qnil;
 void Init_google_rpc_metadata() {
-  rb_cMetadata = rb_define_class_under(rb_mGoogleRpcCore, "Metadata", rb_cObject);
+  rb_cMetadata = rb_define_class_under(rb_mGoogleRpcCore, "Metadata",
+                                       rb_cObject);
 
   /* Allocates an object managed by the ruby runtime */
   rb_define_alloc_func(rb_cMetadata, grpc_rb_metadata_alloc);
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index de35f278a1..95cc7fc708 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -31,7 +31,9 @@ require 'forwardable'
 require 'grpc'
 require 'grpc/generic/bidi_call'
 
-def assert_event_type(got, want)
+def assert_event_type(ev, want)
+  raise OutOfTime if ev.nil?
+  got = ev.type
   raise 'Unexpected rpc event: got %s, want %s' % [got, want] unless got == want
 end
 
@@ -52,21 +54,28 @@ module GRPC
     #
     # deadline is the absolute deadline for the call.
     #
+    # == Keyword Arguments ==
+    # any keyword arguments are treated as metadata to be sent to the server
+    # if a keyword value is a list, multiple metadata for it's key are sent
+    #
     # @param call [Call] a call on which to start and invocation
     # @param q [CompletionQueue] used to wait for INVOKE_ACCEPTED
     # @param deadline [Fixnum,TimeSpec] the deadline for INVOKE_ACCEPTED
-    def self.client_start_invoke(call, q, deadline)
+    def self.client_start_invoke(call, q, deadline, **kw)
       raise ArgumentError.new('not a call') unless call.is_a?Core::Call
       if !q.is_a?Core::CompletionQueue
         raise ArgumentError.new('not a CompletionQueue')
       end
+      call.add_metadata(kw) if kw.length > 0
       invoke_accepted, client_metadata_read = Object.new, Object.new
       finished_tag = Object.new
       call.start_invoke(q, invoke_accepted, client_metadata_read, finished_tag)
+
       # wait for the invocation to be accepted
       ev = q.pluck(invoke_accepted, INFINITE_FUTURE)
       raise OutOfTime if ev.nil?
-      finished_tag
+
+      [finished_tag, client_metadata_read]
     end
 
     # Creates an ActiveCall.
@@ -91,9 +100,11 @@ module GRPC
     # @param deadline [Fixnum] the deadline for the call to complete
     # @param finished_tag [Object] the object used as the call's finish tag,
     #                              if the call has begun
+    # @param read_metadata_tag [Object] the object used as the call's finish
+    #                                   tag, if the call has begun
     # @param started [true|false] (default true) indicates if the call has begun
     def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
-                   started: true)
+                   read_metadata_tag: nil, started: true)
       raise ArgumentError.new('not a call') unless call.is_a?Core::Call
       if !q.is_a?Core::CompletionQueue
         raise ArgumentError.new('not a CompletionQueue')
@@ -102,6 +113,7 @@ module GRPC
       @cq = q
       @deadline = deadline
       @finished_tag = finished_tag
+      @read_metadata_tag = read_metadata_tag
       @marshal = marshal
       @started = started
       @unmarshal = unmarshal
@@ -180,7 +192,7 @@ module GRPC
     def writes_done(assert_finished=true)
       @call.writes_done(self)
       ev = @cq.pluck(self, INFINITE_FUTURE)
-      assert_event_type(ev.type, FINISH_ACCEPTED)
+      assert_event_type(ev, FINISH_ACCEPTED)
       logger.debug("Writes done: waiting for finish? #{assert_finished}")
       if assert_finished
         ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
@@ -229,7 +241,7 @@ module GRPC
       # call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
       # until the flow control allows another send on this call.
       ev = @cq.pluck(self, INFINITE_FUTURE)
-      assert_event_type(ev.type, WRITE_ACCEPTED)
+      assert_event_type(ev, WRITE_ACCEPTED)
       ev = nil
     end
 
@@ -243,7 +255,7 @@ module GRPC
       assert_queue_is_ready
       @call.start_write_status(Core::Status.new(code, details), self)
       ev = @cq.pluck(self, INFINITE_FUTURE)
-      assert_event_type(ev.type, FINISH_ACCEPTED)
+      assert_event_type(ev, FINISH_ACCEPTED)
       logger.debug("Status sent: #{code}:'#{details}'")
       if assert_finished
         return finished
@@ -257,9 +269,16 @@ module GRPC
     # a READ, it returns the response after unmarshalling it. On
     # FINISHED, it returns nil if the status is OK, otherwise raising BadStatus
     def remote_read
+      if @call.metadata.nil? && !@read_metadata_tag.nil?
+        ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE)
+        assert_event_type(ev, CLIENT_METADATA_READ)
+        @call.metadata = ev.result
+        @read_metadata_tag = nil
+      end
+
       @call.start_read(self)
       ev = @cq.pluck(self, INFINITE_FUTURE)
-      assert_event_type(ev.type, READ)
+      assert_event_type(ev, READ)
       logger.debug("received req: #{ev.result.inspect}")
       if !ev.result.nil?
         logger.debug("received req.to_s: #{ev.result.to_s}")
@@ -333,10 +352,15 @@ module GRPC
 
     # request_response sends a request to a GRPC server, and returns the
     # response.
+    #
+    # == Keyword Arguments ==
+    # any keyword arguments are treated as metadata to be sent to the server
+    # if a keyword value is a list, multiple metadata for it's key are sent
+    #
     # @param req [Object] the request sent to the server
     # @return [Object] the response received from the server
-    def request_response(req)
-      start_call unless @started
+    def request_response(req, **kw)
+      start_call(**kw) unless @started
       remote_send(req)
       writes_done(false)
       response = remote_read
@@ -354,10 +378,14 @@ module GRPC
     # array of marshallable objects; in typical case it will be an Enumerable
     # that allows dynamic construction of the marshallable objects.
     #
+    # == Keyword Arguments ==
+    # any keyword arguments are treated as metadata to be sent to the server
+    # if a keyword value is a list, multiple metadata for it's key are sent
+    #
     # @param requests [Object] an Enumerable of requests to send
     # @return [Object] the response received from the server
-    def client_streamer(requests)
-      start_call unless @started
+    def client_streamer(requests, **kw)
+      start_call(**kw) unless @started
       requests.each { |r| remote_send(r) }
       writes_done(false)
       response = remote_read
@@ -377,10 +405,15 @@ module GRPC
     # it is executed with each response as the argument and no result is
     # returned.
     #
+    # == Keyword Arguments ==
+    # any keyword arguments are treated as metadata to be sent to the server
+    # if a keyword value is a list, multiple metadata for it's key are sent
+    # any keyword arguments are treated as metadata to be sent to the server.
+    #
     # @param req [Object] the request sent to the server
     # @return [Enumerator|nil] a response Enumerator
-    def server_streamer(req)
-      start_call unless @started
+    def server_streamer(req, **kw)
+      start_call(**kw) unless @started
       remote_send(req)
       writes_done(false)
       replies = enum_for(:each_remote_read_then_finish)
@@ -410,10 +443,14 @@ module GRPC
     # the_call#writes_done has been called, otherwise the block will loop
     # forever.
     #
+    # == Keyword Arguments ==
+    # any keyword arguments are treated as metadata to be sent to the server
+    # if a keyword value is a list, multiple metadata for it's key are sent
+    #
     # @param requests [Object] an Enumerable of requests to send
     # @return [Enumerator, nil] a response Enumerator
-    def bidi_streamer(requests, &blk)
-      start_call unless @started
+    def bidi_streamer(requests, **kw, &blk)
+      start_call(**kw) unless @started
       bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
                         @finished_tag)
       bd.run_on_client(requests, &blk)
@@ -438,8 +475,9 @@ module GRPC
 
     private
 
-    def start_call
-      @finished_tag = ActiveCall.client_start_invoke(@call, @cq, @deadline)
+    def start_call(**kw)
+      tags = ActiveCall.client_start_invoke(@call, @cq, @deadline, **kw)
+      @finished_tag, @read_metadata_tag = tags
       @started = true
     end
 
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index 144f9d93ec..b0e72262ff 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -50,7 +50,7 @@ module GRPC
     # Any arbitrary keyword arguments are treated as channel arguments used to
     # configure the RPC connection to the host.
     #
-    # There are two specific keywords are that not used to configure the
+    # There are some specific keyword args that are not used to configure the
     # channel:
     #
     # - :channel_override
@@ -61,21 +61,30 @@ module GRPC
     # - :deadline
     # when present, this is the default deadline used for calls
     #
+    # - :update_metadata
+    # when present, this a func that takes a hash and returns a hash
+    # it can be used to update metadata, i.e, remove, change or update
+    # amend metadata values.
+    #
     # @param host [String] the host the stub connects to
     # @param q [Core::CompletionQueue] used to wait for events
     # @param channel_override [Core::Channel] a pre-created channel
     # @param deadline [Number] the default deadline to use in requests
     # @param creds [Core::Credentials] secures and/or authenticates the channel
-    # @param kw [KeywordArgs] the channel arguments
+    # @param update_metadata a func that updates metadata as described above
+    # @param kw [KeywordArgs]the channel arguments
     def initialize(host, q,
                    channel_override:nil,
-                   deadline:DEFAULT_DEADLINE,
-                   creds:nil,
+                   deadline: DEFAULT_DEADLINE,
+                   creds: nil,
+                   update_metadata: nil,
                    **kw)
       if !q.is_a?Core::CompletionQueue
         raise ArgumentError.new('not a CompletionQueue')
       end
-      @host = host
+      @queue = q
+
+      # set the channel instance
       if !channel_override.nil?
         ch = channel_override
         raise ArgumentError.new('not a Channel') unless ch.is_a?(Core::Channel)
@@ -86,10 +95,19 @@ module GRPC
       else
         ch = Core::Channel.new(host, kw, creds)
       end
+      @ch = ch
+
+      @update_metadata = nil
+      if !update_metadata.nil?
+        if !update_metadata.is_a?(Proc)
+          raise ArgumentError.new('update_metadata is not a Proc')
+        end
+        @update_metadata = update_metadata
+      end
+
 
+      @host = host
       @deadline = deadline
-      @ch = ch
-      @queue = q
     end
 
     # request_response sends a request to a GRPC server, and returns the
@@ -117,6 +135,11 @@ module GRPC
     # If return_op is true, the call returns an Operation, calling execute
     # on the Operation returns the response.
     #
+    # == Keyword Args ==
+    #
+    # Unspecified keyword arguments are treated as metadata to be sent to the
+    # server.
+    #
     # @param method [String] the RPC method to call on the GRPC server
     # @param req [Object] the request sent to the server
     # @param marshal [Function] f(obj)->string that marshals requests
@@ -125,15 +148,16 @@ module GRPC
     # @param return_op [true|false] (default false) return an Operation if true
     # @return [Object] the response received from the server
     def request_response(method, req, marshal, unmarshal, deadline=nil,
-                         return_op:false)
+                         return_op:false, **kw)
       c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
-      return c.request_response(req) unless return_op
+      md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+      return c.request_response(req, **md) unless return_op
 
       # return the operation view of the active_call; define #execute as a
       # new method for this instance that invokes #request_response.
       op = c.operation
       op.define_singleton_method(:execute) do
-        c.request_response(req)
+        c.request_response(req, **md)
       end
       op
     end
@@ -168,6 +192,11 @@ module GRPC
     #
     # If return_op is true, the call returns the response.
     #
+    # == Keyword Args ==
+    #
+    # Unspecified keyword arguments are treated as metadata to be sent to the
+    # server.
+    #
     # @param method [String] the RPC method to call on the GRPC server
     # @param requests [Object] an Enumerable of requests to send
     # @param marshal [Function] f(obj)->string that marshals requests
@@ -176,15 +205,16 @@ module GRPC
     # @param return_op [true|false] (default false) return an Operation if true
     # @return [Object|Operation] the response received from the server
     def client_streamer(method, requests, marshal, unmarshal, deadline=nil,
-                        return_op:false)
+                        return_op:false, **kw)
       c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
-      return c.client_streamer(requests) unless return_op
+      md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+      return c.client_streamer(requests, **md) unless return_op
 
       # return the operation view of the active_call; define #execute as a
       # new method for this instance that invokes #client_streamer.
       op = c.operation
       op.define_singleton_method(:execute) do
-        c.client_streamer(requests)
+        c.client_streamer(requests, **md)
       end
       op
     end
@@ -227,6 +257,11 @@ module GRPC
     # calls the given block with each response or returns an Enumerator of the
     # responses.
     #
+    # == Keyword Args ==
+    #
+    # Unspecified keyword arguments are treated as metadata to be sent to the
+    # server.
+    #
     # @param method [String] the RPC method to call on the GRPC server
     # @param req [Object] the request sent to the server
     # @param marshal [Function] f(obj)->string that marshals requests
@@ -236,15 +271,16 @@ module GRPC
     # @param blk [Block] when provided, is executed for each response
     # @return [Enumerator|Operation|nil] as discussed above
     def server_streamer(method, req, marshal, unmarshal, deadline=nil,
-                        return_op:false, &blk)
+                        return_op:false, **kw, &blk)
       c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
-      return c.server_streamer(req, &blk) unless return_op
+      md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+      return c.server_streamer(req, **md, &blk) unless return_op
 
       # return the operation view of the active_call; define #execute
       # as a new method for this instance that invokes #server_streamer
       op = c.operation
       op.define_singleton_method(:execute) do
-        c.server_streamer(req, &blk)
+        c.server_streamer(req, **md, &blk)
       end
       op
     end
@@ -313,6 +349,12 @@ module GRPC
     #
     # * the deadline is exceeded
     #
+    #
+    # == Keyword Args ==
+    #
+    # Unspecified keyword arguments are treated as metadata to be sent to the
+    # server.
+    #
     # == Return Value ==
     #
     # if the return_op is false, the return value is an Enumerator of the
@@ -332,15 +374,16 @@ module GRPC
     # @param return_op [true|false] (default false) return an Operation if true
     # @return [Enumerator|nil|Operation] as discussed above
     def bidi_streamer(method, requests, marshal, unmarshal, deadline=nil,
-                      return_op:false, &blk)
+                      return_op:false, **kw, &blk)
       c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
-      return c.bidi_streamer(requests, &blk) unless return_op
+      md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+      return c.bidi_streamer(requests, **md, &blk) unless return_op
 
       # return the operation view of the active_call; define #execute
       # as a new method for this instance that invokes #bidi_streamer
       op = c.operation
       op.define_singleton_method(:execute) do
-        c.bidi_streamer(requests, &blk)
+        c.bidi_streamer(requests, **md, &blk)
       end
       op
     end
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index ebbf3f9780..76e7838d0f 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -246,6 +246,7 @@ module GRPC
       # immediately
       finished_tag = Object.new
       call_queue = Core::CompletionQueue.new
+      call.metadata = new_server_rpc.metadata  # store the metadata on the call
       call.accept(call_queue, finished_tag)
 
       # Send UNAVAILABLE if there are too many unprocessed jobs
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index ceeef2a1d8..a8ee3c0da8 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -31,14 +31,17 @@ require 'grpc'
 require 'grpc/generic/active_call'
 require_relative '../port_picker'
 
-ActiveCall = GRPC::ActiveCall
+include GRPC::Core::StatusCodes
 
 describe GRPC::ActiveCall do
+  ActiveCall = GRPC::ActiveCall
+  Call = GRPC::Core::Call
+  CompletionType = GRPC::Core::CompletionType
 
   before(:each) do
     @pass_through = Proc.new { |x| x }
     @server_tag = Object.new
-    @server_finished_tag = Object.new
+    @server_done_tag, meta_tag = Object.new
     @tag = Object.new
 
     @client_queue = GRPC::Core::CompletionQueue.new
@@ -58,11 +61,12 @@ describe GRPC::ActiveCall do
   describe 'restricted view methods' do
     before(:each) do
       call = make_test_call
-      finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
-                                                    deadline)
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline)
       @client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                     @pass_through, deadline,
-                                    finished_tag: finished_tag)
+                                    finished_tag: done_tag,
+                                    read_metadata_tag: meta_tag)
     end
 
     describe '#multi_req_view' do
@@ -89,11 +93,12 @@ describe GRPC::ActiveCall do
   describe '#remote_send' do
     it 'allows a client to send a payload to the server' do
       call = make_test_call
-      finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
-                                                    deadline)
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline)
       @client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                     @pass_through, deadline,
-                                    finished_tag: finished_tag)
+                                    finished_tag: done_tag,
+                                    read_metadata_tag: meta_tag)
       msg = 'message is a string'
       @client_call.remote_send(msg)
 
@@ -113,12 +118,13 @@ describe GRPC::ActiveCall do
 
     it 'marshals the payload using the marshal func' do
       call = make_test_call
-      finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
-                                                    deadline)
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline)
       marshal = Proc.new { |x| 'marshalled:' + x }
       client_call = ActiveCall.new(call, @client_queue, marshal,
                                    @pass_through, deadline,
-                                   finished_tag: finished_tag)
+                                   finished_tag: done_tag,
+                                   read_metadata_tag: meta_tag)
       msg = 'message is a string'
       client_call.remote_send(msg)
 
@@ -133,14 +139,31 @@ describe GRPC::ActiveCall do
 
   end
 
+  describe '#client_start_invoke' do
+
+    it 'sends keywords as metadata to the server when the are present' do
+      call, pass_through = make_test_call, Proc.new { |x| x }
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline, k1: 'v1',
+                                                          k2: 'v2')
+      @server.request_call(@server_tag)
+      ev = @server_queue.next(deadline)
+      expect(ev).to_not be_nil
+      expect(ev.result.metadata['k1']).to eq('v1')
+      expect(ev.result.metadata['k2']).to eq('v2')
+    end
+
+  end
+
   describe '#remote_read' do
     it 'reads the response sent by a server' do
       call, pass_through = make_test_call, Proc.new { |x| x }
-      finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
-                                                    deadline)
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   finished_tag: finished_tag)
+                                   finished_tag: done_tag,
+                                   read_metadata_tag: meta_tag)
       msg = 'message is a string'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg)
@@ -148,19 +171,56 @@ describe GRPC::ActiveCall do
       expect(client_call.remote_read).to eq('server_response')
     end
 
+    it 'saves metadata { status=200 } when the server adds no metadata' do
+      call, pass_through = make_test_call, Proc.new { |x| x }
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline)
+      client_call = ActiveCall.new(call, @client_queue, @pass_through,
+                                   @pass_through, deadline,
+                                   finished_tag: done_tag,
+                                   read_metadata_tag: meta_tag)
+      msg = 'message is a string'
+      client_call.remote_send(msg)
+      server_call = expect_server_to_receive(msg)
+      server_call.remote_send('ignore me')
+      expect(client_call.metadata).to be_nil
+      client_call.remote_read
+      expect(client_call.metadata).to eq({':status' => '200'})
+    end
+
+    it 'saves metadata add by the server' do
+      call, pass_through = make_test_call, Proc.new { |x| x }
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline)
+      client_call = ActiveCall.new(call, @client_queue, @pass_through,
+                                   @pass_through, deadline,
+                                   finished_tag: done_tag,
+                                   read_metadata_tag: meta_tag)
+      msg = 'message is a string'
+      client_call.remote_send(msg)
+      server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2')
+      server_call.remote_send('ignore me')
+      expect(client_call.metadata).to be_nil
+      client_call.remote_read
+      expect(client_call.metadata).to eq({':status' => '200', 'k1' => 'v1',
+                                           'k2' => 'v2'})
+    end
+
+
     it 'get a nil msg before a status when an OK status is sent' do
       call, pass_through = make_test_call, Proc.new { |x| x }
-      finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
-                                                    deadline)
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   finished_tag: finished_tag)
+                                   finished_tag: done_tag,
+                                   read_metadata_tag: meta_tag)
       msg = 'message is a string'
       client_call.remote_send(msg)
       client_call.writes_done(false)
       server_call = expect_server_to_receive(msg)
       server_call.remote_send('server_response')
-      server_call.send_status(StatusCodes::OK, 'OK')
+      server_call.send_status(OK, 'OK')
       expect(client_call.remote_read).to eq('server_response')
       res = client_call.remote_read
       expect(res).to be_nil
@@ -169,12 +229,13 @@ describe GRPC::ActiveCall do
 
     it 'unmarshals the response using the unmarshal func' do
       call = make_test_call
-      finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
-                                                    deadline)
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline)
       unmarshal = Proc.new { |x| 'unmarshalled:' + x }
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    unmarshal, deadline,
-                                   finished_tag: finished_tag)
+                                   finished_tag: done_tag,
+                                   read_metadata_tag: meta_tag)
 
       # confirm the client receives the unmarshalled message
       msg = 'message is a string'
@@ -196,11 +257,12 @@ describe GRPC::ActiveCall do
 
     it 'the returns an enumerator that can read n responses' do
       call = make_test_call
-      finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
-                                                    deadline)
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   finished_tag: finished_tag)
+                                   finished_tag: done_tag,
+                                   read_metadata_tag: meta_tag)
       msg = 'message is 4a string'
       reply = 'server_response'
       client_call.remote_send(msg)
@@ -215,11 +277,12 @@ describe GRPC::ActiveCall do
 
     it 'the returns an enumerator that stops after an OK Status' do
       call = make_test_call
-      finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
-                                                    deadline)
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   finished_tag: finished_tag)
+                                   read_metadata_tag: meta_tag,
+                                   finished_tag: done_tag)
       msg = 'message is a string'
       reply = 'server_response'
       client_call.remote_send(msg)
@@ -231,7 +294,7 @@ describe GRPC::ActiveCall do
         server_call.remote_send(reply)
         expect(e.next).to eq(reply)
       end
-      server_call.send_status(StatusCodes::OK, 'OK')
+      server_call.send_status(OK, 'OK')
       expect { e.next }.to raise_error(StopIteration)
     end
 
@@ -240,34 +303,36 @@ describe GRPC::ActiveCall do
   describe '#writes_done' do
     it 'finishes ok if the server sends a status response' do
       call = make_test_call
-      finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
-                                                    deadline)
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   finished_tag: finished_tag)
+                                   finished_tag: done_tag,
+                                   read_metadata_tag: meta_tag)
       msg = 'message is a string'
       client_call.remote_send(msg)
       expect { client_call.writes_done(false) }.to_not raise_error
       server_call = expect_server_to_receive(msg)
       server_call.remote_send('server_response')
       expect(client_call.remote_read).to eq('server_response')
-      server_call.send_status(StatusCodes::OK, 'status code is OK')
+      server_call.send_status(OK, 'status code is OK')
       expect { server_call.finished }.to_not raise_error
       expect { client_call.finished }.to_not raise_error
     end
 
     it 'finishes ok if the server sends an early status response' do
       call = make_test_call
-      finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
-                                                    deadline)
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   finished_tag: finished_tag)
+                                   read_metadata_tag: meta_tag,
+                                   finished_tag: done_tag)
       msg = 'message is a string'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg)
       server_call.remote_send('server_response')
-      server_call.send_status(StatusCodes::OK, 'status code is OK')
+      server_call.send_status(OK, 'status code is OK')
       expect(client_call.remote_read).to eq('server_response')
       expect { client_call.writes_done(false) }.to_not raise_error
       expect { server_call.finished }.to_not raise_error
@@ -276,16 +341,17 @@ describe GRPC::ActiveCall do
 
     it 'finishes ok if writes_done is true' do
       call = make_test_call
-      finished_tag = ActiveCall.client_start_invoke(call, @client_queue,
-                                                    deadline)
+      done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+                                                          deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   finished_tag: finished_tag)
+                                   read_metadata_tag: meta_tag,
+                                   finished_tag: done_tag)
       msg = 'message is a string'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg)
       server_call.remote_send('server_response')
-      server_call.send_status(StatusCodes::OK, 'status code is OK')
+      server_call.send_status(OK, 'status code is OK')
       expect(client_call.remote_read).to eq('server_response')
       expect { client_call.writes_done(true) }.to_not raise_error
       expect { server_call.finished }.to_not raise_error
@@ -293,19 +359,20 @@ describe GRPC::ActiveCall do
 
   end
 
-  def expect_server_to_receive(sent_text)
-    c = expect_server_to_be_invoked
+  def expect_server_to_receive(sent_text, **kw)
+    c = expect_server_to_be_invoked(**kw)
     expect(c.remote_read).to eq(sent_text)
     c
   end
 
-  def expect_server_to_be_invoked()
+  def expect_server_to_be_invoked(**kw)
     @server.request_call(@server_tag)
     ev = @server_queue.next(deadline)
-    ev.call.accept(@client_queue, @server_finished_tag)
+    ev.call.add_metadata(kw)
+    ev.call.accept(@client_queue, @server_done_tag)
     ActiveCall.new(ev.call, @client_queue, @pass_through,
                    @pass_through, deadline,
-                   finished_tag: @server_finished_tag)
+                   finished_tag: @server_done_tag)
   end
 
   def make_test_call
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 4b01af9581..c76f3b291e 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -45,7 +45,7 @@ def wakey_thread(&blk)
 end
 
 def load_test_certs
-  test_root = File.join(File.parent(File.dirname(__FILE__)), 'testdata')
+  test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
   files = ['ca.pem', 'server1.key', 'server1.pem']
   files.map { |f| File.open(File.join(test_root, f)).read }
 end
@@ -136,14 +136,32 @@ describe 'ClientStub' do
       @sent_msg, @resp = 'a_msg', 'a_reply'
     end
 
-    describe 'without a call operation' do
+    shared_examples 'request response' do
 
-      it 'should send a request to/receive a_reply from a server' do
+      it 'should send a request to/receive a reply from a server' do
         host = new_test_host
         th = run_request_response(host, @sent_msg, @resp, @pass)
         stub = GRPC::ClientStub.new(host, @cq)
-        resp = stub.request_response(@method, @sent_msg, NOOP, NOOP)
-        expect(resp).to eq(@resp)
+        expect(get_response(stub)).to eq(@resp)
+        th.join
+      end
+
+      it 'should send metadata to the server ok' do
+        host = new_test_host
+        th = run_request_response(host, @sent_msg, @resp, @pass, k1: 'v1',
+                                  k2: 'v2')
+        stub = GRPC::ClientStub.new(host, @cq)
+        expect(get_response(stub)).to eq(@resp)
+        th.join
+      end
+
+      it 'should update the sent metadata with a provided metadata updater' do
+        host = new_test_host
+        th = run_request_response(host, @sent_msg, @resp, @pass,
+                                  k1: 'updated-v1', k2: 'v2')
+        update_md = Proc.new { |md| md[:k1] = 'updated-v1'; md }
+        stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md)
+        expect(get_response(stub)).to eq(@resp)
         th.join
       end
 
@@ -151,10 +169,8 @@ describe 'ClientStub' do
         alt_host = new_test_host
         th = run_request_response(alt_host, @sent_msg, @resp, @pass)
         ch = GRPC::Core::Channel.new(alt_host, nil)
-        stub = GRPC::ClientStub.new('ignored-host', @cq,
-                                    channel_override:ch)
-        resp = stub.request_response(@method, @sent_msg, NOOP, NOOP)
-        expect(resp).to eq(@resp)
+        stub = GRPC::ClientStub.new('ignored-host', @cq, channel_override:ch)
+        expect(get_response(stub)).to eq(@resp)
         th.join
       end
 
@@ -162,89 +178,73 @@ describe 'ClientStub' do
         host = new_test_host
         th = run_request_response(host, @sent_msg, @resp, @fail)
         stub = GRPC::ClientStub.new(host, @cq)
-        blk = Proc.new do
-          stub.request_response(@method, @sent_msg, NOOP, NOOP)
-        end
+        blk = Proc.new { get_response(stub) }
         expect(&blk).to raise_error(GRPC::BadStatus)
         th.join
       end
 
     end
 
-    describe 'via a call operation' do
+    describe 'without a call operation' do
 
-      it 'should send a request to/receive a_reply from a server' do
-        host = new_test_host
-        th = run_request_response(host, @sent_msg, @resp, @pass)
-        stub = GRPC::ClientStub.new(host, @cq)
-        op = stub.request_response(@method, @sent_msg, NOOP, NOOP,
-                                   return_op:true)
-        expect(op).to be_a(GRPC::ActiveCall::Operation)
-        resp = op.execute()
-        expect(resp).to eq(@resp)
-        th.join
+      def get_response(stub)
+        stub.request_response(@method, @sent_msg, NOOP, NOOP, k1: 'v1',
+                              k2: 'v2')
       end
 
-      it 'should raise an error if the status is not OK' do
-        host = new_test_host
-        th = run_request_response(host, @sent_msg, @resp, @fail)
-        stub = GRPC::ClientStub.new(host, @cq)
+      it_behaves_like 'request response'
+
+    end
+
+    describe 'via a call operation' do
+
+      def get_response(stub)
         op = stub.request_response(@method, @sent_msg, NOOP, NOOP,
-                                   return_op:true)
+                                   return_op:true, k1: 'v1', k2: 'v2')
         expect(op).to be_a(GRPC::ActiveCall::Operation)
-        blk = Proc.new do
-          op.execute()
-        end
-        expect(&blk).to raise_error(GRPC::BadStatus)
-        th.join
+        op.execute()
       end
 
+      it_behaves_like 'request response'
+
     end
 
   end
 
   describe '#client_streamer' do
 
-    before(:each) do
-      @sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s }
-      @resp = 'a_reply'
-    end
+    shared_examples 'client streaming' do
 
-    describe 'without a call operation' do
+      before(:each) do
+        @sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s }
+        @resp = 'a_reply'
+      end
 
       it 'should send requests to/receive a reply from a server' do
         host = new_test_host
         th = run_client_streamer(host, @sent_msgs, @resp, @pass)
         stub = GRPC::ClientStub.new(host, @cq)
-        resp = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP)
-        expect(resp).to eq(@resp)
+        expect(get_response(stub)).to eq(@resp)
         th.join
       end
 
-      it 'should raise an error if the status is not ok' do
+      it 'should send metadata to the server ok' do
         host = new_test_host
-        th = run_client_streamer(host, @sent_msgs, @resp, @fail)
+        th = run_client_streamer(host, @sent_msgs, @resp, @pass, k1: 'v1',
+                                 k2: 'v2')
         stub = GRPC::ClientStub.new(host, @cq)
-        blk = Proc.new do
-          stub.client_streamer(@method, @sent_msgs, NOOP, NOOP)
-        end
-        expect(&blk).to raise_error(BadStatus)
+        expect(get_response(stub)).to eq(@resp)
         th.join
       end
 
-    end
 
-    describe 'via a call operation' do
-
-      it 'should send requests to/receive a reply from a server' do
+      it 'should update the sent metadata with a provided metadata updater' do
         host = new_test_host
-        th = run_client_streamer(host, @sent_msgs, @resp, @pass)
-        stub = GRPC::ClientStub.new(host, @cq)
-        op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP,
-                                  return_op:true)
-        expect(op).to be_a(GRPC::ActiveCall::Operation)
-        resp = op.execute()
-        expect(resp).to eq(@resp)
+        th = run_client_streamer(host, @sent_msgs, @resp, @pass,
+                                 k1: 'updated-v1', k2: 'v2')
+        update_md = Proc.new { |md| md[:k1] = 'updated-v1'; md }
+        stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md)
+        expect(get_response(stub)).to eq(@resp)
         th.join
       end
 
@@ -252,36 +252,53 @@ describe 'ClientStub' do
         host = new_test_host
         th = run_client_streamer(host, @sent_msgs, @resp, @fail)
         stub = GRPC::ClientStub.new(host, @cq)
+        blk = Proc.new { get_response(stub) }
+        expect(&blk).to raise_error(GRPC::BadStatus)
+        th.join
+      end
+
+    end
+
+    describe 'without a call operation' do
+
+      def get_response(stub)
+        stub.client_streamer(@method, @sent_msgs, NOOP, NOOP, k1: 'v1',
+                             k2: 'v2')
+      end
+
+      it_behaves_like 'client streaming'
+
+    end
+
+    describe 'via a call operation' do
+
+      def get_response(stub)
         op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP,
-                                  return_op:true)
+                                  return_op:true, k1: 'v1', k2: 'v2')
         expect(op).to be_a(GRPC::ActiveCall::Operation)
-        blk = Proc.new do
-          op.execute()
-        end
-        expect(&blk).to raise_error(BadStatus)
-        th.join
+        resp = op.execute()
       end
 
+      it_behaves_like 'client streaming'
+
     end
 
   end
 
   describe '#server_streamer' do
 
-    before(:each) do
-      @sent_msg = 'a_msg'
-      @replys = Array.new(3) { |i| 'reply_' + (i+1).to_s }
-    end
+    shared_examples 'server streaming' do
 
-    describe 'without a call operation' do
+      before(:each) do
+        @sent_msg = 'a_msg'
+        @replys = Array.new(3) { |i| 'reply_' + (i+1).to_s }
+      end
 
       it 'should send a request to/receive replies from a server' do
         host = new_test_host
         th = run_server_streamer(host, @sent_msg, @replys, @pass)
         stub = GRPC::ClientStub.new(host, @cq)
-        e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP)
-        expect(e).to be_a(Enumerator)
-        expect(e.collect { |r| r }).to eq(@replys)
+        expect(get_responses(stub).collect { |r| r }).to eq(@replys)
         th.join
       end
 
@@ -289,60 +306,79 @@ describe 'ClientStub' do
         host = new_test_host
         th = run_server_streamer(host, @sent_msg, @replys, @fail)
         stub = GRPC::ClientStub.new(host, @cq)
-        e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP)
-        expect(e).to be_a(Enumerator)
-        expect { e.collect { |r| r } }.to raise_error(BadStatus)
+        e = get_responses(stub)
+        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
         th.join
       end
 
-    end
-
-    describe 'via a call operation' do
-
-      it 'should send a request to/receive replies from a server' do
+      it 'should send metadata to the server ok' do
         host = new_test_host
-        th = run_server_streamer(host, @sent_msg, @replys, @pass)
+        th = run_server_streamer(host, @sent_msg, @replys, @fail, k1: 'v1',
+                                 k2: 'v2')
         stub = GRPC::ClientStub.new(host, @cq)
-        op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP,
-                                  return_op:true)
-        expect(op).to be_a(GRPC::ActiveCall::Operation)
-        e = op.execute()
-        expect(e).to be_a(Enumerator)
+        e = get_responses(stub)
+        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
         th.join
       end
 
-      it 'should raise an error if the status is not ok' do
+      it 'should update the sent metadata with a provided metadata updater' do
         host = new_test_host
-        th = run_server_streamer(host, @sent_msg, @replys, @fail)
-        stub = GRPC::ClientStub.new(host, @cq)
+        th = run_server_streamer(host, @sent_msg, @replys, @pass,
+                                 k1: 'updated-v1', k2: 'v2')
+        update_md = Proc.new { |md| md[:k1] = 'updated-v1'; md }
+        stub = GRPC::ClientStub.new(host, @cq, update_metadata: update_md)
+        e = get_responses(stub)
+        expect(e.collect { |r| r }).to eq(@replys)
+        th.join
+      end
+
+    end
+
+    describe 'without a call operation' do
+
+      def get_responses(stub)
+        e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP, k1: 'v1',
+                                 k2: 'v2')
+        expect(e).to be_a(Enumerator)
+        e
+      end
+
+      it_behaves_like 'server streaming'
+
+    end
+
+    describe 'via a call operation' do
+
+      def get_responses(stub)
         op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP,
-                                  return_op:true)
+                                  return_op:true, k1: 'v1', k2: 'v2')
         expect(op).to be_a(GRPC::ActiveCall::Operation)
         e = op.execute()
         expect(e).to be_a(Enumerator)
-        expect { e.collect { |r| r } }.to raise_error(BadStatus)
-        th.join
+        e
       end
 
+      it_behaves_like 'server streaming'
+
     end
 
   end
 
   describe '#bidi_streamer' do
-    before(:each) do
-      @sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s }
-      @replys = Array.new(3) { |i| 'reply_' + (i+1).to_s }
-    end
 
-    describe 'without a call operation' do
+    shared_examples 'bidi streaming' do
+
+      before(:each) do
+        @sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s }
+        @replys = Array.new(3) { |i| 'reply_' + (i+1).to_s }
+      end
 
       it 'supports sending all the requests first', :bidi => true do
         host = new_test_host
         th = run_bidi_streamer_handle_inputs_first(host, @sent_msgs, @replys,
                                                    @pass)
         stub = GRPC::ClientStub.new(host, @cq)
-        e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP)
-        expect(e).to be_a(Enumerator)
+        e = get_responses(stub)
         expect(e.collect { |r| r }).to eq(@replys)
         th.join
       end
@@ -351,8 +387,7 @@ describe 'ClientStub' do
         host = new_test_host
         th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, true)
         stub = GRPC::ClientStub.new(host, @cq)
-        e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP)
-        expect(e).to be_a(Enumerator)
+        e = get_responses(stub)
         expect(e.collect { |r| r }).to eq(@sent_msgs)
         th.join
       end
@@ -367,68 +402,48 @@ describe 'ClientStub' do
         host = new_test_host
         th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, false)
         stub = GRPC::ClientStub.new(host, @cq)
-        e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP)
-        expect(e).to be_a(Enumerator)
+        e = get_responses(stub)
         expect(e.collect { |r| r }).to eq(@sent_msgs)
         th.join
       end
 
     end
 
-    describe 'via a call operation' do
+    describe 'without a call operation' do
 
-      it 'supports sending all the requests first', :bidi => true do
-        host = new_test_host
-        th = run_bidi_streamer_handle_inputs_first(host, @sent_msgs, @replys,
-                                                   @pass)
-        stub = GRPC::ClientStub.new(host, @cq)
-        op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP,
-                                return_op:true)
-        expect(op).to be_a(GRPC::ActiveCall::Operation)
-        e = op.execute
+      def get_responses(stub)
+        e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP)
         expect(e).to be_a(Enumerator)
-        expect(e.collect { |r| r }).to eq(@replys)
-        th.join
+        e
       end
 
-      it 'supports client-initiated ping pong', :bidi => true  do
-        host = new_test_host
-        th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, true)
-        stub = GRPC::ClientStub.new(host, @cq)
-        op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP,
-                                return_op:true)
-        expect(op).to be_a(GRPC::ActiveCall::Operation)
-        e = op.execute
-        expect(e).to be_a(Enumerator)
-        expect(e.collect { |r| r }).to eq(@sent_msgs)
-        th.join
-      end
+      it_behaves_like 'bidi streaming'
 
-      # disabled because an unresolved wire-protocol implementation feature
-      #
-      # - servers should be able initiate messaging, however, as it stand
-      # servers don't know if all the client metadata has been sent until
-      # they receive a message from the client.  Without receiving all the
-      # metadata, the server does not accept the call, so this test hangs.
-      xit 'supports server-initiated ping pong', :bidi => true do
-        th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, false)
-        stub = GRPC::ClientStub.new(host, @cq)
-        op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP,
-                                return_op:true)
+    end
+
+    describe 'via a call operation' do
+
+      def get_responses(stub)
+        op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP, return_op:true)
         expect(op).to be_a(GRPC::ActiveCall::Operation)
         e = op.execute
         expect(e).to be_a(Enumerator)
-        expect(e.collect { |r| r }).to eq(@sent_msgs)
-        th.join
+        e
       end
 
+      it_behaves_like 'bidi streaming'
+
     end
 
   end
 
-  def run_server_streamer(hostname, expected_input, replys, status)
+  def run_server_streamer(hostname, expected_input, replys, status, **kw)
+    wanted_metadata = kw.clone
     wakey_thread do |mtx, cnd|
       c = expect_server_to_be_invoked(hostname, mtx, cnd)
+      wanted_metadata.each do |k, v|
+        expect(c.metadata[k.to_s]).to eq(v)
+      end
       expect(c.remote_read).to eq(expected_input)
       replys.each { |r| c.remote_send(r) }
       c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
@@ -462,19 +477,27 @@ describe 'ClientStub' do
     end
   end
 
-  def run_client_streamer(hostname, expected_inputs, resp, status)
+  def run_client_streamer(hostname, expected_inputs, resp, status, **kw)
+    wanted_metadata = kw.clone
     wakey_thread do |mtx, cnd|
       c = expect_server_to_be_invoked(hostname, mtx, cnd)
       expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
+      wanted_metadata.each do |k, v|
+        expect(c.metadata[k.to_s]).to eq(v)
+      end
       c.remote_send(resp)
       c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
     end
   end
 
-  def run_request_response(hostname, expected_input, resp, status)
+  def run_request_response(hostname, expected_input, resp, status, **kw)
+    wanted_metadata = kw.clone
     wakey_thread do |mtx, cnd|
       c = expect_server_to_be_invoked(hostname, mtx, cnd)
       expect(c.remote_read).to eq(expected_input)
+      wanted_metadata.each do |k, v|
+        expect(c.metadata[k.to_s]).to eq(v)
+      end
       c.remote_send(resp)
       c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
     end
@@ -496,9 +519,11 @@ describe 'ClientStub' do
     test_deadline = Time.now + 10  # fail tests after 10 seconds
     ev = server_queue.pluck(@server_tag, INFINITE_FUTURE)
     raise OutOfTime if ev.nil?
+    server_call = ev.call
+    server_call.metadata = ev.result.metadata
     finished_tag = Object.new
-    ev.call.accept(server_queue, finished_tag)
-    GRPC::ActiveCall.new(ev.call, server_queue, NOOP, NOOP, INFINITE_FUTURE,
+    server_call.accept(server_queue, finished_tag)
+    GRPC::ActiveCall.new(server_call, server_queue, NOOP, NOOP, INFINITE_FUTURE,
                          finished_tag: finished_tag)
   end
 
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index fc579a6c3f..7c9b074abf 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -36,7 +36,7 @@ require 'xray/thread_dump_signal_handler'
 require_relative '../port_picker'
 
 def load_test_certs
-  test_root = File.join(File.parent(File.dirname(__FILE__)), 'testdata')
+  test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
   files = ['ca.pem', 'server1.key', 'server1.pem']
   files.map { |f| File.open(File.join(test_root, f)).read }
 end
@@ -95,6 +95,7 @@ SlowStub = SlowService.rpc_stub_class
 describe GRPC::RpcServer do
 
   RpcServer = GRPC::RpcServer
+  StatusCodes = GRPC::Core::StatusCodes
 
   before(:each) do
     @method = 'an_rpc_method'
@@ -343,7 +344,7 @@ describe GRPC::RpcServer do
           stub = GRPC::ClientStub.new(@host, cq, **@client_opts)
           stub.request_response('/unknown', req, @marshal, @unmarshal)
         end
-        expect(&blk).to raise_error BadStatus
+        expect(&blk).to raise_error GRPC::BadStatus
         @srv.stop
         t.join
       end
@@ -402,7 +403,7 @@ describe GRPC::RpcServer do
             stub = SlowStub.new(@host, **@client_opts)
             begin
               stub.an_rpc(req)
-            rescue BadStatus => e
+            rescue GRPC::BadStatus => e
               _1_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE
             end
           end
-- 
GitLab