diff options
| author | Emily Shaffer <emilyshaffer@google.com> | 2026-01-28 23:39:18 +0200 |
|---|---|---|
| committer | Junio C Hamano <gitster@pobox.com> | 2026-01-28 15:47:02 -0800 |
| commit | ec0becacc9847406f2b0147a81f62e023b006351 (patch) | |
| tree | b946638740d07c099873886bba58ad8661e68411 /run-command.c | |
| parent | dee82860688cdcd35cbf0ce7e74736e473c5b697 (diff) | |
| download | git-ec0becacc9847406f2b0147a81f62e023b006351.tar.xz | |
run-command: add stdin callback for parallelization
If a user of the run_processes_parallel() API wants to pipe a large
amount of information to the stdin of each parallel command, that
data could exceed the pipe buffer of the process's stdin and can be
too big to store in-memory via strbuf & friends or to slurp to a file.
Generally this is solved by repeatedly writing to child_process.in
between calls to start_command() and finish_command(). For a specific
pre-existing example of this, see transport.c:run_pre_push_hook().
This adds a generic callback API to run_processes_parallel() to do
exactly that in a unified manner, similar to the existing callback APIs,
which can then be used by hooks.h to convert the remaining hooks to the
new, simpler parallel interface.
Signed-off-by: Emily Shaffer <emilyshaffer@google.com>
Signed-off-by: Ævar Arnfjörð Bjarmason <avarab@gmail.com>
Signed-off-by: Adrian Ratiu <adrian.ratiu@collabora.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
Diffstat (limited to 'run-command.c')
| -rw-r--r-- | run-command.c | 87 |
1 files changed, 80 insertions, 7 deletions
diff --git a/run-command.c b/run-command.c index 3989673569..aaf0e4ecee 100644 --- a/run-command.c +++ b/run-command.c @@ -1490,6 +1490,16 @@ static int child_is_working(const struct parallel_child *pp_child) return pp_child->state == GIT_CP_WORKING; } +static int child_is_ready_for_cleanup(const struct parallel_child *pp_child) +{ + return child_is_working(pp_child) && !pp_child->process.in; +} + +static int child_is_receiving_input(const struct parallel_child *pp_child) +{ + return child_is_working(pp_child) && pp_child->process.in > 0; +} + struct parallel_processes { size_t nr_processes; @@ -1659,6 +1669,44 @@ static int pp_start_one(struct parallel_processes *pp, return 0; } +static void pp_buffer_stdin(struct parallel_processes *pp, + const struct run_process_parallel_opts *opts) +{ + /* Buffer stdin for each pipe. */ + for (size_t i = 0; i < opts->processes; i++) { + struct child_process *proc = &pp->children[i].process; + int ret; + + if (!child_is_receiving_input(&pp->children[i])) + continue; + + /* + * child input is provided via path_to_stdin when the feed_pipe cb is + * missing, so we just signal an EOF. + */ + if (!opts->feed_pipe) { + close(proc->in); + proc->in = 0; + continue; + } + + /** + * Feed the pipe: + * ret < 0 means error + * ret == 0 means there is more data to be fed + * ret > 0 means feeding finished + */ + ret = opts->feed_pipe(proc->in, opts->data, pp->children[i].data); + if (ret < 0) + die_errno("feed_pipe"); + + if (ret) { + close(proc->in); + proc->in = 0; + } + } +} + static void pp_buffer_stderr(struct parallel_processes *pp, const struct run_process_parallel_opts *opts, int output_timeout) @@ -1729,6 +1777,7 @@ static int pp_collect_finished(struct parallel_processes *pp, pp->children[i].state = GIT_CP_FREE; if (pp->pfd) pp->pfd[i].fd = -1; + pp->children[i].process.in = 0; child_process_init(&pp->children[i].process); if (opts->ungroup) { @@ -1763,6 +1812,27 @@ static int pp_collect_finished(struct parallel_processes *pp, return result; } +static void pp_handle_child_IO(struct parallel_processes *pp, + const struct run_process_parallel_opts *opts, + int output_timeout) +{ + /* + * First push input, if any (it might no-op), to child tasks to avoid them blocking + * after input. This also prevents deadlocks when ungrouping below, if a child blocks + * while the parent also waits for them to finish. + */ + pp_buffer_stdin(pp, opts); + + if (opts->ungroup) { + for (size_t i = 0; i < opts->processes; i++) + if (child_is_ready_for_cleanup(&pp->children[i])) + pp->children[i].state = GIT_CP_WAIT_CLEANUP; + } else { + pp_buffer_stderr(pp, opts, output_timeout); + pp_output(pp); + } +} + void run_processes_parallel(const struct run_process_parallel_opts *opts) { int i, code; @@ -1782,6 +1852,13 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts) "max:%"PRIuMAX, (uintmax_t)opts->processes); + /* + * Child tasks might receive input via stdin, terminating early (or not), so + * ignore the default SIGPIPE which gets handled by each feed_pipe_fn which + * actually writes the data to children stdin fds. + */ + sigchain_push(SIGPIPE, SIG_IGN); + pp_init(&pp, opts, &pp_sig); while (1) { for (i = 0; @@ -1799,13 +1876,7 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts) } if (!pp.nr_processes) break; - if (opts->ungroup) { - for (size_t i = 0; i < opts->processes; i++) - pp.children[i].state = GIT_CP_WAIT_CLEANUP; - } else { - pp_buffer_stderr(&pp, opts, output_timeout); - pp_output(&pp); - } + pp_handle_child_IO(&pp, opts, output_timeout); code = pp_collect_finished(&pp, opts); if (code) { pp.shutdown = 1; @@ -1816,6 +1887,8 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts) pp_cleanup(&pp, opts); + sigchain_pop(SIGPIPE); + if (do_trace2) trace2_region_leave(tr2_category, tr2_label, NULL); } |
