Skip to content
Snippets Groups Projects
Commit 4bf41d89 authored by Alistair Veitch's avatar Alistair Veitch
Browse files

Merge pull request #387 from maxwell-demon/census_active_ops

V0 implementation of census_get_active_ops().
parents 15867f19 2bfbfe8b
No related branches found
No related tags found
No related merge requests found
...@@ -141,7 +141,7 @@ static void record_stats(census_ht* store, census_op_id op_id, ...@@ -141,7 +141,7 @@ static void record_stats(census_ht* store, census_op_id op_id,
const census_rpc_stats* stats) { const census_rpc_stats* stats) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
if (store != NULL) { if (store != NULL) {
trace_obj* trace = NULL; census_trace_obj* trace = NULL;
census_internal_lock_trace_store(); census_internal_lock_trace_store();
trace = census_get_trace_obj_locked(op_id); trace = census_get_trace_obj_locked(op_id);
if (trace != NULL) { if (trace != NULL) {
......
...@@ -32,38 +32,22 @@ ...@@ -32,38 +32,22 @@
*/ */
#include "src/core/statistics/census_interface.h" #include "src/core/statistics/census_interface.h"
#include "src/core/statistics/census_tracing.h"
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include "src/core/statistics/census_rpc_stats.h"
#include "src/core/statistics/hash_table.h" #include "src/core/statistics/hash_table.h"
#include "src/core/support/string.h" #include "src/core/support/string.h"
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include <grpc/support/time.h>
void census_trace_obj_destroy(census_trace_obj* obj) {
/* Struct for a trace annotation. */ census_trace_annotation* p = obj->annotations;
typedef struct annotation {
gpr_timespec ts; /* timestamp of the annotation */
char txt[CENSUS_MAX_ANNOTATION_LENGTH + 1]; /* actual txt annotation */
struct annotation* next;
} annotation;
typedef struct trace_obj {
census_op_id id;
gpr_timespec ts;
census_rpc_stats rpc_stats;
char* method;
annotation* annotations;
} trace_obj;
static void trace_obj_destroy(trace_obj* obj) {
annotation* p = obj->annotations;
while (p != NULL) { while (p != NULL) {
annotation* next = p->next; census_trace_annotation* next = p->next;
gpr_free(p); gpr_free(p);
p = next; p = next;
} }
...@@ -71,7 +55,9 @@ static void trace_obj_destroy(trace_obj* obj) { ...@@ -71,7 +55,9 @@ static void trace_obj_destroy(trace_obj* obj) {
gpr_free(obj); gpr_free(obj);
} }
static void delete_trace_obj(void* obj) { trace_obj_destroy((trace_obj*)obj); } static void delete_trace_obj(void* obj) {
census_trace_obj_destroy((census_trace_obj*)obj);
}
static const census_ht_option ht_opt = { static const census_ht_option ht_opt = {
CENSUS_HT_UINT64 /* key type*/, 571 /* n_of_buckets */, NULL /* hash */, CENSUS_HT_UINT64 /* key type*/, 571 /* n_of_buckets */, NULL /* hash */,
...@@ -103,8 +89,8 @@ static void init_mutex_once(void) { ...@@ -103,8 +89,8 @@ static void init_mutex_once(void) {
census_op_id census_tracing_start_op(void) { census_op_id census_tracing_start_op(void) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
{ {
trace_obj* ret = (trace_obj*)gpr_malloc(sizeof(trace_obj)); census_trace_obj* ret = gpr_malloc(sizeof(census_trace_obj));
memset(ret, 0, sizeof(trace_obj)); memset(ret, 0, sizeof(census_trace_obj));
g_id++; g_id++;
memcpy(&ret->id, &g_id, sizeof(census_op_id)); memcpy(&ret->id, &g_id, sizeof(census_op_id));
ret->rpc_stats.cnt = 1; ret->rpc_stats.cnt = 1;
...@@ -118,7 +104,7 @@ census_op_id census_tracing_start_op(void) { ...@@ -118,7 +104,7 @@ census_op_id census_tracing_start_op(void) {
int census_add_method_tag(census_op_id op_id, const char* method) { int census_add_method_tag(census_op_id op_id, const char* method) {
int ret = 0; int ret = 0;
trace_obj* trace = NULL; census_trace_obj* trace = NULL;
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
trace = census_ht_find(g_trace_store, op_id_as_key(&op_id)); trace = census_ht_find(g_trace_store, op_id_as_key(&op_id));
if (trace == NULL) { if (trace == NULL) {
...@@ -131,11 +117,11 @@ int census_add_method_tag(census_op_id op_id, const char* method) { ...@@ -131,11 +117,11 @@ int census_add_method_tag(census_op_id op_id, const char* method) {
} }
void census_tracing_print(census_op_id op_id, const char* anno_txt) { void census_tracing_print(census_op_id op_id, const char* anno_txt) {
trace_obj* trace = NULL; census_trace_obj* trace = NULL;
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
trace = census_ht_find(g_trace_store, op_id_as_key(&op_id)); trace = census_ht_find(g_trace_store, op_id_as_key(&op_id));
if (trace != NULL) { if (trace != NULL) {
annotation* anno = gpr_malloc(sizeof(annotation)); census_trace_annotation* anno = gpr_malloc(sizeof(census_trace_annotation));
anno->ts = gpr_now(); anno->ts = gpr_now();
{ {
char* d = anno->txt; char* d = anno->txt;
...@@ -153,7 +139,7 @@ void census_tracing_print(census_op_id op_id, const char* anno_txt) { ...@@ -153,7 +139,7 @@ void census_tracing_print(census_op_id op_id, const char* anno_txt) {
} }
void census_tracing_end_op(census_op_id op_id) { void census_tracing_end_op(census_op_id op_id) {
trace_obj* trace = NULL; census_trace_obj* trace = NULL;
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
trace = census_ht_find(g_trace_store, op_id_as_key(&op_id)); trace = census_ht_find(g_trace_store, op_id_as_key(&op_id));
if (trace != NULL) { if (trace != NULL) {
...@@ -196,14 +182,58 @@ void census_internal_lock_trace_store(void) { gpr_mu_lock(&g_mu); } ...@@ -196,14 +182,58 @@ void census_internal_lock_trace_store(void) { gpr_mu_lock(&g_mu); }
void census_internal_unlock_trace_store(void) { gpr_mu_unlock(&g_mu); } void census_internal_unlock_trace_store(void) { gpr_mu_unlock(&g_mu); }
trace_obj* census_get_trace_obj_locked(census_op_id op_id) { census_trace_obj* census_get_trace_obj_locked(census_op_id op_id) {
if (g_trace_store == NULL) { if (g_trace_store == NULL) {
gpr_log(GPR_ERROR, "Census trace store is not initialized."); gpr_log(GPR_ERROR, "Census trace store is not initialized.");
return NULL; return NULL;
} }
return (trace_obj*)census_ht_find(g_trace_store, op_id_as_key(&op_id)); return (census_trace_obj*)census_ht_find(g_trace_store, op_id_as_key(&op_id));
} }
const char* census_get_trace_method_name(const trace_obj* trace) { const char* census_get_trace_method_name(const census_trace_obj* trace) {
return (const char*)trace->method; return trace->method;
}
static census_trace_annotation* dup_annotation_chain(
census_trace_annotation* from) {
census_trace_annotation *ret = NULL;
census_trace_annotation **to = &ret;
for (; from != NULL; from = from->next) {
*to = gpr_malloc(sizeof(census_trace_annotation));
memcpy(*to, from, sizeof(census_trace_annotation));
to = &(*to)->next;
}
return ret;
}
static census_trace_obj* trace_obj_dup(census_trace_obj* from) {
census_trace_obj* to = NULL;
GPR_ASSERT(from != NULL);
to = gpr_malloc(sizeof(census_trace_obj));
to->id = from->id;
to->ts = from->ts;
to->rpc_stats = from->rpc_stats;
to->method = gpr_strdup(from->method);
to->annotations = dup_annotation_chain(from->annotations);
return to;
}
census_trace_obj** census_get_active_ops(int* num_active_ops) {
census_trace_obj** ret = NULL;
gpr_mu_lock(&g_mu);
if (g_trace_store != NULL) {
size_t n = 0;
census_ht_kv* all_kvs = census_ht_get_all_elements(g_trace_store, &n);
*num_active_ops = (int)n;
if (n != 0 ) {
size_t i = 0;
ret = gpr_malloc(sizeof(census_trace_obj *) * n);
for (i = 0; i < n; i++) {
ret[i] = trace_obj_dup((census_trace_obj*)all_kvs[i].v);
}
}
gpr_free(all_kvs);
}
gpr_mu_unlock(&g_mu);
return ret;
} }
...@@ -34,12 +34,35 @@ ...@@ -34,12 +34,35 @@
#ifndef __GRPC_INTERNAL_STATISTICS_CENSUS_TRACING_H_ #ifndef __GRPC_INTERNAL_STATISTICS_CENSUS_TRACING_H_
#define __GRPC_INTERNAL_STATISTICS_CENSUS_TRACING_H_ #define __GRPC_INTERNAL_STATISTICS_CENSUS_TRACING_H_
#include <grpc/support/time.h>
#include "src/core/statistics/census_rpc_stats.h"
/* WARNING: The data structures and APIs provided by this file are for GRPC
library's internal use ONLY. They might be changed in backward-incompatible
ways and are not subject to any deprecation policy.
They are not recommended for external use.
*/
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
/* Opaque structure for trace object */ /* Struct for a trace annotation. */
typedef struct trace_obj trace_obj; typedef struct census_trace_annotation {
gpr_timespec ts; /* timestamp of the annotation */
char txt[CENSUS_MAX_ANNOTATION_LENGTH + 1]; /* actual txt annotation */
struct census_trace_annotation* next;
} census_trace_annotation;
typedef struct census_trace_obj {
census_op_id id;
gpr_timespec ts;
census_rpc_stats rpc_stats;
char* method;
census_trace_annotation* annotations;
} census_trace_obj;
/* Deletes trace object. */
void census_trace_obj_destroy(census_trace_obj* obj);
/* Initializes trace store. This function is thread safe. */ /* Initializes trace store. This function is thread safe. */
void census_tracing_init(void); void census_tracing_init(void);
...@@ -50,15 +73,21 @@ void census_tracing_shutdown(void); ...@@ -50,15 +73,21 @@ void census_tracing_shutdown(void);
/* Gets trace obj corresponding to the input op_id. Returns NULL if trace store /* Gets trace obj corresponding to the input op_id. Returns NULL if trace store
is not initialized or trace obj is not found. Requires trace store being is not initialized or trace obj is not found. Requires trace store being
locked before calling this function. */ locked before calling this function. */
trace_obj* census_get_trace_obj_locked(census_op_id op_id); census_trace_obj* census_get_trace_obj_locked(census_op_id op_id);
/* The following two functions acquire and release the trace store global lock. /* The following two functions acquire and release the trace store global lock.
They are for census internal use only. */ They are for census internal use only. */
void census_internal_lock_trace_store(void); void census_internal_lock_trace_store(void);
void census_internal_unlock_trace_store(void); void census_internal_unlock_trace_store(void);
/* Gets method tag name associated with the input trace object. */ /* Gets method name associated with the input trace object. */
const char* census_get_trace_method_name(const trace_obj* trace); const char* census_get_trace_method_name(const census_trace_obj* trace);
/* Returns an array of pointers to trace objects of currently active operations
and fills in number of active operations. Returns NULL if there are no active
operations.
Caller owns the returned objects. */
census_trace_obj** census_get_active_ops(int* num_active_ops);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -32,10 +32,12 @@ ...@@ -32,10 +32,12 @@
*/ */
#include <string.h> #include <string.h>
#include <stdio.h>
#include "src/core/statistics/census_interface.h" #include "src/core/statistics/census_interface.h"
#include "src/core/statistics/census_tracing.h" #include "src/core/statistics/census_tracing.h"
#include "src/core/statistics/census_tracing.h" #include "src/core/statistics/census_tracing.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
...@@ -172,6 +174,74 @@ static void test_trace_print(void) { ...@@ -172,6 +174,74 @@ static void test_trace_print(void) {
census_tracing_shutdown(); census_tracing_shutdown();
} }
/* Returns 1 if two ids are equal, otherwise returns 0. */
static int ids_equal(census_op_id id1, census_op_id id2) {
return (id1.upper == id2.upper) && (id1.lower == id2.lower);
}
static void test_get_active_ops(void) {
census_op_id id_1, id_2, id_3;
census_trace_obj** active_ops;
const char* annotation_txt[] = {"annotation 1", "a2"};
int i = 0;
int n = 0;
gpr_log(GPR_INFO, "test_get_active_ops");
census_tracing_init();
/* No active ops before calling start_op(). */
active_ops = census_get_active_ops(&n);
GPR_ASSERT(active_ops == NULL);
GPR_ASSERT(n == 0);
/* Starts one op */
id_1 = census_tracing_start_op();
census_add_method_tag(id_1, "foo_1");
active_ops = census_get_active_ops(&n);
GPR_ASSERT(active_ops != NULL);
GPR_ASSERT(n == 1);
GPR_ASSERT(ids_equal(active_ops[0]->id, id_1));
census_trace_obj_destroy(active_ops[0]);
gpr_free(active_ops);
active_ops = NULL;
/* Start the second and the third ops */
id_2 = census_tracing_start_op();
census_add_method_tag(id_2, "foo_2");
id_3 = census_tracing_start_op();
census_add_method_tag(id_3, "foo_3");
active_ops = census_get_active_ops(&n);
GPR_ASSERT(n == 3);
for (i = 0; i < 3; i++) {
census_trace_obj_destroy(active_ops[i]);
}
gpr_free(active_ops);
active_ops = NULL;
/* End the second op and add annotations to the third ops*/
census_tracing_end_op(id_2);
census_tracing_print(id_3, annotation_txt[0]);
census_tracing_print(id_3, annotation_txt[1]);
active_ops = census_get_active_ops(&n);
GPR_ASSERT(active_ops != NULL);
GPR_ASSERT(n == 2);
for (i = 0; i < 2; i++) {
census_trace_obj_destroy(active_ops[i]);
}
gpr_free(active_ops);
active_ops = NULL;
/* End all ops. */
census_tracing_end_op(id_1);
census_tracing_end_op(id_3);
active_ops = census_get_active_ops(&n);
GPR_ASSERT(active_ops == NULL);
GPR_ASSERT(n == 0);
census_tracing_shutdown();
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_test_init(argc, argv); grpc_test_init(argc, argv);
test_init_shutdown(); test_init_shutdown();
...@@ -180,5 +250,6 @@ int main(int argc, char** argv) { ...@@ -180,5 +250,6 @@ int main(int argc, char** argv) {
test_concurrency(); test_concurrency();
test_add_method_tag_to_unknown_op_id(); test_add_method_tag_to_unknown_op_id();
test_trace_print(); test_trace_print();
test_get_active_ops();
return 0; return 0;
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment