From 59a19a9d5ecc34b60fd6c035a6cb261261dd48fe Mon Sep 17 00:00:00 2001
From: Alexander Polcyn <apolcyn@google.com>
Date: Tue, 18 Jul 2017 17:26:08 -0700
Subject: [PATCH] make sure that client-side view of calls is robust

---
 src/ruby/lib/grpc/generic/active_call.rb  | 17 +++-
 src/ruby/spec/generic/client_stub_spec.rb | 97 ++++++++++++++++++++---
 2 files changed, 104 insertions(+), 10 deletions(-)

diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 96c773a995..4a748a4ac2 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -46,7 +46,7 @@ module GRPC
     extend Forwardable
     attr_reader :deadline, :metadata_sent, :metadata_to_send
     def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
-                   :peer, :peer_cert, :trailing_metadata
+                   :peer, :peer_cert, :trailing_metadata, :status
 
     # client_invoke begins a client invocation.
     #
@@ -105,6 +105,8 @@ module GRPC
       @input_stream_done = false
       @call_finished = false
       @call_finished_mu = Mutex.new
+      @client_call_executed = false
+      @client_call_executed_mu = Mutex.new
     end
 
     # Sends the initial metadata that has yet to be sent.
@@ -327,6 +329,7 @@ module GRPC
     # a list, multiple metadata for its key are sent
     # @return [Object] the response received from the server
     def request_response(req, metadata: {})
+      raise_error_if_already_executed
       ops = {
         SEND_MESSAGE => @marshal.call(req),
         SEND_CLOSE_FROM_CLIENT => nil,
@@ -369,6 +372,7 @@ module GRPC
     # a list, multiple metadata for its key are sent
     # @return [Object] the response received from the server
     def client_streamer(requests, metadata: {})
+      raise_error_if_already_executed
       begin
         merge_metadata_and_send_if_not_already_sent(metadata)
         requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
@@ -411,6 +415,7 @@ module GRPC
     # a list, multiple metadata for its key are sent
     # @return [Enumerator|nil] a response Enumerator
     def server_streamer(req, metadata: {})
+      raise_error_if_already_executed
       ops = {
         SEND_MESSAGE => @marshal.call(req),
         SEND_CLOSE_FROM_CLIENT => nil
@@ -468,6 +473,7 @@ module GRPC
     # a list, multiple metadata for its key are sent
     # @return [Enumerator, nil] a response Enumerator
     def bidi_streamer(requests, metadata: {}, &blk)
+      raise_error_if_already_executed
       # Metadata might have already been sent if this is an operation view
       merge_metadata_and_send_if_not_already_sent(metadata)
       bd = BidiCall.new(@call,
@@ -572,6 +578,15 @@ module GRPC
       merge_metadata_to_send(metadata) && send_initial_metadata
     end
 
+    def raise_error_if_already_executed
+      @client_call_executed_mu.synchronize do
+        if @client_call_executed
+          fail GRPC::Core::CallError, 'attempting to re-run a call'
+        end
+        @client_call_executed = true
+      end
+    end
+
     def self.view_class(*visible_methods)
       Class.new do
         extend ::Forwardable
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 3b8f72eda1..7b5e6a95a4 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -36,6 +36,33 @@ include GRPC::Core::StatusCodes
 include GRPC::Core::TimeConsts
 include GRPC::Core::CallOps
 
+# check that methods on a finished/closed call t crash
+def check_op_view_of_finished_client_call_is_robust(op_view)
+  # use read_response_stream to try to iterate through
+  # possible response stream
+  fail('need something to attempt reads') unless block_given?
+  expect do
+    resp = op_view.execute
+    yield resp
+  end.to raise_error(GRPC::Core::CallError)
+
+  expect { op_view.start_call }.to raise_error(RuntimeError)
+
+  expect do
+    op_view.wait
+    op_view.cancel
+
+    op_view.metadata
+    op_view.trailing_metadata
+    op_view.status
+
+    op_view.cancelled?
+    op_view.deadline
+    op_view.write_flag
+    op_view.write_flag = 1
+  end.to_not raise_error
+end
+
 describe 'ClientStub' do
   let(:noop) { proc { |x| x } }
 
@@ -231,15 +258,27 @@ describe 'ClientStub' do
 
       it_behaves_like 'request response'
 
-      it 'sends metadata to the server ok when running start_call first' do
+      def run_op_view_metadata_test(run_start_call_first)
         server_port = create_test_server
         host = "localhost:#{server_port}"
         th = run_request_response(@sent_msg, @resp, @pass,
                                   k1: 'v1', k2: 'v2')
         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
-        expect(get_response(stub)).to eq(@resp)
+        expect(
+          get_response(stub,
+                       run_start_call_first: run_start_call_first)).to eq(@resp)
         th.join
       end
+
+      it 'sends metadata to the server ok when running start_call first' do
+        run_op_view_metadata_test(true)
+        check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
+      end
+
+      it 'does not crash when used after the call has been finished' do
+        run_op_view_metadata_test(false)
+        check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
+      end
     end
   end
 
@@ -307,11 +346,23 @@ describe 'ClientStub' do
 
       it_behaves_like 'client streaming'
 
-      it 'sends metadata to the server ok when running start_call first' do
+      def run_op_view_metadata_test(run_start_call_first)
         th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
-        expect(get_response(@stub, run_start_call_first: true)).to eq(@resp)
+        expect(
+          get_response(@stub,
+                       run_start_call_first: run_start_call_first)).to eq(@resp)
         th.join
       end
+
+      it 'sends metadata to the server ok when running start_call first' do
+        run_op_view_metadata_test(true)
+        check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
+      end
+
+      it 'does not crash when used after the call has been finished' do
+        run_op_view_metadata_test(false)
+        check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
+      end
     end
   end
 
@@ -377,7 +428,7 @@ describe 'ClientStub' do
       end
     end
 
-    describe 'without a call operation', test2: true do
+    describe 'without a call operation' do
       def get_responses(stub, unmarshal: noop)
         e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
                                  metadata: @metadata)
@@ -405,16 +456,30 @@ describe 'ClientStub' do
 
       it_behaves_like 'server streaming'
 
-      it 'should send metadata to the server ok when start_call is run first' do
+      def run_op_view_metadata_test(run_start_call_first)
         server_port = create_test_server
         host = "localhost:#{server_port}"
         th = run_server_streamer(@sent_msg, @replys, @fail,
                                  k1: 'v1', k2: 'v2')
         stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
-        e = get_responses(stub, run_start_call_first: true)
+        e = get_responses(stub, run_start_call_first: run_start_call_first)
         expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
         th.join
       end
+
+      it 'should send metadata to the server ok when start_call is run first' do
+        run_op_view_metadata_test(true)
+        check_op_view_of_finished_client_call_is_robust(@op) do |responses|
+          responses.each { |r| p r }
+        end
+      end
+
+      it 'does not crash when used after the call has been finished' do
+        run_op_view_metadata_test(false)
+        check_op_view_of_finished_client_call_is_robust(@op) do |responses|
+          responses.each { |r| p r }
+        end
+      end
     end
   end
 
@@ -501,14 +566,28 @@ describe 'ClientStub' do
 
       it_behaves_like 'bidi streaming'
 
-      it 'can run start_call before executing the call' do
+      def run_op_view_metadata_test(run_start_call_first)
         th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
                                                    @pass)
         stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
-        e = get_responses(stub, run_start_call_first: true)
+        e = get_responses(stub, run_start_call_first: run_start_call_first)
         expect(e.collect { |r| r }).to eq(@replys)
         th.join
       end
+
+      it 'can run start_call before executing the call' do
+        run_op_view_metadata_test(true)
+        check_op_view_of_finished_client_call_is_robust(@op) do |responses|
+          responses.each { |r| p r }
+        end
+      end
+
+      it 'doesnt crash when op_view used after call has finished' do
+        run_op_view_metadata_test(false)
+        check_op_view_of_finished_client_call_is_robust(@op) do |responses|
+          responses.each { |r| p r }
+        end
+      end
     end
   end
 
-- 
GitLab