From dee82860688cdcd35cbf0ce7e74736e473c5b697 Mon Sep 17 00:00:00 2001 From: Adrian Ratiu Date: Wed, 28 Jan 2026 23:39:17 +0200 Subject: run-command: add helper for pp child states There is a recurring pattern of testing parallel process child states and file descriptors to determine if a child is running, receiving any input or if it's ready for cleanup. Name the pp_child structure and introduce a helper to make the checks more readable. Suggested-by: Junio C Hamano Signed-off-by: Adrian Ratiu Signed-off-by: Junio C Hamano --- run-command.c | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) (limited to 'run-command.c') diff --git a/run-command.c b/run-command.c index e3e02475cc..3989673569 100644 --- a/run-command.c +++ b/run-command.c @@ -1478,15 +1478,22 @@ enum child_state { GIT_CP_WAIT_CLEANUP, }; +struct parallel_child { + enum child_state state; + struct child_process process; + struct strbuf err; + void *data; +}; + +static int child_is_working(const struct parallel_child *pp_child) +{ + return pp_child->state == GIT_CP_WORKING; +} + struct parallel_processes { size_t nr_processes; - struct { - enum child_state state; - struct child_process process; - struct strbuf err; - void *data; - } *children; + struct parallel_child *children; /* * The struct pollfd is logically part of *children, * but the system call expects it as its own array. @@ -1509,7 +1516,7 @@ static void kill_children(const struct parallel_processes *pp, int signo) { for (size_t i = 0; i < opts->processes; i++) - if (pp->children[i].state == GIT_CP_WORKING) + if (child_is_working(&pp->children[i])) kill(pp->children[i].process.pid, signo); } @@ -1665,7 +1672,7 @@ static void pp_buffer_stderr(struct parallel_processes *pp, /* Buffer output from all pipes. */ for (size_t i = 0; i < opts->processes; i++) { - if (pp->children[i].state == GIT_CP_WORKING && + if (child_is_working(&pp->children[i]) && pp->pfd[i].revents & (POLLIN | POLLHUP)) { int n = strbuf_read_once(&pp->children[i].err, pp->children[i].process.err, 0); @@ -1683,7 +1690,7 @@ static void pp_output(const struct parallel_processes *pp) { size_t i = pp->output_owner; - if (pp->children[i].state == GIT_CP_WORKING && + if (child_is_working(&pp->children[i]) && pp->children[i].err.len) { strbuf_write(&pp->children[i].err, stderr); strbuf_reset(&pp->children[i].err); @@ -1748,7 +1755,7 @@ static int pp_collect_finished(struct parallel_processes *pp, * running process time. */ for (i = 0; i < n; i++) - if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING) + if (child_is_working(&pp->children[(pp->output_owner + i) % n])) break; pp->output_owner = (pp->output_owner + i) % n; } -- cgit v1.3 From ec0becacc9847406f2b0147a81f62e023b006351 Mon Sep 17 00:00:00 2001 From: Emily Shaffer Date: Wed, 28 Jan 2026 23:39:18 +0200 Subject: run-command: add stdin callback for parallelization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If a user of the run_processes_parallel() API wants to pipe a large amount of information to the stdin of each parallel command, that data could exceed the pipe buffer of the process's stdin and can be too big to store in-memory via strbuf & friends or to slurp to a file. Generally this is solved by repeatedly writing to child_process.in between calls to start_command() and finish_command(). For a specific pre-existing example of this, see transport.c:run_pre_push_hook(). This adds a generic callback API to run_processes_parallel() to do exactly that in a unified manner, similar to the existing callback APIs, which can then be used by hooks.h to convert the remaining hooks to the new, simpler parallel interface. Signed-off-by: Emily Shaffer Signed-off-by: Ævar Arnfjörð Bjarmason Signed-off-by: Adrian Ratiu Signed-off-by: Junio C Hamano --- run-command.c | 87 +++++++++++++++++++++++++++++++++++++++++---- run-command.h | 21 +++++++++++ t/helper/test-run-command.c | 52 +++++++++++++++++++++++++-- t/t0061-run-command.sh | 31 ++++++++++++++++ 4 files changed, 182 insertions(+), 9 deletions(-) (limited to 'run-command.c') diff --git a/run-command.c b/run-command.c index 3989673569..aaf0e4ecee 100644 --- a/run-command.c +++ b/run-command.c @@ -1490,6 +1490,16 @@ static int child_is_working(const struct parallel_child *pp_child) return pp_child->state == GIT_CP_WORKING; } +static int child_is_ready_for_cleanup(const struct parallel_child *pp_child) +{ + return child_is_working(pp_child) && !pp_child->process.in; +} + +static int child_is_receiving_input(const struct parallel_child *pp_child) +{ + return child_is_working(pp_child) && pp_child->process.in > 0; +} + struct parallel_processes { size_t nr_processes; @@ -1659,6 +1669,44 @@ static int pp_start_one(struct parallel_processes *pp, return 0; } +static void pp_buffer_stdin(struct parallel_processes *pp, + const struct run_process_parallel_opts *opts) +{ + /* Buffer stdin for each pipe. */ + for (size_t i = 0; i < opts->processes; i++) { + struct child_process *proc = &pp->children[i].process; + int ret; + + if (!child_is_receiving_input(&pp->children[i])) + continue; + + /* + * child input is provided via path_to_stdin when the feed_pipe cb is + * missing, so we just signal an EOF. + */ + if (!opts->feed_pipe) { + close(proc->in); + proc->in = 0; + continue; + } + + /** + * Feed the pipe: + * ret < 0 means error + * ret == 0 means there is more data to be fed + * ret > 0 means feeding finished + */ + ret = opts->feed_pipe(proc->in, opts->data, pp->children[i].data); + if (ret < 0) + die_errno("feed_pipe"); + + if (ret) { + close(proc->in); + proc->in = 0; + } + } +} + static void pp_buffer_stderr(struct parallel_processes *pp, const struct run_process_parallel_opts *opts, int output_timeout) @@ -1729,6 +1777,7 @@ static int pp_collect_finished(struct parallel_processes *pp, pp->children[i].state = GIT_CP_FREE; if (pp->pfd) pp->pfd[i].fd = -1; + pp->children[i].process.in = 0; child_process_init(&pp->children[i].process); if (opts->ungroup) { @@ -1763,6 +1812,27 @@ static int pp_collect_finished(struct parallel_processes *pp, return result; } +static void pp_handle_child_IO(struct parallel_processes *pp, + const struct run_process_parallel_opts *opts, + int output_timeout) +{ + /* + * First push input, if any (it might no-op), to child tasks to avoid them blocking + * after input. This also prevents deadlocks when ungrouping below, if a child blocks + * while the parent also waits for them to finish. + */ + pp_buffer_stdin(pp, opts); + + if (opts->ungroup) { + for (size_t i = 0; i < opts->processes; i++) + if (child_is_ready_for_cleanup(&pp->children[i])) + pp->children[i].state = GIT_CP_WAIT_CLEANUP; + } else { + pp_buffer_stderr(pp, opts, output_timeout); + pp_output(pp); + } +} + void run_processes_parallel(const struct run_process_parallel_opts *opts) { int i, code; @@ -1782,6 +1852,13 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts) "max:%"PRIuMAX, (uintmax_t)opts->processes); + /* + * Child tasks might receive input via stdin, terminating early (or not), so + * ignore the default SIGPIPE which gets handled by each feed_pipe_fn which + * actually writes the data to children stdin fds. + */ + sigchain_push(SIGPIPE, SIG_IGN); + pp_init(&pp, opts, &pp_sig); while (1) { for (i = 0; @@ -1799,13 +1876,7 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts) } if (!pp.nr_processes) break; - if (opts->ungroup) { - for (size_t i = 0; i < opts->processes; i++) - pp.children[i].state = GIT_CP_WAIT_CLEANUP; - } else { - pp_buffer_stderr(&pp, opts, output_timeout); - pp_output(&pp); - } + pp_handle_child_IO(&pp, opts, output_timeout); code = pp_collect_finished(&pp, opts); if (code) { pp.shutdown = 1; @@ -1816,6 +1887,8 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts) pp_cleanup(&pp, opts); + sigchain_pop(SIGPIPE); + if (do_trace2) trace2_region_leave(tr2_category, tr2_label, NULL); } diff --git a/run-command.h b/run-command.h index 0df25e445f..e1ca965b5b 100644 --- a/run-command.h +++ b/run-command.h @@ -420,6 +420,21 @@ typedef int (*start_failure_fn)(struct strbuf *out, void *pp_cb, void *pp_task_cb); +/** + * This callback is repeatedly called on every child process who requests + * start_command() to create a pipe by setting child_process.in < 0. + * + * pp_cb is the callback cookie as passed into run_processes_parallel, and + * pp_task_cb is the callback cookie as passed into get_next_task_fn. + * + * Returns < 0 for error + * Returns == 0 when there is more data to be fed (will be called again) + * Returns > 0 when finished (child closed fd or no more data to be fed) + */ +typedef int (*feed_pipe_fn)(int child_in, + void *pp_cb, + void *pp_task_cb); + /** * This callback is called on every child process that finished processing. * @@ -473,6 +488,12 @@ struct run_process_parallel_opts */ start_failure_fn start_failure; + /* + * feed_pipe: see feed_pipe_fn() above. This can be NULL to omit any + * special handling. + */ + feed_pipe_fn feed_pipe; + /** * task_finished: See task_finished_fn() above. This can be * NULL to omit any special handling. diff --git a/t/helper/test-run-command.c b/t/helper/test-run-command.c index 3719f23cc2..4a56456894 100644 --- a/t/helper/test-run-command.c +++ b/t/helper/test-run-command.c @@ -23,19 +23,26 @@ static int number_callbacks; static int parallel_next(struct child_process *cp, struct strbuf *err, void *cb, - void **task_cb UNUSED) + void **task_cb) { struct child_process *d = cb; if (number_callbacks >= 4) return 0; strvec_pushv(&cp->args, d->args.v); + cp->in = d->in; + cp->no_stdin = d->no_stdin; if (err) strbuf_addstr(err, "preloaded output of a child\n"); else fprintf(stderr, "preloaded output of a child\n"); number_callbacks++; + + /* test_stdin callback will use this to count remaining lines */ + *task_cb = xmalloc(sizeof(int)); + *(int*)(*task_cb) = 2; + return 1; } @@ -54,15 +61,48 @@ static int no_job(struct child_process *cp UNUSED, static int task_finished(int result UNUSED, struct strbuf *err, void *pp_cb UNUSED, - void *pp_task_cb UNUSED) + void *pp_task_cb) { if (err) strbuf_addstr(err, "asking for a quick stop\n"); else fprintf(stderr, "asking for a quick stop\n"); + + FREE_AND_NULL(pp_task_cb); + return 1; } +static int task_finished_quiet(int result UNUSED, + struct strbuf *err UNUSED, + void *pp_cb UNUSED, + void *pp_task_cb) +{ + FREE_AND_NULL(pp_task_cb); + return 0; +} + +static int test_stdin_pipe_feed(int hook_stdin_fd, void *cb UNUSED, void *task_cb) +{ + int *lines_remaining = task_cb; + + if (*lines_remaining) { + struct strbuf buf = STRBUF_INIT; + strbuf_addf(&buf, "sample stdin %d\n", --(*lines_remaining)); + if (write_in_full(hook_stdin_fd, buf.buf, buf.len) < 0) { + if (errno == EPIPE) { + /* child closed stdin, nothing more to do */ + strbuf_release(&buf); + return 1; + } + die_errno("write"); + } + strbuf_release(&buf); + } + + return !(*lines_remaining); +} + struct testsuite { struct string_list tests, failed; int next; @@ -157,6 +197,7 @@ static int testsuite(int argc, const char **argv) struct run_process_parallel_opts opts = { .get_next_task = next_test, .start_failure = test_failed, + .feed_pipe = test_stdin_pipe_feed, .task_finished = test_finished, .data = &suite, }; @@ -460,12 +501,19 @@ int cmd__run_command(int argc, const char **argv) if (!strcmp(argv[1], "run-command-parallel")) { opts.get_next_task = parallel_next; + opts.task_finished = task_finished_quiet; } else if (!strcmp(argv[1], "run-command-abort")) { opts.get_next_task = parallel_next; opts.task_finished = task_finished; } else if (!strcmp(argv[1], "run-command-no-jobs")) { opts.get_next_task = no_job; opts.task_finished = task_finished; + } else if (!strcmp(argv[1], "run-command-stdin")) { + proc.in = -1; + proc.no_stdin = 0; + opts.get_next_task = parallel_next; + opts.task_finished = task_finished_quiet; + opts.feed_pipe = test_stdin_pipe_feed; } else { ret = 1; fprintf(stderr, "check usage\n"); diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh index 76d4936a87..2f77fde0d9 100755 --- a/t/t0061-run-command.sh +++ b/t/t0061-run-command.sh @@ -164,6 +164,37 @@ test_expect_success 'run_command runs ungrouped in parallel with more tasks than test_line_count = 4 err ' +test_expect_success 'run_command listens to stdin' ' + cat >expect <<-\EOF && + preloaded output of a child + listening for stdin: + sample stdin 1 + sample stdin 0 + preloaded output of a child + listening for stdin: + sample stdin 1 + sample stdin 0 + preloaded output of a child + listening for stdin: + sample stdin 1 + sample stdin 0 + preloaded output of a child + listening for stdin: + sample stdin 1 + sample stdin 0 + EOF + + write_script stdin-script <<-\EOF && + echo "listening for stdin:" + while read line + do + echo "$line" + done + EOF + test-tool run-command run-command-stdin 2 ./stdin-script 2>actual && + test_cmp expect actual +' + cat >expect <<-EOF preloaded output of a child asking for a quick stop -- cgit v1.3 From c45a34e12e8699f656ec3613b6ba158c1a57c5e8 Mon Sep 17 00:00:00 2001 From: Adrian Ratiu Date: Wed, 28 Jan 2026 23:39:25 +0200 Subject: run-command: poll child input in addition to output Child input feeding might hit the 100ms output poll timeout as a side-effect of the ungroup=0 design when feeding multiple children in parallel and buffering their outputs. This throttles the write throughput as reported by Kristoffer. Peff also noted that the parent might block if the write pipe is full and cause a deadlock if both parent + child wait for one another. Thus we refactor the run-command I/O loop so it polls on both child input and output fds to eliminate the risk of artificial 100ms latencies and unnecessarily blocking the main process. This ensures that parallel hooks are fed data ASAP while maintaining responsiveness for (sideband) output. It's worth noting that in our current design, sequential execution is not affected by this because it still uses the ungroup=1 behavior, so there are no run-command induced buffering delays since the child sequentially outputs directly to the parent-inherited fds. Reported-by: Kristoffer Haugsbakk Suggested-by: Jeff King Signed-off-by: Adrian Ratiu Signed-off-by: Junio C Hamano --- run-command.c | 80 +++++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 62 insertions(+), 18 deletions(-) (limited to 'run-command.c') diff --git a/run-command.c b/run-command.c index aaf0e4ecee..3356463d43 100644 --- a/run-command.c +++ b/run-command.c @@ -1499,6 +1499,14 @@ static int child_is_receiving_input(const struct parallel_child *pp_child) { return child_is_working(pp_child) && pp_child->process.in > 0; } +static int child_is_sending_output(const struct parallel_child *pp_child) +{ + /* + * all pp children which buffer output through run_command via ungroup=0 + * redirect stdout to stderr, so we just need to check process.err. + */ + return child_is_working(pp_child) && pp_child->process.err > 0; +} struct parallel_processes { size_t nr_processes; @@ -1562,7 +1570,7 @@ static void pp_init(struct parallel_processes *pp, CALLOC_ARRAY(pp->children, n); if (!opts->ungroup) - CALLOC_ARRAY(pp->pfd, n); + CALLOC_ARRAY(pp->pfd, n * 2); for (size_t i = 0; i < n; i++) { strbuf_init(&pp->children[i].err, 0); @@ -1707,21 +1715,63 @@ static void pp_buffer_stdin(struct parallel_processes *pp, } } -static void pp_buffer_stderr(struct parallel_processes *pp, - const struct run_process_parallel_opts *opts, - int output_timeout) +static void pp_buffer_io(struct parallel_processes *pp, + const struct run_process_parallel_opts *opts, + int timeout) { - while (poll(pp->pfd, opts->processes, output_timeout) < 0) { + /* for each potential child slot, prepare two pollfd entries */ + for (size_t i = 0; i < opts->processes; i++) { + if (child_is_sending_output(&pp->children[i])) { + pp->pfd[2*i].fd = pp->children[i].process.err; + pp->pfd[2*i].events = POLLIN | POLLHUP; + } else { + pp->pfd[2*i].fd = -1; + } + + if (child_is_receiving_input(&pp->children[i])) { + pp->pfd[2*i+1].fd = pp->children[i].process.in; + pp->pfd[2*i+1].events = POLLOUT; + } else { + pp->pfd[2*i+1].fd = -1; + } + } + + while (poll(pp->pfd, opts->processes * 2, timeout) < 0) { if (errno == EINTR) continue; pp_cleanup(pp, opts); die_errno("poll"); } - /* Buffer output from all pipes. */ for (size_t i = 0; i < opts->processes; i++) { + /* Handle input feeding (stdin) */ + if (pp->pfd[2*i+1].revents & (POLLOUT | POLLHUP | POLLERR)) { + if (opts->feed_pipe) { + int ret = opts->feed_pipe(pp->children[i].process.in, + opts->data, + pp->children[i].data); + if (ret < 0) + die_errno("feed_pipe"); + if (ret) { + /* done feeding */ + close(pp->children[i].process.in); + pp->children[i].process.in = 0; + } + } else { + /* + * No feed_pipe means there is nothing to do, so + * close the fd. Child input can be fed by other + * methods, such as opts->path_to_stdin which + * slurps a file via dup2, so clean up here. + */ + close(pp->children[i].process.in); + pp->children[i].process.in = 0; + } + } + + /* Handle output reading (stderr) */ if (child_is_working(&pp->children[i]) && - pp->pfd[i].revents & (POLLIN | POLLHUP)) { + pp->pfd[2*i].revents & (POLLIN | POLLHUP)) { int n = strbuf_read_once(&pp->children[i].err, pp->children[i].process.err, 0); if (n == 0) { @@ -1814,21 +1864,15 @@ static int pp_collect_finished(struct parallel_processes *pp, static void pp_handle_child_IO(struct parallel_processes *pp, const struct run_process_parallel_opts *opts, - int output_timeout) + int timeout) { - /* - * First push input, if any (it might no-op), to child tasks to avoid them blocking - * after input. This also prevents deadlocks when ungrouping below, if a child blocks - * while the parent also waits for them to finish. - */ - pp_buffer_stdin(pp, opts); - if (opts->ungroup) { + pp_buffer_stdin(pp, opts); for (size_t i = 0; i < opts->processes; i++) if (child_is_ready_for_cleanup(&pp->children[i])) pp->children[i].state = GIT_CP_WAIT_CLEANUP; } else { - pp_buffer_stderr(pp, opts, output_timeout); + pp_buffer_io(pp, opts, timeout); pp_output(pp); } } @@ -1836,7 +1880,7 @@ static void pp_handle_child_IO(struct parallel_processes *pp, void run_processes_parallel(const struct run_process_parallel_opts *opts) { int i, code; - int output_timeout = 100; + int timeout = 100; int spawn_cap = 4; struct parallel_processes_for_signal pp_sig; struct parallel_processes pp = { @@ -1876,7 +1920,7 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts) } if (!pp.nr_processes) break; - pp_handle_child_IO(&pp, opts, output_timeout); + pp_handle_child_IO(&pp, opts, timeout); code = pp_collect_finished(&pp, opts); if (code) { pp.shutdown = 1; -- cgit v1.3