diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml
index 6b8f336055a9aecb821bf3350e950c82e0a0dd8a..ed4a4438b3f893d2a31c5fe5cdb23039ac206ef0 100644
--- a/src/ruby/.rubocop_todo.yml
+++ b/src/ruby/.rubocop_todo.yml
@@ -1,5 +1,5 @@
 # This configuration was generated by `rubocop --auto-gen-config`
-# on 2015-04-17 12:36:26 -0700 using RuboCop version 0.30.0.
+# on 2015-04-17 14:43:27 -0700 using RuboCop version 0.30.0.
 # The point is for the user to remove these configuration records
 # one by one as the offenses are removed from the code base.
 # Note that changes in the inspected code, or installation of new
@@ -7,12 +7,12 @@
 
 # Offense count: 30
 Metrics/AbcSize:
-  Max: 37
+  Max: 40
 
 # Offense count: 3
 # Configuration parameters: CountComments.
 Metrics/ClassLength:
-  Max: 179
+  Max: 184
 
 # Offense count: 35
 # Configuration parameters: CountComments.
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 274372e4d521627ae19758fa49049da7bf5f3960..43ba549905986c5f175a5d7b80ca0ad73f9a5486 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -122,6 +122,12 @@ module GRPC
       @metadata_tag = metadata_tag
     end
 
+    # output_metadata are provides access to hash that can be used to
+    # save metadata to be sent as trailer
+    def output_metadata
+      @output_metadata ||= {}
+    end
+
     # multi_req_view provides a restricted view of this ActiveCall for use
     # in a server client-streaming handler.
     def multi_req_view
@@ -164,10 +170,12 @@ module GRPC
     def finished
       batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE,
                                      RECV_STATUS_ON_CLIENT => nil)
-      if @call.metadata.nil?
-        @call.metadata = batch_result.metadata
-      elsif !batch_result.metadata.nil?
-        @call.metadata.merge!(batch_result.metadata)
+      unless batch_result.status.nil?
+        if @call.metadata.nil?
+          @call.metadata = batch_result.status.metadata
+        else
+          @call.metadata.merge!(batch_result.status.metadata)
+        end
       end
       batch_result.check_status
     end
@@ -445,12 +453,13 @@ module GRPC
 
     # SingleReqView limits access to an ActiveCall's methods for use in server
     # handlers that receive just one request.
-    SingleReqView = view_class(:cancelled, :deadline, :metadata)
+    SingleReqView = view_class(:cancelled, :deadline, :metadata,
+                               :output_metadata)
 
     # MultiReqView limits access to an ActiveCall's methods for use in
     # server client_streamer handlers.
     MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg,
-                              :each_remote_read, :metadata)
+                              :each_remote_read, :metadata, :output_metadata)
 
     # Operation limits access to an ActiveCall's methods for use as
     # a Operation on the client.
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index 22b80d8938b0a1cf6bef143288a4d66054a5c744..10211ae2397b7a89c1f45f2482d172bb387adf22 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -80,7 +80,7 @@ module GRPC
       else  # is a bidi_stream
         active_call.run_server_bidi(mth)
       end
-      send_status(active_call, OK, 'OK')
+      send_status(active_call, OK, 'OK', **active_call.output_metadata)
     rescue BadStatus => e
       # this is raised by handlers that want GRPC to send an application error
       # code and detail message and some additional app-specific metadata.
diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb
index e5d05c8b9b1518cdbaddc37140e588bb5a26a9f0..083632a080fbe3f4b7a6e1688f1b412500c93c28 100644
--- a/src/ruby/spec/generic/rpc_desc_spec.rb
+++ b/src/ruby/spec/generic/rpc_desc_spec.rb
@@ -77,12 +77,12 @@ describe GRPC::RpcDesc do
   end
 
   describe '#run_server_method' do
+    let(:fake_md) { { k1: 'v1', k2: 'v2' } }
     describe 'for request responses' do
       let(:this_desc) { @request_response }
       before(:each) do
         @call = double('active_call')
         allow(@call).to receive(:single_req_view).and_return(@call)
-        allow(@call).to receive(:gc)
       end
 
       it_behaves_like 'it handles errors'
@@ -91,7 +91,9 @@ describe GRPC::RpcDesc do
         req = Object.new
         expect(@call).to receive(:remote_read).once.and_return(req)
         expect(@call).to receive(:remote_send).once.with(@ok_response)
-        expect(@call).to receive(:send_status).once.with(OK, 'OK', true, {})
+        expect(@call).to receive(:output_metadata).and_return(fake_md)
+        expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
+                                                         **fake_md)
         this_desc.run_server_method(@call, method(:fake_reqresp))
       end
     end
@@ -100,7 +102,6 @@ describe GRPC::RpcDesc do
       before(:each) do
         @call = double('active_call')
         allow(@call).to receive(:multi_req_view).and_return(@call)
-        allow(@call).to receive(:gc)
       end
 
       it 'sends the specified status if BadStatus is raised' do
@@ -125,7 +126,9 @@ describe GRPC::RpcDesc do
 
       it 'sends a response and closes the stream if there no errors' do
         expect(@call).to receive(:remote_send).once.with(@ok_response)
-        expect(@call).to receive(:send_status).once.with(OK, 'OK', true, {})
+        expect(@call).to receive(:output_metadata).and_return(fake_md)
+        expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
+                                                         **fake_md)
         @client_streamer.run_server_method(@call, method(:fake_clstream))
       end
     end
@@ -135,7 +138,6 @@ describe GRPC::RpcDesc do
       before(:each) do
         @call = double('active_call')
         allow(@call).to receive(:single_req_view).and_return(@call)
-        allow(@call).to receive(:gc)
       end
 
       it_behaves_like 'it handles errors'
@@ -144,7 +146,9 @@ describe GRPC::RpcDesc do
         req = Object.new
         expect(@call).to receive(:remote_read).once.and_return(req)
         expect(@call).to receive(:remote_send).twice.with(@ok_response)
-        expect(@call).to receive(:send_status).once.with(OK, 'OK', true, {})
+        expect(@call).to receive(:output_metadata).and_return(fake_md)
+        expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
+                                                         **fake_md)
         @server_streamer.run_server_method(@call, method(:fake_svstream))
       end
     end
@@ -155,7 +159,6 @@ describe GRPC::RpcDesc do
         enq_th, rwl_th = double('enqueue_th'), ('read_write_loop_th')
         allow(enq_th).to receive(:join)
         allow(rwl_th).to receive(:join)
-        allow(@call).to receive(:gc)
       end
 
       it 'sends the specified status if BadStatus is raised' do
@@ -175,7 +178,9 @@ describe GRPC::RpcDesc do
 
       it 'closes the stream if there no errors' do
         expect(@call).to receive(:run_server_bidi)
-        expect(@call).to receive(:send_status).once.with(OK, 'OK', true, {})
+        expect(@call).to receive(:output_metadata).and_return(fake_md)
+        expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
+                                                         **fake_md)
         @bidi_streamer.run_server_method(@call, method(:fake_bidistream))
       end
     end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index f1d95be55397246ae34618737c89a27cc7f5ff90..c15d96926bcd7b770080fd81d721a609c2759b8c 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -64,12 +64,14 @@ class EchoService
   rpc :an_rpc, EchoMsg, EchoMsg
   attr_reader :received_md
 
-  def initialize(_default_var = 'ignored')
+  def initialize(**kw)
+    @trailing_metadata = kw
     @received_md = []
   end
 
   def an_rpc(req, call)
     logger.info('echo service received a request')
+    call.output_metadata.update(@trailing_metadata)
     @received_md << call.metadata unless call.metadata.nil?
     req
   end
@@ -534,7 +536,7 @@ describe GRPC::RpcServer do
       end
     end
 
-    context 'with metadata on failing' do
+    context 'with returned metadata on failing' do
       before(:each) do
         server_opts = {
           server_override: @server,
@@ -544,7 +546,7 @@ describe GRPC::RpcServer do
         @srv = RpcServer.new(**server_opts)
       end
 
-      it 'should send receive metadata failed response', server: true do
+      it 'should receive the metadata in the BadStatus', server: true do
         service = FailingService.new
         @srv.handle(service)
         t = Thread.new { @srv.run }
@@ -568,5 +570,32 @@ describe GRPC::RpcServer do
         t.join
       end
     end
+
+    context 'with returned metadata on passing' do
+      before(:each) do
+        server_opts = {
+          server_override: @server,
+          completion_queue_override: @server_queue,
+          poll_period: 1
+        }
+        @srv = RpcServer.new(**server_opts)
+      end
+
+      it 'should send connect metadata to the client', server: true do
+        wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' }
+        service = EchoService.new(k1: 'out_v1', k2: 'out_v2')
+        @srv.handle(service)
+        t = Thread.new { @srv.run }
+        @srv.wait_till_running
+        req = EchoMsg.new
+        stub = EchoStub.new(@host, **client_opts)
+        op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
+        expect(op.metadata).to be nil
+        expect(op.execute).to be_a(EchoMsg)
+        expect(op.metadata).to eq(wanted_trailers)
+        @srv.stop
+        t.join
+      end
+    end
   end
 end