Skip to content
Snippets Groups Projects
Commit 1f0c8271 authored by Sree Kuchibhotla's avatar Sree Kuchibhotla
Browse files

Fix asan and tsan bugs. Simplify the code

parent 93b60e05
No related branches found
No related tags found
No related merge requests found
...@@ -36,6 +36,8 @@ ...@@ -36,6 +36,8 @@
#include <atomic> #include <atomic>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "test/cpp/microbenchmarks/helpers.h" #include "test/cpp/microbenchmarks/helpers.h"
extern "C" { extern "C" {
...@@ -55,8 +57,6 @@ static void* g_tag = (void*)(intptr_t)10; // Some random number ...@@ -55,8 +57,6 @@ static void* g_tag = (void*)(intptr_t)10; // Some random number
static grpc_completion_queue* g_cq; static grpc_completion_queue* g_cq;
static grpc_event_engine_vtable g_vtable; static grpc_event_engine_vtable g_vtable;
static __thread grpc_cq_completion g_cq_completion;
static void pollset_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, static void pollset_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* ps,
grpc_closure* closure) { grpc_closure* closure) {
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
...@@ -75,15 +75,18 @@ static grpc_error* pollset_kick(grpc_pollset* p, grpc_pollset_worker* worker) { ...@@ -75,15 +75,18 @@ static grpc_error* pollset_kick(grpc_pollset* p, grpc_pollset_worker* worker) {
/* Callback when the tag is dequeued from the completion queue. Does nothing */ /* Callback when the tag is dequeued from the completion queue. Does nothing */
static void cq_done_cb(grpc_exec_ctx* exec_ctx, void* done_arg, static void cq_done_cb(grpc_exec_ctx* exec_ctx, void* done_arg,
grpc_cq_completion* cq_completion) {} grpc_cq_completion* cq_completion) {
gpr_free(cq_completion);
}
/* Queues a completion tag. ZERO polling overhead */ /* Queues a completion tag. ZERO polling overhead */
static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps,
grpc_pollset_worker** worker, gpr_timespec now, grpc_pollset_worker** worker, gpr_timespec now,
gpr_timespec deadline) { gpr_timespec deadline) {
gpr_mu_unlock(&ps->mu); gpr_mu_unlock(&ps->mu);
grpc_cq_begin_op(g_cq, g_tag);
grpc_cq_end_op(exec_ctx, g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, NULL, grpc_cq_end_op(exec_ctx, g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, NULL,
&g_cq_completion); (grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion)));
grpc_exec_ctx_flush(exec_ctx); grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&ps->mu); gpr_mu_lock(&ps->mu);
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
...@@ -113,7 +116,7 @@ static void teardown() { ...@@ -113,7 +116,7 @@ static void teardown() {
grpc_completion_queue_destroy(g_cq); grpc_completion_queue_destroy(g_cq);
} }
/* A few notes about Multi-theaded benchmarks: /* A few notes about Multi-threaded benchmarks:
Setup: Setup:
The benchmark framework ensures that none of the threads proceed beyond the The benchmark framework ensures that none of the threads proceed beyond the
...@@ -136,11 +139,8 @@ static void BM_Cq_Throughput(benchmark::State& state) { ...@@ -136,11 +139,8 @@ static void BM_Cq_Throughput(benchmark::State& state) {
} }
while (state.KeepRunning()) { while (state.KeepRunning()) {
grpc_cq_begin_op(g_cq, g_tag); GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, NULL).type ==
GRPC_OP_COMPLETE);
/* Note that the tag dequeued by the following might have been enqueued
by another thread. */
grpc_completion_queue_next(g_cq, deadline, NULL);
} }
state.SetItemsProcessed(state.iterations()); state.SetItemsProcessed(state.iterations());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment