Skip to content
Snippets Groups Projects
Commit 532519ac authored by Alistair Veitch's avatar Alistair Veitch
Browse files

Respond to review comments

parent e28e426a
No related branches found
No related tags found
No related merge requests found
...@@ -31,64 +31,62 @@ ...@@ -31,64 +31,62 @@
* *
*/ */
/* Available log space is divided up in blocks of // Implements an efficient in-memory log, optimized for multiple writers and
CENSUS_LOG_2_MAX_RECORD_SIZE bytes. A block can be in one of the // a single reader. Available log space is divided up in blocks of
following three data structures: // CENSUS_LOG_2_MAX_RECORD_SIZE bytes. A block can be in one of the following
- Free blocks (free_block_list) // three data structures:
- Blocks with unread data (dirty_block_list) // - Free blocks (free_block_list)
- Blocks currently attached to cores (core_local_blocks[]) // - Blocks with unread data (dirty_block_list)
// - Blocks currently attached to cores (core_local_blocks[])
census_log_start_write() moves a block from core_local_blocks[] to the //
end of dirty_block_list when block: // census_log_start_write() moves a block from core_local_blocks[] to the end of
- is out-of-space OR // dirty_block_list when block:
- has an incomplete record (an incomplete record occurs when a thread calls // - is out-of-space OR
census_log_start_write() and is context-switched before calling // - has an incomplete record (an incomplete record occurs when a thread calls
census_log_end_write() // census_log_start_write() and is context-switched before calling
So, blocks in dirty_block_list are ordered, from oldest to newest, by time // census_log_end_write()
when block is detached from the core. // So, blocks in dirty_block_list are ordered, from oldest to newest, by the
// time when block is detached from the core.
census_log_read_next() first iterates over dirty_block_list and then //
core_local_blocks[]. It moves completely read blocks from dirty_block_list // census_log_read_next() first iterates over dirty_block_list and then
to free_block_list. Blocks in core_local_blocks[] are not freed, even when // core_local_blocks[]. It moves completely read blocks from dirty_block_list
completely read. // to free_block_list. Blocks in core_local_blocks[] are not freed, even when
// completely read.
If log is configured to discard old records and free_block_list is empty, //
census_log_start_write() iterates over dirty_block_list to allocate a // If the log is configured to discard old records and free_block_list is empty,
new block. It moves the oldest available block (no pending read/write) to // census_log_start_write() iterates over dirty_block_list to allocate a
core_local_blocks[]. // new block. It moves the oldest available block (no pending read/write) to
// core_local_blocks[].
core_local_block_struct is used to implement a map from core id to the block //
associated with that core. This mapping is advisory. It is possible that the // core_local_block_struct is used to implement a map from core id to the block
block returned by this mapping is no longer associated with that core. This // associated with that core. This mapping is advisory. It is possible that the
mapping is updated, lazily, by census_log_start_write(). // block returned by this mapping is no longer associated with that core. This
// mapping is updated, lazily, by census_log_start_write().
Locking in block struct: //
// Locking in block struct:
Exclusive g_log.lock must be held before calling any functions operatong on //
block structs except census_log_start_write() and // Exclusive g_log.lock must be held before calling any functions operating on
census_log_end_write(). // block structs except census_log_start_write() and census_log_end_write().
//
Writes to a block are serialized via writer_lock. // Writes to a block are serialized via writer_lock. census_log_start_write()
census_log_start_write() acquires this lock and // acquires this lock and census_log_end_write() releases it. On failure to
census_log_end_write() releases it. On failure to acquire the lock, // acquire the lock, writer allocates a new block for the current core and
writer allocates a new block for the current core and updates // updates core_local_block accordingly.
core_local_block accordingly. //
// Simultaneous read and write access is allowed. Readers can safely read up to
Simultaneous read and write access is allowed. Readers can safely read up to // committed bytes (bytes_committed).
committed bytes (bytes_committed). //
// reader_lock protects the block, currently being read, from getting recycled.
reader_lock protects the block, currently being read, from getting recycled. // start_read() acquires reader_lock and end_read() releases the lock.
start_read() acquires reader_lock and end_read() releases the lock. //
// Read/write access to a block is disabled via try_disable_access(). It returns
Read/write access to a block is disabled via try_disable_access(). It returns // with both writer_lock and reader_lock held. These locks are subsequently
with both writer_lock and reader_lock held. These locks are subsequently // released by enable_access() to enable access to the block.
released by enable_access() to enable access to the block. //
// A note on naming: Most function/struct names are prepended by cl_
A note on naming: Most function/struct names are prepended by cl_ // (shorthand for census_log). Further, functions that manipulate structures
(shorthand for census_log). Further, functions that manipulate structures // include the name of the structure, which will be passed as the first
include the name of the structure, which will be passed as the first // argument. E.g. cl_block_initialize() will initialize a cl_block.
argument. E.g. cl_block_initialize() will initialize a cl_block.
*/
#include "src/core/census/log.h" #include "src/core/census/log.h"
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
...@@ -100,7 +98,7 @@ ...@@ -100,7 +98,7 @@
#include <stdbool.h> #include <stdbool.h>
#include <string.h> #include <string.h>
/* End of platform specific code */ // End of platform specific code
typedef struct census_log_block_list_struct { typedef struct census_log_block_list_struct {
struct census_log_block_list_struct* next; struct census_log_block_list_struct* next;
...@@ -109,29 +107,29 @@ typedef struct census_log_block_list_struct { ...@@ -109,29 +107,29 @@ typedef struct census_log_block_list_struct {
} cl_block_list_struct; } cl_block_list_struct;
typedef struct census_log_block { typedef struct census_log_block {
/* Pointer to underlying buffer */ // Pointer to underlying buffer.
char* buffer; char* buffer;
gpr_atm writer_lock; gpr_atm writer_lock;
gpr_atm reader_lock; gpr_atm reader_lock;
/* Keeps completely written bytes. Declared atomic because accessed // Keeps completely written bytes. Declared atomic because accessed
simultaneously by reader and writer. */ // simultaneously by reader and writer.
gpr_atm bytes_committed; gpr_atm bytes_committed;
/* Bytes already read */ // Bytes already read.
size_t bytes_read; size_t bytes_read;
/* Links for list */ // Links for list.
cl_block_list_struct link; cl_block_list_struct link;
/* We want this structure to be cacheline aligned. We assume the following // We want this structure to be cacheline aligned. We assume the following
sizes for the various parts on 32/64bit systems: // sizes for the various parts on 32/64bit systems:
type 32b size 64b size // type 32b size 64b size
char* 4 8 // char* 4 8
3x gpr_atm 12 24 // 3x gpr_atm 12 24
size_t 4 8 // size_t 4 8
cl_block_list_struct 12 24 // cl_block_list_struct 12 24
TOTAL 32 64 // TOTAL 32 64
//
Depending on the size of our cacheline and the architecture, we // Depending on the size of our cacheline and the architecture, we
selectively add char buffering to this structure. The size is checked // selectively add char buffering to this structure. The size is checked
via assert in census_log_initialize(). */ // via assert in census_log_initialize().
#if defined(GPR_ARCH_64) #if defined(GPR_ARCH_64)
#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 64) #define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 64)
#else #else
...@@ -146,17 +144,17 @@ typedef struct census_log_block { ...@@ -146,17 +144,17 @@ typedef struct census_log_block {
#endif #endif
} cl_block; } cl_block;
/* A list of cl_blocks, doubly-linked through cl_block::link. */ // A list of cl_blocks, doubly-linked through cl_block::link.
typedef struct census_log_block_list { typedef struct census_log_block_list {
int32_t count; /* Number of items in list. */ int32_t count; // Number of items in list.
cl_block_list_struct ht; /* head/tail of linked list. */ cl_block_list_struct ht; // head/tail of linked list.
} cl_block_list; } cl_block_list;
/* Cacheline aligned block pointers to avoid false sharing. Block pointer must // Cacheline aligned block pointers to avoid false sharing. Block pointer must
be initialized via set_block(), before calling other functions */ // be initialized via set_block(), before calling other functions
typedef struct census_log_core_local_block { typedef struct census_log_core_local_block {
gpr_atm block; gpr_atm block;
/* Ensure cachline alignment: we assume sizeof(gpr_atm) == 4 or 8 */ // Ensure cachline alignment: we assume sizeof(gpr_atm) == 4 or 8
#if defined(GPR_ARCH_64) #if defined(GPR_ARCH_64)
#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 8) #define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 8)
#else #else
...@@ -173,40 +171,38 @@ typedef struct census_log_core_local_block { ...@@ -173,40 +171,38 @@ typedef struct census_log_core_local_block {
struct census_log { struct census_log {
int discard_old_records; int discard_old_records;
/* Number of cores (aka hardware-contexts) */ // Number of cores (aka hardware-contexts)
unsigned num_cores; unsigned num_cores;
/* number of CENSUS_LOG_2_MAX_RECORD_SIZE blocks in log */ // number of CENSUS_LOG_2_MAX_RECORD_SIZE blocks in log
uint32_t num_blocks; uint32_t num_blocks;
cl_block* blocks; /* Block metadata. */ cl_block* blocks; // Block metadata.
cl_core_local_block* core_local_blocks; /* Keeps core to block mappings. */ cl_core_local_block* core_local_blocks; // Keeps core to block mappings.
gpr_mu lock; gpr_mu lock;
int initialized; /* has log been initialized? */ int initialized; // has log been initialized?
/* Keeps the state of the reader iterator. A value of 0 indicates that // Keeps the state of the reader iterator. A value of 0 indicates that
iterator has reached the end. census_log_init_reader() resets the // iterator has reached the end. census_log_init_reader() resets the value
value to num_core to restart iteration. */ // to num_core to restart iteration.
uint32_t read_iterator_state; uint32_t read_iterator_state;
/* Points to the block being read. If non-NULL, the block is locked for // Points to the block being read. If non-NULL, the block is locked for
reading (block_being_read_->reader_lock is held). */ // reading(block_being_read_->reader_lock is held).
cl_block* block_being_read; cl_block* block_being_read;
/* A non-zero value indicates that log is full. */
gpr_atm is_full;
char* buffer; char* buffer;
cl_block_list free_block_list; cl_block_list free_block_list;
cl_block_list dirty_block_list; cl_block_list dirty_block_list;
gpr_atm out_of_space_count; gpr_atm out_of_space_count;
}; };
/* Single internal log */ // Single internal log.
static struct census_log g_log; static struct census_log g_log;
/* Functions that operate on an atomic memory location used as a lock */ // Functions that operate on an atomic memory location used as a lock.
/* Returns non-zero if lock is acquired */ // Returns non-zero if lock is acquired.
static int cl_try_lock(gpr_atm* lock) { return gpr_atm_acq_cas(lock, 0, 1); } static int cl_try_lock(gpr_atm* lock) { return gpr_atm_acq_cas(lock, 0, 1); }
static void cl_unlock(gpr_atm* lock) { gpr_atm_rel_store(lock, 0); } static void cl_unlock(gpr_atm* lock) { gpr_atm_rel_store(lock, 0); }
/* Functions that operate on cl_core_local_block's */ // Functions that operate on cl_core_local_block's.
static void cl_core_local_block_set_block(cl_core_local_block* clb, static void cl_core_local_block_set_block(cl_core_local_block* clb,
cl_block* block) { cl_block* block) {
...@@ -217,7 +213,7 @@ static cl_block* cl_core_local_block_get_block(cl_core_local_block* clb) { ...@@ -217,7 +213,7 @@ static cl_block* cl_core_local_block_get_block(cl_core_local_block* clb) {
return (cl_block*)gpr_atm_acq_load(&clb->block); return (cl_block*)gpr_atm_acq_load(&clb->block);
} }
/* Functions that operate on cl_block_list_struct's */ // Functions that operate on cl_block_list_struct's.
static void cl_block_list_struct_initialize(cl_block_list_struct* bls, static void cl_block_list_struct_initialize(cl_block_list_struct* bls,
cl_block* block) { cl_block* block) {
...@@ -225,19 +221,19 @@ static void cl_block_list_struct_initialize(cl_block_list_struct* bls, ...@@ -225,19 +221,19 @@ static void cl_block_list_struct_initialize(cl_block_list_struct* bls,
bls->block = block; bls->block = block;
} }
/* Functions that operate on cl_block_list's */ // Functions that operate on cl_block_list's.
static void cl_block_list_initialize(cl_block_list* list) { static void cl_block_list_initialize(cl_block_list* list) {
list->count = 0; list->count = 0;
cl_block_list_struct_initialize(&list->ht, NULL); cl_block_list_struct_initialize(&list->ht, NULL);
} }
/* Returns head of *this, or NULL if empty. */ // Returns head of *this, or NULL if empty.
static cl_block* cl_block_list_head(cl_block_list* list) { static cl_block* cl_block_list_head(cl_block_list* list) {
return list->ht.next->block; return list->ht.next->block;
} }
/* Insert element *e after *pos. */ // Insert element *e after *pos.
static void cl_block_list_insert(cl_block_list* list, cl_block_list_struct* pos, static void cl_block_list_insert(cl_block_list* list, cl_block_list_struct* pos,
cl_block_list_struct* e) { cl_block_list_struct* e) {
list->count++; list->count++;
...@@ -247,24 +243,24 @@ static void cl_block_list_insert(cl_block_list* list, cl_block_list_struct* pos, ...@@ -247,24 +243,24 @@ static void cl_block_list_insert(cl_block_list* list, cl_block_list_struct* pos,
e->prev->next = e; e->prev->next = e;
} }
/* Insert block at the head of the list */ // Insert block at the head of the list
static void cl_block_list_insert_at_head(cl_block_list* list, cl_block* block) { static void cl_block_list_insert_at_head(cl_block_list* list, cl_block* block) {
cl_block_list_insert(list, &list->ht, &block->link); cl_block_list_insert(list, &list->ht, &block->link);
} }
/* Insert block at the tail of the list */ // Insert block at the tail of the list.
static void cl_block_list_insert_at_tail(cl_block_list* list, cl_block* block) { static void cl_block_list_insert_at_tail(cl_block_list* list, cl_block* block) {
cl_block_list_insert(list, list->ht.prev, &block->link); cl_block_list_insert(list, list->ht.prev, &block->link);
} }
/* Removes block *b. Requires *b be in the list. */ // Removes block *b. Requires *b be in the list.
static void cl_block_list_remove(cl_block_list* list, cl_block* b) { static void cl_block_list_remove(cl_block_list* list, cl_block* b) {
list->count--; list->count--;
b->link.next->prev = b->link.prev; b->link.next->prev = b->link.prev;
b->link.prev->next = b->link.next; b->link.prev->next = b->link.next;
} }
/* Functions that operate on cl_block's */ // Functions that operate on cl_block's
static void cl_block_initialize(cl_block* block, char* buffer) { static void cl_block_initialize(cl_block* block, char* buffer) {
block->buffer = buffer; block->buffer = buffer;
...@@ -275,7 +271,7 @@ static void cl_block_initialize(cl_block* block, char* buffer) { ...@@ -275,7 +271,7 @@ static void cl_block_initialize(cl_block* block, char* buffer) {
cl_block_list_struct_initialize(&block->link, block); cl_block_list_struct_initialize(&block->link, block);
} }
/* Guards against exposing partially written buffer to the reader. */ // Guards against exposing partially written buffer to the reader.
static void cl_block_set_bytes_committed(cl_block* block, static void cl_block_set_bytes_committed(cl_block* block,
size_t bytes_committed) { size_t bytes_committed) {
gpr_atm_rel_store(&block->bytes_committed, (gpr_atm)bytes_committed); gpr_atm_rel_store(&block->bytes_committed, (gpr_atm)bytes_committed);
...@@ -285,13 +281,13 @@ static size_t cl_block_get_bytes_committed(cl_block* block) { ...@@ -285,13 +281,13 @@ static size_t cl_block_get_bytes_committed(cl_block* block) {
return (size_t)gpr_atm_acq_load(&block->bytes_committed); return (size_t)gpr_atm_acq_load(&block->bytes_committed);
} }
/* Tries to disable future read/write access to this block. Succeeds if: // Tries to disable future read/write access to this block. Succeeds if:
- no in-progress write AND // - no in-progress write AND
- no in-progress read AND // - no in-progress read AND
- 'discard_data' set to true OR no unread data // - 'discard_data' set to true OR no unread data
On success, clears the block state and returns with writer_lock_ and // On success, clears the block state and returns with writer_lock_ and
reader_lock_ held. These locks are released by a subsequent // reader_lock_ held. These locks are released by a subsequent
cl_block_access_enable() call. */ // cl_block_access_enable() call.
static bool cl_block_try_disable_access(cl_block* block, int discard_data) { static bool cl_block_try_disable_access(cl_block* block, int discard_data) {
if (!cl_try_lock(&block->writer_lock)) { if (!cl_try_lock(&block->writer_lock)) {
return false; return false;
...@@ -316,13 +312,12 @@ static void cl_block_enable_access(cl_block* block) { ...@@ -316,13 +312,12 @@ static void cl_block_enable_access(cl_block* block) {
cl_unlock(&block->writer_lock); cl_unlock(&block->writer_lock);
} }
/* Returns with writer_lock held. */ // Returns with writer_lock held.
static void* cl_block_start_write(cl_block* block, size_t size) { static void* cl_block_start_write(cl_block* block, size_t size) {
size_t bytes_committed;
if (!cl_try_lock(&block->writer_lock)) { if (!cl_try_lock(&block->writer_lock)) {
return NULL; return NULL;
} }
bytes_committed = cl_block_get_bytes_committed(block); size_t bytes_committed = cl_block_get_bytes_committed(block);
if (bytes_committed + size > CENSUS_LOG_MAX_RECORD_SIZE) { if (bytes_committed + size > CENSUS_LOG_MAX_RECORD_SIZE) {
cl_unlock(&block->writer_lock); cl_unlock(&block->writer_lock);
return NULL; return NULL;
...@@ -330,26 +325,26 @@ static void* cl_block_start_write(cl_block* block, size_t size) { ...@@ -330,26 +325,26 @@ static void* cl_block_start_write(cl_block* block, size_t size) {
return block->buffer + bytes_committed; return block->buffer + bytes_committed;
} }
/* Releases writer_lock and increments committed bytes by 'bytes_written'. // Releases writer_lock and increments committed bytes by 'bytes_written'.
'bytes_written' must be <= 'size' specified in the corresponding // 'bytes_written' must be <= 'size' specified in the corresponding
StartWrite() call. This function is thread-safe. */ // StartWrite() call. This function is thread-safe.
static void cl_block_end_write(cl_block* block, size_t bytes_written) { static void cl_block_end_write(cl_block* block, size_t bytes_written) {
cl_block_set_bytes_committed( cl_block_set_bytes_committed(
block, cl_block_get_bytes_committed(block) + bytes_written); block, cl_block_get_bytes_committed(block) + bytes_written);
cl_unlock(&block->writer_lock); cl_unlock(&block->writer_lock);
} }
/* Returns a pointer to the first unread byte in buffer. The number of bytes // Returns a pointer to the first unread byte in buffer. The number of bytes
available are returned in 'bytes_available'. Acquires reader lock that is // available are returned in 'bytes_available'. Acquires reader lock that is
released by a subsequent cl_block_end_read() call. Returns NULL if: // released by a subsequent cl_block_end_read() call. Returns NULL if:
- read in progress // - read in progress
- no data available */ // - no data available
static void* cl_block_start_read(cl_block* block, size_t* bytes_available) { static void* cl_block_start_read(cl_block* block, size_t* bytes_available) {
if (!cl_try_lock(&block->reader_lock)) { if (!cl_try_lock(&block->reader_lock)) {
return NULL; return NULL;
} }
/* bytes_committed may change from under us. Use bytes_available to update // bytes_committed may change from under us. Use bytes_available to update
bytes_read below. */ // bytes_read below.
size_t bytes_committed = cl_block_get_bytes_committed(block); size_t bytes_committed = cl_block_get_bytes_committed(block);
GPR_ASSERT(bytes_committed >= block->bytes_read); GPR_ASSERT(bytes_committed >= block->bytes_read);
*bytes_available = bytes_committed - block->bytes_read; *bytes_available = bytes_committed - block->bytes_read;
...@@ -366,10 +361,10 @@ static void cl_block_end_read(cl_block* block) { ...@@ -366,10 +361,10 @@ static void cl_block_end_read(cl_block* block) {
cl_unlock(&block->reader_lock); cl_unlock(&block->reader_lock);
} }
/* Internal functions operating on g_log */ // Internal functions operating on g_log
/* Allocates a new free block (or recycles an available dirty block if log is // Allocates a new free block (or recycles an available dirty block if log is
configured to discard old records). Returns NULL if out-of-space. */ // configured to discard old records). Returns NULL if out-of-space.
static cl_block* cl_allocate_block(void) { static cl_block* cl_allocate_block(void) {
cl_block* block = cl_block_list_head(&g_log.free_block_list); cl_block* block = cl_block_list_head(&g_log.free_block_list);
if (block != NULL) { if (block != NULL) {
...@@ -377,10 +372,10 @@ static cl_block* cl_allocate_block(void) { ...@@ -377,10 +372,10 @@ static cl_block* cl_allocate_block(void) {
return block; return block;
} }
if (!g_log.discard_old_records) { if (!g_log.discard_old_records) {
/* No free block and log is configured to keep old records. */ // No free block and log is configured to keep old records.
return NULL; return NULL;
} }
/* Recycle dirty block. Start from the oldest. */ // Recycle dirty block. Start from the oldest.
for (block = cl_block_list_head(&g_log.dirty_block_list); block != NULL; for (block = cl_block_list_head(&g_log.dirty_block_list); block != NULL;
block = block->link.next->block) { block = block->link.next->block) {
if (cl_block_try_disable_access(block, 1 /* discard data */)) { if (cl_block_try_disable_access(block, 1 /* discard data */)) {
...@@ -391,15 +386,15 @@ static cl_block* cl_allocate_block(void) { ...@@ -391,15 +386,15 @@ static cl_block* cl_allocate_block(void) {
return NULL; return NULL;
} }
/* Allocates a new block and updates core id => block mapping. 'old_block' // Allocates a new block and updates core id => block mapping. 'old_block'
points to the block that the caller thinks is attached to // points to the block that the caller thinks is attached to
'core_id'. 'old_block' may be NULL. Returns true if: // 'core_id'. 'old_block' may be NULL. Returns true if:
- allocated a new block OR // - allocated a new block OR
- 'core_id' => 'old_block' mapping changed (another thread allocated a // - 'core_id' => 'old_block' mapping changed (another thread allocated a
block before lock was acquired). */ // block before lock was acquired).
static bool cl_allocate_core_local_block(uint32_t core_id, static bool cl_allocate_core_local_block(uint32_t core_id,
cl_block* old_block) { cl_block* old_block) {
/* Now that we have the lock, check if core-local mapping has changed. */ // Now that we have the lock, check if core-local mapping has changed.
cl_core_local_block* core_local_block = &g_log.core_local_blocks[core_id]; cl_core_local_block* core_local_block = &g_log.core_local_blocks[core_id];
cl_block* block = cl_core_local_block_get_block(core_local_block); cl_block* block = cl_core_local_block_get_block(core_local_block);
if ((block != NULL) && (block != old_block)) { if ((block != NULL) && (block != old_block)) {
...@@ -411,7 +406,6 @@ static bool cl_allocate_core_local_block(uint32_t core_id, ...@@ -411,7 +406,6 @@ static bool cl_allocate_core_local_block(uint32_t core_id,
} }
block = cl_allocate_block(); block = cl_allocate_block();
if (block == NULL) { if (block == NULL) {
gpr_atm_rel_store(&g_log.is_full, 1);
return false; return false;
} }
cl_core_local_block_set_block(core_local_block, block); cl_core_local_block_set_block(core_local_block, block);
...@@ -425,21 +419,22 @@ static cl_block* cl_get_block(void* record) { ...@@ -425,21 +419,22 @@ static cl_block* cl_get_block(void* record) {
return &g_log.blocks[index]; return &g_log.blocks[index];
} }
/* Gets the next block to read and tries to free 'prev' block (if not NULL). // Gets the next block to read and tries to free 'prev' block (if not NULL).
Returns NULL if reached the end. */ // Returns NULL if reached the end.
static cl_block* cl_next_block_to_read(cl_block* prev) { static cl_block* cl_next_block_to_read(cl_block* prev) {
cl_block* block = NULL; cl_block* block = NULL;
if (g_log.read_iterator_state == g_log.num_cores) { if (g_log.read_iterator_state == g_log.num_cores) {
/* We are traversing dirty list; find the next dirty block. */ // We are traversing dirty list; find the next dirty block.
if (prev != NULL) { if (prev != NULL) {
/* Try to free the previous block if there is no unread data. This block // Try to free the previous block if there is no unread data. This
may have unread data if previously incomplete record completed between // block
read_next() calls. */ // may have unread data if previously incomplete record completed
// between
// read_next() calls.
block = prev->link.next->block; block = prev->link.next->block;
if (cl_block_try_disable_access(prev, 0 /* do not discard data */)) { if (cl_block_try_disable_access(prev, 0 /* do not discard data */)) {
cl_block_list_remove(&g_log.dirty_block_list, prev); cl_block_list_remove(&g_log.dirty_block_list, prev);
cl_block_list_insert_at_head(&g_log.free_block_list, prev); cl_block_list_insert_at_head(&g_log.free_block_list, prev);
gpr_atm_rel_store(&g_log.is_full, 0);
} }
} else { } else {
block = cl_block_list_head(&g_log.dirty_block_list); block = cl_block_list_head(&g_log.dirty_block_list);
...@@ -447,7 +442,7 @@ static cl_block* cl_next_block_to_read(cl_block* prev) { ...@@ -447,7 +442,7 @@ static cl_block* cl_next_block_to_read(cl_block* prev) {
if (block != NULL) { if (block != NULL) {
return block; return block;
} }
/* We are done with the dirty list; moving on to core-local blocks. */ // We are done with the dirty list; moving on to core-local blocks.
} }
while (g_log.read_iterator_state > 0) { while (g_log.read_iterator_state > 0) {
g_log.read_iterator_state--; g_log.read_iterator_state--;
...@@ -460,27 +455,26 @@ static cl_block* cl_next_block_to_read(cl_block* prev) { ...@@ -460,27 +455,26 @@ static cl_block* cl_next_block_to_read(cl_block* prev) {
return NULL; return NULL;
} }
#define CL_LOG_2_MB 20 /* 2^20 = 1MB */ #define CL_LOG_2_MB 20 // 2^20 = 1MB
/* External functions: primary stats_log interface */ // External functions: primary stats_log interface
void census_log_initialize(size_t size_in_mb, int discard_old_records) { void census_log_initialize(size_t size_in_mb, int discard_old_records) {
/* Check cacheline alignment. */ // Check cacheline alignment.
GPR_ASSERT(sizeof(cl_block) % GPR_CACHELINE_SIZE == 0); GPR_ASSERT(sizeof(cl_block) % GPR_CACHELINE_SIZE == 0);
GPR_ASSERT(sizeof(cl_core_local_block) % GPR_CACHELINE_SIZE == 0); GPR_ASSERT(sizeof(cl_core_local_block) % GPR_CACHELINE_SIZE == 0);
GPR_ASSERT(!g_log.initialized); GPR_ASSERT(!g_log.initialized);
g_log.discard_old_records = discard_old_records; g_log.discard_old_records = discard_old_records;
g_log.num_cores = gpr_cpu_num_cores(); g_log.num_cores = gpr_cpu_num_cores();
/* Ensure that we will not get any overflow in calaculating num_blocks */ // Ensure that we will not get any overflow in calaculating num_blocks
GPR_ASSERT(CL_LOG_2_MB >= CENSUS_LOG_2_MAX_RECORD_SIZE); GPR_ASSERT(CL_LOG_2_MB >= CENSUS_LOG_2_MAX_RECORD_SIZE);
GPR_ASSERT(size_in_mb < 1000); GPR_ASSERT(size_in_mb < 1000);
/* Ensure at least as many blocks as there are cores. */ // Ensure at least 2x as many blocks as there are cores.
g_log.num_blocks = g_log.num_blocks =
(uint32_t)GPR_MAX(g_log.num_cores, (size_in_mb << CL_LOG_2_MB) >> (uint32_t)GPR_MAX(2 * g_log.num_cores, (size_in_mb << CL_LOG_2_MB) >>
CENSUS_LOG_2_MAX_RECORD_SIZE); CENSUS_LOG_2_MAX_RECORD_SIZE);
gpr_mu_init(&g_log.lock); gpr_mu_init(&g_log.lock);
g_log.read_iterator_state = 0; g_log.read_iterator_state = 0;
g_log.block_being_read = NULL; g_log.block_being_read = NULL;
gpr_atm_rel_store(&g_log.is_full, 0);
g_log.core_local_blocks = (cl_core_local_block*)gpr_malloc_aligned( g_log.core_local_blocks = (cl_core_local_block*)gpr_malloc_aligned(
g_log.num_cores * sizeof(cl_core_local_block), GPR_CACHELINE_SIZE_LOG); g_log.num_cores * sizeof(cl_core_local_block), GPR_CACHELINE_SIZE_LOG);
memset(g_log.core_local_blocks, 0, memset(g_log.core_local_blocks, 0,
...@@ -515,7 +509,7 @@ void census_log_shutdown(void) { ...@@ -515,7 +509,7 @@ void census_log_shutdown(void) {
} }
void* census_log_start_write(size_t size) { void* census_log_start_write(size_t size) {
/* Used to bound number of times block allocation is attempted. */ // Used to bound number of times block allocation is attempted.
GPR_ASSERT(size > 0); GPR_ASSERT(size > 0);
GPR_ASSERT(g_log.initialized); GPR_ASSERT(g_log.initialized);
if (size > CENSUS_LOG_MAX_RECORD_SIZE) { if (size > CENSUS_LOG_MAX_RECORD_SIZE) {
...@@ -530,14 +524,10 @@ void* census_log_start_write(size_t size) { ...@@ -530,14 +524,10 @@ void* census_log_start_write(size_t size) {
if (block && (record = cl_block_start_write(block, size))) { if (block && (record = cl_block_start_write(block, size))) {
return record; return record;
} }
/* Need to allocate a new block. We are here if: // Need to allocate a new block. We are here if:
- No block associated with the core OR // - No block associated with the core OR
- Write in-progress on the block OR // - Write in-progress on the block OR
- block is out of space */ // - block is out of space
if (gpr_atm_acq_load(&g_log.is_full)) {
gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1);
return NULL;
}
gpr_mu_lock(&g_log.lock); gpr_mu_lock(&g_log.lock);
bool allocated = cl_allocate_core_local_block(core_id, block); bool allocated = cl_allocate_core_local_block(core_id, block);
gpr_mu_unlock(&g_log.lock); gpr_mu_unlock(&g_log.lock);
...@@ -546,7 +536,7 @@ void* census_log_start_write(size_t size) { ...@@ -546,7 +536,7 @@ void* census_log_start_write(size_t size) {
return NULL; return NULL;
} }
} while (attempts_remaining--); } while (attempts_remaining--);
/* Give up. */ // Give up.
gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1); gpr_atm_no_barrier_fetch_add(&g_log.out_of_space_count, 1);
return NULL; return NULL;
} }
...@@ -559,7 +549,7 @@ void census_log_end_write(void* record, size_t bytes_written) { ...@@ -559,7 +549,7 @@ void census_log_end_write(void* record, size_t bytes_written) {
void census_log_init_reader(void) { void census_log_init_reader(void) {
GPR_ASSERT(g_log.initialized); GPR_ASSERT(g_log.initialized);
gpr_mu_lock(&g_log.lock); gpr_mu_lock(&g_log.lock);
/* If a block is locked for reading unlock it. */ // If a block is locked for reading unlock it.
if (g_log.block_being_read != NULL) { if (g_log.block_being_read != NULL) {
cl_block_end_read(g_log.block_being_read); cl_block_end_read(g_log.block_being_read);
g_log.block_being_read = NULL; g_log.block_being_read = NULL;
...@@ -594,7 +584,7 @@ size_t census_log_remaining_space(void) { ...@@ -594,7 +584,7 @@ size_t census_log_remaining_space(void) {
size_t space = 0; size_t space = 0;
gpr_mu_lock(&g_log.lock); gpr_mu_lock(&g_log.lock);
if (g_log.discard_old_records) { if (g_log.discard_old_records) {
/* Remaining space is not meaningful; just return the entire log space. */ // Remaining space is not meaningful; just return the entire log space.
space = g_log.num_blocks << CENSUS_LOG_2_MAX_RECORD_SIZE; space = g_log.num_blocks << CENSUS_LOG_2_MAX_RECORD_SIZE;
} else { } else {
GPR_ASSERT(g_log.free_block_list.count >= 0); GPR_ASSERT(g_log.free_block_list.count >= 0);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment