From 515b0a93526a3955f5f76ce6013394f87a6c904c Mon Sep 17 00:00:00 2001
From: Nathaniel Manista <nathaniel@google.com>
Date: Thu, 13 Aug 2015 10:10:18 -0700
Subject: [PATCH] Service-side read without allowance

This compensates for the abstraction mismatch described in issue 2916.
---
 src/python/grpcio/grpc/_links/service.py      | 43 +++++++++++++------
 .../framework/interfaces/links/test_cases.py  |  9 +---
 2 files changed, 32 insertions(+), 20 deletions(-)

diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py
index 7783e91824..5c636d61ab 100644
--- a/src/python/grpcio/grpc/_links/service.py
+++ b/src/python/grpcio/grpc/_links/service.py
@@ -44,7 +44,10 @@ from grpc.framework.interfaces.links import links
 @enum.unique
 class _Read(enum.Enum):
   READING = 'reading'
-  AWAITING_ALLOWANCE = 'awaiting allowance'
+  # TODO(issue 2916): This state will again be necessary after eliminating the
+  # "early_read" field of _RPCState and going back to only reading when granted
+  # allowance to read.
+  # AWAITING_ALLOWANCE = 'awaiting allowance'
   CLOSED = 'closed'
 
 
@@ -67,12 +70,15 @@ class _RPCState(object):
 
   def __init__(
       self, request_deserializer, response_serializer, sequence_number, read,
-      allowance, high_write, low_write, premetadataed, terminal_metadata, code,
-      message):
+      early_read, allowance, high_write, low_write, premetadataed,
+      terminal_metadata, code, message):
     self.request_deserializer = request_deserializer
     self.response_serializer = response_serializer
     self.sequence_number = sequence_number
     self.read = read
+    # TODO(issue 2916): Eliminate this by eliminating the necessity of calling
+    # call.read just to advance the RPC.
+    self.early_read = early_read  # A raw (not deserialized) read.
     self.allowance = allowance
     self.high_write = high_write
     self.low_write = low_write
@@ -120,7 +126,7 @@ class _Kernel(object):
 
     call.read(call)
     self._rpc_states[call] = _RPCState(
-        request_deserializer, response_serializer, 1, _Read.READING, 0,
+        request_deserializer, response_serializer, 1, _Read.READING, None, 1,
         _HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None)
     ticket = links.Ticket(
         call, 0, group, method, links.Ticket.Subscription.FULL,
@@ -140,12 +146,15 @@ class _Kernel(object):
       termination = links.Ticket.Termination.COMPLETION
     else:
       if 0 < rpc_state.allowance:
+        payload = rpc_state.request_deserializer(event.bytes)
+        termination = None
         rpc_state.allowance -= 1
         call.read(call)
       else:
-        rpc_state.read = _Read.AWAITING_ALLOWANCE
-      payload = rpc_state.request_deserializer(event.bytes)
-      termination = None
+        rpc_state.early_read = event.bytes
+        return
+        # TODO(issue 2916): Instead of returning:
+        # rpc_state.read = _Read.AWAITING_ALLOWANCE
     ticket = links.Ticket(
         call, rpc_state.sequence_number, None, None, None, None, None, None,
         payload, None, None, None, termination)
@@ -237,12 +246,22 @@ class _Kernel(object):
           rpc_state.premetadataed = True
 
       if ticket.allowance is not None:
-        if rpc_state.read is _Read.AWAITING_ALLOWANCE:
-          rpc_state.allowance += ticket.allowance - 1
-          call.read(call)
-          rpc_state.read = _Read.READING
-        else:
+        if rpc_state.early_read is None:
           rpc_state.allowance += ticket.allowance
+        else:
+          payload = rpc_state.request_deserializer(rpc_state.early_read)
+          rpc_state.allowance += ticket.allowance - 1
+          rpc_state.early_read = None
+          if rpc_state.read is _Read.READING:
+            call.read(call)
+            termination = None
+          else:
+            termination = links.Ticket.Termination.COMPLETION
+          ticket = links.Ticket(
+              call, rpc_state.sequence_number, None, None, None, None, None,
+              None, payload, None, None, None, termination)
+          rpc_state.sequence_number += 1
+          self._relay.add_value(ticket)
 
       if ticket.payload is not None:
         call.write(rpc_state.response_serializer(ticket.payload), call)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py
index 26ca035c44..1e575d1a9e 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py
@@ -303,16 +303,9 @@ class TransmissionTest(object):
         invocation_message, links.Ticket.Termination.COMPLETION)
     self._invocation_link.accept_ticket(original_invocation_ticket)
 
-    # TODO(nathaniel): This shouldn't be necessary. Detecting the end of the
-    # invocation-side ticket sequence shouldn't require granting allowance for
-    # another payload.
     self._service_mate.block_until_tickets_satisfy(
         at_least_n_payloads_received_predicate(1))
     service_operation_id = self._service_mate.tickets()[0].operation_id
-    self._service_link.accept_ticket(
-        links.Ticket(
-            service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
-            None, 1, None, None, None, None, None, None))
 
     self._service_mate.block_until_tickets_satisfy(terminated)
     self._assert_is_valid_invocation_sequence(
@@ -321,7 +314,7 @@ class TransmissionTest(object):
         invocation_terminal_metadata, links.Ticket.Termination.COMPLETION)
 
     original_service_ticket = links.Ticket(
-        service_operation_id, 1, None, None, links.Ticket.Subscription.FULL,
+        service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
         timeout, 0, service_initial_metadata, service_payload,
         service_terminal_metadata, service_code, service_message,
         links.Ticket.Termination.COMPLETION)
-- 
GitLab