From b779a954b1f3b15ce4ec034e02f343c848012d31 Mon Sep 17 00:00:00 2001
From: Arjun Roy <arjunroy@google.com>
Date: Fri, 19 Apr 2019 17:59:42 -0700
Subject: [PATCH] Inlined md_ref/unref and saved some unnecessary ops

---
 src/core/lib/transport/metadata.cc | 449 ++++++++++++-----------------
 src/core/lib/transport/metadata.h  | 199 ++++++++++++-
 2 files changed, 380 insertions(+), 268 deletions(-)

diff --git a/src/core/lib/transport/metadata.cc b/src/core/lib/transport/metadata.cc
index b7e7fd40c0..03bd2056fb 100644
--- a/src/core/lib/transport/metadata.cc
+++ b/src/core/lib/transport/metadata.cc
@@ -41,6 +41,10 @@
 #include "src/core/lib/slice/slice_string_helpers.h"
 #include "src/core/lib/transport/static_metadata.h"
 
+using grpc_core::AllocatedMetadata;
+using grpc_core::InternedMetadata;
+using grpc_core::UserData;
+
 /* There are two kinds of mdelem and mdstr instances.
  * Static instances are declared in static_metadata.{h,c} and
  * are initialized by grpc_mdctx_global_init().
@@ -54,13 +58,40 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_metadata(false, "metadata");
 
 #ifndef NDEBUG
 #define DEBUG_ARGS , const char *file, int line
-#define FWD_DEBUG_ARGS , file, line
-#define REF_MD_LOCKED(shard, s) ref_md_locked((shard), (s), __FILE__, __LINE__)
-#else
+#define FWD_DEBUG_ARGS file, line
+
+void grpc_mdelem_trace_ref(void* md, const grpc_slice& key,
+                           const grpc_slice& value, intptr_t refcnt,
+                           const char* file, int line) {
+  if (grpc_trace_metadata.enabled()) {
+    char* key_str = grpc_slice_to_c_string(key);
+    char* value_str = grpc_slice_to_c_string(value);
+    gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+            "ELM   REF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'", md, refcnt,
+            refcnt + 1, key_str, value_str);
+    gpr_free(key_str);
+    gpr_free(value_str);
+  }
+}
+
+void grpc_mdelem_trace_unref(void* md, const grpc_slice& key,
+                             const grpc_slice& value, intptr_t refcnt,
+                             const char* file, int line) {
+  if (grpc_trace_metadata.enabled()) {
+    char* key_str = grpc_slice_to_c_string(key);
+    char* value_str = grpc_slice_to_c_string(value);
+    gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+            "ELM   UNREF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'", md,
+            refcnt, refcnt - 1, key_str, value_str);
+    gpr_free(key_str);
+    gpr_free(value_str);
+  }
+}
+
+#else  // ifndef NDEBUG
 #define DEBUG_ARGS
 #define FWD_DEBUG_ARGS
-#define REF_MD_LOCKED(shard, s) ref_md_locked((shard), (s))
-#endif
+#endif  // ifndef NDEBUG
 
 #define INITIAL_SHARD_CAPACITY 8
 #define LOG2_SHARD_COUNT 4
@@ -69,43 +100,87 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_metadata(false, "metadata");
 #define TABLE_IDX(hash, capacity) (((hash) >> (LOG2_SHARD_COUNT)) % (capacity))
 #define SHARD_IDX(hash) ((hash) & ((1 << (LOG2_SHARD_COUNT)) - 1))
 
-typedef void (*destroy_user_data_func)(void* user_data);
-
-struct UserData {
-  gpr_mu mu_user_data;
-  gpr_atm destroy_user_data;
-  gpr_atm user_data;
-};
-
-/* Shadow structure for grpc_mdelem_data for interned elements */
-typedef struct interned_metadata {
-  /* must be byte compatible with grpc_mdelem_data */
-  grpc_slice key;
-  grpc_slice value;
-
-  /* private only data */
-  gpr_atm refcnt;
-
-  UserData user_data;
+AllocatedMetadata::AllocatedMetadata(const grpc_slice& key,
+                                     const grpc_slice& value)
+    : key_(grpc_slice_ref_internal(key)),
+      value_(grpc_slice_ref_internal(value)),
+      refcnt_(1) {
+#ifndef NDEBUG
+  if (grpc_trace_metadata.enabled()) {
+    char* key_str = grpc_slice_to_c_string(key_);
+    char* value_str = grpc_slice_to_c_string(value_);
+    gpr_log(GPR_DEBUG, "ELM ALLOC:%p:%" PRIdPTR ": '%s' = '%s'", this,
+            RefValue(), key_str, value_str);
+    gpr_free(key_str);
+    gpr_free(value_str);
+  }
+#endif
+}
 
-  struct interned_metadata* bucket_next;
-} interned_metadata;
+AllocatedMetadata::~AllocatedMetadata() {
+  grpc_slice_unref_internal(key_);
+  grpc_slice_unref_internal(value_);
+  if (user_data_.user_data) {
+    destroy_user_data_func destroy_user_data =
+        (destroy_user_data_func)gpr_atm_no_barrier_load(
+            &user_data_.destroy_user_data);
+    destroy_user_data((void*)user_data_.user_data);
+  }
+}
 
-/* Shadow structure for grpc_mdelem_data for allocated elements */
-typedef struct allocated_metadata {
-  /* must be byte compatible with grpc_mdelem_data */
-  grpc_slice key;
-  grpc_slice value;
+InternedMetadata::InternedMetadata(const grpc_slice& key,
+                                   const grpc_slice& value, uint32_t hash,
+                                   InternedMetadata* next)
+    : key_(grpc_slice_ref_internal(key)),
+      value_(grpc_slice_ref_internal(value)),
+      refcnt_(1),
+      hash_(hash),
+      link_(next) {
+#ifndef NDEBUG
+  if (grpc_trace_metadata.enabled()) {
+    char* key_str = grpc_slice_to_c_string(key_);
+    char* value_str = grpc_slice_to_c_string(value_);
+    gpr_log(GPR_DEBUG, "ELM   NEW:%p:%" PRIdPTR ": '%s' = '%s'", this,
+            RefValue(), key_str, value_str);
+    gpr_free(key_str);
+    gpr_free(value_str);
+  }
+#endif
+}
 
-  /* private only data */
-  gpr_atm refcnt;
+InternedMetadata::~InternedMetadata() {
+  grpc_slice_unref_internal(key_);
+  grpc_slice_unref_internal(value_);
+  if (user_data_.user_data) {
+    destroy_user_data_func destroy_user_data =
+        (destroy_user_data_func)gpr_atm_no_barrier_load(
+            &user_data_.destroy_user_data);
+    destroy_user_data((void*)user_data_.user_data);
+  }
+}
 
-  UserData user_data;
-} allocated_metadata;
+intptr_t InternedMetadata::CleanupLinkedMetadata(
+    InternedMetadata::BucketLink* head) {
+  intptr_t num_freed = 0;
+  InternedMetadata::BucketLink* prev_next = head;
+  InternedMetadata *md, *next;
+
+  for (md = head->next; md; md = next) {
+    next = md->link_.next;
+    if (md->AllRefsDropped()) {
+      prev_next->next = next;
+      grpc_core::Delete(md);
+      num_freed++;
+    } else {
+      prev_next = &md->link_;
+    }
+  }
+  return num_freed;
+}
 
 typedef struct mdtab_shard {
   gpr_mu mu;
-  interned_metadata** elems;
+  InternedMetadata::BucketLink* elems;
   size_t count;
   size_t capacity;
   /** Estimate of the number of unreferenced mdelems in the hash table.
@@ -126,7 +201,7 @@ void grpc_mdctx_global_init(void) {
     shard->count = 0;
     gpr_atm_no_barrier_store(&shard->free_estimate, 0);
     shard->capacity = INITIAL_SHARD_CAPACITY;
-    shard->elems = static_cast<interned_metadata**>(
+    shard->elems = static_cast<InternedMetadata::BucketLink*>(
         gpr_zalloc(sizeof(*shard->elems) * shard->capacity));
   }
 }
@@ -154,55 +229,29 @@ static int is_mdelem_static(grpc_mdelem e) {
              &grpc_static_mdelem_table[GRPC_STATIC_MDELEM_COUNT];
 }
 
-static void ref_md_locked(mdtab_shard* shard,
-                          interned_metadata* md DEBUG_ARGS) {
+void InternedMetadata::RefWithShardLocked(mdtab_shard* shard) {
 #ifndef NDEBUG
   if (grpc_trace_metadata.enabled()) {
-    char* key_str = grpc_slice_to_c_string(md->key);
-    char* value_str = grpc_slice_to_c_string(md->value);
-    gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
-            "ELM   REF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'", (void*)md,
-            gpr_atm_no_barrier_load(&md->refcnt),
-            gpr_atm_no_barrier_load(&md->refcnt) + 1, key_str, value_str);
+    char* key_str = grpc_slice_to_c_string(key_);
+    char* value_str = grpc_slice_to_c_string(value_);
+    intptr_t value = RefValue();
+    gpr_log(__FILE__, __LINE__, GPR_LOG_SEVERITY_DEBUG,
+            "ELM   REF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'", this, value,
+            value + 1, key_str, value_str);
     gpr_free(key_str);
     gpr_free(value_str);
   }
 #endif
-  if (0 == gpr_atm_no_barrier_fetch_add(&md->refcnt, 1)) {
+  if (FirstRef()) {
     gpr_atm_no_barrier_fetch_add(&shard->free_estimate, -1);
   }
 }
 
 static void gc_mdtab(mdtab_shard* shard) {
   GPR_TIMER_SCOPE("gc_mdtab", 0);
-
-  size_t i;
-  interned_metadata** prev_next;
-  interned_metadata *md, *next;
-  gpr_atm num_freed = 0;
-
-  for (i = 0; i < shard->capacity; i++) {
-    prev_next = &shard->elems[i];
-    for (md = shard->elems[i]; md; md = next) {
-      void* user_data =
-          (void*)gpr_atm_no_barrier_load(&md->user_data.user_data);
-      next = md->bucket_next;
-      if (gpr_atm_acq_load(&md->refcnt) == 0) {
-        grpc_slice_unref_internal(md->key);
-        grpc_slice_unref_internal(md->value);
-        if (md->user_data.user_data) {
-          ((destroy_user_data_func)gpr_atm_no_barrier_load(
-              &md->user_data.destroy_user_data))(user_data);
-        }
-        gpr_mu_destroy(&md->user_data.mu_user_data);
-        gpr_free(md);
-        *prev_next = next;
-        num_freed++;
-        shard->count--;
-      } else {
-        prev_next = &md->bucket_next;
-      }
-    }
+  intptr_t num_freed = 0;
+  for (size_t i = 0; i < shard->capacity; ++i) {
+    num_freed += InternedMetadata::CleanupLinkedMetadata(&shard->elems[i]);
   }
   gpr_atm_no_barrier_fetch_add(&shard->free_estimate, -num_freed);
 }
@@ -212,22 +261,21 @@ static void grow_mdtab(mdtab_shard* shard) {
 
   size_t capacity = shard->capacity * 2;
   size_t i;
-  interned_metadata** mdtab;
-  interned_metadata *md, *next;
+  InternedMetadata::BucketLink* mdtab;
+  InternedMetadata *md, *next;
   uint32_t hash;
 
-  mdtab = static_cast<interned_metadata**>(
-      gpr_zalloc(sizeof(interned_metadata*) * capacity));
+  mdtab = static_cast<InternedMetadata::BucketLink*>(
+      gpr_zalloc(sizeof(InternedMetadata::BucketLink) * capacity));
 
   for (i = 0; i < shard->capacity; i++) {
-    for (md = shard->elems[i]; md; md = next) {
+    for (md = shard->elems[i].next; md; md = next) {
       size_t idx;
-      hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash(md->key),
-                                grpc_slice_hash(md->value));
-      next = md->bucket_next;
+      hash = md->hash();
+      next = md->bucket_next();
       idx = TABLE_IDX(hash, capacity);
-      md->bucket_next = mdtab[idx];
-      mdtab[idx] = md;
+      md->set_bucket_next(mdtab[idx].next);
+      mdtab[idx].next = md;
     }
   }
   gpr_free(shard->elems);
@@ -247,34 +295,22 @@ static void rehash_mdtab(mdtab_shard* shard) {
 grpc_mdelem grpc_mdelem_create(
     const grpc_slice& key, const grpc_slice& value,
     grpc_mdelem_data* compatible_external_backing_store) {
+  // External storage if either slice is not interned and the caller already
+  // created a backing store. If no backing store, we allocate one.
   if (!grpc_slice_is_interned(key) || !grpc_slice_is_interned(value)) {
     if (compatible_external_backing_store != nullptr) {
+      // Caller provided backing store.
       return GRPC_MAKE_MDELEM(compatible_external_backing_store,
                               GRPC_MDELEM_STORAGE_EXTERNAL);
+    } else {
+      // We allocate backing store.
+      return GRPC_MAKE_MDELEM(grpc_core::New<AllocatedMetadata>(key, value),
+                              GRPC_MDELEM_STORAGE_ALLOCATED);
     }
-
-    allocated_metadata* allocated =
-        static_cast<allocated_metadata*>(gpr_malloc(sizeof(*allocated)));
-    allocated->key = grpc_slice_ref_internal(key);
-    allocated->value = grpc_slice_ref_internal(value);
-    gpr_atm_rel_store(&allocated->refcnt, 1);
-    allocated->user_data.user_data = 0;
-    allocated->user_data.destroy_user_data = 0;
-    gpr_mu_init(&allocated->user_data.mu_user_data);
-#ifndef NDEBUG
-    if (grpc_trace_metadata.enabled()) {
-      char* key_str = grpc_slice_to_c_string(allocated->key);
-      char* value_str = grpc_slice_to_c_string(allocated->value);
-      gpr_log(GPR_DEBUG, "ELM ALLOC:%p:%" PRIdPTR ": '%s' = '%s'",
-              (void*)allocated, gpr_atm_no_barrier_load(&allocated->refcnt),
-              key_str, value_str);
-      gpr_free(key_str);
-      gpr_free(value_str);
-    }
-#endif
-    return GRPC_MAKE_MDELEM(allocated, GRPC_MDELEM_STORAGE_ALLOCATED);
   }
 
+  // Not all static slice input yields a statically stored metadata element.
+  // It may be worth documenting why.
   if (GRPC_IS_STATIC_METADATA_STRING(key) &&
       GRPC_IS_STATIC_METADATA_STRING(value)) {
     grpc_mdelem static_elem = grpc_static_mdelem_for_static_strings(
@@ -286,7 +322,7 @@ grpc_mdelem grpc_mdelem_create(
 
   uint32_t hash =
       GRPC_MDSTR_KV_HASH(grpc_slice_hash(key), grpc_slice_hash(value));
-  interned_metadata* md;
+  InternedMetadata* md;
   mdtab_shard* shard = &g_shards[SHARD_IDX(hash)];
   size_t idx;
 
@@ -296,34 +332,18 @@ grpc_mdelem grpc_mdelem_create(
 
   idx = TABLE_IDX(hash, shard->capacity);
   /* search for an existing pair */
-  for (md = shard->elems[idx]; md; md = md->bucket_next) {
-    if (grpc_slice_eq(key, md->key) && grpc_slice_eq(value, md->value)) {
-      REF_MD_LOCKED(shard, md);
+  for (md = shard->elems[idx].next; md; md = md->bucket_next()) {
+    if (grpc_slice_eq(key, md->key()) && grpc_slice_eq(value, md->value())) {
+      md->RefWithShardLocked(shard);
       gpr_mu_unlock(&shard->mu);
       return GRPC_MAKE_MDELEM(md, GRPC_MDELEM_STORAGE_INTERNED);
     }
   }
 
   /* not found: create a new pair */
-  md = static_cast<interned_metadata*>(gpr_malloc(sizeof(interned_metadata)));
-  gpr_atm_rel_store(&md->refcnt, 1);
-  md->key = grpc_slice_ref_internal(key);
-  md->value = grpc_slice_ref_internal(value);
-  md->user_data.user_data = 0;
-  md->user_data.destroy_user_data = 0;
-  md->bucket_next = shard->elems[idx];
-  shard->elems[idx] = md;
-  gpr_mu_init(&md->user_data.mu_user_data);
-#ifndef NDEBUG
-  if (grpc_trace_metadata.enabled()) {
-    char* key_str = grpc_slice_to_c_string(md->key);
-    char* value_str = grpc_slice_to_c_string(md->value);
-    gpr_log(GPR_DEBUG, "ELM   NEW:%p:%" PRIdPTR ": '%s' = '%s'", (void*)md,
-            gpr_atm_no_barrier_load(&md->refcnt), key_str, value_str);
-    gpr_free(key_str);
-    gpr_free(value_str);
-  }
-#endif
+  md = grpc_core::New<InternedMetadata>(key, value, hash,
+                                        shard->elems[idx].next);
+  shard->elems[idx].next = md;
   shard->count++;
 
   if (shard->count > shard->capacity * 2) {
@@ -354,126 +374,6 @@ grpc_mdelem grpc_mdelem_from_grpc_metadata(grpc_metadata* metadata) {
       changed ? nullptr : reinterpret_cast<grpc_mdelem_data*>(metadata));
 }
 
-grpc_mdelem grpc_mdelem_ref(grpc_mdelem gmd DEBUG_ARGS) {
-  switch (GRPC_MDELEM_STORAGE(gmd)) {
-    case GRPC_MDELEM_STORAGE_EXTERNAL:
-    case GRPC_MDELEM_STORAGE_STATIC:
-      break;
-    case GRPC_MDELEM_STORAGE_INTERNED: {
-      interned_metadata* md =
-          reinterpret_cast<interned_metadata*> GRPC_MDELEM_DATA(gmd);
-#ifndef NDEBUG
-      if (grpc_trace_metadata.enabled()) {
-        char* key_str = grpc_slice_to_c_string(md->key);
-        char* value_str = grpc_slice_to_c_string(md->value);
-        gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
-                "ELM   REF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'",
-                (void*)md, gpr_atm_no_barrier_load(&md->refcnt),
-                gpr_atm_no_barrier_load(&md->refcnt) + 1, key_str, value_str);
-        gpr_free(key_str);
-        gpr_free(value_str);
-      }
-#endif
-      /* we can assume the ref count is >= 1 as the application is calling
-         this function - meaning that no adjustment to mdtab_free is necessary,
-         simplifying the logic here to be just an atomic increment */
-      /* use C assert to have this removed in opt builds */
-      GPR_ASSERT(gpr_atm_no_barrier_load(&md->refcnt) >= 1);
-      gpr_atm_no_barrier_fetch_add(&md->refcnt, 1);
-      break;
-    }
-    case GRPC_MDELEM_STORAGE_ALLOCATED: {
-      allocated_metadata* md =
-          reinterpret_cast<allocated_metadata*> GRPC_MDELEM_DATA(gmd);
-#ifndef NDEBUG
-      if (grpc_trace_metadata.enabled()) {
-        char* key_str = grpc_slice_to_c_string(md->key);
-        char* value_str = grpc_slice_to_c_string(md->value);
-        gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
-                "ELM   REF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'",
-                (void*)md, gpr_atm_no_barrier_load(&md->refcnt),
-                gpr_atm_no_barrier_load(&md->refcnt) + 1, key_str, value_str);
-        gpr_free(key_str);
-        gpr_free(value_str);
-      }
-#endif
-      /* we can assume the ref count is >= 1 as the application is calling
-         this function - meaning that no adjustment to mdtab_free is necessary,
-         simplifying the logic here to be just an atomic increment */
-      /* use C assert to have this removed in opt builds */
-      gpr_atm_no_barrier_fetch_add(&md->refcnt, 1);
-      break;
-    }
-  }
-  return gmd;
-}
-
-void grpc_mdelem_unref(grpc_mdelem gmd DEBUG_ARGS) {
-  switch (GRPC_MDELEM_STORAGE(gmd)) {
-    case GRPC_MDELEM_STORAGE_EXTERNAL:
-    case GRPC_MDELEM_STORAGE_STATIC:
-      break;
-    case GRPC_MDELEM_STORAGE_INTERNED: {
-      interned_metadata* md =
-          reinterpret_cast<interned_metadata*> GRPC_MDELEM_DATA(gmd);
-#ifndef NDEBUG
-      if (grpc_trace_metadata.enabled()) {
-        char* key_str = grpc_slice_to_c_string(md->key);
-        char* value_str = grpc_slice_to_c_string(md->value);
-        gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
-                "ELM UNREF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'",
-                (void*)md, gpr_atm_no_barrier_load(&md->refcnt),
-                gpr_atm_no_barrier_load(&md->refcnt) - 1, key_str, value_str);
-        gpr_free(key_str);
-        gpr_free(value_str);
-      }
-#endif
-      uint32_t hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash(md->key),
-                                         grpc_slice_hash(md->value));
-      const gpr_atm prev_refcount = gpr_atm_full_fetch_add(&md->refcnt, -1);
-      GPR_ASSERT(prev_refcount >= 1);
-      if (1 == prev_refcount) {
-        /* once the refcount hits zero, some other thread can come along and
-           free md at any time: it's unsafe from this point on to access it */
-        mdtab_shard* shard = &g_shards[SHARD_IDX(hash)];
-        gpr_atm_no_barrier_fetch_add(&shard->free_estimate, 1);
-      }
-      break;
-    }
-    case GRPC_MDELEM_STORAGE_ALLOCATED: {
-      allocated_metadata* md =
-          reinterpret_cast<allocated_metadata*> GRPC_MDELEM_DATA(gmd);
-#ifndef NDEBUG
-      if (grpc_trace_metadata.enabled()) {
-        char* key_str = grpc_slice_to_c_string(md->key);
-        char* value_str = grpc_slice_to_c_string(md->value);
-        gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
-                "ELM UNREF:%p:%" PRIdPTR "->%" PRIdPTR ": '%s' = '%s'",
-                (void*)md, gpr_atm_no_barrier_load(&md->refcnt),
-                gpr_atm_no_barrier_load(&md->refcnt) - 1, key_str, value_str);
-        gpr_free(key_str);
-        gpr_free(value_str);
-      }
-#endif
-      const gpr_atm prev_refcount = gpr_atm_full_fetch_add(&md->refcnt, -1);
-      GPR_ASSERT(prev_refcount >= 1);
-      if (1 == prev_refcount) {
-        grpc_slice_unref_internal(md->key);
-        grpc_slice_unref_internal(md->value);
-        if (md->user_data.user_data) {
-          destroy_user_data_func destroy_user_data =
-              (destroy_user_data_func)gpr_atm_no_barrier_load(
-                  &md->user_data.destroy_user_data);
-          destroy_user_data((void*)md->user_data.user_data);
-        }
-        gpr_mu_destroy(&md->user_data.mu_user_data);
-        gpr_free(md);
-      }
-      break;
-    }
-  }
-}
-
 static void* get_user_data(UserData* user_data, void (*destroy_func)(void*)) {
   if (gpr_atm_acq_load(&user_data->destroy_user_data) ==
       (gpr_atm)destroy_func) {
@@ -491,14 +391,12 @@ void* grpc_mdelem_get_user_data(grpc_mdelem md, void (*destroy_func)(void*)) {
       return (void*)grpc_static_mdelem_user_data[GRPC_MDELEM_DATA(md) -
                                                  grpc_static_mdelem_table];
     case GRPC_MDELEM_STORAGE_ALLOCATED: {
-      allocated_metadata* am =
-          reinterpret_cast<allocated_metadata*>(GRPC_MDELEM_DATA(md));
-      return get_user_data(&am->user_data, destroy_func);
+      auto* am = reinterpret_cast<AllocatedMetadata*>(GRPC_MDELEM_DATA(md));
+      return get_user_data(am->user_data(), destroy_func);
     }
     case GRPC_MDELEM_STORAGE_INTERNED: {
-      interned_metadata* im =
-          reinterpret_cast<interned_metadata*> GRPC_MDELEM_DATA(md);
-      return get_user_data(&im->user_data, destroy_func);
+      auto* im = reinterpret_cast<InternedMetadata*> GRPC_MDELEM_DATA(md);
+      return get_user_data(im->user_data(), destroy_func);
     }
   }
   GPR_UNREACHABLE_CODE(return nullptr);
@@ -507,10 +405,10 @@ void* grpc_mdelem_get_user_data(grpc_mdelem md, void (*destroy_func)(void*)) {
 static void* set_user_data(UserData* ud, void (*destroy_func)(void*),
                            void* user_data) {
   GPR_ASSERT((user_data == nullptr) == (destroy_func == nullptr));
-  gpr_mu_lock(&ud->mu_user_data);
+  grpc_core::ReleasableMutexLock lock(&ud->mu_user_data);
   if (gpr_atm_no_barrier_load(&ud->destroy_user_data)) {
     /* user data can only be set once */
-    gpr_mu_unlock(&ud->mu_user_data);
+    lock.Unlock();
     if (destroy_func != nullptr) {
       destroy_func(user_data);
     }
@@ -518,7 +416,6 @@ static void* set_user_data(UserData* ud, void (*destroy_func)(void*),
   }
   gpr_atm_no_barrier_store(&ud->user_data, (gpr_atm)user_data);
   gpr_atm_rel_store(&ud->destroy_user_data, (gpr_atm)destroy_func);
-  gpr_mu_unlock(&ud->mu_user_data);
   return user_data;
 }
 
@@ -533,15 +430,13 @@ void* grpc_mdelem_set_user_data(grpc_mdelem md, void (*destroy_func)(void*),
       return (void*)grpc_static_mdelem_user_data[GRPC_MDELEM_DATA(md) -
                                                  grpc_static_mdelem_table];
     case GRPC_MDELEM_STORAGE_ALLOCATED: {
-      allocated_metadata* am =
-          reinterpret_cast<allocated_metadata*>(GRPC_MDELEM_DATA(md));
-      return set_user_data(&am->user_data, destroy_func, user_data);
+      auto* am = reinterpret_cast<AllocatedMetadata*>(GRPC_MDELEM_DATA(md));
+      return set_user_data(am->user_data(), destroy_func, user_data);
     }
     case GRPC_MDELEM_STORAGE_INTERNED: {
-      interned_metadata* im =
-          reinterpret_cast<interned_metadata*> GRPC_MDELEM_DATA(md);
+      auto* im = reinterpret_cast<InternedMetadata*> GRPC_MDELEM_DATA(md);
       GPR_ASSERT(!is_mdelem_static(md));
-      return set_user_data(&im->user_data, destroy_func, user_data);
+      return set_user_data(im->user_data(), destroy_func, user_data);
     }
   }
   GPR_UNREACHABLE_CODE(return nullptr);
@@ -554,3 +449,33 @@ bool grpc_mdelem_eq(grpc_mdelem a, grpc_mdelem b) {
   return grpc_slice_eq(GRPC_MDKEY(a), GRPC_MDKEY(b)) &&
          grpc_slice_eq(GRPC_MDVALUE(a), GRPC_MDVALUE(b));
 }
+
+static void note_disposed_interned_metadata(uint32_t hash) {
+  mdtab_shard* shard = &g_shards[SHARD_IDX(hash)];
+  gpr_atm_no_barrier_fetch_add(&shard->free_estimate, 1);
+}
+
+void grpc_mdelem_do_unref(grpc_mdelem gmd DEBUG_ARGS) {
+  switch (GRPC_MDELEM_STORAGE(gmd)) {
+    case GRPC_MDELEM_STORAGE_EXTERNAL:
+    case GRPC_MDELEM_STORAGE_STATIC:
+      return;
+    case GRPC_MDELEM_STORAGE_INTERNED: {
+      auto* md = reinterpret_cast<InternedMetadata*> GRPC_MDELEM_DATA(gmd);
+      uint32_t hash = md->hash();
+      if (md->Unref(FWD_DEBUG_ARGS)) {
+        /* once the refcount hits zero, some other thread can come along and
+           free md at any time: it's unsafe from this point on to access it */
+        note_disposed_interned_metadata(hash);
+      }
+      break;
+    }
+    case GRPC_MDELEM_STORAGE_ALLOCATED: {
+      auto* md = reinterpret_cast<AllocatedMetadata*> GRPC_MDELEM_DATA(gmd);
+      if (md->Unref(FWD_DEBUG_ARGS)) {
+        grpc_core::Delete(md);
+      }
+      break;
+    }
+  }
+}
diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h
index 5e016567eb..d431474d01 100644
--- a/src/core/lib/transport/metadata.h
+++ b/src/core/lib/transport/metadata.h
@@ -21,11 +21,15 @@
 
 #include <grpc/support/port_platform.h>
 
+#include "include/grpc/impl/codegen/log.h"
+
 #include <grpc/grpc.h>
 #include <grpc/slice.h>
 
 #include "src/core/lib/debug/trace.h"
 #include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/atomic.h"
+#include "src/core/lib/gprpp/sync.h"
 
 extern grpc_core::DebugOnlyTraceFlag grpc_trace_metadata;
 
@@ -63,7 +67,7 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_metadata;
 typedef struct grpc_mdelem grpc_mdelem;
 
 /* if changing this, make identical changes in:
-   - interned_metadata, allocated_metadata in metadata.c
+   - grpc_core::{InternedMetadata, AllocatedMetadata}
    - grpc_metadata in grpc_types.h */
 typedef struct grpc_mdelem_data {
   const grpc_slice key;
@@ -143,17 +147,200 @@ void* grpc_mdelem_get_user_data(grpc_mdelem md, void (*if_destroy_func)(void*));
 void* grpc_mdelem_set_user_data(grpc_mdelem md, void (*destroy_func)(void*),
                                 void* user_data);
 
+// Defined in metadata.cc.
+struct mdtab_shard;
+
+#ifndef NDEBUG
+void grpc_mdelem_trace_ref(void* md, const grpc_slice& key,
+                           const grpc_slice& value, intptr_t refcnt,
+                           const char* file, int line);
+void grpc_mdelem_trace_unref(void* md, const grpc_slice& key,
+                             const grpc_slice& value, intptr_t refcnt,
+                             const char* file, int line);
+#endif
+namespace grpc_core {
+
+typedef void (*destroy_user_data_func)(void* user_data);
+
+struct UserData {
+  Mutex mu_user_data;
+  gpr_atm destroy_user_data = 0;
+  gpr_atm user_data = 0;
+};
+
+class InternedMetadata {
+ public:
+  struct BucketLink {
+    explicit BucketLink(InternedMetadata* md) : next(md) {}
+
+    InternedMetadata* next = nullptr;
+  };
+
+  InternedMetadata(const grpc_slice& key, const grpc_slice& value,
+                   uint32_t hash, InternedMetadata* next);
+  ~InternedMetadata();
+
+#ifndef NDEBUG
+  void Ref(const char* file, int line) {
+    grpc_mdelem_trace_ref(this, key_, value_, RefValue(), file, line);
+    const intptr_t prior = refcnt_.FetchAdd(1, MemoryOrder::RELAXED);
+    GPR_ASSERT(prior > 0);
+  }
+  bool Unref(const char* file, int line) {
+    grpc_mdelem_trace_unref(this, key_, value_, RefValue(), file, line);
+    return Unref();
+  }
+#else
+  // We define a naked Ref() in the else-clause to make sure we don't
+  // inadvertently skip the assert on debug builds.
+  void Ref() {
+    /* we can assume the ref count is >= 1 as the application is calling
+       this function - meaning that no adjustment to mdtab_free is necessary,
+       simplifying the logic here to be just an atomic increment */
+    refcnt_.FetchAdd(1, MemoryOrder::RELAXED);
+  }
+#endif  // ifndef NDEBUG
+  bool Unref() {
+    const intptr_t prior = refcnt_.FetchSub(1, MemoryOrder::ACQ_REL);
+    GPR_DEBUG_ASSERT(prior > 0);
+    return prior == 1;
+  }
+
+  void RefWithShardLocked(mdtab_shard* shard);
+  const grpc_slice& key() const { return key_; }
+  const grpc_slice& value() const { return value_; }
+  UserData* user_data() { return &user_data_; }
+  uint32_t hash() { return hash_; }
+  InternedMetadata* bucket_next() { return link_.next; }
+  void set_bucket_next(InternedMetadata* md) { link_.next = md; }
+
+  static intptr_t CleanupLinkedMetadata(BucketLink* head);
+
+ private:
+  bool AllRefsDropped() { return refcnt_.Load(MemoryOrder::ACQUIRE) == 0; }
+  bool FirstRef() { return refcnt_.FetchAdd(1, MemoryOrder::RELAXED) == 0; }
+  intptr_t RefValue() { return refcnt_.Load(MemoryOrder::RELAXED); }
+
+  /* must be byte compatible with grpc_mdelem_data */
+  grpc_slice key_;
+  grpc_slice value_;
+
+  /* private only data */
+  grpc_core::Atomic<intptr_t> refcnt_;
+  uint32_t hash_;
+
+  UserData user_data_;
+
+  BucketLink link_;
+};
+
+/* Shadow structure for grpc_mdelem_data for allocated elements */
+class AllocatedMetadata {
+ public:
+  AllocatedMetadata(const grpc_slice& key, const grpc_slice& value);
+  ~AllocatedMetadata();
+
+  const grpc_slice& key() const { return key_; }
+  const grpc_slice& value() const { return value_; }
+  UserData* user_data() { return &user_data_; }
+
+#ifndef NDEBUG
+  void Ref(const char* file, int line) {
+    grpc_mdelem_trace_ref(this, key_, value_, RefValue(), file, line);
+    Ref();
+  }
+  bool Unref(const char* file, int line) {
+    grpc_mdelem_trace_unref(this, key_, value_, RefValue(), file, line);
+    return Unref();
+  }
+#endif  // ifndef NDEBUG
+  void Ref() {
+    /* we can assume the ref count is >= 1 as the application is calling
+       this function - meaning that no adjustment to mdtab_free is necessary,
+       simplifying the logic here to be just an atomic increment */
+    refcnt_.FetchAdd(1, MemoryOrder::RELAXED);
+  }
+  bool Unref() {
+    const intptr_t prior = refcnt_.FetchSub(1, MemoryOrder::ACQ_REL);
+    GPR_DEBUG_ASSERT(prior > 0);
+    return prior == 1;
+  }
+
+ private:
+  intptr_t RefValue() { return refcnt_.Load(MemoryOrder::RELAXED); }
+
+  /* must be byte compatible with grpc_mdelem_data */
+  grpc_slice key_;
+  grpc_slice value_;
+
+  /* private only data */
+  grpc_core::Atomic<intptr_t> refcnt_;
+
+  UserData user_data_;
+};
+
+}  // namespace grpc_core
+
 #ifndef NDEBUG
 #define GRPC_MDELEM_REF(s) grpc_mdelem_ref((s), __FILE__, __LINE__)
+inline grpc_mdelem grpc_mdelem_ref(grpc_mdelem gmd, const char* file,
+                                   int line) {
+#else  // ifndef NDEBUG
+#define GRPC_MDELEM_REF(s) grpc_mdelem_ref((s))
+inline grpc_mdelem grpc_mdelem_ref(grpc_mdelem gmd) {
+#endif  // ifndef NDEBUG
+  switch (GRPC_MDELEM_STORAGE(gmd)) {
+    case GRPC_MDELEM_STORAGE_EXTERNAL:
+    case GRPC_MDELEM_STORAGE_STATIC:
+      break;
+    case GRPC_MDELEM_STORAGE_INTERNED: {
+      auto* md =
+          reinterpret_cast<grpc_core::InternedMetadata*> GRPC_MDELEM_DATA(gmd);
+      /* use C assert to have this removed in opt builds */
+#ifndef NDEBUG
+      md->Ref(file, line);
+#else
+      md->Ref();
+#endif
+      break;
+    }
+    case GRPC_MDELEM_STORAGE_ALLOCATED: {
+      auto* md =
+          reinterpret_cast<grpc_core::AllocatedMetadata*> GRPC_MDELEM_DATA(gmd);
+#ifndef NDEBUG
+      md->Ref(file, line);
+#else
+      md->Ref();
+#endif
+      break;
+    }
+  }
+  return gmd;
+}
+
+#ifndef NDEBUG
 #define GRPC_MDELEM_UNREF(s) grpc_mdelem_unref((s), __FILE__, __LINE__)
-grpc_mdelem grpc_mdelem_ref(grpc_mdelem md, const char* file, int line);
-void grpc_mdelem_unref(grpc_mdelem md, const char* file, int line);
+void grpc_mdelem_do_unref(grpc_mdelem gmd, const char* file, int line);
+inline void grpc_mdelem_unref(grpc_mdelem gmd, const char* file, int line) {
 #else
-#define GRPC_MDELEM_REF(s) grpc_mdelem_ref((s))
 #define GRPC_MDELEM_UNREF(s) grpc_mdelem_unref((s))
-grpc_mdelem grpc_mdelem_ref(grpc_mdelem md);
-void grpc_mdelem_unref(grpc_mdelem md);
+void grpc_mdelem_do_unref(grpc_mdelem gmd);
+inline void grpc_mdelem_unref(grpc_mdelem gmd) {
 #endif
+  switch (GRPC_MDELEM_STORAGE(gmd)) {
+    case GRPC_MDELEM_STORAGE_EXTERNAL:
+    case GRPC_MDELEM_STORAGE_STATIC:
+      return;
+    case GRPC_MDELEM_STORAGE_INTERNED:
+    case GRPC_MDELEM_STORAGE_ALLOCATED:
+#ifndef NDEBUG
+      grpc_mdelem_do_unref(gmd, file, line);
+#else
+      grpc_mdelem_do_unref(gmd);
+#endif
+      return;
+  }
+}
 
 #define GRPC_MDNULL GRPC_MAKE_MDELEM(NULL, GRPC_MDELEM_STORAGE_EXTERNAL)
 #define GRPC_MDISNULL(md) (GRPC_MDELEM_DATA(md) == NULL)
-- 
GitLab