Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
G
Grpc
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container registry
Model registry
Operate
Environments
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
tci-gateway-module
Grpc
Commits
e0341792
Commit
e0341792
authored
Mar 8, 2017
by
Craig Tiller
Browse files
Options
Downloads
Patches
Plain Diff
Remove most usage of the grpc_call lock
parent
c35a5b0e
No related branches found
No related tags found
No related merge requests found
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
src/core/lib/surface/call.c
+29
-48
29 additions, 48 deletions
src/core/lib/surface/call.c
with
29 additions
and
48 deletions
src/core/lib/surface/call.c
+
29
−
48
View file @
e0341792
...
@@ -143,9 +143,10 @@ struct grpc_call {
...
@@ -143,9 +143,10 @@ struct grpc_call {
grpc_channel
*
channel
;
grpc_channel
*
channel
;
grpc_call
*
parent
;
grpc_call
*
parent
;
grpc_call
*
first_child
;
grpc_call
*
first_child
;
gpr_atm
has_children
;
gpr_timespec
start_time
;
gpr_timespec
start_time
;
/*
TODO(ctiller): share with cq if possible?
*/
/*
protects first_child, setting has_children, and child next/prev links
*/
gpr_mu
mu
;
gpr_mu
child_list_
mu
;
/* client or server call */
/* client or server call */
bool
is_client
;
bool
is_client
;
...
@@ -161,7 +162,7 @@ struct grpc_call {
...
@@ -161,7 +162,7 @@ struct grpc_call {
bool
receiving_message
;
bool
receiving_message
;
bool
requested_final_op
;
bool
requested_final_op
;
bool
received_final_op
;
bool
received_final_op
;
bool
sent_any_op
;
gpr_atm
num_ops_sent
;
/* have we received initial metadata */
/* have we received initial metadata */
bool
has_initial_md_been_received
;
bool
has_initial_md_been_received
;
...
@@ -275,7 +276,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
...
@@ -275,7 +276,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN
(
"grpc_call_create"
,
0
);
GPR_TIMER_BEGIN
(
"grpc_call_create"
,
0
);
call
=
gpr_zalloc
(
sizeof
(
grpc_call
)
+
channel_stack
->
call_stack_size
);
call
=
gpr_zalloc
(
sizeof
(
grpc_call
)
+
channel_stack
->
call_stack_size
);
*
out_call
=
call
;
*
out_call
=
call
;
gpr_mu_init
(
&
call
->
mu
);
gpr_mu_init
(
&
call
->
child_list_
mu
);
call
->
channel
=
args
->
channel
;
call
->
channel
=
args
->
channel
;
call
->
cq
=
args
->
cq
;
call
->
cq
=
args
->
cq
;
call
->
parent
=
args
->
parent_call
;
call
->
parent
=
args
->
parent_call
;
...
@@ -313,7 +314,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
...
@@ -313,7 +314,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
GPR_ASSERT
(
call
->
is_client
);
GPR_ASSERT
(
call
->
is_client
);
GPR_ASSERT
(
!
args
->
parent_call
->
is_client
);
GPR_ASSERT
(
!
args
->
parent_call
->
is_client
);
gpr_mu_lock
(
&
args
->
parent_call
->
mu
);
gpr_mu_lock
(
&
args
->
parent_call
->
child_list_mu
);
gpr_atm_no_barrier_store
(
&
args
->
parent_call
->
has_children
,
1
);
if
(
args
->
propagation_mask
&
GRPC_PROPAGATE_DEADLINE
)
{
if
(
args
->
propagation_mask
&
GRPC_PROPAGATE_DEADLINE
)
{
send_deadline
=
gpr_time_min
(
send_deadline
=
gpr_time_min
(
...
@@ -353,7 +355,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
...
@@ -353,7 +355,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
call
;
call
;
}
}
gpr_mu_unlock
(
&
args
->
parent_call
->
mu
);
gpr_mu_unlock
(
&
args
->
parent_call
->
child_list_
mu
);
}
}
call
->
send_deadline
=
send_deadline
;
call
->
send_deadline
=
send_deadline
;
...
@@ -435,7 +437,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
...
@@ -435,7 +437,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
if
(
c
->
receiving_stream
!=
NULL
)
{
if
(
c
->
receiving_stream
!=
NULL
)
{
grpc_byte_stream_destroy
(
exec_ctx
,
c
->
receiving_stream
);
grpc_byte_stream_destroy
(
exec_ctx
,
c
->
receiving_stream
);
}
}
gpr_mu_destroy
(
&
c
->
mu
);
gpr_mu_destroy
(
&
c
->
child_list_
mu
);
for
(
ii
=
0
;
ii
<
c
->
send_extra_metadata_count
;
ii
++
)
{
for
(
ii
=
0
;
ii
<
c
->
send_extra_metadata_count
;
ii
++
)
{
GRPC_MDELEM_UNREF
(
exec_ctx
,
c
->
send_extra_metadata
[
ii
].
md
);
GRPC_MDELEM_UNREF
(
exec_ctx
,
c
->
send_extra_metadata
[
ii
].
md
);
}
}
...
@@ -473,7 +475,7 @@ void grpc_call_destroy(grpc_call *c) {
...
@@ -473,7 +475,7 @@ void grpc_call_destroy(grpc_call *c) {
GRPC_API_TRACE
(
"grpc_call_destroy(c=%p)"
,
1
,
(
c
));
GRPC_API_TRACE
(
"grpc_call_destroy(c=%p)"
,
1
,
(
c
));
if
(
parent
)
{
if
(
parent
)
{
gpr_mu_lock
(
&
parent
->
mu
);
gpr_mu_lock
(
&
parent
->
child_list_
mu
);
if
(
c
==
parent
->
first_child
)
{
if
(
c
==
parent
->
first_child
)
{
parent
->
first_child
=
c
->
sibling_next
;
parent
->
first_child
=
c
->
sibling_next
;
if
(
c
==
parent
->
first_child
)
{
if
(
c
==
parent
->
first_child
)
{
...
@@ -482,15 +484,13 @@ void grpc_call_destroy(grpc_call *c) {
...
@@ -482,15 +484,13 @@ void grpc_call_destroy(grpc_call *c) {
c
->
sibling_prev
->
sibling_next
=
c
->
sibling_next
;
c
->
sibling_prev
->
sibling_next
=
c
->
sibling_next
;
c
->
sibling_next
->
sibling_prev
=
c
->
sibling_prev
;
c
->
sibling_next
->
sibling_prev
=
c
->
sibling_prev
;
}
}
gpr_mu_unlock
(
&
parent
->
mu
);
gpr_mu_unlock
(
&
parent
->
child_list_
mu
);
GRPC_CALL_INTERNAL_UNREF
(
&
exec_ctx
,
parent
,
"child"
);
GRPC_CALL_INTERNAL_UNREF
(
&
exec_ctx
,
parent
,
"child"
);
}
}
gpr_mu_lock
(
&
c
->
mu
);
GPR_ASSERT
(
!
c
->
destroy_called
);
GPR_ASSERT
(
!
c
->
destroy_called
);
c
->
destroy_called
=
1
;
c
->
destroy_called
=
1
;
cancel
=
c
->
sent_any_op
&&
!
c
->
received_final_op
;
cancel
=
gpr_atm_no_barrier_load
(
&
c
->
num_ops_sent
)
&&
!
c
->
received_final_op
;
gpr_mu_unlock
(
&
c
->
mu
);
if
(
cancel
)
{
if
(
cancel
)
{
cancel_with_error
(
&
exec_ctx
,
c
,
STATUS_FROM_API_OVERRIDE
,
cancel_with_error
(
&
exec_ctx
,
c
,
STATUS_FROM_API_OVERRIDE
,
GRPC_ERROR_CANCELLED
);
GRPC_ERROR_CANCELLED
);
...
@@ -555,10 +555,8 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
...
@@ -555,10 +555,8 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
"c=%p, status=%d, description=%s, reserved=%p)"
,
"c=%p, status=%d, description=%s, reserved=%p)"
,
4
,
(
c
,
(
int
)
status
,
description
,
reserved
));
4
,
(
c
,
(
int
)
status
,
description
,
reserved
));
GPR_ASSERT
(
reserved
==
NULL
);
GPR_ASSERT
(
reserved
==
NULL
);
gpr_mu_lock
(
&
c
->
mu
);
cancel_with_status
(
&
exec_ctx
,
c
,
STATUS_FROM_API_OVERRIDE
,
status
,
cancel_with_status
(
&
exec_ctx
,
c
,
STATUS_FROM_API_OVERRIDE
,
status
,
description
);
description
);
gpr_mu_unlock
(
&
c
->
mu
);
grpc_exec_ctx_finish
(
&
exec_ctx
);
grpc_exec_ctx_finish
(
&
exec_ctx
);
return
GRPC_CALL_OK
;
return
GRPC_CALL_OK
;
}
}
...
@@ -715,9 +713,7 @@ static void set_incoming_compression_algorithm(
...
@@ -715,9 +713,7 @@ static void set_incoming_compression_algorithm(
grpc_compression_algorithm
grpc_call_test_only_get_compression_algorithm
(
grpc_compression_algorithm
grpc_call_test_only_get_compression_algorithm
(
grpc_call
*
call
)
{
grpc_call
*
call
)
{
grpc_compression_algorithm
algorithm
;
grpc_compression_algorithm
algorithm
;
gpr_mu_lock
(
&
call
->
mu
);
algorithm
=
call
->
incoming_compression_algorithm
;
algorithm
=
call
->
incoming_compression_algorithm
;
gpr_mu_unlock
(
&
call
->
mu
);
return
algorithm
;
return
algorithm
;
}
}
...
@@ -729,9 +725,7 @@ static grpc_compression_algorithm compression_algorithm_for_level_locked(
...
@@ -729,9 +725,7 @@ static grpc_compression_algorithm compression_algorithm_for_level_locked(
uint32_t
grpc_call_test_only_get_message_flags
(
grpc_call
*
call
)
{
uint32_t
grpc_call_test_only_get_message_flags
(
grpc_call
*
call
)
{
uint32_t
flags
;
uint32_t
flags
;
gpr_mu_lock
(
&
call
->
mu
);
flags
=
call
->
test_only_last_message_flags
;
flags
=
call
->
test_only_last_message_flags
;
gpr_mu_unlock
(
&
call
->
mu
);
return
flags
;
return
flags
;
}
}
...
@@ -785,9 +779,7 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
...
@@ -785,9 +779,7 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
uint32_t
grpc_call_test_only_get_encodings_accepted_by_peer
(
grpc_call
*
call
)
{
uint32_t
grpc_call_test_only_get_encodings_accepted_by_peer
(
grpc_call
*
call
)
{
uint32_t
encodings_accepted_by_peer
;
uint32_t
encodings_accepted_by_peer
;
gpr_mu_lock
(
&
call
->
mu
);
encodings_accepted_by_peer
=
call
->
encodings_accepted_by_peer
;
encodings_accepted_by_peer
=
call
->
encodings_accepted_by_peer
;
gpr_mu_unlock
(
&
call
->
mu
);
return
encodings_accepted_by_peer
;
return
encodings_accepted_by_peer
;
}
}
...
@@ -1083,8 +1075,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
...
@@ -1083,8 +1075,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
grpc_call
*
call
=
bctl
->
call
;
grpc_call
*
call
=
bctl
->
call
;
grpc_error
*
error
=
consolidate_batch_errors
(
bctl
);
grpc_error
*
error
=
consolidate_batch_errors
(
bctl
);
gpr_mu_lock
(
&
call
->
mu
);
if
(
bctl
->
send_initial_metadata
)
{
if
(
bctl
->
send_initial_metadata
)
{
grpc_metadata_batch_destroy
(
grpc_metadata_batch_destroy
(
exec_ctx
,
exec_ctx
,
...
@@ -1105,6 +1095,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
...
@@ -1105,6 +1095,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
call
->
received_final_op
=
true
;
call
->
received_final_op
=
true
;
/* propagate cancellation to any interested children */
/* propagate cancellation to any interested children */
if
(
gpr_atm_no_barrier_load
(
&
call
->
has_children
))
{
gpr_mu_lock
(
&
call
->
child_list_mu
);
child_call
=
call
->
first_child
;
child_call
=
call
->
first_child
;
if
(
child_call
!=
NULL
)
{
if
(
child_call
!=
NULL
)
{
do
{
do
{
...
@@ -1117,6 +1109,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
...
@@ -1117,6 +1109,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
child_call
=
next_child_call
;
child_call
=
next_child_call
;
}
while
(
child_call
!=
call
->
first_child
);
}
while
(
child_call
!=
call
->
first_child
);
}
}
gpr_mu_unlock
(
&
call
->
child_list_mu
);
}
if
(
call
->
is_client
)
{
if
(
call
->
is_client
)
{
get_final_status
(
call
,
set_status_value_directly
,
get_final_status
(
call
,
set_status_value_directly
,
...
@@ -1130,7 +1124,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
...
@@ -1130,7 +1124,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF
(
error
);
GRPC_ERROR_UNREF
(
error
);
error
=
GRPC_ERROR_NONE
;
error
=
GRPC_ERROR_NONE
;
}
}
gpr_mu_unlock
(
&
call
->
mu
);
if
(
bctl
->
is_notify_tag_closure
)
{
if
(
bctl
->
is_notify_tag_closure
)
{
/* unrefs bctl->error */
/* unrefs bctl->error */
...
@@ -1221,7 +1214,6 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
...
@@ -1221,7 +1214,6 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error
*
error
)
{
grpc_error
*
error
)
{
batch_control
*
bctl
=
bctlp
;
batch_control
*
bctl
=
bctlp
;
grpc_call
*
call
=
bctl
->
call
;
grpc_call
*
call
=
bctl
->
call
;
gpr_mu_lock
(
&
bctl
->
call
->
mu
);
if
(
error
!=
GRPC_ERROR_NONE
)
{
if
(
error
!=
GRPC_ERROR_NONE
)
{
if
(
call
->
receiving_stream
!=
NULL
)
{
if
(
call
->
receiving_stream
!=
NULL
)
{
grpc_byte_stream_destroy
(
exec_ctx
,
call
->
receiving_stream
);
grpc_byte_stream_destroy
(
exec_ctx
,
call
->
receiving_stream
);
...
@@ -1233,11 +1225,9 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
...
@@ -1233,11 +1225,9 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
if
(
call
->
has_initial_md_been_received
||
error
!=
GRPC_ERROR_NONE
||
if
(
call
->
has_initial_md_been_received
||
error
!=
GRPC_ERROR_NONE
||
call
->
receiving_stream
==
NULL
)
{
call
->
receiving_stream
==
NULL
)
{
gpr_mu_unlock
(
&
bctl
->
call
->
mu
);
process_data_after_md
(
exec_ctx
,
bctlp
);
process_data_after_md
(
exec_ctx
,
bctlp
);
}
else
{
}
else
{
call
->
saved_receiving_stream_ready_bctlp
=
bctlp
;
call
->
saved_receiving_stream_ready_bctlp
=
bctlp
;
gpr_mu_unlock
(
&
bctl
->
call
->
mu
);
}
}
}
}
...
@@ -1309,8 +1299,6 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
...
@@ -1309,8 +1299,6 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
batch_control
*
bctl
=
bctlp
;
batch_control
*
bctl
=
bctlp
;
grpc_call
*
call
=
bctl
->
call
;
grpc_call
*
call
=
bctl
->
call
;
gpr_mu_lock
(
&
call
->
mu
);
add_batch_error
(
exec_ctx
,
bctl
,
GRPC_ERROR_REF
(
error
),
false
);
add_batch_error
(
exec_ctx
,
bctl
,
GRPC_ERROR_REF
(
error
),
false
);
if
(
error
==
GRPC_ERROR_NONE
)
{
if
(
error
==
GRPC_ERROR_NONE
)
{
grpc_metadata_batch
*
md
=
grpc_metadata_batch
*
md
=
...
@@ -1336,11 +1324,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
...
@@ -1336,11 +1324,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
receiving_stream_ready
,
call
->
saved_receiving_stream_ready_bctlp
,
receiving_stream_ready
,
call
->
saved_receiving_stream_ready_bctlp
,
grpc_schedule_on_exec_ctx
);
grpc_schedule_on_exec_ctx
);
call
->
saved_receiving_stream_ready_bctlp
=
NULL
;
call
->
saved_receiving_stream_ready_bctlp
=
NULL
;
grpc_closure_
sched
(
exec_ctx
,
saved_rsr_closure
,
GRPC_ERROR_REF
(
error
));
grpc_closure_
run
(
exec_ctx
,
saved_rsr_closure
,
GRPC_ERROR_REF
(
error
));
}
}
gpr_mu_unlock
(
&
call
->
mu
);
finish_batch_step
(
exec_ctx
,
bctl
);
finish_batch_step
(
exec_ctx
,
bctl
);
}
}
...
@@ -1393,7 +1379,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
...
@@ -1393,7 +1379,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl
->
notify_tag
=
notify_tag
;
bctl
->
notify_tag
=
notify_tag
;
bctl
->
is_notify_tag_closure
=
(
uint8_t
)(
is_notify_tag_closure
!=
0
);
bctl
->
is_notify_tag_closure
=
(
uint8_t
)(
is_notify_tag_closure
!=
0
);
gpr_mu_lock
(
&
call
->
mu
);
grpc_transport_stream_op
*
stream_op
=
&
bctl
->
op
;
grpc_transport_stream_op
*
stream_op
=
&
bctl
->
op
;
memset
(
stream_op
,
0
,
sizeof
(
*
stream_op
));
memset
(
stream_op
,
0
,
sizeof
(
*
stream_op
));
stream_op
->
covered_by_poller
=
true
;
stream_op
->
covered_by_poller
=
true
;
...
@@ -1679,8 +1664,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
...
@@ -1679,8 +1664,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_closure_init
(
&
bctl
->
finish_batch
,
finish_batch
,
bctl
,
grpc_closure_init
(
&
bctl
->
finish_batch
,
finish_batch
,
bctl
,
grpc_schedule_on_exec_ctx
);
grpc_schedule_on_exec_ctx
);
stream_op
->
on_complete
=
&
bctl
->
finish_batch
;
stream_op
->
on_complete
=
&
bctl
->
finish_batch
;
call
->
sent_any_op
=
true
;
gpr_atm_no_barrier_fetch_add
(
&
call
->
num_ops_sent
,
1
);
gpr_mu_unlock
(
&
call
->
mu
);
execute_op
(
exec_ctx
,
call
,
stream_op
);
execute_op
(
exec_ctx
,
call
,
stream_op
);
...
@@ -1711,7 +1695,6 @@ done_with_error:
...
@@ -1711,7 +1695,6 @@ done_with_error:
if
(
bctl
->
recv_final_op
)
{
if
(
bctl
->
recv_final_op
)
{
call
->
requested_final_op
=
0
;
call
->
requested_final_op
=
0
;
}
}
gpr_mu_unlock
(
&
call
->
mu
);
goto
done
;
goto
done
;
}
}
...
@@ -1760,10 +1743,8 @@ uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; }
...
@@ -1760,10 +1743,8 @@ uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; }
grpc_compression_algorithm
grpc_call_compression_for_level
(
grpc_compression_algorithm
grpc_call_compression_for_level
(
grpc_call
*
call
,
grpc_compression_level
level
)
{
grpc_call
*
call
,
grpc_compression_level
level
)
{
gpr_mu_lock
(
&
call
->
mu
);
grpc_compression_algorithm
algo
=
grpc_compression_algorithm
algo
=
compression_algorithm_for_level_locked
(
call
,
level
);
compression_algorithm_for_level_locked
(
call
,
level
);
gpr_mu_unlock
(
&
call
->
mu
);
return
algo
;
return
algo
;
}
}
...
...
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
sign in
to comment