Skip to content
Snippets Groups Projects
Commit c584d995 authored by Mark D. Roth's avatar Mark D. Roth
Browse files

Eliminate some code duplication.

parent a9bd9433
No related branches found
No related tags found
No related merge requests found
......@@ -106,6 +106,31 @@ static void cleanup_args_for_failure_locked(
handshaker->args->args = NULL;
}
// If the handshake failed or we're shutting down, clean up and invoke the
// callback with the error.
static void handshake_failed_locked(grpc_exec_ctx* exec_ctx,
http_connect_handshaker* handshaker,
grpc_error* error) {
if (error == GRPC_ERROR_NONE) {
// If we were shut down after an endpoint operation succeeded but
// before the endpoint callback was invoked, we need to generate our
// own error.
error = GRPC_ERROR_CREATE("Handshaker shutdown");
}
if (!handshaker->shutdown) {
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
// Not shutting down, so the handshake failed. Clean up before
// invoking the callback.
cleanup_args_for_failure_locked(handshaker);
}
// Invoke callback.
grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
}
// Callback invoked when finished writing HTTP CONNECT request.
static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
......@@ -114,25 +139,7 @@ static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
if (error != GRPC_ERROR_NONE || handshaker->shutdown) {
// If the write failed or we're shutting down, clean up and invoke the
// callback with the error.
if (error == GRPC_ERROR_NONE) {
// If we were shut down after the write succeeded but before this
// callback was invoked, we need to generate our own error.
error = GRPC_ERROR_CREATE("Handshaker shutdown");
} else {
GRPC_ERROR_REF(error); // Take ref for the handshake-done callback.
}
if (!handshaker->shutdown) {
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
// Not shutting down, so the write failed. Clean up before
// invoking the callback.
cleanup_args_for_failure_locked(handshaker);
}
// Invoke callback.
grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
handshake_failed_locked(exec_ctx, handshaker, GRPC_ERROR_REF(error));
gpr_mu_unlock(&handshaker->mu);
http_connect_handshaker_unref(exec_ctx, handshaker);
} else {
......@@ -151,25 +158,9 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
http_connect_handshaker* handshaker = arg;
gpr_mu_lock(&handshaker->mu);
if (error != GRPC_ERROR_NONE || handshaker->shutdown) {
// If the write failed or we're shutting down, clean up and invoke the
// If the read failed or we're shutting down, clean up and invoke the
// callback with the error.
if (error == GRPC_ERROR_NONE) {
// If we were shut down after the write succeeded but before this
// callback was invoked, we need to generate our own error.
error = GRPC_ERROR_CREATE("Handshaker shutdown");
} else {
GRPC_ERROR_REF(error); // Take ref for the handshake-done callback.
}
if (!handshaker->shutdown) {
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
// Not shutting down, so the write failed. Clean up before
// invoking the callback.
cleanup_args_for_failure_locked(handshaker);
}
handshake_failed_locked(exec_ctx, handshaker, GRPC_ERROR_REF(error));
goto done;
}
// Add buffer to parser.
......@@ -179,7 +170,10 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
error = grpc_http_parser_parse(&handshaker->http_parser,
handshaker->args->read_buffer->slices[i],
&body_start_offset);
if (error != GRPC_ERROR_NONE) goto done;
if (error != GRPC_ERROR_NONE) {
handshake_failed_locked(exec_ctx, handshaker, error);
goto done;
}
if (handshaker->http_parser.state == GRPC_HTTP_BODY) {
// Remove the data we've already read from the read buffer,
// leaving only the leftover bytes (if any).
......@@ -228,10 +222,12 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
handshaker->http_response.status);
error = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
handshake_failed_locked(exec_ctx, handshaker, error);
goto done;
}
done:
// Invoke handshake-done callback.
// Success. Invoke handshake-done callback.
grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
done:
gpr_mu_unlock(&handshaker->mu);
http_connect_handshaker_unref(exec_ctx, handshaker);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment