From 59c7b88198af8af29deffb2eee7b7b2329e9197e Mon Sep 17 00:00:00 2001 From: Jeff Hostetler Date: Mon, 15 Mar 2021 21:08:23 +0000 Subject: simple-ipc: add win32 implementation Create Windows implementation of "simple-ipc" using named pipes. Signed-off-by: Jeff Hostetler Signed-off-by: Junio C Hamano --- Makefile | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'Makefile') diff --git a/Makefile b/Makefile index 4128b457e1..40d5cab78d 100644 --- a/Makefile +++ b/Makefile @@ -1679,6 +1679,11 @@ else LIB_OBJS += unix-socket.o endif +ifdef USE_WIN32_IPC + LIB_OBJS += compat/simple-ipc/ipc-shared.o + LIB_OBJS += compat/simple-ipc/ipc-win32.o +endif + ifdef NO_ICONV BASIC_CFLAGS += -DNO_ICONV endif -- cgit v1.3-5-g9baa From 9fd19027621fc4f8beb2e57ba37d91465d1906f6 Mon Sep 17 00:00:00 2001 From: Jeff Hostetler Date: Mon, 15 Mar 2021 21:08:27 +0000 Subject: unix-stream-server: create unix domain socket under lock Create a wrapper class for `unix_stream_listen()` that uses a ".lock" lockfile to create the unix domain socket in a race-free manner. Unix domain sockets have a fundamental problem on Unix systems because they persist in the filesystem until they are deleted. This is independent of whether a server is actually listening for connections. Well-behaved servers are expected to delete the socket when they shutdown. A new server cannot easily tell if a found socket is attached to an active server or is leftover cruft from a dead server. The traditional solution used by `unix_stream_listen()` is to force delete the socket pathname and then create a new socket. This solves the latter (cruft) problem, but in the case of the former, it orphans the existing server (by stealing the pathname associated with the socket it is listening on). We cannot directly use a .lock lockfile to create the socket because the socket is created by `bind(2)` rather than the `open(2)` mechanism used by `tempfile.c`. As an alternative, we hold a plain lockfile (".lock") as a mutual exclusion device. Under the lock, we test if an existing socket ("") is has an active server. If not, we create a new socket and begin listening. Then we use "rollback" to delete the lockfile in all cases. This wrapper code conceptually exists at a higher-level than the core unix_stream_connect() and unix_stream_listen() routines that it consumes. It is isolated in a wrapper class for clarity. Signed-off-by: Jeff Hostetler Signed-off-by: Junio C Hamano --- Makefile | 1 + contrib/buildsystems/CMakeLists.txt | 2 +- unix-stream-server.c | 125 ++++++++++++++++++++++++++++++++++++ unix-stream-server.h | 33 ++++++++++ 4 files changed, 160 insertions(+), 1 deletion(-) create mode 100644 unix-stream-server.c create mode 100644 unix-stream-server.h (limited to 'Makefile') diff --git a/Makefile b/Makefile index 40d5cab78d..f29a9e82e1 100644 --- a/Makefile +++ b/Makefile @@ -1677,6 +1677,7 @@ ifdef NO_UNIX_SOCKETS BASIC_CFLAGS += -DNO_UNIX_SOCKETS else LIB_OBJS += unix-socket.o + LIB_OBJS += unix-stream-server.o endif ifdef USE_WIN32_IPC diff --git a/contrib/buildsystems/CMakeLists.txt b/contrib/buildsystems/CMakeLists.txt index 4bd41054ee..6ffbbb6a6f 100644 --- a/contrib/buildsystems/CMakeLists.txt +++ b/contrib/buildsystems/CMakeLists.txt @@ -243,7 +243,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL "Windows") elseif(CMAKE_SYSTEM_NAME STREQUAL "Linux") add_compile_definitions(PROCFS_EXECUTABLE_PATH="/proc/self/exe" HAVE_DEV_TTY ) - list(APPEND compat_SOURCES unix-socket.c) + list(APPEND compat_SOURCES unix-socket.c unix-stream-server.c) endif() if(CMAKE_SYSTEM_NAME STREQUAL "Windows") diff --git a/unix-stream-server.c b/unix-stream-server.c new file mode 100644 index 0000000000..efa2a207ab --- /dev/null +++ b/unix-stream-server.c @@ -0,0 +1,125 @@ +#include "cache.h" +#include "lockfile.h" +#include "unix-socket.h" +#include "unix-stream-server.h" + +#define DEFAULT_LOCK_TIMEOUT (100) + +/* + * Try to connect to a unix domain socket at `path` (if it exists) and + * see if there is a server listening. + * + * We don't know if the socket exists, whether a server died and + * failed to cleanup, or whether we have a live server listening, so + * we "poke" it. + * + * We immediately hangup without sending/receiving any data because we + * don't know anything about the protocol spoken and don't want to + * block while writing/reading data. It is sufficient to just know + * that someone is listening. + */ +static int is_another_server_alive(const char *path, + const struct unix_stream_listen_opts *opts) +{ + int fd = unix_stream_connect(path, opts->disallow_chdir); + if (fd >= 0) { + close(fd); + return 1; + } + + return 0; +} + +int unix_ss_create(const char *path, + const struct unix_stream_listen_opts *opts, + long timeout_ms, + struct unix_ss_socket **new_server_socket) +{ + struct lock_file lock = LOCK_INIT; + int fd_socket; + struct unix_ss_socket *server_socket; + + *new_server_socket = NULL; + + if (timeout_ms < 0) + timeout_ms = DEFAULT_LOCK_TIMEOUT; + + /* + * Create a lock at ".lock" if we can. + */ + if (hold_lock_file_for_update_timeout(&lock, path, 0, timeout_ms) < 0) + return -1; + + /* + * If another server is listening on "" give up. We do not + * want to create a socket and steal future connections from them. + */ + if (is_another_server_alive(path, opts)) { + rollback_lock_file(&lock); + errno = EADDRINUSE; + return -2; + } + + /* + * Create and bind to a Unix domain socket at "". + */ + fd_socket = unix_stream_listen(path, opts); + if (fd_socket < 0) { + int saved_errno = errno; + rollback_lock_file(&lock); + errno = saved_errno; + return -1; + } + + server_socket = xcalloc(1, sizeof(*server_socket)); + server_socket->path_socket = strdup(path); + server_socket->fd_socket = fd_socket; + lstat(path, &server_socket->st_socket); + + *new_server_socket = server_socket; + + /* + * Always rollback (just delete) ".lock" because we already created + * "" as a socket and do not want to commit_lock to do the atomic + * rename trick. + */ + rollback_lock_file(&lock); + + return 0; +} + +void unix_ss_free(struct unix_ss_socket *server_socket) +{ + if (!server_socket) + return; + + if (server_socket->fd_socket >= 0) { + if (!unix_ss_was_stolen(server_socket)) + unlink(server_socket->path_socket); + close(server_socket->fd_socket); + } + + free(server_socket->path_socket); + free(server_socket); +} + +int unix_ss_was_stolen(struct unix_ss_socket *server_socket) +{ + struct stat st_now; + + if (!server_socket) + return 0; + + if (lstat(server_socket->path_socket, &st_now) == -1) + return 1; + + if (st_now.st_ino != server_socket->st_socket.st_ino) + return 1; + if (st_now.st_dev != server_socket->st_socket.st_dev) + return 1; + + if (!S_ISSOCK(st_now.st_mode)) + return 1; + + return 0; +} diff --git a/unix-stream-server.h b/unix-stream-server.h new file mode 100644 index 0000000000..ae2712ba39 --- /dev/null +++ b/unix-stream-server.h @@ -0,0 +1,33 @@ +#ifndef UNIX_STREAM_SERVER_H +#define UNIX_STREAM_SERVER_H + +#include "unix-socket.h" + +struct unix_ss_socket { + char *path_socket; + struct stat st_socket; + int fd_socket; +}; + +/* + * Create a Unix Domain Socket at the given path under the protection + * of a '.lock' lockfile. + * + * Returns 0 on success, -1 on error, -2 if socket is in use. + */ +int unix_ss_create(const char *path, + const struct unix_stream_listen_opts *opts, + long timeout_ms, + struct unix_ss_socket **server_socket); + +/* + * Close and delete the socket. + */ +void unix_ss_free(struct unix_ss_socket *server_socket); + +/* + * Return 1 if the inode of the pathname to our socket changes. + */ +int unix_ss_was_stolen(struct unix_ss_socket *server_socket); + +#endif /* UNIX_STREAM_SERVER_H */ -- cgit v1.3-5-g9baa From 7cd5dbcaba44914c07a51377e75c4e8bbe31f319 Mon Sep 17 00:00:00 2001 From: Jeff Hostetler Date: Mon, 22 Mar 2021 10:29:47 +0000 Subject: simple-ipc: add Unix domain socket implementation Create Unix domain socket based implementation of "simple-ipc". A set of `ipc_client` routines implement a client library to connect to an `ipc_server` over a Unix domain socket, send a simple request, and receive a single response. Clients use blocking IO on the socket. A set of `ipc_server` routines implement a thread pool to listen for and concurrently service client connections. The server creates a new Unix domain socket at a known location. If a socket already exists with that name, the server tries to determine if another server is already listening on the socket or if the socket is dead. If socket is busy, the server exits with an error rather than stealing the socket. If the socket is dead, the server creates a new one and starts up. If while running, the server detects that its socket has been stolen by another server, it automatically exits. Signed-off-by: Jeff Hostetler Signed-off-by: Junio C Hamano --- Makefile | 2 + compat/simple-ipc/ipc-unix-socket.c | 999 ++++++++++++++++++++++++++++++++++++ contrib/buildsystems/CMakeLists.txt | 2 + simple-ipc.h | 13 +- 4 files changed, 1015 insertions(+), 1 deletion(-) create mode 100644 compat/simple-ipc/ipc-unix-socket.c (limited to 'Makefile') diff --git a/Makefile b/Makefile index f29a9e82e1..8e4bde2d81 100644 --- a/Makefile +++ b/Makefile @@ -1678,6 +1678,8 @@ ifdef NO_UNIX_SOCKETS else LIB_OBJS += unix-socket.o LIB_OBJS += unix-stream-server.o + LIB_OBJS += compat/simple-ipc/ipc-shared.o + LIB_OBJS += compat/simple-ipc/ipc-unix-socket.o endif ifdef USE_WIN32_IPC diff --git a/compat/simple-ipc/ipc-unix-socket.c b/compat/simple-ipc/ipc-unix-socket.c new file mode 100644 index 0000000000..38689b278d --- /dev/null +++ b/compat/simple-ipc/ipc-unix-socket.c @@ -0,0 +1,999 @@ +#include "cache.h" +#include "simple-ipc.h" +#include "strbuf.h" +#include "pkt-line.h" +#include "thread-utils.h" +#include "unix-socket.h" +#include "unix-stream-server.h" + +#ifdef NO_UNIX_SOCKETS +#error compat/simple-ipc/ipc-unix-socket.c requires Unix sockets +#endif + +enum ipc_active_state ipc_get_active_state(const char *path) +{ + enum ipc_active_state state = IPC_STATE__OTHER_ERROR; + struct ipc_client_connect_options options + = IPC_CLIENT_CONNECT_OPTIONS_INIT; + struct stat st; + struct ipc_client_connection *connection_test = NULL; + + options.wait_if_busy = 0; + options.wait_if_not_found = 0; + + if (lstat(path, &st) == -1) { + switch (errno) { + case ENOENT: + case ENOTDIR: + return IPC_STATE__NOT_LISTENING; + default: + return IPC_STATE__INVALID_PATH; + } + } + + /* also complain if a plain file is in the way */ + if ((st.st_mode & S_IFMT) != S_IFSOCK) + return IPC_STATE__INVALID_PATH; + + /* + * Just because the filesystem has a S_IFSOCK type inode + * at `path`, doesn't mean it that there is a server listening. + * Ping it to be sure. + */ + state = ipc_client_try_connect(path, &options, &connection_test); + ipc_client_close_connection(connection_test); + + return state; +} + +/* + * Retry frequency when trying to connect to a server. + * + * This value should be short enough that we don't seriously delay our + * caller, but not fast enough that our spinning puts pressure on the + * system. + */ +#define WAIT_STEP_MS (50) + +/* + * Try to connect to the server. If the server is just starting up or + * is very busy, we may not get a connection the first time. + */ +static enum ipc_active_state connect_to_server( + const char *path, + int timeout_ms, + const struct ipc_client_connect_options *options, + int *pfd) +{ + int k; + + *pfd = -1; + + for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) { + int fd = unix_stream_connect(path, options->uds_disallow_chdir); + + if (fd != -1) { + *pfd = fd; + return IPC_STATE__LISTENING; + } + + if (errno == ENOENT) { + if (!options->wait_if_not_found) + return IPC_STATE__PATH_NOT_FOUND; + + goto sleep_and_try_again; + } + + if (errno == ETIMEDOUT) { + if (!options->wait_if_busy) + return IPC_STATE__NOT_LISTENING; + + goto sleep_and_try_again; + } + + if (errno == ECONNREFUSED) { + if (!options->wait_if_busy) + return IPC_STATE__NOT_LISTENING; + + goto sleep_and_try_again; + } + + return IPC_STATE__OTHER_ERROR; + + sleep_and_try_again: + sleep_millisec(WAIT_STEP_MS); + } + + return IPC_STATE__NOT_LISTENING; +} + +/* + * The total amount of time that we are willing to wait when trying to + * connect to a server. + * + * When the server is first started, it might take a little while for + * it to become ready to service requests. Likewise, the server may + * be very (temporarily) busy and not respond to our connections. + * + * We should gracefully and silently handle those conditions and try + * again for a reasonable time period. + * + * The value chosen here should be long enough for the server + * to reliably heal from the above conditions. + */ +#define MY_CONNECTION_TIMEOUT_MS (1000) + +enum ipc_active_state ipc_client_try_connect( + const char *path, + const struct ipc_client_connect_options *options, + struct ipc_client_connection **p_connection) +{ + enum ipc_active_state state = IPC_STATE__OTHER_ERROR; + int fd = -1; + + *p_connection = NULL; + + trace2_region_enter("ipc-client", "try-connect", NULL); + trace2_data_string("ipc-client", NULL, "try-connect/path", path); + + state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS, + options, &fd); + + trace2_data_intmax("ipc-client", NULL, "try-connect/state", + (intmax_t)state); + trace2_region_leave("ipc-client", "try-connect", NULL); + + if (state == IPC_STATE__LISTENING) { + (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection)); + (*p_connection)->fd = fd; + } + + return state; +} + +void ipc_client_close_connection(struct ipc_client_connection *connection) +{ + if (!connection) + return; + + if (connection->fd != -1) + close(connection->fd); + + free(connection); +} + +int ipc_client_send_command_to_connection( + struct ipc_client_connection *connection, + const char *message, struct strbuf *answer) +{ + int ret = 0; + + strbuf_setlen(answer, 0); + + trace2_region_enter("ipc-client", "send-command", NULL); + + if (write_packetized_from_buf_no_flush(message, strlen(message), + connection->fd) < 0 || + packet_flush_gently(connection->fd) < 0) { + ret = error(_("could not send IPC command")); + goto done; + } + + if (read_packetized_to_strbuf( + connection->fd, answer, + PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) { + ret = error(_("could not read IPC response")); + goto done; + } + +done: + trace2_region_leave("ipc-client", "send-command", NULL); + return ret; +} + +int ipc_client_send_command(const char *path, + const struct ipc_client_connect_options *options, + const char *message, struct strbuf *answer) +{ + int ret = -1; + enum ipc_active_state state; + struct ipc_client_connection *connection = NULL; + + state = ipc_client_try_connect(path, options, &connection); + + if (state != IPC_STATE__LISTENING) + return ret; + + ret = ipc_client_send_command_to_connection(connection, message, answer); + + ipc_client_close_connection(connection); + + return ret; +} + +static int set_socket_blocking_flag(int fd, int make_nonblocking) +{ + int flags; + + flags = fcntl(fd, F_GETFL, NULL); + + if (flags < 0) + return -1; + + if (make_nonblocking) + flags |= O_NONBLOCK; + else + flags &= ~O_NONBLOCK; + + return fcntl(fd, F_SETFL, flags); +} + +/* + * Magic numbers used to annotate callback instance data. + * These are used to help guard against accidentally passing the + * wrong instance data across multiple levels of callbacks (which + * is easy to do if there are `void*` arguments). + */ +enum magic { + MAGIC_SERVER_REPLY_DATA, + MAGIC_WORKER_THREAD_DATA, + MAGIC_ACCEPT_THREAD_DATA, + MAGIC_SERVER_DATA, +}; + +struct ipc_server_reply_data { + enum magic magic; + int fd; + struct ipc_worker_thread_data *worker_thread_data; +}; + +struct ipc_worker_thread_data { + enum magic magic; + struct ipc_worker_thread_data *next_thread; + struct ipc_server_data *server_data; + pthread_t pthread_id; +}; + +struct ipc_accept_thread_data { + enum magic magic; + struct ipc_server_data *server_data; + + struct unix_ss_socket *server_socket; + + int fd_send_shutdown; + int fd_wait_shutdown; + pthread_t pthread_id; +}; + +/* + * With unix-sockets, the conceptual "ipc-server" is implemented as a single + * controller "accept-thread" thread and a pool of "worker-thread" threads. + * The former does the usual `accept()` loop and dispatches connections + * to an idle worker thread. The worker threads wait in an idle loop for + * a new connection, communicate with the client and relay data to/from + * the `application_cb` and then wait for another connection from the + * server thread. This avoids the overhead of constantly creating and + * destroying threads. + */ +struct ipc_server_data { + enum magic magic; + ipc_server_application_cb *application_cb; + void *application_data; + struct strbuf buf_path; + + struct ipc_accept_thread_data *accept_thread; + struct ipc_worker_thread_data *worker_thread_list; + + pthread_mutex_t work_available_mutex; + pthread_cond_t work_available_cond; + + /* + * Accepted but not yet processed client connections are kept + * in a circular buffer FIFO. The queue is empty when the + * positions are equal. + */ + int *fifo_fds; + int queue_size; + int back_pos; + int front_pos; + + int shutdown_requested; + int is_stopped; +}; + +/* + * Remove and return the oldest queued connection. + * + * Returns -1 if empty. + */ +static int fifo_dequeue(struct ipc_server_data *server_data) +{ + /* ASSERT holding mutex */ + + int fd; + + if (server_data->back_pos == server_data->front_pos) + return -1; + + fd = server_data->fifo_fds[server_data->front_pos]; + server_data->fifo_fds[server_data->front_pos] = -1; + + server_data->front_pos++; + if (server_data->front_pos == server_data->queue_size) + server_data->front_pos = 0; + + return fd; +} + +/* + * Push a new fd onto the back of the queue. + * + * Drop it and return -1 if queue is already full. + */ +static int fifo_enqueue(struct ipc_server_data *server_data, int fd) +{ + /* ASSERT holding mutex */ + + int next_back_pos; + + next_back_pos = server_data->back_pos + 1; + if (next_back_pos == server_data->queue_size) + next_back_pos = 0; + + if (next_back_pos == server_data->front_pos) { + /* Queue is full. Just drop it. */ + close(fd); + return -1; + } + + server_data->fifo_fds[server_data->back_pos] = fd; + server_data->back_pos = next_back_pos; + + return fd; +} + +/* + * Wait for a connection to be queued to the FIFO and return it. + * + * Returns -1 if someone has already requested a shutdown. + */ +static int worker_thread__wait_for_connection( + struct ipc_worker_thread_data *worker_thread_data) +{ + /* ASSERT NOT holding mutex */ + + struct ipc_server_data *server_data = worker_thread_data->server_data; + int fd = -1; + + pthread_mutex_lock(&server_data->work_available_mutex); + for (;;) { + if (server_data->shutdown_requested) + break; + + fd = fifo_dequeue(server_data); + if (fd >= 0) + break; + + pthread_cond_wait(&server_data->work_available_cond, + &server_data->work_available_mutex); + } + pthread_mutex_unlock(&server_data->work_available_mutex); + + return fd; +} + +/* + * Forward declare our reply callback function so that any compiler + * errors are reported when we actually define the function (in addition + * to any errors reported when we try to pass this callback function as + * a parameter in a function call). The former are easier to understand. + */ +static ipc_server_reply_cb do_io_reply_callback; + +/* + * Relay application's response message to the client process. + * (We do not flush at this point because we allow the caller + * to chunk data to the client thru us.) + */ +static int do_io_reply_callback(struct ipc_server_reply_data *reply_data, + const char *response, size_t response_len) +{ + if (reply_data->magic != MAGIC_SERVER_REPLY_DATA) + BUG("reply_cb called with wrong instance data"); + + return write_packetized_from_buf_no_flush(response, response_len, + reply_data->fd); +} + +/* A randomly chosen value. */ +#define MY_WAIT_POLL_TIMEOUT_MS (10) + +/* + * If the client hangs up without sending any data on the wire, just + * quietly close the socket and ignore this client. + * + * This worker thread is committed to reading the IPC request data + * from the client at the other end of this fd. Wait here for the + * client to actually put something on the wire -- because if the + * client just does a ping (connect and hangup without sending any + * data), our use of the pkt-line read routines will spew an error + * message. + * + * Return -1 if the client hung up. + * Return 0 if data (possibly incomplete) is ready. + */ +static int worker_thread__wait_for_io_start( + struct ipc_worker_thread_data *worker_thread_data, + int fd) +{ + struct ipc_server_data *server_data = worker_thread_data->server_data; + struct pollfd pollfd[1]; + int result; + + for (;;) { + pollfd[0].fd = fd; + pollfd[0].events = POLLIN; + + result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS); + if (result < 0) { + if (errno == EINTR) + continue; + goto cleanup; + } + + if (result == 0) { + /* a timeout */ + + int in_shutdown; + + pthread_mutex_lock(&server_data->work_available_mutex); + in_shutdown = server_data->shutdown_requested; + pthread_mutex_unlock(&server_data->work_available_mutex); + + /* + * If a shutdown is already in progress and this + * client has not started talking yet, just drop it. + */ + if (in_shutdown) + goto cleanup; + continue; + } + + if (pollfd[0].revents & POLLHUP) + goto cleanup; + + if (pollfd[0].revents & POLLIN) + return 0; + + goto cleanup; + } + +cleanup: + close(fd); + return -1; +} + +/* + * Receive the request/command from the client and pass it to the + * registered request-callback. The request-callback will compose + * a response and call our reply-callback to send it to the client. + */ +static int worker_thread__do_io( + struct ipc_worker_thread_data *worker_thread_data, + int fd) +{ + /* ASSERT NOT holding lock */ + + struct strbuf buf = STRBUF_INIT; + struct ipc_server_reply_data reply_data; + int ret = 0; + + reply_data.magic = MAGIC_SERVER_REPLY_DATA; + reply_data.worker_thread_data = worker_thread_data; + + reply_data.fd = fd; + + ret = read_packetized_to_strbuf( + reply_data.fd, &buf, + PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR); + if (ret >= 0) { + ret = worker_thread_data->server_data->application_cb( + worker_thread_data->server_data->application_data, + buf.buf, do_io_reply_callback, &reply_data); + + packet_flush_gently(reply_data.fd); + } + else { + /* + * The client probably disconnected/shutdown before it + * could send a well-formed message. Ignore it. + */ + } + + strbuf_release(&buf); + close(reply_data.fd); + + return ret; +} + +/* + * Block SIGPIPE on the current thread (so that we get EPIPE from + * write() rather than an actual signal). + * + * Note that using sigchain_push() and _pop() to control SIGPIPE + * around our IO calls is not thread safe: + * [] It uses a global stack of handler frames. + * [] It uses ALLOC_GROW() to resize it. + * [] Finally, according to the `signal(2)` man-page: + * "The effects of `signal()` in a multithreaded process are unspecified." + */ +static void thread_block_sigpipe(sigset_t *old_set) +{ + sigset_t new_set; + + sigemptyset(&new_set); + sigaddset(&new_set, SIGPIPE); + + sigemptyset(old_set); + pthread_sigmask(SIG_BLOCK, &new_set, old_set); +} + +/* + * Thread proc for an IPC worker thread. It handles a series of + * connections from clients. It pulls the next fd from the queue + * processes it, and then waits for the next client. + * + * Block SIGPIPE in this worker thread for the life of the thread. + * This avoids stray (and sometimes delayed) SIGPIPE signals caused + * by client errors and/or when we are under extremely heavy IO load. + * + * This means that the application callback will have SIGPIPE blocked. + * The callback should not change it. + */ +static void *worker_thread_proc(void *_worker_thread_data) +{ + struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data; + struct ipc_server_data *server_data = worker_thread_data->server_data; + sigset_t old_set; + int fd, io; + int ret; + + trace2_thread_start("ipc-worker"); + + thread_block_sigpipe(&old_set); + + for (;;) { + fd = worker_thread__wait_for_connection(worker_thread_data); + if (fd == -1) + break; /* in shutdown */ + + io = worker_thread__wait_for_io_start(worker_thread_data, fd); + if (io == -1) + continue; /* client hung up without sending anything */ + + ret = worker_thread__do_io(worker_thread_data, fd); + + if (ret == SIMPLE_IPC_QUIT) { + trace2_data_string("ipc-worker", NULL, "queue_stop_async", + "application_quit"); + /* + * The application layer is telling the ipc-server + * layer to shutdown. + * + * We DO NOT have a response to send to the client. + * + * Queue an async stop (to stop the other threads) and + * allow this worker thread to exit now (no sense waiting + * for the thread-pool shutdown signal). + * + * Other non-idle worker threads are allowed to finish + * responding to their current clients. + */ + ipc_server_stop_async(server_data); + break; + } + } + + trace2_thread_exit(); + return NULL; +} + +/* A randomly chosen value. */ +#define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000) + +/* + * Accept a new client connection on our socket. This uses non-blocking + * IO so that we can also wait for shutdown requests on our socket-pair + * without actually spinning on a fast timeout. + */ +static int accept_thread__wait_for_connection( + struct ipc_accept_thread_data *accept_thread_data) +{ + struct pollfd pollfd[2]; + int result; + + for (;;) { + pollfd[0].fd = accept_thread_data->fd_wait_shutdown; + pollfd[0].events = POLLIN; + + pollfd[1].fd = accept_thread_data->server_socket->fd_socket; + pollfd[1].events = POLLIN; + + result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS); + if (result < 0) { + if (errno == EINTR) + continue; + return result; + } + + if (result == 0) { + /* a timeout */ + + /* + * If someone deletes or force-creates a new unix + * domain socket at our path, all future clients + * will be routed elsewhere and we silently starve. + * If that happens, just queue a shutdown. + */ + if (unix_ss_was_stolen( + accept_thread_data->server_socket)) { + trace2_data_string("ipc-accept", NULL, + "queue_stop_async", + "socket_stolen"); + ipc_server_stop_async( + accept_thread_data->server_data); + } + continue; + } + + if (pollfd[0].revents & POLLIN) { + /* shutdown message queued to socketpair */ + return -1; + } + + if (pollfd[1].revents & POLLIN) { + /* a connection is available on server_socket */ + + int client_fd = + accept(accept_thread_data->server_socket->fd_socket, + NULL, NULL); + if (client_fd >= 0) + return client_fd; + + /* + * An error here is unlikely -- it probably + * indicates that the connecting process has + * already dropped the connection. + */ + continue; + } + + BUG("unandled poll result errno=%d r[0]=%d r[1]=%d", + errno, pollfd[0].revents, pollfd[1].revents); + } +} + +/* + * Thread proc for the IPC server "accept thread". This waits for + * an incoming socket connection, appends it to the queue of available + * connections, and notifies a worker thread to process it. + * + * Block SIGPIPE in this thread for the life of the thread. This + * avoids any stray SIGPIPE signals when closing pipe fds under + * extremely heavy loads (such as when the fifo queue is full and we + * drop incomming connections). + */ +static void *accept_thread_proc(void *_accept_thread_data) +{ + struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data; + struct ipc_server_data *server_data = accept_thread_data->server_data; + sigset_t old_set; + + trace2_thread_start("ipc-accept"); + + thread_block_sigpipe(&old_set); + + for (;;) { + int client_fd = accept_thread__wait_for_connection( + accept_thread_data); + + pthread_mutex_lock(&server_data->work_available_mutex); + if (server_data->shutdown_requested) { + pthread_mutex_unlock(&server_data->work_available_mutex); + if (client_fd >= 0) + close(client_fd); + break; + } + + if (client_fd < 0) { + /* ignore transient accept() errors */ + } + else { + fifo_enqueue(server_data, client_fd); + pthread_cond_broadcast(&server_data->work_available_cond); + } + pthread_mutex_unlock(&server_data->work_available_mutex); + } + + trace2_thread_exit(); + return NULL; +} + +/* + * We can't predict the connection arrival rate relative to the worker + * processing rate, therefore we allow the "accept-thread" to queue up + * a generous number of connections, since we'd rather have the client + * not unnecessarily timeout if we can avoid it. (The assumption is + * that this will be used for FSMonitor and a few second wait on a + * connection is better than having the client timeout and do the full + * computation itself.) + * + * The FIFO queue size is set to a multiple of the worker pool size. + * This value chosen at random. + */ +#define FIFO_SCALE (100) + +/* + * The backlog value for `listen(2)`. This doesn't need to huge, + * rather just large enough for our "accept-thread" to wake up and + * queue incoming connections onto the FIFO without the kernel + * dropping any. + * + * This value chosen at random. + */ +#define LISTEN_BACKLOG (50) + +static int create_listener_socket( + const char *path, + const struct ipc_server_opts *ipc_opts, + struct unix_ss_socket **new_server_socket) +{ + struct unix_ss_socket *server_socket = NULL; + struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT; + int ret; + + uslg_opts.listen_backlog_size = LISTEN_BACKLOG; + uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir; + + ret = unix_ss_create(path, &uslg_opts, -1, &server_socket); + if (ret) + return ret; + + if (set_socket_blocking_flag(server_socket->fd_socket, 1)) { + int saved_errno = errno; + unix_ss_free(server_socket); + errno = saved_errno; + return -1; + } + + *new_server_socket = server_socket; + + trace2_data_string("ipc-server", NULL, "listen-with-lock", path); + return 0; +} + +static int setup_listener_socket( + const char *path, + const struct ipc_server_opts *ipc_opts, + struct unix_ss_socket **new_server_socket) +{ + int ret, saved_errno; + + trace2_region_enter("ipc-server", "create-listener_socket", NULL); + + ret = create_listener_socket(path, ipc_opts, new_server_socket); + + saved_errno = errno; + trace2_region_leave("ipc-server", "create-listener_socket", NULL); + errno = saved_errno; + + return ret; +} + +/* + * Start IPC server in a pool of background threads. + */ +int ipc_server_run_async(struct ipc_server_data **returned_server_data, + const char *path, const struct ipc_server_opts *opts, + ipc_server_application_cb *application_cb, + void *application_data) +{ + struct unix_ss_socket *server_socket = NULL; + struct ipc_server_data *server_data; + int sv[2]; + int k; + int ret; + int nr_threads = opts->nr_threads; + + *returned_server_data = NULL; + + /* + * Create a socketpair and set sv[1] to non-blocking. This + * will used to send a shutdown message to the accept-thread + * and allows the accept-thread to wait on EITHER a client + * connection or a shutdown request without spinning. + */ + if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0) + return -1; + + if (set_socket_blocking_flag(sv[1], 1)) { + int saved_errno = errno; + close(sv[0]); + close(sv[1]); + errno = saved_errno; + return -1; + } + + ret = setup_listener_socket(path, opts, &server_socket); + if (ret) { + int saved_errno = errno; + close(sv[0]); + close(sv[1]); + errno = saved_errno; + return ret; + } + + server_data = xcalloc(1, sizeof(*server_data)); + server_data->magic = MAGIC_SERVER_DATA; + server_data->application_cb = application_cb; + server_data->application_data = application_data; + strbuf_init(&server_data->buf_path, 0); + strbuf_addstr(&server_data->buf_path, path); + + if (nr_threads < 1) + nr_threads = 1; + + pthread_mutex_init(&server_data->work_available_mutex, NULL); + pthread_cond_init(&server_data->work_available_cond, NULL); + + server_data->queue_size = nr_threads * FIFO_SCALE; + CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size); + + server_data->accept_thread = + xcalloc(1, sizeof(*server_data->accept_thread)); + server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA; + server_data->accept_thread->server_data = server_data; + server_data->accept_thread->server_socket = server_socket; + server_data->accept_thread->fd_send_shutdown = sv[0]; + server_data->accept_thread->fd_wait_shutdown = sv[1]; + + if (pthread_create(&server_data->accept_thread->pthread_id, NULL, + accept_thread_proc, server_data->accept_thread)) + die_errno(_("could not start accept_thread '%s'"), path); + + for (k = 0; k < nr_threads; k++) { + struct ipc_worker_thread_data *wtd; + + wtd = xcalloc(1, sizeof(*wtd)); + wtd->magic = MAGIC_WORKER_THREAD_DATA; + wtd->server_data = server_data; + + if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc, + wtd)) { + if (k == 0) + die(_("could not start worker[0] for '%s'"), + path); + /* + * Limp along with the thread pool that we have. + */ + break; + } + + wtd->next_thread = server_data->worker_thread_list; + server_data->worker_thread_list = wtd; + } + + *returned_server_data = server_data; + return 0; +} + +/* + * Gently tell the IPC server treads to shutdown. + * Can be run on any thread. + */ +int ipc_server_stop_async(struct ipc_server_data *server_data) +{ + /* ASSERT NOT holding mutex */ + + int fd; + + if (!server_data) + return 0; + + trace2_region_enter("ipc-server", "server-stop-async", NULL); + + pthread_mutex_lock(&server_data->work_available_mutex); + + server_data->shutdown_requested = 1; + + /* + * Write a byte to the shutdown socket pair to wake up the + * accept-thread. + */ + if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0) + error_errno("could not write to fd_send_shutdown"); + + /* + * Drain the queue of existing connections. + */ + while ((fd = fifo_dequeue(server_data)) != -1) + close(fd); + + /* + * Gently tell worker threads to stop processing new connections + * and exit. (This does not abort in-process conversations.) + */ + pthread_cond_broadcast(&server_data->work_available_cond); + + pthread_mutex_unlock(&server_data->work_available_mutex); + + trace2_region_leave("ipc-server", "server-stop-async", NULL); + + return 0; +} + +/* + * Wait for all IPC server threads to stop. + */ +int ipc_server_await(struct ipc_server_data *server_data) +{ + pthread_join(server_data->accept_thread->pthread_id, NULL); + + if (!server_data->shutdown_requested) + BUG("ipc-server: accept-thread stopped for '%s'", + server_data->buf_path.buf); + + while (server_data->worker_thread_list) { + struct ipc_worker_thread_data *wtd = + server_data->worker_thread_list; + + pthread_join(wtd->pthread_id, NULL); + + server_data->worker_thread_list = wtd->next_thread; + free(wtd); + } + + server_data->is_stopped = 1; + + return 0; +} + +void ipc_server_free(struct ipc_server_data *server_data) +{ + struct ipc_accept_thread_data * accept_thread_data; + + if (!server_data) + return; + + if (!server_data->is_stopped) + BUG("cannot free ipc-server while running for '%s'", + server_data->buf_path.buf); + + accept_thread_data = server_data->accept_thread; + if (accept_thread_data) { + unix_ss_free(accept_thread_data->server_socket); + + if (accept_thread_data->fd_send_shutdown != -1) + close(accept_thread_data->fd_send_shutdown); + if (accept_thread_data->fd_wait_shutdown != -1) + close(accept_thread_data->fd_wait_shutdown); + + free(server_data->accept_thread); + } + + while (server_data->worker_thread_list) { + struct ipc_worker_thread_data *wtd = + server_data->worker_thread_list; + + server_data->worker_thread_list = wtd->next_thread; + free(wtd); + } + + pthread_cond_destroy(&server_data->work_available_cond); + pthread_mutex_destroy(&server_data->work_available_mutex); + + strbuf_release(&server_data->buf_path); + + free(server_data->fifo_fds); + free(server_data); +} diff --git a/contrib/buildsystems/CMakeLists.txt b/contrib/buildsystems/CMakeLists.txt index 6ffbbb6a6f..629a181745 100644 --- a/contrib/buildsystems/CMakeLists.txt +++ b/contrib/buildsystems/CMakeLists.txt @@ -248,6 +248,8 @@ endif() if(CMAKE_SYSTEM_NAME STREQUAL "Windows") list(APPEND compat_SOURCES compat/simple-ipc/ipc-shared.c compat/simple-ipc/ipc-win32.c) +else() + list(APPEND compat_SOURCES compat/simple-ipc/ipc-shared.c compat/simple-ipc/ipc-unix-socket.c) endif() set(EXE_EXTENSION ${CMAKE_EXECUTABLE_SUFFIX}) diff --git a/simple-ipc.h b/simple-ipc.h index ab5619e3d7..dc3606e30b 100644 --- a/simple-ipc.h +++ b/simple-ipc.h @@ -5,7 +5,7 @@ * See Documentation/technical/api-simple-ipc.txt */ -#if defined(GIT_WINDOWS_NATIVE) +#if defined(GIT_WINDOWS_NATIVE) || !defined(NO_UNIX_SOCKETS) #define SUPPORTS_SIMPLE_IPC #endif @@ -62,11 +62,17 @@ struct ipc_client_connect_options { * the service and need to wait for it to become ready. */ unsigned int wait_if_not_found:1; + + /* + * Disallow chdir() when creating a Unix domain socket. + */ + unsigned int uds_disallow_chdir:1; }; #define IPC_CLIENT_CONNECT_OPTIONS_INIT { \ .wait_if_busy = 0, \ .wait_if_not_found = 0, \ + .uds_disallow_chdir = 0, \ } /* @@ -159,6 +165,11 @@ struct ipc_server_data; struct ipc_server_opts { int nr_threads; + + /* + * Disallow chdir() when creating a Unix domain socket. + */ + unsigned int uds_disallow_chdir:1; }; /* -- cgit v1.3-5-g9baa From 36a7eb68760b7a45ce6b16be300dd04fbe9bd029 Mon Sep 17 00:00:00 2001 From: Jeff Hostetler Date: Mon, 22 Mar 2021 10:29:48 +0000 Subject: t0052: add simple-ipc tests and t/helper/test-simple-ipc tool Create t0052-simple-ipc.sh with unit tests for the "simple-ipc" mechanism. Create t/helper/test-simple-ipc test tool to exercise the "simple-ipc" functions. When the tool is invoked with "run-daemon", it runs a server to listen for "simple-ipc" connections on a test socket or named pipe and responds to a set of commands to exercise/stress the communication setup. When the tool is invoked with "start-daemon", it spawns a "run-daemon" command in the background and waits for the server to become ready before exiting. (This helps make unit tests in t0052 more predictable and avoids the need for arbitrary sleeps in the test script.) The tool also has a series of client "send" commands to send commands and data to a server instance. Signed-off-by: Jeff Hostetler Signed-off-by: Junio C Hamano --- Makefile | 1 + t/helper/test-simple-ipc.c | 787 +++++++++++++++++++++++++++++++++++++++++++++ t/helper/test-tool.c | 1 + t/helper/test-tool.h | 1 + t/t0052-simple-ipc.sh | 122 +++++++ 5 files changed, 912 insertions(+) create mode 100644 t/helper/test-simple-ipc.c create mode 100755 t/t0052-simple-ipc.sh (limited to 'Makefile') diff --git a/Makefile b/Makefile index 8e4bde2d81..dfc37b0480 100644 --- a/Makefile +++ b/Makefile @@ -740,6 +740,7 @@ TEST_BUILTINS_OBJS += test-serve-v2.o TEST_BUILTINS_OBJS += test-sha1.o TEST_BUILTINS_OBJS += test-sha256.o TEST_BUILTINS_OBJS += test-sigchain.o +TEST_BUILTINS_OBJS += test-simple-ipc.o TEST_BUILTINS_OBJS += test-strcmp-offset.o TEST_BUILTINS_OBJS += test-string-list.o TEST_BUILTINS_OBJS += test-submodule-config.o diff --git a/t/helper/test-simple-ipc.c b/t/helper/test-simple-ipc.c new file mode 100644 index 0000000000..42040ef81b --- /dev/null +++ b/t/helper/test-simple-ipc.c @@ -0,0 +1,787 @@ +/* + * test-simple-ipc.c: verify that the Inter-Process Communication works. + */ + +#include "test-tool.h" +#include "cache.h" +#include "strbuf.h" +#include "simple-ipc.h" +#include "parse-options.h" +#include "thread-utils.h" +#include "strvec.h" + +#ifndef SUPPORTS_SIMPLE_IPC +int cmd__simple_ipc(int argc, const char **argv) +{ + die("simple IPC not available on this platform"); +} +#else + +/* + * The test daemon defines an "application callback" that supports a + * series of commands (see `test_app_cb()`). + * + * Unknown commands are caught here and we send an error message back + * to the client process. + */ +static int app__unhandled_command(const char *command, + ipc_server_reply_cb *reply_cb, + struct ipc_server_reply_data *reply_data) +{ + struct strbuf buf = STRBUF_INIT; + int ret; + + strbuf_addf(&buf, "unhandled command: %s", command); + ret = reply_cb(reply_data, buf.buf, buf.len); + strbuf_release(&buf); + + return ret; +} + +/* + * Reply with a single very large buffer. This is to ensure that + * long response are properly handled -- whether the chunking occurs + * in the kernel or in the (probably pkt-line) layer. + */ +#define BIG_ROWS (10000) +static int app__big_command(ipc_server_reply_cb *reply_cb, + struct ipc_server_reply_data *reply_data) +{ + struct strbuf buf = STRBUF_INIT; + int row; + int ret; + + for (row = 0; row < BIG_ROWS; row++) + strbuf_addf(&buf, "big: %.75d\n", row); + + ret = reply_cb(reply_data, buf.buf, buf.len); + strbuf_release(&buf); + + return ret; +} + +/* + * Reply with a series of lines. This is to ensure that we can incrementally + * compute the response and chunk it to the client. + */ +#define CHUNK_ROWS (10000) +static int app__chunk_command(ipc_server_reply_cb *reply_cb, + struct ipc_server_reply_data *reply_data) +{ + struct strbuf buf = STRBUF_INIT; + int row; + int ret; + + for (row = 0; row < CHUNK_ROWS; row++) { + strbuf_setlen(&buf, 0); + strbuf_addf(&buf, "big: %.75d\n", row); + ret = reply_cb(reply_data, buf.buf, buf.len); + } + + strbuf_release(&buf); + + return ret; +} + +/* + * Slowly reply with a series of lines. This is to model an expensive to + * compute chunked response (which might happen if this callback is running + * in a thread and is fighting for a lock with other threads). + */ +#define SLOW_ROWS (1000) +#define SLOW_DELAY_MS (10) +static int app__slow_command(ipc_server_reply_cb *reply_cb, + struct ipc_server_reply_data *reply_data) +{ + struct strbuf buf = STRBUF_INIT; + int row; + int ret; + + for (row = 0; row < SLOW_ROWS; row++) { + strbuf_setlen(&buf, 0); + strbuf_addf(&buf, "big: %.75d\n", row); + ret = reply_cb(reply_data, buf.buf, buf.len); + sleep_millisec(SLOW_DELAY_MS); + } + + strbuf_release(&buf); + + return ret; +} + +/* + * The client sent a command followed by a (possibly very) large buffer. + */ +static int app__sendbytes_command(const char *received, + ipc_server_reply_cb *reply_cb, + struct ipc_server_reply_data *reply_data) +{ + struct strbuf buf_resp = STRBUF_INIT; + const char *p = "?"; + int len_ballast = 0; + int k; + int errs = 0; + int ret; + + if (skip_prefix(received, "sendbytes ", &p)) + len_ballast = strlen(p); + + /* + * Verify that the ballast is n copies of a single letter. + * And that the multi-threaded IO layer didn't cross the streams. + */ + for (k = 1; k < len_ballast; k++) + if (p[k] != p[0]) + errs++; + + if (errs) + strbuf_addf(&buf_resp, "errs:%d\n", errs); + else + strbuf_addf(&buf_resp, "rcvd:%c%08d\n", p[0], len_ballast); + + ret = reply_cb(reply_data, buf_resp.buf, buf_resp.len); + + strbuf_release(&buf_resp); + + return ret; +} + +/* + * An arbitrary fixed address to verify that the application instance + * data is handled properly. + */ +static int my_app_data = 42; + +static ipc_server_application_cb test_app_cb; + +/* + * This is the "application callback" that sits on top of the + * "ipc-server". It completely defines the set of commands supported + * by this application. + */ +static int test_app_cb(void *application_data, + const char *command, + ipc_server_reply_cb *reply_cb, + struct ipc_server_reply_data *reply_data) +{ + /* + * Verify that we received the application-data that we passed + * when we started the ipc-server. (We have several layers of + * callbacks calling callbacks and it's easy to get things mixed + * up (especially when some are "void*").) + */ + if (application_data != (void*)&my_app_data) + BUG("application_cb: application_data pointer wrong"); + + if (!strcmp(command, "quit")) { + /* + * The client sent a "quit" command. This is an async + * request for the server to shutdown. + * + * We DO NOT send the client a response message + * (because we have nothing to say and the other + * server threads have not yet stopped). + * + * Tell the ipc-server layer to start shutting down. + * This includes: stop listening for new connections + * on the socket/pipe and telling all worker threads + * to finish/drain their outgoing responses to other + * clients. + * + * This DOES NOT force an immediate sync shutdown. + */ + return SIMPLE_IPC_QUIT; + } + + if (!strcmp(command, "ping")) { + const char *answer = "pong"; + return reply_cb(reply_data, answer, strlen(answer)); + } + + if (!strcmp(command, "big")) + return app__big_command(reply_cb, reply_data); + + if (!strcmp(command, "chunk")) + return app__chunk_command(reply_cb, reply_data); + + if (!strcmp(command, "slow")) + return app__slow_command(reply_cb, reply_data); + + if (starts_with(command, "sendbytes ")) + return app__sendbytes_command(command, reply_cb, reply_data); + + return app__unhandled_command(command, reply_cb, reply_data); +} + +struct cl_args +{ + const char *subcommand; + const char *path; + const char *token; + + int nr_threads; + int max_wait_sec; + int bytecount; + int batchsize; + + char bytevalue; +}; + +static struct cl_args cl_args = { + .subcommand = NULL, + .path = "ipc-test", + .token = NULL, + + .nr_threads = 5, + .max_wait_sec = 60, + .bytecount = 1024, + .batchsize = 10, + + .bytevalue = 'x', +}; + +/* + * This process will run as a simple-ipc server and listen for IPC commands + * from client processes. + */ +static int daemon__run_server(void) +{ + int ret; + + struct ipc_server_opts opts = { + .nr_threads = cl_args.nr_threads, + }; + + /* + * Synchronously run the ipc-server. We don't need any application + * instance data, so pass an arbitrary pointer (that we'll later + * verify made the round trip). + */ + ret = ipc_server_run(cl_args.path, &opts, test_app_cb, (void*)&my_app_data); + if (ret == -2) + error(_("socket/pipe already in use: '%s'"), cl_args.path); + else if (ret == -1) + error_errno(_("could not start server on: '%s'"), cl_args.path); + + return ret; +} + +#ifndef GIT_WINDOWS_NATIVE +/* + * This is adapted from `daemonize()`. Use `fork()` to directly create and + * run the daemon in a child process. + */ +static int spawn_server(pid_t *pid) +{ + struct ipc_server_opts opts = { + .nr_threads = cl_args.nr_threads, + }; + + *pid = fork(); + + switch (*pid) { + case 0: + if (setsid() == -1) + error_errno(_("setsid failed")); + close(0); + close(1); + close(2); + sanitize_stdfds(); + + return ipc_server_run(cl_args.path, &opts, test_app_cb, + (void*)&my_app_data); + + case -1: + return error_errno(_("could not spawn daemon in the background")); + + default: + return 0; + } +} +#else +/* + * Conceptually like `daemonize()` but different because Windows does not + * have `fork(2)`. Spawn a normal Windows child process but without the + * limitations of `start_command()` and `finish_command()`. + */ +static int spawn_server(pid_t *pid) +{ + char test_tool_exe[MAX_PATH]; + struct strvec args = STRVEC_INIT; + int in, out; + + GetModuleFileNameA(NULL, test_tool_exe, MAX_PATH); + + in = open("/dev/null", O_RDONLY); + out = open("/dev/null", O_WRONLY); + + strvec_push(&args, test_tool_exe); + strvec_push(&args, "simple-ipc"); + strvec_push(&args, "run-daemon"); + strvec_pushf(&args, "--name=%s", cl_args.path); + strvec_pushf(&args, "--threads=%d", cl_args.nr_threads); + + *pid = mingw_spawnvpe(args.v[0], args.v, NULL, NULL, in, out, out); + close(in); + close(out); + + strvec_clear(&args); + + if (*pid < 0) + return error(_("could not spawn daemon in the background")); + + return 0; +} +#endif + +/* + * This is adapted from `wait_or_whine()`. Watch the child process and + * let it get started and begin listening for requests on the socket + * before reporting our success. + */ +static int wait_for_server_startup(pid_t pid_child) +{ + int status; + pid_t pid_seen; + enum ipc_active_state s; + time_t time_limit, now; + + time(&time_limit); + time_limit += cl_args.max_wait_sec; + + for (;;) { + pid_seen = waitpid(pid_child, &status, WNOHANG); + + if (pid_seen == -1) + return error_errno(_("waitpid failed")); + + else if (pid_seen == 0) { + /* + * The child is still running (this should be + * the normal case). Try to connect to it on + * the socket and see if it is ready for + * business. + * + * If there is another daemon already running, + * our child will fail to start (possibly + * after a timeout on the lock), but we don't + * care (who responds) if the socket is live. + */ + s = ipc_get_active_state(cl_args.path); + if (s == IPC_STATE__LISTENING) + return 0; + + time(&now); + if (now > time_limit) + return error(_("daemon not online yet")); + + continue; + } + + else if (pid_seen == pid_child) { + /* + * The new child daemon process shutdown while + * it was starting up, so it is not listening + * on the socket. + * + * Try to ping the socket in the odd chance + * that another daemon started (or was already + * running) while our child was starting. + * + * Again, we don't care who services the socket. + */ + s = ipc_get_active_state(cl_args.path); + if (s == IPC_STATE__LISTENING) + return 0; + + /* + * We don't care about the WEXITSTATUS() nor + * any of the WIF*(status) values because + * `cmd__simple_ipc()` does the `!!result` + * trick on all function return values. + * + * So it is sufficient to just report the + * early shutdown as an error. + */ + return error(_("daemon failed to start")); + } + + else + return error(_("waitpid is confused")); + } +} + +/* + * This process will start a simple-ipc server in a background process and + * wait for it to become ready. This is like `daemonize()` but gives us + * more control and better error reporting (and makes it easier to write + * unit tests). + */ +static int daemon__start_server(void) +{ + pid_t pid_child; + int ret; + + /* + * Run the actual daemon in a background process. + */ + ret = spawn_server(&pid_child); + if (pid_child <= 0) + return ret; + + /* + * Let the parent wait for the child process to get started + * and begin listening for requests on the socket. + */ + ret = wait_for_server_startup(pid_child); + + return ret; +} + +/* + * This process will run a quick probe to see if a simple-ipc server + * is active on this path. + * + * Returns 0 if the server is alive. + */ +static int client__probe_server(void) +{ + enum ipc_active_state s; + + s = ipc_get_active_state(cl_args.path); + switch (s) { + case IPC_STATE__LISTENING: + return 0; + + case IPC_STATE__NOT_LISTENING: + return error("no server listening at '%s'", cl_args.path); + + case IPC_STATE__PATH_NOT_FOUND: + return error("path not found '%s'", cl_args.path); + + case IPC_STATE__INVALID_PATH: + return error("invalid pipe/socket name '%s'", cl_args.path); + + case IPC_STATE__OTHER_ERROR: + default: + return error("other error for '%s'", cl_args.path); + } +} + +/* + * Send an IPC command token to an already-running server daemon and + * print the response. + * + * This is a simple 1 word command/token that `test_app_cb()` (in the + * daemon process) will understand. + */ +static int client__send_ipc(void) +{ + const char *command = "(no-command)"; + struct strbuf buf = STRBUF_INIT; + struct ipc_client_connect_options options + = IPC_CLIENT_CONNECT_OPTIONS_INIT; + + if (cl_args.token && *cl_args.token) + command = cl_args.token; + + options.wait_if_busy = 1; + options.wait_if_not_found = 0; + + if (!ipc_client_send_command(cl_args.path, &options, command, &buf)) { + if (buf.len) { + printf("%s\n", buf.buf); + fflush(stdout); + } + strbuf_release(&buf); + + return 0; + } + + return error("failed to send '%s' to '%s'", command, cl_args.path); +} + +/* + * Send an IPC command to an already-running server and ask it to + * shutdown. "send quit" is an async request and queues a shutdown + * event in the server, so we spin and wait here for it to actually + * shutdown to make the unit tests a little easier to write. + */ +static int client__stop_server(void) +{ + int ret; + time_t time_limit, now; + enum ipc_active_state s; + + time(&time_limit); + time_limit += cl_args.max_wait_sec; + + cl_args.token = "quit"; + + ret = client__send_ipc(); + if (ret) + return ret; + + for (;;) { + sleep_millisec(100); + + s = ipc_get_active_state(cl_args.path); + + if (s != IPC_STATE__LISTENING) { + /* + * The socket/pipe is gone and/or has stopped + * responding. Lets assume that the daemon + * process has exited too. + */ + return 0; + } + + time(&now); + if (now > time_limit) + return error(_("daemon has not shutdown yet")); + } +} + +/* + * Send an IPC command followed by ballast to confirm that a large + * message can be sent and that the kernel or pkt-line layers will + * properly chunk it and that the daemon receives the entire message. + */ +static int do_sendbytes(int bytecount, char byte, const char *path, + const struct ipc_client_connect_options *options) +{ + struct strbuf buf_send = STRBUF_INIT; + struct strbuf buf_resp = STRBUF_INIT; + + strbuf_addstr(&buf_send, "sendbytes "); + strbuf_addchars(&buf_send, byte, bytecount); + + if (!ipc_client_send_command(path, options, buf_send.buf, &buf_resp)) { + strbuf_rtrim(&buf_resp); + printf("sent:%c%08d %s\n", byte, bytecount, buf_resp.buf); + fflush(stdout); + strbuf_release(&buf_send); + strbuf_release(&buf_resp); + + return 0; + } + + return error("client failed to sendbytes(%d, '%c') to '%s'", + bytecount, byte, path); +} + +/* + * Send an IPC command with ballast to an already-running server daemon. + */ +static int client__sendbytes(void) +{ + struct ipc_client_connect_options options + = IPC_CLIENT_CONNECT_OPTIONS_INIT; + + options.wait_if_busy = 1; + options.wait_if_not_found = 0; + options.uds_disallow_chdir = 0; + + return do_sendbytes(cl_args.bytecount, cl_args.bytevalue, cl_args.path, + &options); +} + +struct multiple_thread_data { + pthread_t pthread_id; + struct multiple_thread_data *next; + const char *path; + int bytecount; + int batchsize; + int sum_errors; + int sum_good; + char letter; +}; + +static void *multiple_thread_proc(void *_multiple_thread_data) +{ + struct multiple_thread_data *d = _multiple_thread_data; + int k; + struct ipc_client_connect_options options + = IPC_CLIENT_CONNECT_OPTIONS_INIT; + + options.wait_if_busy = 1; + options.wait_if_not_found = 0; + /* + * A multi-threaded client should not be randomly calling chdir(). + * The test will pass without this restriction because the test is + * not otherwise accessing the filesystem, but it makes us honest. + */ + options.uds_disallow_chdir = 1; + + trace2_thread_start("multiple"); + + for (k = 0; k < d->batchsize; k++) { + if (do_sendbytes(d->bytecount + k, d->letter, d->path, &options)) + d->sum_errors++; + else + d->sum_good++; + } + + trace2_thread_exit(); + return NULL; +} + +/* + * Start a client-side thread pool. Each thread sends a series of + * IPC requests. Each request is on a new connection to the server. + */ +static int client__multiple(void) +{ + struct multiple_thread_data *list = NULL; + int k; + int sum_join_errors = 0; + int sum_thread_errors = 0; + int sum_good = 0; + + for (k = 0; k < cl_args.nr_threads; k++) { + struct multiple_thread_data *d = xcalloc(1, sizeof(*d)); + d->next = list; + d->path = cl_args.path; + d->bytecount = cl_args.bytecount + cl_args.batchsize*(k/26); + d->batchsize = cl_args.batchsize; + d->sum_errors = 0; + d->sum_good = 0; + d->letter = 'A' + (k % 26); + + if (pthread_create(&d->pthread_id, NULL, multiple_thread_proc, d)) { + warning("failed to create thread[%d] skipping remainder", k); + free(d); + break; + } + + list = d; + } + + while (list) { + struct multiple_thread_data *d = list; + + if (pthread_join(d->pthread_id, NULL)) + sum_join_errors++; + + sum_thread_errors += d->sum_errors; + sum_good += d->sum_good; + + list = d->next; + free(d); + } + + printf("client (good %d) (join %d), (errors %d)\n", + sum_good, sum_join_errors, sum_thread_errors); + + return (sum_join_errors + sum_thread_errors) ? 1 : 0; +} + +int cmd__simple_ipc(int argc, const char **argv) +{ + const char * const simple_ipc_usage[] = { + N_("test-helper simple-ipc is-active [] []"), + N_("test-helper simple-ipc run-daemon [] []"), + N_("test-helper simple-ipc start-daemon [] [] []"), + N_("test-helper simple-ipc stop-daemon [] []"), + N_("test-helper simple-ipc send [] []"), + N_("test-helper simple-ipc sendbytes [] [] []"), + N_("test-helper simple-ipc multiple [] [] [] []"), + NULL + }; + + const char *bytevalue = NULL; + + struct option options[] = { +#ifndef GIT_WINDOWS_NATIVE + OPT_STRING(0, "name", &cl_args.path, N_("name"), N_("name or pathname of unix domain socket")), +#else + OPT_STRING(0, "name", &cl_args.path, N_("name"), N_("named-pipe name")), +#endif + OPT_INTEGER(0, "threads", &cl_args.nr_threads, N_("number of threads in server thread pool")), + OPT_INTEGER(0, "max-wait", &cl_args.max_wait_sec, N_("seconds to wait for daemon to start or stop")), + + OPT_INTEGER(0, "bytecount", &cl_args.bytecount, N_("number of bytes")), + OPT_INTEGER(0, "batchsize", &cl_args.batchsize, N_("number of requests per thread")), + + OPT_STRING(0, "byte", &bytevalue, N_("byte"), N_("ballast character")), + OPT_STRING(0, "token", &cl_args.token, N_("token"), N_("command token to send to the server")), + + OPT_END() + }; + + if (argc < 2) + usage_with_options(simple_ipc_usage, options); + + if (argc == 2 && !strcmp(argv[1], "-h")) + usage_with_options(simple_ipc_usage, options); + + if (argc == 2 && !strcmp(argv[1], "SUPPORTS_SIMPLE_IPC")) + return 0; + + cl_args.subcommand = argv[1]; + + argc--; + argv++; + + argc = parse_options(argc, argv, NULL, options, simple_ipc_usage, 0); + + if (cl_args.nr_threads < 1) + cl_args.nr_threads = 1; + if (cl_args.max_wait_sec < 0) + cl_args.max_wait_sec = 0; + if (cl_args.bytecount < 1) + cl_args.bytecount = 1; + if (cl_args.batchsize < 1) + cl_args.batchsize = 1; + + if (bytevalue && *bytevalue) + cl_args.bytevalue = bytevalue[0]; + + /* + * Use '!!' on all dispatch functions to map from `error()` style + * (returns -1) style to `test_must_fail` style (expects 1). This + * makes shell error messages less confusing. + */ + + if (!strcmp(cl_args.subcommand, "is-active")) + return !!client__probe_server(); + + if (!strcmp(cl_args.subcommand, "run-daemon")) + return !!daemon__run_server(); + + if (!strcmp(cl_args.subcommand, "start-daemon")) + return !!daemon__start_server(); + + /* + * Client commands follow. Ensure a server is running before + * sending any data. This might be overkill, but then again + * this is a test harness. + */ + + if (!strcmp(cl_args.subcommand, "stop-daemon")) { + if (client__probe_server()) + return 1; + return !!client__stop_server(); + } + + if (!strcmp(cl_args.subcommand, "send")) { + if (client__probe_server()) + return 1; + return !!client__send_ipc(); + } + + if (!strcmp(cl_args.subcommand, "sendbytes")) { + if (client__probe_server()) + return 1; + return !!client__sendbytes(); + } + + if (!strcmp(cl_args.subcommand, "multiple")) { + if (client__probe_server()) + return 1; + return !!client__multiple(); + } + + die("Unhandled subcommand: '%s'", cl_args.subcommand); +} +#endif diff --git a/t/helper/test-tool.c b/t/helper/test-tool.c index 9d6d14d929..a409655f03 100644 --- a/t/helper/test-tool.c +++ b/t/helper/test-tool.c @@ -64,6 +64,7 @@ static struct test_cmd cmds[] = { { "sha1", cmd__sha1 }, { "sha256", cmd__sha256 }, { "sigchain", cmd__sigchain }, + { "simple-ipc", cmd__simple_ipc }, { "strcmp-offset", cmd__strcmp_offset }, { "string-list", cmd__string_list }, { "submodule-config", cmd__submodule_config }, diff --git a/t/helper/test-tool.h b/t/helper/test-tool.h index a6470ff62c..564eb3c8e9 100644 --- a/t/helper/test-tool.h +++ b/t/helper/test-tool.h @@ -54,6 +54,7 @@ int cmd__sha1(int argc, const char **argv); int cmd__oid_array(int argc, const char **argv); int cmd__sha256(int argc, const char **argv); int cmd__sigchain(int argc, const char **argv); +int cmd__simple_ipc(int argc, const char **argv); int cmd__strcmp_offset(int argc, const char **argv); int cmd__string_list(int argc, const char **argv); int cmd__submodule_config(int argc, const char **argv); diff --git a/t/t0052-simple-ipc.sh b/t/t0052-simple-ipc.sh new file mode 100755 index 0000000000..ff98be31a5 --- /dev/null +++ b/t/t0052-simple-ipc.sh @@ -0,0 +1,122 @@ +#!/bin/sh + +test_description='simple command server' + +. ./test-lib.sh + +test-tool simple-ipc SUPPORTS_SIMPLE_IPC || { + skip_all='simple IPC not supported on this platform' + test_done +} + +stop_simple_IPC_server () { + test-tool simple-ipc stop-daemon +} + +test_expect_success 'start simple command server' ' + test_atexit stop_simple_IPC_server && + test-tool simple-ipc start-daemon --threads=8 && + test-tool simple-ipc is-active +' + +test_expect_success 'simple command server' ' + test-tool simple-ipc send --token=ping >actual && + echo pong >expect && + test_cmp expect actual +' + +test_expect_success 'servers cannot share the same path' ' + test_must_fail test-tool simple-ipc run-daemon && + test-tool simple-ipc is-active +' + +test_expect_success 'big response' ' + test-tool simple-ipc send --token=big >actual && + test_line_count -ge 10000 actual && + grep -q "big: [0]*9999\$" actual +' + +test_expect_success 'chunk response' ' + test-tool simple-ipc send --token=chunk >actual && + test_line_count -ge 10000 actual && + grep -q "big: [0]*9999\$" actual +' + +test_expect_success 'slow response' ' + test-tool simple-ipc send --token=slow >actual && + test_line_count -ge 100 actual && + grep -q "big: [0]*99\$" actual +' + +# Send an IPC with n=100,000 bytes of ballast. This should be large enough +# to force both the kernel and the pkt-line layer to chunk the message to the +# daemon and for the daemon to receive it in chunks. +# +test_expect_success 'sendbytes' ' + test-tool simple-ipc sendbytes --bytecount=100000 --byte=A >actual && + grep "sent:A00100000 rcvd:A00100000" actual +' + +# Start a series of client threads that each make +# IPC requests to the server. Each ( * ) request +# will open a new connection to the server and randomly bind to a server +# thread. Each client thread exits after completing its batch. So the +# total number of live client threads will be smaller than the total. +# Each request will send a message containing at least bytes +# of ballast. (Responses are small.) +# +# The purpose here is to test threading in the server and responding to +# many concurrent client requests (regardless of whether they come from +# 1 client process or many). And to test that the server side of the +# named pipe/socket is stable. (On Windows this means that the server +# pipe is properly recycled.) +# +# On Windows it also lets us adjust the connection timeout in the +# `ipc_client_send_command()`. +# +# Note it is easy to drive the system into failure by requesting an +# insane number of threads on client or server and/or increasing the +# per-thread batchsize or the per-request bytecount (ballast). +# On Windows these failures look like "pipe is busy" errors. +# So I've chosen fairly conservative values for now. +# +# We expect output of the form "sent: ..." +# With terms (7, 19, 13) we expect: +# in [A-G] +# in [19+0 .. 19+(13-1)] +# and (7 * 13) successful responses. +# +test_expect_success 'stress test threads' ' + test-tool simple-ipc multiple \ + --threads=7 \ + --bytecount=19 \ + --batchsize=13 \ + >actual && + test_line_count = 92 actual && + grep "good 91" actual && + grep "sent:A" actual_a && + cat >expect_a <<-EOF && + sent:A00000019 rcvd:A00000019 + sent:A00000020 rcvd:A00000020 + sent:A00000021 rcvd:A00000021 + sent:A00000022 rcvd:A00000022 + sent:A00000023 rcvd:A00000023 + sent:A00000024 rcvd:A00000024 + sent:A00000025 rcvd:A00000025 + sent:A00000026 rcvd:A00000026 + sent:A00000027 rcvd:A00000027 + sent:A00000028 rcvd:A00000028 + sent:A00000029 rcvd:A00000029 + sent:A00000030 rcvd:A00000030 + sent:A00000031 rcvd:A00000031 + EOF + test_cmp expect_a actual_a +' + +test_expect_success 'stop-daemon works' ' + test-tool simple-ipc stop-daemon && + test_must_fail test-tool simple-ipc is-active && + test_must_fail test-tool simple-ipc send --token=ping +' + +test_done -- cgit v1.3-5-g9baa