Skip to content
Snippets Groups Projects
Commit 8268155c authored by Craig Tiller's avatar Craig Tiller
Browse files

Initial cut of slice interning

parent bd39e23f
No related branches found
No related tags found
No related merge requests found
...@@ -79,7 +79,7 @@ GPRAPI grpc_slice grpc_slice_malloc(size_t length); ...@@ -79,7 +79,7 @@ GPRAPI grpc_slice grpc_slice_malloc(size_t length);
/* Intern a slice: /* Intern a slice:
The return value for two invocations of this function with the same sequence The return value for two invocations of this function with the same sequence
of bytes is a slice which points to the same memory */ of bytes is a slice which points to the same memory. */
GPRAPI grpc_slice grpc_slice_intern(grpc_slice slice); GPRAPI grpc_slice grpc_slice_intern(grpc_slice slice);
/* Create a slice by copying a string. /* Create a slice by copying a string.
......
...@@ -32,20 +32,31 @@ ...@@ -32,20 +32,31 @@
*/ */
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/iomgr/iomgr_internal.h" /* for iomgr_abort_on_leaks() */
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/murmur_hash.h"
#include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/static_metadata.h"
#define LOG2_SHARD_COUNT 5 #define LOG2_SHARD_COUNT 5
#define SHARD_COUNT (1 << LOG2_SHARD_COUNT) #define SHARD_COUNT (1 << LOG2_SHARD_COUNT)
#define INITIAL_SHARD_CAPACITY 8
#define TABLE_IDX(hash, capacity) (((hash) >> LOG2_SHARD_COUNT) % (capacity)) #define TABLE_IDX(hash, capacity) (((hash) >> LOG2_SHARD_COUNT) % (capacity))
#define SHARD_IDX(hash) ((hash) & ((1 << LOG2_SHARD_COUNT) - 1)) #define SHARD_IDX(hash) ((hash) & ((1 << LOG2_SHARD_COUNT) - 1))
typedef struct interned_slice_refcount { typedef struct interned_slice_refcount {
grpc_slice_refcount base; grpc_slice_refcount base;
uint32_t hash; size_t length;
gpr_atm refcnt; gpr_atm refcnt;
uint32_t hash;
struct interned_slice_refcount *bucket_next; struct interned_slice_refcount *bucket_next;
grpc_slice_refcount *source_refcount;
} interned_slice_refcount; } interned_slice_refcount;
typedef struct slice_shard { typedef struct slice_shard {
...@@ -55,29 +66,89 @@ typedef struct slice_shard { ...@@ -55,29 +66,89 @@ typedef struct slice_shard {
size_t capacity; size_t capacity;
} slice_shard; } slice_shard;
#define REFCOUNT_TO_SLICE(rc) (*(grpc_slice *)((rc) + 1))
/* hash seed: decided at initialization time */ /* hash seed: decided at initialization time */
static uint32_t g_hash_seed; static uint32_t g_hash_seed;
static int g_forced_hash_seed = 0; static int g_forced_hash_seed = 0;
static slice_shard g_shards[SHARD_COUNT]; static slice_shard g_shards[SHARD_COUNT];
/* linearly probed hash tables for static string lookup */ static void interned_slice_ref(void *p) {
static grpc_slice g_static_slices[GRPC_STATIC_MDSTR_COUNT * 2]; interned_slice_refcount *s = p;
GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&s->refcnt, 1) > 0);
}
static void interned_slice_destroy(interned_slice_refcount *s) {
slice_shard *shard = &g_shards[SHARD_IDX(s->hash)];
gpr_mu_lock(&shard->mu);
GPR_ASSERT(0 == gpr_atm_no_barrier_load(&s->refcnt));
interned_slice_refcount **prev_next;
interned_slice_refcount *cur;
for (prev_next = &shard->strs[TABLE_IDX(s->hash, shard->capacity)],
cur = *prev_next;
cur != s; prev_next = &cur->bucket_next, cur = cur->bucket_next)
;
*prev_next = cur->bucket_next;
shard->count--;
gpr_free(s);
gpr_mu_unlock(&shard->mu);
}
static void interned_slice_unref(grpc_exec_ctx *exec_ctx, void *p) {
interned_slice_refcount *s = p;
if (1 == gpr_atm_full_fetch_add(&s->refcnt, -1)) {
interned_slice_destroy(s);
}
}
static void grow_shard(slice_shard *shard) {
size_t capacity = shard->capacity * 2;
size_t i;
interned_slice_refcount **strtab;
interned_slice_refcount *s, *next;
GPR_TIMER_BEGIN("grow_strtab", 0);
strtab = gpr_malloc(sizeof(interned_slice_refcount *) * capacity);
memset(strtab, 0, sizeof(interned_slice_refcount *) * capacity);
for (i = 0; i < shard->capacity; i++) {
for (s = shard->strs[i]; s; s = next) {
size_t idx = TABLE_IDX(s->hash, capacity);
next = s->bucket_next;
s->bucket_next = strtab[idx];
strtab[idx] = s;
}
}
gpr_free(shard->strs);
shard->strs = strtab;
shard->capacity = capacity;
GPR_TIMER_END("grow_strtab", 0);
}
static grpc_slice materialize(interned_slice_refcount *s) {
grpc_slice slice;
slice.refcount = &s->base;
slice.data.refcounted.bytes = (uint8_t *)(s + 1);
slice.data.refcounted.length = s->length;
return slice;
}
grpc_slice grpc_slice_intern(grpc_slice slice) { grpc_slice grpc_slice_intern(grpc_slice slice) {
interned_slice_refcount *s;
uint32_t hash = uint32_t hash =
gpr_murmur_hash3(GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice)); gpr_murmur_hash3(GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice),
g_forced_hash_seed);
slice_shard *shard = &g_shards[SHARD_IDX(hash)]; slice_shard *shard = &g_shards[SHARD_IDX(hash)];
gpr_mu_lock(&shard->mu); gpr_mu_lock(&shard->mu);
/* search for an existing string */ /* search for an existing string */
size_t idx = TABLE_IDX(hash, shard->capacity); size_t idx = TABLE_IDX(hash, shard->capacity);
for (interned_slice_refcount *s = shard->strs[idx]; s; s = s->bucket_next) { for (s = shard->strs[idx]; s; s = s->bucket_next) {
if (s->hash == hash && grpc_slice_cmp(slice, REFCOUNT_TO_SLICE(s))) { if (s->hash == hash && grpc_slice_cmp(slice, materialize(s))) {
if (gpr_atm_full_fetch_add(&s->refcnt, 1) == 0) { if (gpr_atm_no_barrier_fetch_add(&s->refcnt, 1) == 0) {
/* If we get here, we've added a ref to something that was about to /* If we get here, we've added a ref to something that was about to
* die - drop it immediately. * die - drop it immediately.
* The *only* possible path here (given the shard mutex) should be to * The *only* possible path here (given the shard mutex) should be to
...@@ -87,45 +158,71 @@ grpc_slice grpc_slice_intern(grpc_slice slice) { ...@@ -87,45 +158,71 @@ grpc_slice grpc_slice_intern(grpc_slice slice) {
} else { } else {
gpr_mu_unlock(&shard->mu); gpr_mu_unlock(&shard->mu);
GPR_TIMER_END("grpc_mdstr_from_buffer", 0); GPR_TIMER_END("grpc_mdstr_from_buffer", 0);
return REFCOUNT_TO_SLICE(s); return materialize(s);
} }
} }
} }
/* not found: create a new string */ /* not found: create a new string */
if (length + 1 < GRPC_SLICE_INLINED_SIZE) { /* string data goes after the internal_string header */
/* string data goes directly into the slice */ s = gpr_malloc(sizeof(*s) + GRPC_SLICE_LENGTH(slice));
s = gpr_malloc(sizeof(internal_string)); gpr_atm_rel_store(&s->refcnt, 1);
gpr_atm_rel_store(&s->refcnt, 1); s->length = GRPC_SLICE_LENGTH(slice);
s->slice.refcount = NULL;
memcpy(s->slice.data.inlined.bytes, buf, length);
s->slice.data.inlined.bytes[length] = 0;
s->slice.data.inlined.length = (uint8_t)length;
} else {
/* string data goes after the internal_string header, and we +1 for null
terminator */
s = gpr_malloc(sizeof(internal_string) + length + 1);
gpr_atm_rel_store(&s->refcnt, 1);
s->refcount.ref = slice_ref;
s->refcount.unref = slice_unref;
s->slice.refcount = &s->refcount;
s->slice.data.refcounted.bytes = (uint8_t *)(s + 1);
s->slice.data.refcounted.length = length;
memcpy(s->slice.data.refcounted.bytes, buf, length);
/* add a null terminator for cheap c string conversion when desired */
s->slice.data.refcounted.bytes[length] = 0;
}
s->has_base64_and_huffman_encoded = 0;
s->hash = hash; s->hash = hash;
s->size_in_decoder_table = SIZE_IN_DECODER_TABLE_NOT_SET; s->base.ref = interned_slice_ref;
s->base.unref = interned_slice_unref;
s->bucket_next = shard->strs[idx]; s->bucket_next = shard->strs[idx];
shard->strs[idx] = s; shard->strs[idx] = s;
memcpy(s + 1, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice));
shard->count++; shard->count++;
if (shard->count > shard->capacity * 2) { if (shard->count > shard->capacity * 2) {
grow_strtab(shard); grow_shard(shard);
} }
gpr_mu_unlock(&shard->mu); gpr_mu_unlock(&shard->mu);
return materialize(s);
}
void grpc_test_only_set_slice_interning_hash_seed(uint32_t seed) {
g_hash_seed = seed;
g_forced_hash_seed = 1;
}
void grpc_slice_intern_init(void) {
for (size_t i = 0; i < SHARD_COUNT; i++) {
slice_shard *shard = &g_shards[i];
gpr_mu_init(&shard->mu);
shard->count = 0;
shard->capacity = INITIAL_SHARD_CAPACITY;
shard->strs = gpr_malloc(sizeof(*shard->strs) * shard->capacity);
memset(shard->strs, 0, sizeof(*shard->strs) * shard->capacity);
}
}
void grpc_slice_intern_shutdown(void) {
for (size_t i = 0; i < SHARD_COUNT; i++) {
slice_shard *shard = &g_shards[i];
gpr_mu_destroy(&shard->mu);
/* TODO(ctiller): GPR_ASSERT(shard->count == 0); */
if (shard->count != 0) {
gpr_log(GPR_DEBUG, "WARNING: %" PRIuPTR " metadata strings were leaked",
shard->count);
for (size_t j = 0; j < shard->capacity; j++) {
for (interned_slice_refcount *s = shard->strs[j]; s;
s = s->bucket_next) {
char *text =
grpc_dump_slice(materialize(s), GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "LEAKED: %s", text);
gpr_free(text);
}
}
if (grpc_iomgr_abort_on_leaks()) {
abort();
}
}
gpr_free(shard->strs);
}
} }
...@@ -48,5 +48,6 @@ void grpc_slice_buffer_destroy_internal(grpc_exec_ctx *exec_ctx, ...@@ -48,5 +48,6 @@ void grpc_slice_buffer_destroy_internal(grpc_exec_ctx *exec_ctx,
void grpc_slice_intern_init(void); void grpc_slice_intern_init(void);
void grpc_slice_intern_shutdown(void); void grpc_slice_intern_shutdown(void);
void grpc_test_only_set_slice_interning_hash_seed(uint32_t key);
#endif #endif
...@@ -40,6 +40,8 @@ ...@@ -40,6 +40,8 @@
#include <grpc/slice_buffer.h> #include <grpc/slice_buffer.h>
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "src/core/lib/support/string.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment