Skip to content
Snippets Groups Projects
Commit 1ae439e3 authored by Yash Tibrewal's avatar Yash Tibrewal
Browse files

Reviewer comments

parent d4d9e459
No related branches found
No related tags found
No related merge requests found
...@@ -25,24 +25,16 @@ namespace grpc_core { ...@@ -25,24 +25,16 @@ namespace grpc_core {
DebugOnlyTraceFlag grpc_logical_thread_trace(false, "logical_thread"); DebugOnlyTraceFlag grpc_logical_thread_trace(false, "logical_thread");
struct CallbackWrapper { struct CallbackWrapper {
#ifndef NDEBUG
CallbackWrapper(std::function<void()> cb, const grpc_core::DebugLocation& loc) CallbackWrapper(std::function<void()> cb, const grpc_core::DebugLocation& loc)
: callback(std::move(cb)), location(loc) {} : callback(std::move(cb)), location(loc) {}
#else
explicit CallbackWrapper(std::function<void()> cb)
: callback(std::move(cb)) {}
#endif
MultiProducerSingleConsumerQueue::Node mpscq_node; MultiProducerSingleConsumerQueue::Node mpscq_node;
const std::function<void()> callback; const std::function<void()> callback;
#ifndef NDEBUG
const DebugLocation location; const DebugLocation location;
#endif
}; };
void LogicalThread::Run(std::function<void()> callback, void LogicalThread::Run(std::function<void()> callback,
const grpc_core::DebugLocation& location) { const grpc_core::DebugLocation& location) {
(void)location;
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO, "LogicalThread::Run() %p Scheduling callback [%s:%d]", gpr_log(GPR_INFO, "LogicalThread::Run() %p Scheduling callback [%s:%d]",
this, location.file(), location.line()); this, location.file(), location.line());
...@@ -58,19 +50,14 @@ void LogicalThread::Run(std::function<void()> callback, ...@@ -58,19 +50,14 @@ void LogicalThread::Run(std::function<void()> callback,
// Loan this thread to the logical thread and drain the queue. // Loan this thread to the logical thread and drain the queue.
DrainQueue(); DrainQueue();
} else { } else {
#ifndef NDEBUG
CallbackWrapper* cb_wrapper = CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location); new CallbackWrapper(std::move(callback), location);
#else
CallbackWrapper* cb_wrapper = new CallbackWrapper(std::move(callback));
#endif
// There already are closures executing on this logical thread. Simply add // There already are closures executing on this logical thread. Simply add
// this closure to the queue. // this closure to the queue.
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO, " Scheduling on queue : item %p", cb_wrapper); gpr_log(GPR_INFO, " Scheduling on queue : item %p", cb_wrapper);
} }
queue_.Push( queue_.Push(&cb_wrapper->mpscq_node);
reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(cb_wrapper));
} }
} }
...@@ -101,17 +88,14 @@ void LogicalThread::DrainQueue() { ...@@ -101,17 +88,14 @@ void LogicalThread::DrainQueue() {
// This can happen either due to a race condition within the mpscq // This can happen either due to a race condition within the mpscq
// implementation or because of a race with Run() // implementation or because of a race with Run()
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
// TODO(yashykt) : The combiner mechanism offloads execution to the
// executor at this point. Figure out if we should replicate that
// behavior here.
gpr_log(GPR_INFO, " Queue returned nullptr, trying again"); gpr_log(GPR_INFO, " Queue returned nullptr, trying again");
} }
} }
#ifndef NDEBUG #ifndef NDEBUG
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO, " Running item %p : callback scheduled at [%s:%d]",
" Running item %p : callback scheduled at [%s:%d]", cb_wrapper, cb_wrapper, cb_wrapper->location.file(),
cb_wrapper->location.file(), cb_wrapper->location.line()); cb_wrapper->location.line());
} }
#endif #endif
cb_wrapper->callback(); cb_wrapper->callback();
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
* *
*/ */
#include <memory>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
...@@ -37,59 +39,73 @@ TEST(LogicalThreadTest, ExecuteOne) { ...@@ -37,59 +39,73 @@ TEST(LogicalThreadTest, ExecuteOne) {
gpr_event done; gpr_event done;
gpr_event_init(&done); gpr_event_init(&done);
lock->Run([&done]() { gpr_event_set(&done, (void*)1); }, DEBUG_LOCATION); lock->Run([&done]() { gpr_event_set(&done, (void*)1); }, DEBUG_LOCATION);
GPR_ASSERT(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) != EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
nullptr); nullptr);
} }
typedef struct { struct ThreadArgs {
size_t ctr; size_t counter;
grpc_core::RefCountedPtr<grpc_core::LogicalThread> lock; grpc_core::RefCountedPtr<grpc_core::LogicalThread> lock;
gpr_event done; gpr_event done;
} thd_args; };
typedef struct { struct ExecutionArgs {
size_t* ctr; size_t* counter;
size_t value; size_t value;
} ex_args; };
class TestThread {
public:
explicit TestThread(grpc_core::RefCountedPtr<grpc_core::LogicalThread> lock)
: lock_(std::move(lock)),
thread_("grpc_execute_many", ExecuteManyLoop, this) {
gpr_event_init(&done_);
thread_.Start();
}
void execute_many_loop(void* a) { ~TestThread() {
thd_args* args = static_cast<thd_args*>(a); EXPECT_NE(gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)),
nullptr);
thread_.Join();
}
private:
static void ExecuteManyLoop(void* arg) {
TestThread* self = static_cast<TestThread*>(arg);
size_t n = 1; size_t n = 1;
for (size_t i = 0; i < 10; i++) { for (size_t i = 0; i < 10; i++) {
for (size_t j = 0; j < 10000; j++) { for (size_t j = 0; j < 10000; j++) {
ex_args* c = static_cast<ex_args*>(gpr_malloc(sizeof(*c))); ExecutionArgs* c = new ExecutionArgs;
c->ctr = &args->ctr; c->counter = &self->counter_;
c->value = n++; c->value = n++;
args->lock->Run( self->lock_->Run(
[c]() { [c]() {
GPR_ASSERT(*c->ctr == c->value - 1); EXPECT_TRUE(*c->counter == c->value - 1);
*c->ctr = c->value; *c->counter = c->value;
gpr_free(c); delete c;
}, },
DEBUG_LOCATION); DEBUG_LOCATION);
} }
// sleep for a little bit, to test other threads picking up the load // sleep for a little bit, to test other threads picking up the load
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100)); gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
} }
args->lock->Run([args]() { gpr_event_set(&args->done, (void*)1); }, self->lock_->Run([self]() { gpr_event_set(&self->done_, (void*)1); },
DEBUG_LOCATION); DEBUG_LOCATION);
} }
grpc_core::RefCountedPtr<grpc_core::LogicalThread> lock_;
grpc_core::Thread thread_;
size_t counter_ = 0;
gpr_event done_;
};
TEST(LogicalThreadTest, ExecuteMany) { TEST(LogicalThreadTest, ExecuteMany) {
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>(); auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
grpc_core::Thread thds[100]; {
thd_args ta[GPR_ARRAY_SIZE(thds)]; std::vector<std::unique_ptr<TestThread>> threads;
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { for (size_t i = 0; i < 100; ++i) {
ta[i].ctr = 0; threads.push_back(std::unique_ptr<TestThread>(new TestThread(lock)));
ta[i].lock = lock;
gpr_event_init(&ta[i].done);
thds[i] = grpc_core::Thread("grpc_execute_many", execute_many_loop, &ta[i]);
thds[i].Start();
} }
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
GPR_ASSERT(gpr_event_wait(&ta[i].done,
gpr_inf_future(GPR_CLOCK_REALTIME)) != nullptr);
thds[i].Join();
} }
} }
} // namespace } // namespace
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment