diff --git a/src/core/lib/support/subprocess_posix.c b/src/core/lib/support/subprocess_posix.c index daf371d03eee45b8f02a32a93336eef8a2d72127..4247a1c12ba120b1ead4326a851a4fa895d0ca75 100644 --- a/src/core/lib/support/subprocess_posix.c +++ b/src/core/lib/support/subprocess_posix.c @@ -98,7 +98,8 @@ retry: if (errno == EINTR) { goto retry; } - gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno)); + gpr_log(GPR_ERROR, "waitpid failed for pid %d: %s", p->pid, + strerror(errno)); return -1; } p->joined = true; diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc index 106509ab83e9e82fcaaf570064cc68304ff72511..b7b2553f12d8d43607954f9479a7d5d4ea273df0 100644 --- a/test/cpp/qps/json_run_localhost.cc +++ b/test/cpp/qps/json_run_localhost.cc @@ -31,7 +31,11 @@ * */ +#include <signal.h> +#include <string.h> + #include <memory> +#include <mutex> #include <sstream> #include <string> @@ -43,6 +47,11 @@ using grpc::SubProcess; +constexpr auto kNumWorkers = 2; + +static SubProcess* g_driver; +static SubProcess* g_workers[kNumWorkers]; + template <class T> std::string as_string(const T& val) { std::ostringstream out; @@ -50,6 +59,24 @@ std::string as_string(const T& val) { return out.str(); } +static void sighandler(int sig) { + const int errno_saved = errno; + if (g_driver != NULL) g_driver->Interrupt(); + for (int i = 0; i < kNumWorkers; ++i) { + if (g_workers[i]) g_workers[i]->Interrupt(); + } + errno = errno_saved; +} + +static void register_sighandler() { + struct sigaction act; + memset(&act, 0, sizeof(act)); + act.sa_handler = sighandler; + + sigaction(SIGINT, &act, NULL); + sigaction(SIGTERM, &act, NULL); +} + static void LogStatus(int status, const char* label) { if (WIFEXITED(status)) { gpr_log(GPR_INFO, "%s: subprocess exited with status %d", label, @@ -63,8 +90,7 @@ static void LogStatus(int status, const char* label) { } int main(int argc, char** argv) { - typedef std::unique_ptr<SubProcess> SubProcessPtr; - std::vector<SubProcessPtr> jobs; + register_sighandler(); std::string my_bin = argv[0]; std::string bin_dir = my_bin.substr(0, my_bin.rfind('/')); @@ -72,11 +98,11 @@ int main(int argc, char** argv) { std::ostringstream env; bool first = true; - for (int i = 0; i < 2; i++) { - auto port = grpc_pick_unused_port_or_die(); + for (int i = 0; i < kNumWorkers; i++) { + const auto port = grpc_pick_unused_port_or_die(); std::vector<std::string> args = {bin_dir + "/qps_worker", "-driver_port", as_string(port)}; - jobs.emplace_back(new SubProcess(args)); + g_workers[i] = new SubProcess(args); if (!first) env << ","; env << "localhost:" << port; first = false; @@ -87,18 +113,27 @@ int main(int argc, char** argv) { for (int i = 1; i < argc; i++) { args.push_back(argv[i]); } - int status = SubProcess(args).Join(); - if (status != 0) { - LogStatus(status, "driver"); - } - for (auto it = jobs.begin(); it != jobs.end(); ++it) { - (*it)->Interrupt(); + g_driver = new SubProcess(args); + const int driver_join_status = g_driver->Join(); + if (driver_join_status != 0) { + LogStatus(driver_join_status, "driver"); } - for (auto it = jobs.begin(); it != jobs.end(); ++it) { - status = (*it)->Join(); - if (status != 0) { - LogStatus(status, "worker"); + for (int i = 0; i < kNumWorkers; ++i) { + if (g_workers[i]) g_workers[i]->Interrupt(); + } + + for (int i = 0; i < kNumWorkers; ++i) { + if (g_workers[i]) { + const int worker_status = g_workers[i]->Join(); + if (worker_status != 0) { + LogStatus(worker_status, "worker"); + } } } + + delete g_driver; + g_driver = NULL; + for (int i = 0; i < kNumWorkers; ++i) delete g_workers[i]; + GPR_ASSERT(driver_join_status == 0); }