diff --git a/.pylintrc b/.pylintrc
index 1682488cb4a262b86d71765f31758efd3994007f..da2081b87e12f048fc0278dfc43785a29e4ded8c 100644
--- a/.pylintrc
+++ b/.pylintrc
@@ -30,6 +30,5 @@
 #TODO: Enable too-many-nested-blocks
 #TODO: Enable super-init-not-called
 #TODO: Enable no-self-use
-#TODO: Enable no-member
 
-disable=missing-docstring,too-few-public-methods,too-many-arguments,no-init,duplicate-code,invalid-name,suppressed-message,locally-disabled,protected-access,no-name-in-module,unused-argument,fixme,wrong-import-order,no-value-for-parameter,cyclic-import,unused-variable,redefined-outer-name,unused-import,too-many-instance-attributes,broad-except,too-many-locals,too-many-lines,redefined-variable-type,next-method-called,import-error,useless-else-on-loop,too-many-return-statements,too-many-nested-blocks,super-init-not-called,no-self-use,no-member
+disable=missing-docstring,too-few-public-methods,too-many-arguments,no-init,duplicate-code,invalid-name,suppressed-message,locally-disabled,protected-access,no-name-in-module,unused-argument,fixme,wrong-import-order,no-value-for-parameter,cyclic-import,unused-variable,redefined-outer-name,unused-import,too-many-instance-attributes,broad-except,too-many-locals,too-many-lines,redefined-variable-type,next-method-called,import-error,useless-else-on-loop,too-many-return-statements,too-many-nested-blocks,super-init-not-called,no-self-use
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 0fae3effb6352d1c2f055e4a0a7c4316b4e69b98..3c563bcc6fafb565977d0f48b62093440ab05a85 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -153,8 +153,6 @@ struct grpc_call {
   bool destroy_called;
   /** flag indicating that cancellation is inherited */
   bool cancellation_is_inherited;
-  /** bitmask of live batches */
-  uint8_t used_batches;
   /** which ops are in-flight */
   bool sent_initial_metadata;
   bool sending_message;
@@ -1012,25 +1010,48 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
   return !(flags & invalid_positions);
 }
 
-static batch_control *allocate_batch_control(grpc_call *call) {
-  size_t i;
-  for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) {
-    if ((call->used_batches & (1 << i)) == 0) {
-      call->used_batches = (uint8_t)(call->used_batches | (uint8_t)(1 << i));
-      return &call->active_batches[i];
-    }
+static int batch_slot_for_op(grpc_op_type type) {
+  switch (type) {
+    case GRPC_OP_SEND_INITIAL_METADATA:
+      return 0;
+    case GRPC_OP_SEND_MESSAGE:
+      return 1;
+    case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+    case GRPC_OP_SEND_STATUS_FROM_SERVER:
+      return 2;
+    case GRPC_OP_RECV_INITIAL_METADATA:
+      return 3;
+    case GRPC_OP_RECV_MESSAGE:
+      return 4;
+    case GRPC_OP_RECV_CLOSE_ON_SERVER:
+    case GRPC_OP_RECV_STATUS_ON_CLIENT:
+      return 5;
+  }
+  GPR_UNREACHABLE_CODE(return 123456789);
+}
+
+static batch_control *allocate_batch_control(grpc_call *call,
+                                             const grpc_op *ops,
+                                             size_t num_ops) {
+  int slot = batch_slot_for_op(ops[0].op);
+  for (size_t i = 1; i < num_ops; i++) {
+    int op_slot = batch_slot_for_op(ops[i].op);
+    slot = GPR_MIN(slot, op_slot);
+  }
+  batch_control *bctl = &call->active_batches[slot];
+  if (bctl->call != NULL) {
+    return NULL;
   }
-  return NULL;
+  memset(bctl, 0, sizeof(*bctl));
+  bctl->call = call;
+  return bctl;
 }
 
 static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
                                     grpc_cq_completion *storage) {
   batch_control *bctl = user_data;
   grpc_call *call = bctl->call;
-  gpr_mu_lock(&call->mu);
-  call->used_batches = (uint8_t)(
-      call->used_batches & ~(uint8_t)(1 << (bctl - call->active_batches)));
-  gpr_mu_unlock(&call->mu);
+  bctl->call = NULL;
   GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
 }
 
@@ -1113,12 +1134,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
 
   if (bctl->is_notify_tag_closure) {
     /* unrefs bctl->error */
+    bctl->call = NULL;
     grpc_closure_run(exec_ctx, bctl->notify_tag, error);
-    gpr_mu_lock(&call->mu);
-    bctl->call->used_batches =
-        (uint8_t)(bctl->call->used_batches &
-                  ~(uint8_t)(1 << (bctl - bctl->call->active_batches)));
-    gpr_mu_unlock(&call->mu);
     GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
   } else {
     /* unrefs bctl->error */
@@ -1330,6 +1347,11 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
   finish_batch_step(exec_ctx, bctl);
 }
 
+static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p,
+                                  grpc_cq_completion *completion) {
+  gpr_free(completion);
+}
+
 static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
                                         grpc_call *call, const grpc_op *ops,
                                         size_t nops, void *notify_tag,
@@ -1344,32 +1366,34 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
   grpc_metadata compression_md;
 
   GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
-
   GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
 
-  /* TODO(ctiller): this feels like it could be made lock-free */
-  gpr_mu_lock(&call->mu);
-  bctl = allocate_batch_control(call);
-  memset(bctl, 0, sizeof(*bctl));
-  bctl->call = call;
-  bctl->notify_tag = notify_tag;
-  bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
-
-  grpc_transport_stream_op *stream_op = &bctl->op;
-  memset(stream_op, 0, sizeof(*stream_op));
-  stream_op->covered_by_poller = true;
-
   if (nops == 0) {
-    GRPC_CALL_INTERNAL_REF(call, "completion");
     if (!is_notify_tag_closure) {
       grpc_cq_begin_op(call->cq, notify_tag);
+      grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
+                     free_no_op_completion, NULL,
+                     gpr_malloc(sizeof(grpc_cq_completion)));
+    } else {
+      grpc_closure_sched(exec_ctx, notify_tag, GRPC_ERROR_NONE);
     }
-    gpr_mu_unlock(&call->mu);
-    post_batch_completion(exec_ctx, bctl);
     error = GRPC_CALL_OK;
     goto done;
   }
 
+  /* TODO(ctiller): this feels like it could be made lock-free */
+  bctl = allocate_batch_control(call, ops, nops);
+  if (bctl == NULL) {
+    return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
+  }
+  bctl->notify_tag = notify_tag;
+  bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
+
+  gpr_mu_lock(&call->mu);
+  grpc_transport_stream_op *stream_op = &bctl->op;
+  memset(stream_op, 0, sizeof(*stream_op));
+  stream_op->covered_by_poller = true;
+
   /* rewrite batch ops into a transport op */
   for (i = 0; i < nops; i++) {
     op = &ops[i];
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 26d93faf75287e6d91ece853118257a5ad8426a4..e602f39aeb09d61eb5dc68b7ff4b420fa25421f3 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -43,7 +43,6 @@ _USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
 
 _EMPTY_FLAGS = 0
 _INFINITE_FUTURE = cygrpc.Timespec(float('+inf'))
-_EMPTY_METADATA = cygrpc.Metadata(())
 
 _UNARY_UNARY_INITIAL_DUE = (cygrpc.OperationType.send_initial_metadata,
                             cygrpc.OperationType.send_message,
@@ -138,8 +137,8 @@ def _abort(state, code, details):
         state.code = code
         state.details = details
         if state.initial_metadata is None:
-            state.initial_metadata = _EMPTY_METADATA
-        state.trailing_metadata = _EMPTY_METADATA
+            state.initial_metadata = _common.EMPTY_METADATA
+        state.trailing_metadata = _common.EMPTY_METADATA
 
 
 def _handle_event(event, state, response_deserializer):
@@ -435,7 +434,7 @@ def _start_unary_request(request, timeout, request_serializer):
     deadline, deadline_timespec = _deadline(timeout)
     serialized_request = _common.serialize(request, request_serializer)
     if serialized_request is None:
-        state = _RPCState((), _EMPTY_METADATA, _EMPTY_METADATA,
+        state = _RPCState((), _common.EMPTY_METADATA, _common.EMPTY_METADATA,
                           grpc.StatusCode.INTERNAL,
                           'Exception serializing request!')
         rendezvous = _Rendezvous(state, None, None, deadline)
diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py
index f9accd75a9f4698561a80457e173cf05cb994a9f..6879e1780baabfbe778a5d71973024c6c9bc2831 100644
--- a/src/python/grpcio/grpc/_common.py
+++ b/src/python/grpcio/grpc/_common.py
@@ -37,7 +37,7 @@ import six
 import grpc
 from grpc._cython import cygrpc
 
-_EMPTY_METADATA = cygrpc.Metadata(())
+EMPTY_METADATA = cygrpc.Metadata(())
 
 CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
     cygrpc.ConnectivityState.idle:
@@ -107,7 +107,7 @@ def channel_args(options):
 
 
 def cygrpc_metadata(application_metadata):
-    return _EMPTY_METADATA if application_metadata is None else cygrpc.Metadata(
+    return EMPTY_METADATA if application_metadata is None else cygrpc.Metadata(
         cygrpc.Metadatum(encode(key), encode(value))
         for key, value in application_metadata)
 
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index bf2743c16b9d840396b26dc09ed1d0099afa6c54..b8e7ea17f708b3252514e51174e125343505ed1c 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -57,7 +57,6 @@ _CLOSED = 'closed'
 _CANCELLED = 'cancelled'
 
 _EMPTY_FLAGS = 0
-_EMPTY_METADATA = cygrpc.Metadata(())
 
 _UNEXPECTED_EXIT_SERVER_GRACE = 1.0
 
@@ -143,7 +142,7 @@ def _abort(state, call, code, details):
         effective_details = details if state.details is None else state.details
         if state.initial_metadata_allowed:
             operations = (cygrpc.operation_send_initial_metadata(
-                _EMPTY_METADATA, _EMPTY_FLAGS),
+                _common.EMPTY_METADATA, _EMPTY_FLAGS),
                           cygrpc.operation_send_status_from_server(
                               _common.cygrpc_metadata(state.trailing_metadata),
                               effective_code, effective_details, _EMPTY_FLAGS),)
@@ -416,7 +415,7 @@ def _send_response(rpc_event, state, serialized_response):
         else:
             if state.initial_metadata_allowed:
                 operations = (cygrpc.operation_send_initial_metadata(
-                    _EMPTY_METADATA, _EMPTY_FLAGS),
+                    _common.EMPTY_METADATA, _EMPTY_FLAGS),
                               cygrpc.operation_send_message(serialized_response,
                                                             _EMPTY_FLAGS),)
                 state.initial_metadata_allowed = False
@@ -446,8 +445,8 @@ def _status(rpc_event, state, serialized_response):
             ]
             if state.initial_metadata_allowed:
                 operations.append(
-                    cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
-                                                           _EMPTY_FLAGS))
+                    cygrpc.operation_send_initial_metadata(
+                        _common.EMPTY_METADATA, _EMPTY_FLAGS))
             if serialized_response is not None:
                 operations.append(
                     cygrpc.operation_send_message(serialized_response,
@@ -549,12 +548,12 @@ def _find_method_handler(rpc_event, generic_handlers):
 
 
 def _handle_unrecognized_method(rpc_event):
-    operations = (
-        cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, _EMPTY_FLAGS),
-        cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
-        cygrpc.operation_send_status_from_server(
-            _EMPTY_METADATA, cygrpc.StatusCode.unimplemented,
-            b'Method not found!', _EMPTY_FLAGS),)
+    operations = (cygrpc.operation_send_initial_metadata(_common.EMPTY_METADATA,
+                                                         _EMPTY_FLAGS),
+                  cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
+                  cygrpc.operation_send_status_from_server(
+                      _common.EMPTY_METADATA, cygrpc.StatusCode.unimplemented,
+                      b'Method not found!', _EMPTY_FLAGS),)
     rpc_state = _RPCState()
     rpc_event.operation_call.start_server_batch(
         operations, lambda ignored_event: (rpc_state, (),))
diff --git a/test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-5171071900712960 b/test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-5171071900712960
new file mode 100644
index 0000000000000000000000000000000000000000..70ca3a336e32e06e4c7fea6a0fed57c9d7e5eb2b
Binary files /dev/null and b/test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-5171071900712960 differ
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index ddc7424555d39617b1d55b1cdcab7b1d9cff2b1a..ea90dc66c8316a89497ce91b1c4e17fa7e4cc7ce 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -79725,6 +79725,28 @@
     ], 
     "uses_polling": false
   }, 
+  {
+    "args": [
+      "test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-5171071900712960"
+    ], 
+    "ci_platforms": [
+      "linux"
+    ], 
+    "cpu_cost": 0.1, 
+    "exclude_configs": [
+      "tsan"
+    ], 
+    "exclude_iomgrs": [
+      "uv"
+    ], 
+    "flaky": false, 
+    "language": "c", 
+    "name": "api_fuzzer_one_entry", 
+    "platforms": [
+      "linux"
+    ], 
+    "uses_polling": false
+  }, 
   {
     "args": [
       "test/core/end2end/fuzzers/api_fuzzer_corpus/crash-0597bbdd657fa4ed14443994c9147a1a7bbc205f"