Skip to content
Snippets Groups Projects
Commit de8a573b authored by Craig Tiller's avatar Craig Tiller
Browse files

Merge github.com:grpc/grpc into oh_sanity_my_sanity

parents aa272bcf a0433d4e
No related branches found
No related tags found
No related merge requests found
...@@ -55,6 +55,8 @@ typedef struct http_connect_handshaker { ...@@ -55,6 +55,8 @@ typedef struct http_connect_handshaker {
grpc_handshaker base; grpc_handshaker base;
char* proxy_server; char* proxy_server;
grpc_http_header* headers;
size_t num_headers;
gpr_refcount refcount; gpr_refcount refcount;
gpr_mu mu; gpr_mu mu;
...@@ -90,6 +92,11 @@ static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx, ...@@ -90,6 +92,11 @@ static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx,
gpr_free(handshaker->read_buffer_to_destroy); gpr_free(handshaker->read_buffer_to_destroy);
} }
gpr_free(handshaker->proxy_server); gpr_free(handshaker->proxy_server);
for (size_t i = 0; i < handshaker->num_headers; ++i) {
gpr_free(handshaker->headers[i].key);
gpr_free(handshaker->headers[i].value);
}
gpr_free(handshaker->headers);
grpc_slice_buffer_destroy_internal(exec_ctx, &handshaker->write_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &handshaker->write_buffer);
grpc_http_parser_destroy(&handshaker->http_parser); grpc_http_parser_destroy(&handshaker->http_parser);
grpc_http_response_destroy(&handshaker->http_response); grpc_http_response_destroy(&handshaker->http_response);
...@@ -290,6 +297,8 @@ static void http_connect_handshaker_do_handshake( ...@@ -290,6 +297,8 @@ static void http_connect_handshaker_do_handshake(
request.host = server_name; request.host = server_name;
request.http.method = "CONNECT"; request.http.method = "CONNECT";
request.http.path = server_name; request.http.path = server_name;
request.http.hdrs = handshaker->headers;
request.http.hdr_count = handshaker->num_headers;
request.handshaker = &grpc_httpcli_plaintext; request.handshaker = &grpc_httpcli_plaintext;
grpc_slice request_slice = grpc_httpcli_format_connect_request(&request); grpc_slice request_slice = grpc_httpcli_format_connect_request(&request);
grpc_slice_buffer_add(&handshaker->write_buffer, request_slice); grpc_slice_buffer_add(&handshaker->write_buffer, request_slice);
...@@ -307,7 +316,9 @@ static const grpc_handshaker_vtable http_connect_handshaker_vtable = { ...@@ -307,7 +316,9 @@ static const grpc_handshaker_vtable http_connect_handshaker_vtable = {
http_connect_handshaker_destroy, http_connect_handshaker_shutdown, http_connect_handshaker_destroy, http_connect_handshaker_shutdown,
http_connect_handshaker_do_handshake}; http_connect_handshaker_do_handshake};
grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server) { grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
grpc_http_header* headers,
size_t num_headers) {
GPR_ASSERT(proxy_server != NULL); GPR_ASSERT(proxy_server != NULL);
http_connect_handshaker* handshaker = gpr_malloc(sizeof(*handshaker)); http_connect_handshaker* handshaker = gpr_malloc(sizeof(*handshaker));
memset(handshaker, 0, sizeof(*handshaker)); memset(handshaker, 0, sizeof(*handshaker));
...@@ -315,6 +326,14 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server) { ...@@ -315,6 +326,14 @@ grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server) {
gpr_mu_init(&handshaker->mu); gpr_mu_init(&handshaker->mu);
gpr_ref_init(&handshaker->refcount, 1); gpr_ref_init(&handshaker->refcount, 1);
handshaker->proxy_server = gpr_strdup(proxy_server); handshaker->proxy_server = gpr_strdup(proxy_server);
if (num_headers > 0) {
handshaker->headers = gpr_malloc(sizeof(grpc_http_header) * num_headers);
for (size_t i = 0; i < num_headers; ++i) {
handshaker->headers[i].key = gpr_strdup(headers[i].key);
handshaker->headers[i].value = gpr_strdup(headers[i].value);
}
handshaker->num_headers = num_headers;
}
grpc_slice_buffer_init(&handshaker->write_buffer); grpc_slice_buffer_init(&handshaker->write_buffer);
grpc_closure_init(&handshaker->request_done_closure, on_write_done, grpc_closure_init(&handshaker->request_done_closure, on_write_done,
handshaker, grpc_schedule_on_exec_ctx); handshaker, grpc_schedule_on_exec_ctx);
...@@ -358,8 +377,9 @@ static void handshaker_factory_add_handshakers( ...@@ -358,8 +377,9 @@ static void handshaker_factory_add_handshakers(
const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) { const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) {
char* proxy_name = grpc_get_http_proxy_server(); char* proxy_name = grpc_get_http_proxy_server();
if (proxy_name != NULL) { if (proxy_name != NULL) {
grpc_handshake_manager_add(handshake_mgr, grpc_handshake_manager_add(
grpc_http_connect_handshaker_create(proxy_name)); handshake_mgr,
grpc_http_connect_handshaker_create(proxy_name, NULL, 0));
gpr_free(proxy_name); gpr_free(proxy_name);
} }
} }
......
...@@ -35,9 +35,12 @@ ...@@ -35,9 +35,12 @@
#define GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H #define GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H
#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/http/parser.h"
/// Creates a new HTTP CONNECT handshaker. /// Creates a new HTTP CONNECT handshaker.
grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server); grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
grpc_http_header* headers,
size_t num_headers);
/// Returns the name of the proxy to use, or NULL if no proxy is configured. /// Returns the name of the proxy to use, or NULL if no proxy is configured.
/// Caller takes ownership of result. /// Caller takes ownership of result.
......
...@@ -68,12 +68,8 @@ class BenchmarkClient: ...@@ -68,12 +68,8 @@ class BenchmarkClient:
else: else:
channel = grpc.insecure_channel(server) channel = grpc.insecure_channel(server)
connected_event = threading.Event() # waits for the channel to be ready before we start sending messages
def wait_for_ready(connectivity): grpc.channel_ready_future(channel).result()
if connectivity == grpc.ChannelConnectivity.READY:
connected_event.set()
channel.subscribe(wait_for_ready, try_to_connect=True)
connected_event.wait()
if config.payload_config.WhichOneof('payload') == 'simple_params': if config.payload_config.WhichOneof('payload') == 'simple_params':
self._generic = False self._generic = False
......
...@@ -110,10 +110,13 @@ def _get_channel(target, args): ...@@ -110,10 +110,13 @@ def _get_channel(target, args):
channel_credentials = grpc.ssl_channel_credentials( channel_credentials = grpc.ssl_channel_credentials(
root_certificates=root_certificates) root_certificates=root_certificates)
options = (('grpc.ssl_target_name_override', args.server_host_override,),) options = (('grpc.ssl_target_name_override', args.server_host_override,),)
return grpc.secure_channel( channel = grpc.secure_channel(target, channel_credentials, options=options)
target, channel_credentials, options=options)
else: else:
return grpc.insecure_channel(target) channel = grpc.insecure_channel(target)
# waits for the channel to be ready before we start sending messages
grpc.channel_ready_future(channel).result()
return channel
def run_test(args): def run_test(args):
test_cases = _parse_weighted_test_cases(args.test_cases) test_cases = _parse_weighted_test_cases(args.test_cases)
......
...@@ -73,7 +73,6 @@ class H2ProtocolBaseServer(twisted.internet.protocol.Protocol): ...@@ -73,7 +73,6 @@ class H2ProtocolBaseServer(twisted.internet.protocol.Protocol):
def on_connection_lost(self, reason): def on_connection_lost(self, reason):
logging.info('Disconnected %s' % reason) logging.info('Disconnected %s' % reason)
twisted.internet.reactor.callFromThread(twisted.internet.reactor.stop)
def dataReceived(self, data): def dataReceived(self, data):
try: try:
......
...@@ -73,18 +73,32 @@ class H2Factory(twisted.internet.protocol.Factory): ...@@ -73,18 +73,32 @@ class H2Factory(twisted.internet.protocol.Factory):
else: else:
return t().get_base_server() return t().get_base_server()
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--base_port', type=int, default=8080,
help='base port to run the servers (default: 8080). One test server is '
'started on each incrementing port, beginning with base_port, in the '
'following order: goaway,max_streams,ping,rst_after_data,rst_after_header,'
'rst_during_data'
)
return parser.parse_args()
def start_test_servers(base_port):
""" Start one server per test case on incrementing port numbers
beginning with base_port """
index = 0
for test_case in sorted(_TEST_CASE_MAPPING.keys()):
portnum = base_port + index
logging.warning('serving on port %d : %s'%(portnum, test_case))
endpoint = twisted.internet.endpoints.TCP4ServerEndpoint(
twisted.internet.reactor, portnum, backlog=128)
endpoint.listen(H2Factory(test_case))
index += 1
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig( logging.basicConfig(
format='%(levelname) -10s %(asctime)s %(module)s:%(lineno)s | %(message)s', format='%(levelname) -10s %(asctime)s %(module)s:%(lineno)s | %(message)s',
level=logging.INFO) level=logging.INFO)
parser = argparse.ArgumentParser() args = parse_arguments()
parser.add_argument('--test_case', choices=sorted(_TEST_CASE_MAPPING.keys()), start_test_servers(args.base_port)
help='test case to run', required=True)
parser.add_argument('--port', type=int, default=8080,
help='port to run the server (default: 8080)')
args = parser.parse_args()
logging.info('Running test case %s on port %d' % (args.test_case, args.port))
endpoint = twisted.internet.endpoints.TCP4ServerEndpoint(
twisted.internet.reactor, args.port, backlog=128)
endpoint.listen(H2Factory(args.test_case))
twisted.internet.reactor.run() twisted.internet.reactor.run()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment