diff options
| author | Junio C Hamano <gitster@pobox.com> | 2026-01-06 16:33:53 +0900 |
|---|---|---|
| committer | Junio C Hamano <gitster@pobox.com> | 2026-01-06 16:33:53 +0900 |
| commit | f406b8955295d01089ba2baf35eceadff2d11cae (patch) | |
| tree | 7b53772788307329d2c0e4005b13f2a5621f529e /run-command.c | |
| parent | 1627809eeff75e6ec936fc609e7be46d5eb2fa9e (diff) | |
| parent | c65f26fca46f742e8e457d859a83c4e6ef3c3953 (diff) | |
| download | git-f406b8955295d01089ba2baf35eceadff2d11cae.tar.xz | |
Merge branch 'ar/run-command-hook'
Use hook API to replace ad-hoc invocation of hook scripts with the
run_command() API.
* ar/run-command-hook:
receive-pack: convert receive hooks to hook API
receive-pack: convert update hooks to new API
hooks: allow callers to capture output
run-command: allow capturing of collated output
hook: allow overriding the ungroup option
reference-transaction: use hook API instead of run-command
transport: convert pre-push to hook API
hook: convert 'post-rewrite' hook in sequencer.c to hook API
hook: provide stdin via callback
run-command: add stdin callback for parallelization
run-command: add first helper for pp child states
Diffstat (limited to 'run-command.c')
| -rw-r--r-- | run-command.c | 142 |
1 files changed, 118 insertions, 24 deletions
diff --git a/run-command.c b/run-command.c index e3e02475cc..2d3c2ac55c 100644 --- a/run-command.c +++ b/run-command.c @@ -1478,15 +1478,32 @@ enum child_state { GIT_CP_WAIT_CLEANUP, }; +struct parallel_child { + enum child_state state; + struct child_process process; + struct strbuf err; + void *data; +}; + +static int child_is_working(const struct parallel_child *pp_child) +{ + return pp_child->state == GIT_CP_WORKING; +} + +static int child_is_ready_for_cleanup(const struct parallel_child *pp_child) +{ + return child_is_working(pp_child) && !pp_child->process.in; +} + +static int child_is_receiving_input(const struct parallel_child *pp_child) +{ + return child_is_working(pp_child) && pp_child->process.in > 0; +} + struct parallel_processes { size_t nr_processes; - struct { - enum child_state state; - struct child_process process; - struct strbuf err; - void *data; - } *children; + struct parallel_child *children; /* * The struct pollfd is logically part of *children, * but the system call expects it as its own array. @@ -1509,7 +1526,7 @@ static void kill_children(const struct parallel_processes *pp, int signo) { for (size_t i = 0; i < opts->processes; i++) - if (pp->children[i].state == GIT_CP_WORKING) + if (child_is_working(&pp->children[i])) kill(pp->children[i].process.pid, signo); } @@ -1578,7 +1595,10 @@ static void pp_cleanup(struct parallel_processes *pp, * When get_next_task added messages to the buffer in its last * iteration, the buffered output is non empty. */ - strbuf_write(&pp->buffered_output, stderr); + if (opts->consume_output) + opts->consume_output(&pp->buffered_output, opts->data); + else + strbuf_write(&pp->buffered_output, stderr); strbuf_release(&pp->buffered_output); sigchain_pop_common(); @@ -1652,6 +1672,44 @@ static int pp_start_one(struct parallel_processes *pp, return 0; } +static void pp_buffer_stdin(struct parallel_processes *pp, + const struct run_process_parallel_opts *opts) +{ + /* Buffer stdin for each pipe. */ + for (size_t i = 0; i < opts->processes; i++) { + struct child_process *proc = &pp->children[i].process; + int ret; + + if (!child_is_receiving_input(&pp->children[i])) + continue; + + /* + * child input is provided via path_to_stdin when the feed_pipe cb is + * missing, so we just signal an EOF. + */ + if (!opts->feed_pipe) { + close(proc->in); + proc->in = 0; + continue; + } + + /** + * Feed the pipe: + * ret < 0 means error + * ret == 0 means there is more data to be fed + * ret > 0 means feeding finished + */ + ret = opts->feed_pipe(proc->in, opts->data, pp->children[i].data); + if (ret < 0) + die_errno("feed_pipe"); + + if (ret) { + close(proc->in); + proc->in = 0; + } + } +} + static void pp_buffer_stderr(struct parallel_processes *pp, const struct run_process_parallel_opts *opts, int output_timeout) @@ -1665,7 +1723,7 @@ static void pp_buffer_stderr(struct parallel_processes *pp, /* Buffer output from all pipes. */ for (size_t i = 0; i < opts->processes; i++) { - if (pp->children[i].state == GIT_CP_WORKING && + if (child_is_working(&pp->children[i]) && pp->pfd[i].revents & (POLLIN | POLLHUP)) { int n = strbuf_read_once(&pp->children[i].err, pp->children[i].process.err, 0); @@ -1679,13 +1737,17 @@ static void pp_buffer_stderr(struct parallel_processes *pp, } } -static void pp_output(const struct parallel_processes *pp) +static void pp_output(const struct parallel_processes *pp, + const struct run_process_parallel_opts *opts) { size_t i = pp->output_owner; - if (pp->children[i].state == GIT_CP_WORKING && + if (child_is_working(&pp->children[i]) && pp->children[i].err.len) { - strbuf_write(&pp->children[i].err, stderr); + if (opts->consume_output) + opts->consume_output(&pp->children[i].err, opts->data); + else + strbuf_write(&pp->children[i].err, stderr); strbuf_reset(&pp->children[i].err); } } @@ -1722,6 +1784,7 @@ static int pp_collect_finished(struct parallel_processes *pp, pp->children[i].state = GIT_CP_FREE; if (pp->pfd) pp->pfd[i].fd = -1; + pp->children[i].process.in = 0; child_process_init(&pp->children[i].process); if (opts->ungroup) { @@ -1732,11 +1795,15 @@ static int pp_collect_finished(struct parallel_processes *pp, } else { const size_t n = opts->processes; - strbuf_write(&pp->children[i].err, stderr); + /* Output errors, then all other finished child processes */ + if (opts->consume_output) { + opts->consume_output(&pp->children[i].err, opts->data); + opts->consume_output(&pp->buffered_output, opts->data); + } else { + strbuf_write(&pp->children[i].err, stderr); + strbuf_write(&pp->buffered_output, stderr); + } strbuf_reset(&pp->children[i].err); - - /* Output all other finished child processes */ - strbuf_write(&pp->buffered_output, stderr); strbuf_reset(&pp->buffered_output); /* @@ -1748,7 +1815,7 @@ static int pp_collect_finished(struct parallel_processes *pp, * running process time. */ for (i = 0; i < n; i++) - if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING) + if (child_is_working(&pp->children[(pp->output_owner + i) % n])) break; pp->output_owner = (pp->output_owner + i) % n; } @@ -1756,6 +1823,27 @@ static int pp_collect_finished(struct parallel_processes *pp, return result; } +static void pp_handle_child_IO(struct parallel_processes *pp, + const struct run_process_parallel_opts *opts, + int output_timeout) +{ + /* + * First push input, if any (it might no-op), to child tasks to avoid them blocking + * after input. This also prevents deadlocks when ungrouping below, if a child blocks + * while the parent also waits for them to finish. + */ + pp_buffer_stdin(pp, opts); + + if (opts->ungroup) { + for (size_t i = 0; i < opts->processes; i++) + if (child_is_ready_for_cleanup(&pp->children[i])) + pp->children[i].state = GIT_CP_WAIT_CLEANUP; + } else { + pp_buffer_stderr(pp, opts, output_timeout); + pp_output(pp, opts); + } +} + void run_processes_parallel(const struct run_process_parallel_opts *opts) { int i, code; @@ -1775,6 +1863,16 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts) "max:%"PRIuMAX, (uintmax_t)opts->processes); + if (opts->ungroup && opts->consume_output) + BUG("ungroup and reading output are mutualy exclusive"); + + /* + * Child tasks might receive input via stdin, terminating early (or not), so + * ignore the default SIGPIPE which gets handled by each feed_pipe_fn which + * actually writes the data to children stdin fds. + */ + sigchain_push(SIGPIPE, SIG_IGN); + pp_init(&pp, opts, &pp_sig); while (1) { for (i = 0; @@ -1792,13 +1890,7 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts) } if (!pp.nr_processes) break; - if (opts->ungroup) { - for (size_t i = 0; i < opts->processes; i++) - pp.children[i].state = GIT_CP_WAIT_CLEANUP; - } else { - pp_buffer_stderr(&pp, opts, output_timeout); - pp_output(&pp); - } + pp_handle_child_IO(&pp, opts, output_timeout); code = pp_collect_finished(&pp, opts); if (code) { pp.shutdown = 1; @@ -1809,6 +1901,8 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts) pp_cleanup(&pp, opts); + sigchain_pop(SIGPIPE); + if (do_trace2) trace2_region_leave(tr2_category, tr2_label, NULL); } |
