Skip to content
Snippets Groups Projects
Commit a47563ca authored by Craig Tiller's avatar Craig Tiller
Browse files

progress

parent 2da58cf6
No related branches found
No related tags found
No related merge requests found
...@@ -74,6 +74,30 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { ...@@ -74,6 +74,30 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
c = next; c = next;
} }
} }
if (exec_ctx->stealing_from_workqueue != NULL) {
if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue,
exec_ctx->stolen_closure,
exec_ctx->stolen_closure->error);
GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
"exec_ctx_sched");
exec_ctx->stealing_from_workqueue = NULL;
exec_ctx->stolen_closure = NULL;
} else {
grpc_closure *c = exec_ctx->stolen_closure;
GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
"exec_ctx_sched");
exec_ctx->stealing_from_workqueue = NULL;
exec_ctx->stolen_closure = NULL;
grpc_error *error = c->error;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush.stolen_cb", 0);
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
GPR_TIMER_END("grpc_exec_ctx_flush.stolen_cb", 0);
grpc_exec_ctx_flush(exec_ctx);
return true;
}
}
GPR_TIMER_END("grpc_exec_ctx_flush", 0); GPR_TIMER_END("grpc_exec_ctx_flush", 0);
return did_something; return did_something;
} }
...@@ -88,9 +112,20 @@ void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, ...@@ -88,9 +112,20 @@ void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_workqueue *offload_target_or_null) { grpc_workqueue *offload_target_or_null) {
if (offload_target_or_null == NULL) { if (offload_target_or_null == NULL) {
grpc_closure_list_append(&exec_ctx->closure_list, closure, error); grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
} else { } else if (exec_ctx->stealing_from_workqueue == NULL) {
exec_ctx->stealing_from_workqueue = offload_target_or_null;
closure->error = error;
exec_ctx->stolen_closure = closure;
} else if (exec_ctx->stealing_from_workqueue != offload_target_or_null) {
grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error); grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error);
GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched"); GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
} else { /* stealing_from_workqueue == offload_target_or_null */
grpc_workqueue_enqueue(exec_ctx, offload_target_or_null,
exec_ctx->stolen_closure,
exec_ctx->stolen_closure->error);
closure->error = error;
exec_ctx->stolen_closure = closure;
GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
} }
} }
......
...@@ -66,6 +66,8 @@ typedef struct grpc_combiner grpc_combiner; ...@@ -66,6 +66,8 @@ typedef struct grpc_combiner grpc_combiner;
*/ */
struct grpc_exec_ctx { struct grpc_exec_ctx {
grpc_closure_list closure_list; grpc_closure_list closure_list;
grpc_workqueue *stealing_from_workqueue;
grpc_closure *stolen_closure;
/** currently active combiner: updated only via combiner.c */ /** currently active combiner: updated only via combiner.c */
grpc_combiner *active_combiner; grpc_combiner *active_combiner;
bool cached_ready_to_finish; bool cached_ready_to_finish;
...@@ -74,7 +76,10 @@ struct grpc_exec_ctx { ...@@ -74,7 +76,10 @@ struct grpc_exec_ctx {
}; };
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ #define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
{ GRPC_CLOSURE_LIST_INIT, NULL, false, finish_check_arg, finish_check } { \
GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, false, finish_check_arg, \
finish_check \
}
#else #else
struct grpc_exec_ctx { struct grpc_exec_ctx {
bool cached_ready_to_finish; bool cached_ready_to_finish;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment