Skip to content
Snippets Groups Projects
Commit 3cce2ecb authored by Sree Kuchibhotla's avatar Sree Kuchibhotla
Browse files

Merge pull request #4859 from ctiller/sceq

Subchannel Sharing  [The interop tests failures and Basic test failures are unrelated to this change. Merging]
parents ac94c430 c0ce00fd
Branches
Tags
No related merge requests found
Showing
with 520 additions and 35 deletions
...@@ -194,6 +194,7 @@ cc_library( ...@@ -194,6 +194,7 @@ cc_library(
"src/core/client_config/resolvers/sockaddr_resolver.h", "src/core/client_config/resolvers/sockaddr_resolver.h",
"src/core/client_config/subchannel.h", "src/core/client_config/subchannel.h",
"src/core/client_config/subchannel_factory.h", "src/core/client_config/subchannel_factory.h",
"src/core/client_config/subchannel_index.h",
"src/core/client_config/uri_parser.h", "src/core/client_config/uri_parser.h",
"src/core/compression/algorithm_metadata.h", "src/core/compression/algorithm_metadata.h",
"src/core/compression/message_compress.h", "src/core/compression/message_compress.h",
...@@ -332,6 +333,7 @@ cc_library( ...@@ -332,6 +333,7 @@ cc_library(
"src/core/client_config/resolvers/sockaddr_resolver.c", "src/core/client_config/resolvers/sockaddr_resolver.c",
"src/core/client_config/subchannel.c", "src/core/client_config/subchannel.c",
"src/core/client_config/subchannel_factory.c", "src/core/client_config/subchannel_factory.c",
"src/core/client_config/subchannel_index.c",
"src/core/client_config/uri_parser.c", "src/core/client_config/uri_parser.c",
"src/core/compression/algorithm.c", "src/core/compression/algorithm.c",
"src/core/compression/message_compress.c", "src/core/compression/message_compress.c",
...@@ -498,6 +500,7 @@ cc_library( ...@@ -498,6 +500,7 @@ cc_library(
"src/core/client_config/resolvers/sockaddr_resolver.h", "src/core/client_config/resolvers/sockaddr_resolver.h",
"src/core/client_config/subchannel.h", "src/core/client_config/subchannel.h",
"src/core/client_config/subchannel_factory.h", "src/core/client_config/subchannel_factory.h",
"src/core/client_config/subchannel_index.h",
"src/core/client_config/uri_parser.h", "src/core/client_config/uri_parser.h",
"src/core/compression/algorithm_metadata.h", "src/core/compression/algorithm_metadata.h",
"src/core/compression/message_compress.h", "src/core/compression/message_compress.h",
...@@ -616,6 +619,7 @@ cc_library( ...@@ -616,6 +619,7 @@ cc_library(
"src/core/client_config/resolvers/sockaddr_resolver.c", "src/core/client_config/resolvers/sockaddr_resolver.c",
"src/core/client_config/subchannel.c", "src/core/client_config/subchannel.c",
"src/core/client_config/subchannel_factory.c", "src/core/client_config/subchannel_factory.c",
"src/core/client_config/subchannel_index.c",
"src/core/client_config/uri_parser.c", "src/core/client_config/uri_parser.c",
"src/core/compression/algorithm.c", "src/core/compression/algorithm.c",
"src/core/compression/message_compress.c", "src/core/compression/message_compress.c",
...@@ -1293,6 +1297,7 @@ objc_library( ...@@ -1293,6 +1297,7 @@ objc_library(
"src/core/client_config/resolvers/sockaddr_resolver.c", "src/core/client_config/resolvers/sockaddr_resolver.c",
"src/core/client_config/subchannel.c", "src/core/client_config/subchannel.c",
"src/core/client_config/subchannel_factory.c", "src/core/client_config/subchannel_factory.c",
"src/core/client_config/subchannel_index.c",
"src/core/client_config/uri_parser.c", "src/core/client_config/uri_parser.c",
"src/core/compression/algorithm.c", "src/core/compression/algorithm.c",
"src/core/compression/message_compress.c", "src/core/compression/message_compress.c",
...@@ -1454,6 +1459,7 @@ objc_library( ...@@ -1454,6 +1459,7 @@ objc_library(
"src/core/client_config/resolvers/sockaddr_resolver.h", "src/core/client_config/resolvers/sockaddr_resolver.h",
"src/core/client_config/subchannel.h", "src/core/client_config/subchannel.h",
"src/core/client_config/subchannel_factory.h", "src/core/client_config/subchannel_factory.h",
"src/core/client_config/subchannel_index.h",
"src/core/client_config/uri_parser.h", "src/core/client_config/uri_parser.h",
"src/core/compression/algorithm_metadata.h", "src/core/compression/algorithm_metadata.h",
"src/core/compression/message_compress.h", "src/core/compression/message_compress.h",
......
...@@ -2366,6 +2366,7 @@ LIBGRPC_SRC = \ ...@@ -2366,6 +2366,7 @@ LIBGRPC_SRC = \
src/core/client_config/resolvers/sockaddr_resolver.c \ src/core/client_config/resolvers/sockaddr_resolver.c \
src/core/client_config/subchannel.c \ src/core/client_config/subchannel.c \
src/core/client_config/subchannel_factory.c \ src/core/client_config/subchannel_factory.c \
src/core/client_config/subchannel_index.c \
src/core/client_config/uri_parser.c \ src/core/client_config/uri_parser.c \
src/core/compression/algorithm.c \ src/core/compression/algorithm.c \
src/core/compression/message_compress.c \ src/core/compression/message_compress.c \
...@@ -2651,6 +2652,7 @@ LIBGRPC_UNSECURE_SRC = \ ...@@ -2651,6 +2652,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/client_config/resolvers/sockaddr_resolver.c \ src/core/client_config/resolvers/sockaddr_resolver.c \
src/core/client_config/subchannel.c \ src/core/client_config/subchannel.c \
src/core/client_config/subchannel_factory.c \ src/core/client_config/subchannel_factory.c \
src/core/client_config/subchannel_index.c \
src/core/client_config/uri_parser.c \ src/core/client_config/uri_parser.c \
src/core/compression/algorithm.c \ src/core/compression/algorithm.c \
src/core/compression/message_compress.c \ src/core/compression/message_compress.c \
......
...@@ -605,6 +605,7 @@ ...@@ -605,6 +605,7 @@
'src/core/client_config/resolvers/sockaddr_resolver.c', 'src/core/client_config/resolvers/sockaddr_resolver.c',
'src/core/client_config/subchannel.c', 'src/core/client_config/subchannel.c',
'src/core/client_config/subchannel_factory.c', 'src/core/client_config/subchannel_factory.c',
'src/core/client_config/subchannel_index.c',
'src/core/client_config/uri_parser.c', 'src/core/client_config/uri_parser.c',
'src/core/compression/algorithm.c', 'src/core/compression/algorithm.c',
'src/core/compression/message_compress.c', 'src/core/compression/message_compress.c',
......
...@@ -270,6 +270,7 @@ filegroups: ...@@ -270,6 +270,7 @@ filegroups:
- src/core/client_config/resolvers/sockaddr_resolver.h - src/core/client_config/resolvers/sockaddr_resolver.h
- src/core/client_config/subchannel.h - src/core/client_config/subchannel.h
- src/core/client_config/subchannel_factory.h - src/core/client_config/subchannel_factory.h
- src/core/client_config/subchannel_index.h
- src/core/client_config/uri_parser.h - src/core/client_config/uri_parser.h
- src/core/compression/algorithm_metadata.h - src/core/compression/algorithm_metadata.h
- src/core/compression/message_compress.h - src/core/compression/message_compress.h
...@@ -385,6 +386,7 @@ filegroups: ...@@ -385,6 +386,7 @@ filegroups:
- src/core/client_config/resolvers/sockaddr_resolver.c - src/core/client_config/resolvers/sockaddr_resolver.c
- src/core/client_config/subchannel.c - src/core/client_config/subchannel.c
- src/core/client_config/subchannel_factory.c - src/core/client_config/subchannel_factory.c
- src/core/client_config/subchannel_index.c
- src/core/client_config/uri_parser.c - src/core/client_config/uri_parser.c
- src/core/compression/algorithm.c - src/core/compression/algorithm.c
- src/core/compression/message_compress.c - src/core/compression/message_compress.c
...@@ -2627,8 +2629,8 @@ configs: ...@@ -2627,8 +2629,8 @@ configs:
LDXX: clang++ LDXX: clang++
compile_the_world: true compile_the_world: true
test_environ: test_environ:
ASAN_OPTIONS: suppressions=tools/asan_suppressions.txt:detect_leaks=1:color=always ASAN_OPTIONS: detect_leaks=1:color=always
LSAN_OPTIONS: suppressions=tools/asan_suppressions.txt:report_objects=1 LSAN_OPTIONS: suppressions=tools/lsan_suppressions.txt:report_objects=1
timeout_multiplier: 1.5 timeout_multiplier: 1.5
asan-noleaks: asan-noleaks:
CC: clang CC: clang
......
...@@ -198,6 +198,7 @@ Pod::Spec.new do |s| ...@@ -198,6 +198,7 @@ Pod::Spec.new do |s|
'src/core/client_config/resolvers/sockaddr_resolver.h', 'src/core/client_config/resolvers/sockaddr_resolver.h',
'src/core/client_config/subchannel.h', 'src/core/client_config/subchannel.h',
'src/core/client_config/subchannel_factory.h', 'src/core/client_config/subchannel_factory.h',
'src/core/client_config/subchannel_index.h',
'src/core/client_config/uri_parser.h', 'src/core/client_config/uri_parser.h',
'src/core/compression/algorithm_metadata.h', 'src/core/compression/algorithm_metadata.h',
'src/core/compression/message_compress.h', 'src/core/compression/message_compress.h',
...@@ -349,6 +350,7 @@ Pod::Spec.new do |s| ...@@ -349,6 +350,7 @@ Pod::Spec.new do |s|
'src/core/client_config/resolvers/sockaddr_resolver.c', 'src/core/client_config/resolvers/sockaddr_resolver.c',
'src/core/client_config/subchannel.c', 'src/core/client_config/subchannel.c',
'src/core/client_config/subchannel_factory.c', 'src/core/client_config/subchannel_factory.c',
'src/core/client_config/subchannel_index.c',
'src/core/client_config/uri_parser.c', 'src/core/client_config/uri_parser.c',
'src/core/compression/algorithm.c', 'src/core/compression/algorithm.c',
'src/core/compression/message_compress.c', 'src/core/compression/message_compress.c',
...@@ -506,6 +508,7 @@ Pod::Spec.new do |s| ...@@ -506,6 +508,7 @@ Pod::Spec.new do |s|
'src/core/client_config/resolvers/sockaddr_resolver.h', 'src/core/client_config/resolvers/sockaddr_resolver.h',
'src/core/client_config/subchannel.h', 'src/core/client_config/subchannel.h',
'src/core/client_config/subchannel_factory.h', 'src/core/client_config/subchannel_factory.h',
'src/core/client_config/subchannel_index.h',
'src/core/client_config/uri_parser.h', 'src/core/client_config/uri_parser.h',
'src/core/compression/algorithm_metadata.h', 'src/core/compression/algorithm_metadata.h',
'src/core/compression/message_compress.h', 'src/core/compression/message_compress.h',
......
...@@ -194,6 +194,7 @@ Gem::Specification.new do |s| ...@@ -194,6 +194,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/client_config/resolvers/sockaddr_resolver.h ) s.files += %w( src/core/client_config/resolvers/sockaddr_resolver.h )
s.files += %w( src/core/client_config/subchannel.h ) s.files += %w( src/core/client_config/subchannel.h )
s.files += %w( src/core/client_config/subchannel_factory.h ) s.files += %w( src/core/client_config/subchannel_factory.h )
s.files += %w( src/core/client_config/subchannel_index.h )
s.files += %w( src/core/client_config/uri_parser.h ) s.files += %w( src/core/client_config/uri_parser.h )
s.files += %w( src/core/compression/algorithm_metadata.h ) s.files += %w( src/core/compression/algorithm_metadata.h )
s.files += %w( src/core/compression/message_compress.h ) s.files += %w( src/core/compression/message_compress.h )
...@@ -332,6 +333,7 @@ Gem::Specification.new do |s| ...@@ -332,6 +333,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/client_config/resolvers/sockaddr_resolver.c ) s.files += %w( src/core/client_config/resolvers/sockaddr_resolver.c )
s.files += %w( src/core/client_config/subchannel.c ) s.files += %w( src/core/client_config/subchannel.c )
s.files += %w( src/core/client_config/subchannel_factory.c ) s.files += %w( src/core/client_config/subchannel_factory.c )
s.files += %w( src/core/client_config/subchannel_index.c )
s.files += %w( src/core/client_config/uri_parser.c ) s.files += %w( src/core/client_config/uri_parser.c )
s.files += %w( src/core/compression/algorithm.c ) s.files += %w( src/core/compression/algorithm.c )
s.files += %w( src/core/compression/message_compress.c ) s.files += %w( src/core/compression/message_compress.c )
......
...@@ -68,6 +68,12 @@ typedef enum { ...@@ -68,6 +68,12 @@ typedef enum {
GRPC_ARG_POINTER GRPC_ARG_POINTER
} grpc_arg_type; } grpc_arg_type;
typedef struct grpc_arg_pointer_vtable {
void *(*copy)(void *p);
void (*destroy)(void *p);
int (*cmp)(void *p, void *q);
} grpc_arg_pointer_vtable;
/** A single argument... each argument has a key and a value /** A single argument... each argument has a key and a value
A note on naming keys: A note on naming keys:
...@@ -88,8 +94,7 @@ typedef struct { ...@@ -88,8 +94,7 @@ typedef struct {
int integer; int integer;
struct { struct {
void *p; void *p;
void *(*copy)(void *p); const grpc_arg_pointer_vtable *vtable;
void (*destroy)(void *p);
} pointer; } pointer;
} value; } value;
} grpc_arg; } grpc_arg;
......
...@@ -81,11 +81,12 @@ GPRAPI void gpr_avl_unref(gpr_avl avl); ...@@ -81,11 +81,12 @@ GPRAPI void gpr_avl_unref(gpr_avl avl);
if key exists in avl, the new tree's key entry updated if key exists in avl, the new tree's key entry updated
(i.e. a duplicate is not created) */ (i.e. a duplicate is not created) */
GPRAPI gpr_avl gpr_avl_add(gpr_avl avl, void *key, void *value); GPRAPI gpr_avl gpr_avl_add(gpr_avl avl, void *key, void *value);
/** return a new tree with key deleted */ /** return a new tree with key deleted
implicitly unrefs avl to allow easy chaining. */
GPRAPI gpr_avl gpr_avl_remove(gpr_avl avl, void *key); GPRAPI gpr_avl gpr_avl_remove(gpr_avl avl, void *key);
/** lookup key, and return the associated value. /** lookup key, and return the associated value.
does not mutate avl. does not mutate avl.
returns NULL if key is not found. */ returns NULL if key is not found. */
GPRAPI void *gpr_avl_get(gpr_avl avl, void *key); GPRAPI void *gpr_avl_get(gpr_avl avl, void *key);
#endif #endif /* GRPC_SUPPORT_AVL_H */
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
...@@ -72,4 +72,6 @@ ...@@ -72,4 +72,6 @@
0x0f0f0f0f) % \ 0x0f0f0f0f) % \
255) 255)
#define GPR_ICMP(a, b) ((a) < (b) ? -1 : ((a) > (b) ? 1 : 0))
#endif /* GRPC_SUPPORT_USEFUL_H */ #endif /* GRPC_SUPPORT_USEFUL_H */
...@@ -139,6 +139,7 @@ ...@@ -139,6 +139,7 @@
"src/core/client_config/resolvers/sockaddr_resolver.h", "src/core/client_config/resolvers/sockaddr_resolver.h",
"src/core/client_config/subchannel.h", "src/core/client_config/subchannel.h",
"src/core/client_config/subchannel_factory.h", "src/core/client_config/subchannel_factory.h",
"src/core/client_config/subchannel_index.h",
"src/core/client_config/uri_parser.h", "src/core/client_config/uri_parser.h",
"src/core/compression/algorithm_metadata.h", "src/core/compression/algorithm_metadata.h",
"src/core/compression/message_compress.h", "src/core/compression/message_compress.h",
...@@ -277,6 +278,7 @@ ...@@ -277,6 +278,7 @@
"src/core/client_config/resolvers/sockaddr_resolver.c", "src/core/client_config/resolvers/sockaddr_resolver.c",
"src/core/client_config/subchannel.c", "src/core/client_config/subchannel.c",
"src/core/client_config/subchannel_factory.c", "src/core/client_config/subchannel_factory.c",
"src/core/client_config/subchannel_index.c",
"src/core/client_config/uri_parser.c", "src/core/client_config/uri_parser.c",
"src/core/compression/algorithm.c", "src/core/compression/algorithm.c",
"src/core/compression/message_compress.c", "src/core/compression/message_compress.c",
......
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include <grpc/census.h> #include <grpc/census.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
...@@ -55,9 +56,8 @@ static grpc_arg copy_arg(const grpc_arg *src) { ...@@ -55,9 +56,8 @@ static grpc_arg copy_arg(const grpc_arg *src) {
break; break;
case GRPC_ARG_POINTER: case GRPC_ARG_POINTER:
dst.value.pointer = src->value.pointer; dst.value.pointer = src->value.pointer;
dst.value.pointer.p = src->value.pointer.copy dst.value.pointer.p =
? src->value.pointer.copy(src->value.pointer.p) src->value.pointer.vtable->copy(src->value.pointer.p);
: src->value.pointer.p;
break; break;
} }
return dst; return dst;
...@@ -94,6 +94,58 @@ grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, ...@@ -94,6 +94,58 @@ grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
return grpc_channel_args_copy_and_add(a, b->args, b->num_args); return grpc_channel_args_copy_and_add(a, b->args, b->num_args);
} }
static int cmp_arg(const grpc_arg *a, const grpc_arg *b) {
int c = GPR_ICMP(a->type, b->type);
if (c != 0) return c;
c = strcmp(a->key, b->key);
if (c != 0) return c;
switch (a->type) {
case GRPC_ARG_STRING:
return strcmp(a->value.string, b->value.string);
case GRPC_ARG_INTEGER:
return GPR_ICMP(a->value.integer, b->value.integer);
case GRPC_ARG_POINTER:
c = GPR_ICMP(a->value.pointer.p, b->value.pointer.p);
if (c != 0) {
c = GPR_ICMP(a->value.pointer.vtable, b->value.pointer.vtable);
if (c == 0) {
c = a->value.pointer.vtable->cmp(a->value.pointer.p,
b->value.pointer.p);
}
}
return c;
}
GPR_UNREACHABLE_CODE(return 0);
}
/* stabilizing comparison function: since channel_args ordering matters for
* keys with the same name, we need to preserve that ordering */
static int cmp_key_stable(const void *ap, const void *bp) {
const grpc_arg *const *a = ap;
const grpc_arg *const *b = bp;
int c = strcmp((*a)->key, (*b)->key);
if (c == 0) c = GPR_ICMP(*a, *b);
return c;
}
grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *a) {
grpc_arg **args = gpr_malloc(sizeof(grpc_arg *) * a->num_args);
for (size_t i = 0; i < a->num_args; i++) {
args[i] = &a->args[i];
}
qsort(args, a->num_args, sizeof(grpc_arg *), cmp_key_stable);
grpc_channel_args *b = gpr_malloc(sizeof(grpc_channel_args));
b->num_args = a->num_args;
b->args = gpr_malloc(sizeof(grpc_arg) * b->num_args);
for (size_t i = 0; i < a->num_args; i++) {
b->args[i] = copy_arg(args[i]);
}
gpr_free(args);
return b;
}
void grpc_channel_args_destroy(grpc_channel_args *a) { void grpc_channel_args_destroy(grpc_channel_args *a) {
size_t i; size_t i;
for (i = 0; i < a->num_args; i++) { for (i = 0; i < a->num_args; i++) {
...@@ -104,9 +156,7 @@ void grpc_channel_args_destroy(grpc_channel_args *a) { ...@@ -104,9 +156,7 @@ void grpc_channel_args_destroy(grpc_channel_args *a) {
case GRPC_ARG_INTEGER: case GRPC_ARG_INTEGER:
break; break;
case GRPC_ARG_POINTER: case GRPC_ARG_POINTER:
if (a->args[i].value.pointer.destroy) { a->args[i].value.pointer.vtable->destroy(a->args[i].value.pointer.p);
a->args[i].value.pointer.destroy(a->args[i].value.pointer.p);
}
break; break;
} }
gpr_free(a->args[i].key); gpr_free(a->args[i].key);
...@@ -208,3 +258,14 @@ int grpc_channel_args_compression_algorithm_get_states( ...@@ -208,3 +258,14 @@ int grpc_channel_args_compression_algorithm_get_states(
return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */ return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */
} }
} }
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b) {
int c = GPR_ICMP(a->num_args, b->num_args);
if (c != 0) return c;
for (size_t i = 0; i < a->num_args; i++) {
c = cmp_arg(&a->args[i], &b->args[i]);
if (c != 0) return c;
}
return 0;
}
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
...@@ -40,6 +40,9 @@ ...@@ -40,6 +40,9 @@
/* Copy some arguments */ /* Copy some arguments */
grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src); grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src);
/* Copy some arguments, stably sorting keys */
grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *a);
/** Copy some arguments and add the to_add parameter in the end. /** Copy some arguments and add the to_add parameter in the end.
If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */ If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */
grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
...@@ -85,4 +88,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( ...@@ -85,4 +88,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
int grpc_channel_args_compression_algorithm_get_states( int grpc_channel_args_compression_algorithm_get_states(
const grpc_channel_args *a); const grpc_channel_args *a);
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */ #endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
...@@ -33,8 +33,9 @@ ...@@ -33,8 +33,9 @@
#include "src/core/client_config/connector.h" #include "src/core/client_config/connector.h"
void grpc_connector_ref(grpc_connector* connector) { grpc_connector* grpc_connector_ref(grpc_connector* connector) {
connector->vtable->ref(connector); connector->vtable->ref(connector);
return connector;
} }
void grpc_connector_unref(grpc_exec_ctx* exec_ctx, grpc_connector* connector) { void grpc_connector_unref(grpc_exec_ctx* exec_ctx, grpc_connector* connector) {
......
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
...@@ -81,7 +81,7 @@ struct grpc_connector_vtable { ...@@ -81,7 +81,7 @@ struct grpc_connector_vtable {
grpc_connect_out_args *out_args, grpc_closure *notify); grpc_connect_out_args *out_args, grpc_closure *notify);
}; };
void grpc_connector_ref(grpc_connector *connector); grpc_connector *grpc_connector_ref(grpc_connector *connector);
void grpc_connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *connector); void grpc_connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *connector);
/** Connect using the connector: max one outstanding call at a time */ /** Connect using the connector: max one outstanding call at a time */
void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector, void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
......
...@@ -36,16 +36,17 @@ ...@@ -36,16 +36,17 @@
#include <string.h> #include <string.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
#include "src/core/channel/channel_args.h" #include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h" #include "src/core/channel/client_channel.h"
#include "src/core/channel/connected_channel.h" #include "src/core/channel/connected_channel.h"
#include "src/core/client_config/initial_connect_string.h" #include "src/core/client_config/initial_connect_string.h"
#include "src/core/client_config/subchannel_index.h"
#include "src/core/iomgr/timer.h" #include "src/core/iomgr/timer.h"
#include "src/core/profiling/timers.h" #include "src/core/profiling/timers.h"
#include "src/core/surface/channel.h" #include "src/core/surface/channel.h"
#include "src/core/transport/connectivity_state.h" #include "src/core/transport/connectivity_state.h"
#include "src/core/transport/connectivity_state.h"
#define INTERNAL_REF_BITS 16 #define INTERNAL_REF_BITS 16
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
...@@ -94,6 +95,8 @@ struct grpc_subchannel { ...@@ -94,6 +95,8 @@ struct grpc_subchannel {
struct sockaddr *addr; struct sockaddr *addr;
size_t addr_len; size_t addr_len;
grpc_subchannel_key *key;
/** initial string to send to peer */ /** initial string to send to peer */
gpr_slice initial_connect_string; gpr_slice initial_connect_string;
...@@ -207,6 +210,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -207,6 +210,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
grpc_connector_unref(exec_ctx, c->connector); grpc_connector_unref(exec_ctx, c->connector);
grpc_pollset_set_destroy(&c->pollset_set); grpc_pollset_set_destroy(&c->pollset_set);
grpc_subchannel_key_destroy(exec_ctx, c->key);
gpr_free(c); gpr_free(c);
} }
...@@ -222,22 +226,42 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, ...@@ -222,22 +226,42 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
return old_val; return old_val;
} }
void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs; gpr_atm old_refs;
old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS),
0 REF_MUTATE_PURPOSE("STRONG_REF")); 0 REF_MUTATE_PURPOSE("STRONG_REF"));
GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0);
return c;
} }
void grpc_subchannel_weak_ref(grpc_subchannel *c grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs; gpr_atm old_refs;
old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
GPR_ASSERT(old_refs != 0); GPR_ASSERT(old_refs != 0);
return c;
}
grpc_subchannel *grpc_subchannel_ref_from_weak_ref(
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
if (!c) return NULL;
for (;;) {
gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair);
if (old_refs >= (1 << INTERNAL_REF_BITS)) {
gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS);
if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) {
return c;
}
} else {
return NULL;
}
}
} }
static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_connected_subchannel *con; grpc_connected_subchannel *con;
grpc_subchannel_index_unregister(exec_ctx, c->key, c);
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected); GPR_ASSERT(!c->disconnected);
c->disconnected = 1; c->disconnected = 1;
...@@ -276,10 +300,19 @@ static uint32_t random_seed() { ...@@ -276,10 +300,19 @@ static uint32_t random_seed() {
return (uint32_t)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); return (uint32_t)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC)));
} }
grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_connector *connector,
grpc_subchannel_args *args) { grpc_subchannel_args *args) {
grpc_subchannel *c = gpr_malloc(sizeof(*c)); grpc_subchannel_key *key = grpc_subchannel_key_create(connector, args);
grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key);
if (c) {
grpc_subchannel_key_destroy(exec_ctx, key);
return c;
}
c = gpr_malloc(sizeof(*c));
memset(c, 0, sizeof(*c)); memset(c, 0, sizeof(*c));
c->key = key;
gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS);
c->connector = connector; c->connector = connector;
grpc_connector_ref(c->connector); grpc_connector_ref(c->connector);
...@@ -305,7 +338,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, ...@@ -305,7 +338,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel"); "subchannel");
gpr_mu_init(&c->mu); gpr_mu_init(&c->mu);
return c;
return grpc_subchannel_index_register(exec_ctx, key, c);
} }
static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
......
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
...@@ -48,6 +48,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; ...@@ -48,6 +48,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
#ifdef GRPC_STREAM_REFCOUNT_DEBUG #ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define GRPC_SUBCHANNEL_REF(p, r) \ #define GRPC_SUBCHANNEL_REF(p, r) \
grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) grpc_subchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
grpc_subchannel_ref_from_weak_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_UNREF(cl, p, r) \ #define GRPC_SUBCHANNEL_UNREF(cl, p, r) \
grpc_subchannel_unref((cl), (p), __FILE__, __LINE__, (r)) grpc_subchannel_unref((cl), (p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) \ #define GRPC_SUBCHANNEL_WEAK_REF(p, r) \
...@@ -66,6 +68,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; ...@@ -66,6 +68,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
, const char *file, int line, const char *reason , const char *file, int line, const char *reason
#else #else
#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p))
#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
grpc_subchannel_ref_from_weak_ref((p))
#define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p))
#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \ #define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \
...@@ -79,12 +83,14 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; ...@@ -79,12 +83,14 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
#endif #endif
void grpc_subchannel_ref(grpc_subchannel *channel grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS); GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
grpc_subchannel *grpc_subchannel_ref_from_weak_ref(
grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS); GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_weak_ref(grpc_subchannel *channel grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS); GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel grpc_subchannel *channel
...@@ -146,6 +152,8 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack( ...@@ -146,6 +152,8 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack(
grpc_subchannel_call *subchannel_call); grpc_subchannel_call *subchannel_call);
struct grpc_subchannel_args { struct grpc_subchannel_args {
/* When updating this struct, also update subchannel_index.c */
/** Channel filters for this channel - wrapped factories will likely /** Channel filters for this channel - wrapped factories will likely
want to mutate this */ want to mutate this */
const grpc_channel_filter **filters; const grpc_channel_filter **filters;
...@@ -159,7 +167,8 @@ struct grpc_subchannel_args { ...@@ -159,7 +167,8 @@ struct grpc_subchannel_args {
}; };
/** create a subchannel given a connector */ /** create a subchannel given a connector */
grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_connector *connector,
grpc_subchannel_args *args); grpc_subchannel_args *args);
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */ #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */
//
//
// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
//
#include "src/core/client_config/subchannel_index.h"
#include <stdbool.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
#include <grpc/support/tls.h>
#include "src/core/channel/channel_args.h"
// a map of subchannel_key --> subchannel, used for detecting connections
// to the same destination in order to share them
static gpr_avl g_subchannel_index;
static gpr_mu g_mu;
struct grpc_subchannel_key {
grpc_connector *connector;
grpc_subchannel_args args;
};
GPR_TLS_DECL(subchannel_index_exec_ctx);
static void enter_ctx(grpc_exec_ctx *exec_ctx) {
GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == 0);
gpr_tls_set(&subchannel_index_exec_ctx, (intptr_t)exec_ctx);
}
static void leave_ctx(grpc_exec_ctx *exec_ctx) {
GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == (intptr_t)exec_ctx);
gpr_tls_set(&subchannel_index_exec_ctx, 0);
}
static grpc_exec_ctx *current_ctx() {
grpc_exec_ctx *c = (grpc_exec_ctx *)gpr_tls_get(&subchannel_index_exec_ctx);
GPR_ASSERT(c != NULL);
return c;
}
static grpc_subchannel_key *create_key(
grpc_connector *connector, grpc_subchannel_args *args,
grpc_channel_args *(*copy_channel_args)(const grpc_channel_args *args)) {
grpc_subchannel_key *k = gpr_malloc(sizeof(*k));
k->connector = grpc_connector_ref(connector);
k->args.filter_count = args->filter_count;
k->args.filters = gpr_malloc(sizeof(*k->args.filters) * k->args.filter_count);
memcpy((grpc_channel_filter *)k->args.filters, args->filters,
sizeof(*k->args.filters) * k->args.filter_count);
k->args.addr_len = args->addr_len;
k->args.addr = gpr_malloc(args->addr_len);
memcpy(k->args.addr, args->addr, k->args.addr_len);
k->args.args = copy_channel_args(args->args);
return k;
}
grpc_subchannel_key *grpc_subchannel_key_create(grpc_connector *connector,
grpc_subchannel_args *args) {
return create_key(connector, args, grpc_channel_args_normalize);
}
static grpc_subchannel_key *subchannel_key_copy(grpc_subchannel_key *k) {
return create_key(k->connector, &k->args, grpc_channel_args_copy);
}
static int subchannel_key_compare(grpc_subchannel_key *a,
grpc_subchannel_key *b) {
int c = GPR_ICMP(a->connector, b->connector);
if (c != 0) return c;
c = GPR_ICMP(a->args.addr_len, b->args.addr_len);
if (c != 0) return c;
c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
if (c != 0) return c;
c = memcmp(a->args.addr, b->args.addr, a->args.addr_len);
if (c != 0) return c;
c = memcmp(a->args.filters, b->args.filters,
a->args.filter_count * sizeof(*a->args.filters));
return grpc_channel_args_compare(a->args.args, b->args.args);
}
void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *k) {
grpc_connector_unref(exec_ctx, k->connector);
gpr_free(k->args.addr);
gpr_free((grpc_channel_args *)k->args.filters);
grpc_channel_args_destroy((grpc_channel_args *)k->args.args);
gpr_free(k);
}
static void sck_avl_destroy(void *p) {
grpc_subchannel_key_destroy(current_ctx(), p);
}
static void *sck_avl_copy(void *p) { return subchannel_key_copy(p); }
static long sck_avl_compare(void *a, void *b) {
return subchannel_key_compare(a, b);
}
static void scv_avl_destroy(void *p) {
GRPC_SUBCHANNEL_WEAK_UNREF(current_ctx(), p, "subchannel_index");
}
static void *scv_avl_copy(void *p) {
GRPC_SUBCHANNEL_WEAK_REF(p, "subchannel_index");
return p;
}
static const gpr_avl_vtable subchannel_avl_vtable = {
.destroy_key = sck_avl_destroy,
.copy_key = sck_avl_copy,
.compare_keys = sck_avl_compare,
.destroy_value = scv_avl_destroy,
.copy_value = scv_avl_copy};
void grpc_subchannel_index_init(void) {
g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable);
gpr_mu_init(&g_mu);
}
void grpc_subchannel_index_shutdown(void) {
gpr_mu_destroy(&g_mu);
gpr_avl_unref(g_subchannel_index);
}
grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key) {
enter_ctx(exec_ctx);
// Lock, and take a reference to the subchannel index.
// We don't need to do the search under a lock as avl's are immutable.
gpr_mu_lock(&g_mu);
gpr_avl index = gpr_avl_ref(g_subchannel_index);
gpr_mu_unlock(&g_mu);
grpc_subchannel *c =
GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(gpr_avl_get(index, key), "index_find");
gpr_avl_unref(index);
leave_ctx(exec_ctx);
return c;
}
grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key,
grpc_subchannel *constructed) {
enter_ctx(exec_ctx);
grpc_subchannel *c = NULL;
while (c == NULL) {
// Compare and swap loop:
// - take a reference to the current index
gpr_mu_lock(&g_mu);
gpr_avl index = gpr_avl_ref(g_subchannel_index);
gpr_mu_unlock(&g_mu);
// - Check to see if a subchannel already exists
c = gpr_avl_get(index, key);
if (c != NULL) {
// yes -> we're done
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, constructed, "index_register");
} else {
// no -> update the avl and compare/swap
gpr_avl updated =
gpr_avl_add(gpr_avl_ref(index), subchannel_key_copy(key),
GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"));
// it may happen (but it's expected to be unlikely)
// that some other thread has changed the index:
// compare/swap here to check that, and retry as necessary
gpr_mu_lock(&g_mu);
if (index.root == g_subchannel_index.root) {
GPR_SWAP(gpr_avl, updated, g_subchannel_index);
c = constructed;
}
gpr_mu_unlock(&g_mu);
gpr_avl_unref(updated);
}
gpr_avl_unref(index);
}
leave_ctx(exec_ctx);
return c;
}
void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key,
grpc_subchannel *constructed) {
enter_ctx(exec_ctx);
bool done = false;
while (!done) {
// Compare and swap loop:
// - take a reference to the current index
gpr_mu_lock(&g_mu);
gpr_avl index = gpr_avl_ref(g_subchannel_index);
gpr_mu_unlock(&g_mu);
// Check to see if this key still refers to the previously
// registered subchannel
grpc_subchannel *c = gpr_avl_get(index, key);
if (c != constructed) {
gpr_avl_unref(index);
break;
}
// compare and swap the update (some other thread may have
// mutated the index behind us)
gpr_avl updated = gpr_avl_remove(gpr_avl_ref(index), key);
gpr_mu_lock(&g_mu);
if (index.root == g_subchannel_index.root) {
GPR_SWAP(gpr_avl, updated, g_subchannel_index);
done = true;
}
gpr_mu_unlock(&g_mu);
gpr_avl_unref(updated);
gpr_avl_unref(index);
}
leave_ctx(exec_ctx);
}
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_INDEX_H
#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_INDEX_H
#include "src/core/client_config/connector.h"
#include "src/core/client_config/subchannel.h"
/** \file Provides an index of active subchannels so that they can be
shared amongst channels */
typedef struct grpc_subchannel_key grpc_subchannel_key;
/** Create a key that can be used to uniquely identify a subchannel */
grpc_subchannel_key *grpc_subchannel_key_create(grpc_connector *con,
grpc_subchannel_args *args);
/** Destroy a subchannel key */
void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key);
/** Given a subchannel key, find the subchannel registered for it.
Returns NULL if no such channel exists.
Thread-safe. */
grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key);
/** Register a subchannel against a key.
Takes ownership of \a constructed.
Returns the registered subchannel. This may be different from
\a constructed in the case of a registration race. */
grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key,
grpc_subchannel *constructed);
/** Remove \a constructed as the registered subchannel for \a key. */
void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key,
grpc_subchannel *constructed);
/** Initialize the subchannel index (global) */
void grpc_subchannel_index_init(void);
/** Shutdown the subchannel index (global) */
void grpc_subchannel_index_shutdown(void);
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_INDEX_H */
...@@ -196,14 +196,21 @@ static void *server_credentials_pointer_arg_copy(void *p) { ...@@ -196,14 +196,21 @@ static void *server_credentials_pointer_arg_copy(void *p) {
return grpc_server_credentials_ref(p); return grpc_server_credentials_ref(p);
} }
static int server_credentials_pointer_cmp(void *a, void *b) {
return GPR_ICMP(a, b);
}
static const grpc_arg_pointer_vtable cred_ptr_vtable = {
server_credentials_pointer_arg_copy, server_credentials_pointer_arg_destroy,
server_credentials_pointer_cmp};
grpc_arg grpc_server_credentials_to_arg(grpc_server_credentials *p) { grpc_arg grpc_server_credentials_to_arg(grpc_server_credentials *p) {
grpc_arg arg; grpc_arg arg;
memset(&arg, 0, sizeof(grpc_arg)); memset(&arg, 0, sizeof(grpc_arg));
arg.type = GRPC_ARG_POINTER; arg.type = GRPC_ARG_POINTER;
arg.key = GRPC_SERVER_CREDENTIALS_ARG; arg.key = GRPC_SERVER_CREDENTIALS_ARG;
arg.value.pointer.p = p; arg.value.pointer.p = p;
arg.value.pointer.copy = server_credentials_pointer_arg_copy; arg.value.pointer.vtable = &cred_ptr_vtable;
arg.value.pointer.destroy = server_credentials_pointer_arg_destroy;
return arg; return arg;
} }
......
...@@ -202,12 +202,17 @@ static void *connector_pointer_arg_copy(void *p) { ...@@ -202,12 +202,17 @@ static void *connector_pointer_arg_copy(void *p) {
return GRPC_SECURITY_CONNECTOR_REF(p, "connector_pointer_arg"); return GRPC_SECURITY_CONNECTOR_REF(p, "connector_pointer_arg");
} }
static int connector_pointer_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
static const grpc_arg_pointer_vtable connector_pointer_vtable = {
connector_pointer_arg_copy, connector_pointer_arg_destroy,
connector_pointer_cmp};
grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc) { grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc) {
grpc_arg result; grpc_arg result;
result.type = GRPC_ARG_POINTER; result.type = GRPC_ARG_POINTER;
result.key = GRPC_SECURITY_CONNECTOR_ARG; result.key = GRPC_SECURITY_CONNECTOR_ARG;
result.value.pointer.destroy = connector_pointer_arg_destroy; result.value.pointer.vtable = &connector_pointer_vtable;
result.value.pointer.copy = connector_pointer_arg_copy;
result.value.pointer.p = sc; result.value.pointer.p = sc;
return result; return result;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment