From 385e18810f10ec0ce0a266d25da4e1878c8ce15a Mon Sep 17 00:00:00 2001 From: Patrick Steinhardt Date: Sun, 23 Nov 2025 19:59:36 +0100 Subject: packfile: introduce function to read object info from a store Extract the logic to read object info for a packed object from `do_oid_object_into_extended()` into a standalone function that operates on the packfile store. This function will be used in a subsequent commit. Note that this change allows us to make `find_pack_entry()` an internal implementation detail. As a consequence though we have to move around `packfile_store_freshen_object()` so that it is defined after that function. Signed-off-by: Patrick Steinhardt Signed-off-by: Junio C Hamano --- packfile.c | 71 +++++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 54 insertions(+), 17 deletions(-) (limited to 'packfile.c') diff --git a/packfile.c b/packfile.c index 40f733dd23..b4bc40d895 100644 --- a/packfile.c +++ b/packfile.c @@ -819,22 +819,6 @@ struct packed_git *packfile_store_load_pack(struct packfile_store *store, return p; } -int packfile_store_freshen_object(struct packfile_store *store, - const struct object_id *oid) -{ - struct pack_entry e; - if (!find_pack_entry(store->odb->repo, oid, &e)) - return 0; - if (e.p->is_cruft) - return 0; - if (e.p->freshened) - return 1; - if (utime(e.p->pack_name, NULL)) - return 0; - e.p->freshened = 1; - return 1; -} - void (*report_garbage)(unsigned seen_bits, const char *path); static void report_helper(const struct string_list *list, @@ -2064,7 +2048,9 @@ static int fill_pack_entry(const struct object_id *oid, return 1; } -int find_pack_entry(struct repository *r, const struct object_id *oid, struct pack_entry *e) +static int find_pack_entry(struct repository *r, + const struct object_id *oid, + struct pack_entry *e) { struct list_head *pos; @@ -2087,6 +2073,57 @@ int find_pack_entry(struct repository *r, const struct object_id *oid, struct pa return 0; } +int packfile_store_freshen_object(struct packfile_store *store, + const struct object_id *oid) +{ + struct pack_entry e; + if (!find_pack_entry(store->odb->repo, oid, &e)) + return 0; + if (e.p->is_cruft) + return 0; + if (e.p->freshened) + return 1; + if (utime(e.p->pack_name, NULL)) + return 0; + e.p->freshened = 1; + return 1; +} + +int packfile_store_read_object_info(struct packfile_store *store, + const struct object_id *oid, + struct object_info *oi, + unsigned flags UNUSED) +{ + static struct object_info blank_oi = OBJECT_INFO_INIT; + struct pack_entry e; + int rtype; + + if (!find_pack_entry(store->odb->repo, oid, &e)) + return 1; + + /* + * We know that the caller doesn't actually need the + * information below, so return early. + */ + if (oi == &blank_oi) + return 0; + + rtype = packed_object_info(store->odb->repo, e.p, e.offset, oi); + if (rtype < 0) { + mark_bad_packed_object(e.p, oid); + return -1; + } + + if (oi->whence == OI_PACKED) { + oi->u.packed.offset = e.offset; + oi->u.packed.pack = e.p; + oi->u.packed.is_delta = (rtype == OBJ_REF_DELTA || + rtype == OBJ_OFS_DELTA); + } + + return 0; +} + static void maybe_invalidate_kept_pack_cache(struct repository *r, unsigned flags) { -- cgit v1.3 From 8c1b84bc977bf1e4515efe0386de87257ec28689 Mon Sep 17 00:00:00 2001 From: Patrick Steinhardt Date: Sun, 23 Nov 2025 19:59:41 +0100 Subject: streaming: move logic to read packed objects streams into backend Move the logic to read packed object streams into the respective subsystem. Signed-off-by: Patrick Steinhardt Signed-off-by: Junio C Hamano --- packfile.c | 128 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ packfile.h | 5 +++ streaming.c | 136 +----------------------------------------------------------- 3 files changed, 134 insertions(+), 135 deletions(-) (limited to 'packfile.c') diff --git a/packfile.c b/packfile.c index b4bc40d895..ad56ce0b90 100644 --- a/packfile.c +++ b/packfile.c @@ -20,6 +20,7 @@ #include "tree.h" #include "object-file.h" #include "odb.h" +#include "streaming.h" #include "midx.h" #include "commit-graph.h" #include "pack-revindex.h" @@ -2406,3 +2407,130 @@ void packfile_store_close(struct packfile_store *store) close_pack(p); } } + +struct odb_packed_read_stream { + struct odb_read_stream base; + struct packed_git *pack; + git_zstream z; + enum { + ODB_PACKED_READ_STREAM_UNINITIALIZED, + ODB_PACKED_READ_STREAM_INUSE, + ODB_PACKED_READ_STREAM_DONE, + ODB_PACKED_READ_STREAM_ERROR, + } z_state; + off_t pos; +}; + +static ssize_t read_istream_pack_non_delta(struct odb_read_stream *_st, char *buf, + size_t sz) +{ + struct odb_packed_read_stream *st = (struct odb_packed_read_stream *)_st; + size_t total_read = 0; + + switch (st->z_state) { + case ODB_PACKED_READ_STREAM_UNINITIALIZED: + memset(&st->z, 0, sizeof(st->z)); + git_inflate_init(&st->z); + st->z_state = ODB_PACKED_READ_STREAM_INUSE; + break; + case ODB_PACKED_READ_STREAM_DONE: + return 0; + case ODB_PACKED_READ_STREAM_ERROR: + return -1; + case ODB_PACKED_READ_STREAM_INUSE: + break; + } + + while (total_read < sz) { + int status; + struct pack_window *window = NULL; + unsigned char *mapped; + + mapped = use_pack(st->pack, &window, + st->pos, &st->z.avail_in); + + st->z.next_out = (unsigned char *)buf + total_read; + st->z.avail_out = sz - total_read; + st->z.next_in = mapped; + status = git_inflate(&st->z, Z_FINISH); + + st->pos += st->z.next_in - mapped; + total_read = st->z.next_out - (unsigned char *)buf; + unuse_pack(&window); + + if (status == Z_STREAM_END) { + git_inflate_end(&st->z); + st->z_state = ODB_PACKED_READ_STREAM_DONE; + break; + } + + /* + * Unlike the loose object case, we do not have to worry here + * about running out of input bytes and spinning infinitely. If + * we get Z_BUF_ERROR due to too few input bytes, then we'll + * replenish them in the next use_pack() call when we loop. If + * we truly hit the end of the pack (i.e., because it's corrupt + * or truncated), then use_pack() catches that and will die(). + */ + if (status != Z_OK && status != Z_BUF_ERROR) { + git_inflate_end(&st->z); + st->z_state = ODB_PACKED_READ_STREAM_ERROR; + return -1; + } + } + return total_read; +} + +static int close_istream_pack_non_delta(struct odb_read_stream *_st) +{ + struct odb_packed_read_stream *st = (struct odb_packed_read_stream *)_st; + if (st->z_state == ODB_PACKED_READ_STREAM_INUSE) + git_inflate_end(&st->z); + return 0; +} + +int packfile_store_read_object_stream(struct odb_read_stream **out, + struct packfile_store *store, + const struct object_id *oid) +{ + struct odb_packed_read_stream *stream; + struct pack_window *window = NULL; + struct object_info oi = OBJECT_INFO_INIT; + enum object_type in_pack_type; + unsigned long size; + + oi.sizep = &size; + + if (packfile_store_read_object_info(store, oid, &oi, 0) || + oi.u.packed.is_delta || + repo_settings_get_big_file_threshold(store->odb->repo) >= size) + return -1; + + in_pack_type = unpack_object_header(oi.u.packed.pack, + &window, + &oi.u.packed.offset, + &size); + unuse_pack(&window); + switch (in_pack_type) { + default: + return -1; /* we do not do deltas for now */ + case OBJ_COMMIT: + case OBJ_TREE: + case OBJ_BLOB: + case OBJ_TAG: + break; + } + + CALLOC_ARRAY(stream, 1); + stream->base.close = close_istream_pack_non_delta; + stream->base.read = read_istream_pack_non_delta; + stream->base.type = in_pack_type; + stream->base.size = size; + stream->z_state = ODB_PACKED_READ_STREAM_UNINITIALIZED; + stream->pack = oi.u.packed.pack; + stream->pos = oi.u.packed.offset; + + *out = &stream->base; + + return 0; +} diff --git a/packfile.h b/packfile.h index 0a98bddd81..3fcc5ae6e0 100644 --- a/packfile.h +++ b/packfile.h @@ -8,6 +8,7 @@ /* in odb.h */ struct object_info; +struct odb_read_stream; struct packed_git { struct hashmap_entry packmap_ent; @@ -144,6 +145,10 @@ void packfile_store_add_pack(struct packfile_store *store, #define repo_for_each_pack(repo, p) \ for (p = packfile_store_get_packs(repo->objects->packfiles); p; p = p->next) +int packfile_store_read_object_stream(struct odb_read_stream **out, + struct packfile_store *store, + const struct object_id *oid); + /* * Try to read the object identified by its ID from the object store and * populate the object info with its data. Returns 1 in case the object was diff --git a/streaming.c b/streaming.c index d5acc1c396..3140728a70 100644 --- a/streaming.c +++ b/streaming.c @@ -114,140 +114,6 @@ static struct odb_read_stream *attach_stream_filter(struct odb_read_stream *st, return &fs->base; } -/***************************************************************** - * - * Non-delta packed object stream - * - *****************************************************************/ - -struct odb_packed_read_stream { - struct odb_read_stream base; - struct packed_git *pack; - git_zstream z; - enum { - ODB_PACKED_READ_STREAM_UNINITIALIZED, - ODB_PACKED_READ_STREAM_INUSE, - ODB_PACKED_READ_STREAM_DONE, - ODB_PACKED_READ_STREAM_ERROR, - } z_state; - off_t pos; -}; - -static ssize_t read_istream_pack_non_delta(struct odb_read_stream *_st, char *buf, - size_t sz) -{ - struct odb_packed_read_stream *st = (struct odb_packed_read_stream *)_st; - size_t total_read = 0; - - switch (st->z_state) { - case ODB_PACKED_READ_STREAM_UNINITIALIZED: - memset(&st->z, 0, sizeof(st->z)); - git_inflate_init(&st->z); - st->z_state = ODB_PACKED_READ_STREAM_INUSE; - break; - case ODB_PACKED_READ_STREAM_DONE: - return 0; - case ODB_PACKED_READ_STREAM_ERROR: - return -1; - case ODB_PACKED_READ_STREAM_INUSE: - break; - } - - while (total_read < sz) { - int status; - struct pack_window *window = NULL; - unsigned char *mapped; - - mapped = use_pack(st->pack, &window, - st->pos, &st->z.avail_in); - - st->z.next_out = (unsigned char *)buf + total_read; - st->z.avail_out = sz - total_read; - st->z.next_in = mapped; - status = git_inflate(&st->z, Z_FINISH); - - st->pos += st->z.next_in - mapped; - total_read = st->z.next_out - (unsigned char *)buf; - unuse_pack(&window); - - if (status == Z_STREAM_END) { - git_inflate_end(&st->z); - st->z_state = ODB_PACKED_READ_STREAM_DONE; - break; - } - - /* - * Unlike the loose object case, we do not have to worry here - * about running out of input bytes and spinning infinitely. If - * we get Z_BUF_ERROR due to too few input bytes, then we'll - * replenish them in the next use_pack() call when we loop. If - * we truly hit the end of the pack (i.e., because it's corrupt - * or truncated), then use_pack() catches that and will die(). - */ - if (status != Z_OK && status != Z_BUF_ERROR) { - git_inflate_end(&st->z); - st->z_state = ODB_PACKED_READ_STREAM_ERROR; - return -1; - } - } - return total_read; -} - -static int close_istream_pack_non_delta(struct odb_read_stream *_st) -{ - struct odb_packed_read_stream *st = (struct odb_packed_read_stream *)_st; - if (st->z_state == ODB_PACKED_READ_STREAM_INUSE) - git_inflate_end(&st->z); - return 0; -} - -static int open_istream_pack_non_delta(struct odb_read_stream **out, - struct object_database *odb, - const struct object_id *oid) -{ - struct odb_packed_read_stream *stream; - struct pack_window *window = NULL; - struct object_info oi = OBJECT_INFO_INIT; - enum object_type in_pack_type; - unsigned long size; - - oi.sizep = &size; - - if (packfile_store_read_object_info(odb->packfiles, oid, &oi, 0) || - oi.u.packed.is_delta || - repo_settings_get_big_file_threshold(odb->repo) >= size) - return -1; - - in_pack_type = unpack_object_header(oi.u.packed.pack, - &window, - &oi.u.packed.offset, - &size); - unuse_pack(&window); - switch (in_pack_type) { - default: - return -1; /* we do not do deltas for now */ - case OBJ_COMMIT: - case OBJ_TREE: - case OBJ_BLOB: - case OBJ_TAG: - break; - } - - CALLOC_ARRAY(stream, 1); - stream->base.close = close_istream_pack_non_delta; - stream->base.read = read_istream_pack_non_delta; - stream->base.type = in_pack_type; - stream->base.size = size; - stream->z_state = ODB_PACKED_READ_STREAM_UNINITIALIZED; - stream->pack = oi.u.packed.pack; - stream->pos = oi.u.packed.offset; - - *out = &stream->base; - - return 0; -} - - /***************************************************************** * * In-core stream @@ -319,7 +185,7 @@ static int istream_source(struct odb_read_stream **out, { struct odb_source *source; - if (!open_istream_pack_non_delta(out, r->objects, oid)) + if (!packfile_store_read_object_stream(out, r->objects->packfiles, oid)) return 0; odb_prepare_alternates(r->objects); -- cgit v1.3 From 1599b68d5e960a12f5ac624f81c70ece317db5a6 Mon Sep 17 00:00:00 2001 From: Patrick Steinhardt Date: Sun, 23 Nov 2025 19:59:43 +0100 Subject: streaming: move into object database subsystem The "streaming" terminology is somewhat generic, so it may not be immediately obvious that "streaming.{c,h}" is specific to the object database. Rectify this by moving it into the "odb/" directory so that it can be immediately attributed to the object subsystem. Signed-off-by: Patrick Steinhardt Signed-off-by: Junio C Hamano --- Makefile | 2 +- archive-tar.c | 2 +- archive-zip.c | 2 +- builtin/cat-file.c | 2 +- builtin/fsck.c | 2 +- builtin/index-pack.c | 2 +- builtin/log.c | 2 +- builtin/pack-objects.c | 2 +- entry.c | 2 +- meson.build | 2 +- object-file.c | 2 +- odb/streaming.c | 299 +++++++++++++++++++++++++++++++++++++++++++++++++ odb/streaming.h | 70 ++++++++++++ packfile.c | 2 +- parallel-checkout.c | 2 +- streaming.c | 299 ------------------------------------------------- streaming.h | 70 ------------ 17 files changed, 382 insertions(+), 382 deletions(-) create mode 100644 odb/streaming.c create mode 100644 odb/streaming.h delete mode 100644 streaming.c delete mode 100644 streaming.h (limited to 'packfile.c') diff --git a/Makefile b/Makefile index 7e0f77e298..6d8dcc4622 100644 --- a/Makefile +++ b/Makefile @@ -1201,6 +1201,7 @@ LIB_OBJS += object-file.o LIB_OBJS += object-name.o LIB_OBJS += object.o LIB_OBJS += odb.o +LIB_OBJS += odb/streaming.o LIB_OBJS += oid-array.o LIB_OBJS += oidmap.o LIB_OBJS += oidset.o @@ -1294,7 +1295,6 @@ LIB_OBJS += split-index.o LIB_OBJS += stable-qsort.o LIB_OBJS += statinfo.o LIB_OBJS += strbuf.o -LIB_OBJS += streaming.o LIB_OBJS += string-list.o LIB_OBJS += strmap.o LIB_OBJS += strvec.o diff --git a/archive-tar.c b/archive-tar.c index 4d87b28504..494b9f0667 100644 --- a/archive-tar.c +++ b/archive-tar.c @@ -12,8 +12,8 @@ #include "tar.h" #include "archive.h" #include "odb.h" +#include "odb/streaming.h" #include "strbuf.h" -#include "streaming.h" #include "run-command.h" #include "write-or-die.h" diff --git a/archive-zip.c b/archive-zip.c index c44684aebc..a0bdc2fe3b 100644 --- a/archive-zip.c +++ b/archive-zip.c @@ -10,9 +10,9 @@ #include "gettext.h" #include "git-zlib.h" #include "hex.h" -#include "streaming.h" #include "utf8.h" #include "odb.h" +#include "odb/streaming.h" #include "strbuf.h" #include "userdiff.h" #include "write-or-die.h" diff --git a/builtin/cat-file.c b/builtin/cat-file.c index 120d626d66..505ddaa12f 100644 --- a/builtin/cat-file.c +++ b/builtin/cat-file.c @@ -18,13 +18,13 @@ #include "list-objects-filter-options.h" #include "parse-options.h" #include "userdiff.h" -#include "streaming.h" #include "oid-array.h" #include "packfile.h" #include "pack-bitmap.h" #include "object-file.h" #include "object-name.h" #include "odb.h" +#include "odb/streaming.h" #include "replace-object.h" #include "promisor-remote.h" #include "mailmap.h" diff --git a/builtin/fsck.c b/builtin/fsck.c index 1a348d43c2..c7d2eea287 100644 --- a/builtin/fsck.c +++ b/builtin/fsck.c @@ -13,11 +13,11 @@ #include "fsck.h" #include "parse-options.h" #include "progress.h" -#include "streaming.h" #include "packfile.h" #include "object-file.h" #include "object-name.h" #include "odb.h" +#include "odb/streaming.h" #include "path.h" #include "read-cache-ll.h" #include "replace-object.h" diff --git a/builtin/index-pack.c b/builtin/index-pack.c index fb76ef0f4c..581023495f 100644 --- a/builtin/index-pack.c +++ b/builtin/index-pack.c @@ -16,12 +16,12 @@ #include "progress.h" #include "fsck.h" #include "strbuf.h" -#include "streaming.h" #include "thread-utils.h" #include "packfile.h" #include "pack-revindex.h" #include "object-file.h" #include "odb.h" +#include "odb/streaming.h" #include "oid-array.h" #include "oidset.h" #include "path.h" diff --git a/builtin/log.c b/builtin/log.c index e7b83a6e00..d4cf9c59c8 100644 --- a/builtin/log.c +++ b/builtin/log.c @@ -16,6 +16,7 @@ #include "refs.h" #include "object-name.h" #include "odb.h" +#include "odb/streaming.h" #include "pager.h" #include "color.h" #include "commit.h" @@ -35,7 +36,6 @@ #include "parse-options.h" #include "line-log.h" #include "branch.h" -#include "streaming.h" #include "version.h" #include "mailmap.h" #include "progress.h" diff --git a/builtin/pack-objects.c b/builtin/pack-objects.c index 1353c2384c..f109e26786 100644 --- a/builtin/pack-objects.c +++ b/builtin/pack-objects.c @@ -22,7 +22,6 @@ #include "pack-objects.h" #include "progress.h" #include "refs.h" -#include "streaming.h" #include "thread-utils.h" #include "pack-bitmap.h" #include "delta-islands.h" @@ -33,6 +32,7 @@ #include "packfile.h" #include "object-file.h" #include "odb.h" +#include "odb/streaming.h" #include "replace-object.h" #include "dir.h" #include "midx.h" diff --git a/entry.c b/entry.c index 38dfe670f7..7817aee362 100644 --- a/entry.c +++ b/entry.c @@ -2,13 +2,13 @@ #include "git-compat-util.h" #include "odb.h" +#include "odb/streaming.h" #include "dir.h" #include "environment.h" #include "gettext.h" #include "hex.h" #include "name-hash.h" #include "sparse-index.h" -#include "streaming.h" #include "submodule.h" #include "symlinks.h" #include "progress.h" diff --git a/meson.build b/meson.build index 1f95a06edb..fc82929b37 100644 --- a/meson.build +++ b/meson.build @@ -397,6 +397,7 @@ libgit_sources = [ 'object-name.c', 'object.c', 'odb.c', + 'odb/streaming.c', 'oid-array.c', 'oidmap.c', 'oidset.c', @@ -490,7 +491,6 @@ libgit_sources = [ 'stable-qsort.c', 'statinfo.c', 'strbuf.c', - 'streaming.c', 'string-list.c', 'strmap.c', 'strvec.c', diff --git a/object-file.c b/object-file.c index 9ba40a848c..9601fdb12d 100644 --- a/object-file.c +++ b/object-file.c @@ -20,13 +20,13 @@ #include "object-file-convert.h" #include "object-file.h" #include "odb.h" +#include "odb/streaming.h" #include "oidtree.h" #include "pack.h" #include "packfile.h" #include "path.h" #include "read-cache-ll.h" #include "setup.h" -#include "streaming.h" #include "tempfile.h" #include "tmp-objdir.h" diff --git a/odb/streaming.c b/odb/streaming.c new file mode 100644 index 0000000000..7ef58adaa2 --- /dev/null +++ b/odb/streaming.c @@ -0,0 +1,299 @@ +/* + * Copyright (c) 2011, Google Inc. + */ + +#include "git-compat-util.h" +#include "convert.h" +#include "environment.h" +#include "repository.h" +#include "object-file.h" +#include "odb.h" +#include "odb/streaming.h" +#include "replace-object.h" +#include "packfile.h" + +#define FILTER_BUFFER (1024*16) + +/***************************************************************** + * + * Filtered stream + * + *****************************************************************/ + +struct odb_filtered_read_stream { + struct odb_read_stream base; + struct odb_read_stream *upstream; + struct stream_filter *filter; + char ibuf[FILTER_BUFFER]; + char obuf[FILTER_BUFFER]; + int i_end, i_ptr; + int o_end, o_ptr; + int input_finished; +}; + +static int close_istream_filtered(struct odb_read_stream *_fs) +{ + struct odb_filtered_read_stream *fs = (struct odb_filtered_read_stream *)_fs; + free_stream_filter(fs->filter); + return odb_read_stream_close(fs->upstream); +} + +static ssize_t read_istream_filtered(struct odb_read_stream *_fs, char *buf, + size_t sz) +{ + struct odb_filtered_read_stream *fs = (struct odb_filtered_read_stream *)_fs; + size_t filled = 0; + + while (sz) { + /* do we already have filtered output? */ + if (fs->o_ptr < fs->o_end) { + size_t to_move = fs->o_end - fs->o_ptr; + if (sz < to_move) + to_move = sz; + memcpy(buf + filled, fs->obuf + fs->o_ptr, to_move); + fs->o_ptr += to_move; + sz -= to_move; + filled += to_move; + continue; + } + fs->o_end = fs->o_ptr = 0; + + /* do we have anything to feed the filter with? */ + if (fs->i_ptr < fs->i_end) { + size_t to_feed = fs->i_end - fs->i_ptr; + size_t to_receive = FILTER_BUFFER; + if (stream_filter(fs->filter, + fs->ibuf + fs->i_ptr, &to_feed, + fs->obuf, &to_receive)) + return -1; + fs->i_ptr = fs->i_end - to_feed; + fs->o_end = FILTER_BUFFER - to_receive; + continue; + } + + /* tell the filter to drain upon no more input */ + if (fs->input_finished) { + size_t to_receive = FILTER_BUFFER; + if (stream_filter(fs->filter, + NULL, NULL, + fs->obuf, &to_receive)) + return -1; + fs->o_end = FILTER_BUFFER - to_receive; + if (!fs->o_end) + break; + continue; + } + fs->i_end = fs->i_ptr = 0; + + /* refill the input from the upstream */ + if (!fs->input_finished) { + fs->i_end = odb_read_stream_read(fs->upstream, fs->ibuf, FILTER_BUFFER); + if (fs->i_end < 0) + return -1; + if (fs->i_end) + continue; + } + fs->input_finished = 1; + } + return filled; +} + +static struct odb_read_stream *attach_stream_filter(struct odb_read_stream *st, + struct stream_filter *filter) +{ + struct odb_filtered_read_stream *fs; + + CALLOC_ARRAY(fs, 1); + fs->base.close = close_istream_filtered; + fs->base.read = read_istream_filtered; + fs->upstream = st; + fs->filter = filter; + fs->base.size = -1; /* unknown */ + fs->base.type = st->type; + + return &fs->base; +} + +/***************************************************************** + * + * In-core stream + * + *****************************************************************/ + +struct odb_incore_read_stream { + struct odb_read_stream base; + char *buf; /* from odb_read_object_info_extended() */ + unsigned long read_ptr; +}; + +static int close_istream_incore(struct odb_read_stream *_st) +{ + struct odb_incore_read_stream *st = (struct odb_incore_read_stream *)_st; + free(st->buf); + return 0; +} + +static ssize_t read_istream_incore(struct odb_read_stream *_st, char *buf, size_t sz) +{ + struct odb_incore_read_stream *st = (struct odb_incore_read_stream *)_st; + size_t read_size = sz; + size_t remainder = st->base.size - st->read_ptr; + + if (remainder <= read_size) + read_size = remainder; + if (read_size) { + memcpy(buf, st->buf + st->read_ptr, read_size); + st->read_ptr += read_size; + } + return read_size; +} + +static int open_istream_incore(struct odb_read_stream **out, + struct object_database *odb, + const struct object_id *oid) +{ + struct object_info oi = OBJECT_INFO_INIT; + struct odb_incore_read_stream stream = { + .base.close = close_istream_incore, + .base.read = read_istream_incore, + }; + struct odb_incore_read_stream *st; + int ret; + + oi.typep = &stream.base.type; + oi.sizep = &stream.base.size; + oi.contentp = (void **)&stream.buf; + ret = odb_read_object_info_extended(odb, oid, &oi, + OBJECT_INFO_DIE_IF_CORRUPT); + if (ret) + return ret; + + CALLOC_ARRAY(st, 1); + *st = stream; + *out = &st->base; + + return 0; +} + +/***************************************************************************** + * static helpers variables and functions for users of streaming interface + *****************************************************************************/ + +static int istream_source(struct odb_read_stream **out, + struct object_database *odb, + const struct object_id *oid) +{ + struct odb_source *source; + + if (!packfile_store_read_object_stream(out, odb->packfiles, oid)) + return 0; + + odb_prepare_alternates(odb); + for (source = odb->sources; source; source = source->next) + if (!odb_source_loose_read_object_stream(out, source, oid)) + return 0; + + return open_istream_incore(out, odb, oid); +} + +/**************************************************************** + * Users of streaming interface + ****************************************************************/ + +int odb_read_stream_close(struct odb_read_stream *st) +{ + int r = st->close(st); + free(st); + return r; +} + +ssize_t odb_read_stream_read(struct odb_read_stream *st, void *buf, size_t sz) +{ + return st->read(st, buf, sz); +} + +struct odb_read_stream *odb_read_stream_open(struct object_database *odb, + const struct object_id *oid, + enum object_type *type, + unsigned long *size, + struct stream_filter *filter) +{ + struct odb_read_stream *st; + const struct object_id *real = lookup_replace_object(odb->repo, oid); + int ret = istream_source(&st, odb, real); + + if (ret) + return NULL; + + if (filter) { + /* Add "&& !is_null_stream_filter(filter)" for performance */ + struct odb_read_stream *nst = attach_stream_filter(st, filter); + if (!nst) { + odb_read_stream_close(st); + return NULL; + } + st = nst; + } + + *size = st->size; + *type = st->type; + return st; +} + +int odb_stream_blob_to_fd(struct object_database *odb, + int fd, + const struct object_id *oid, + struct stream_filter *filter, + int can_seek) +{ + struct odb_read_stream *st; + enum object_type type; + unsigned long sz; + ssize_t kept = 0; + int result = -1; + + st = odb_read_stream_open(odb, oid, &type, &sz, filter); + if (!st) { + if (filter) + free_stream_filter(filter); + return result; + } + if (type != OBJ_BLOB) + goto close_and_exit; + for (;;) { + char buf[1024 * 16]; + ssize_t wrote, holeto; + ssize_t readlen = odb_read_stream_read(st, buf, sizeof(buf)); + + if (readlen < 0) + goto close_and_exit; + if (!readlen) + break; + if (can_seek && sizeof(buf) == readlen) { + for (holeto = 0; holeto < readlen; holeto++) + if (buf[holeto]) + break; + if (readlen == holeto) { + kept += holeto; + continue; + } + } + + if (kept && lseek(fd, kept, SEEK_CUR) == (off_t) -1) + goto close_and_exit; + else + kept = 0; + wrote = write_in_full(fd, buf, readlen); + + if (wrote < 0) + goto close_and_exit; + } + if (kept && (lseek(fd, kept - 1, SEEK_CUR) == (off_t) -1 || + xwrite(fd, "", 1) != 1)) + goto close_and_exit; + result = 0; + + close_and_exit: + odb_read_stream_close(st); + return result; +} diff --git a/odb/streaming.h b/odb/streaming.h new file mode 100644 index 0000000000..7cb55213b7 --- /dev/null +++ b/odb/streaming.h @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2011, Google Inc. + */ +#ifndef STREAMING_H +#define STREAMING_H 1 + +#include "object.h" + +struct object_database; +struct odb_read_stream; +struct stream_filter; + +typedef int (*odb_read_stream_close_fn)(struct odb_read_stream *); +typedef ssize_t (*odb_read_stream_read_fn)(struct odb_read_stream *, char *, size_t); + +/* + * A stream that can be used to read an object from the object database without + * loading all of it into memory. + */ +struct odb_read_stream { + odb_read_stream_close_fn close; + odb_read_stream_read_fn read; + enum object_type type; + unsigned long size; /* inflated size of full object */ +}; + +/* + * Create a new object stream for the given object database. Populates the type + * and size pointers with the object's info. An optional filter can be used to + * transform the object's content. + * + * Returns the stream on success, a `NULL` pointer otherwise. + */ +struct odb_read_stream *odb_read_stream_open(struct object_database *odb, + const struct object_id *oid, + enum object_type *type, + unsigned long *size, + struct stream_filter *filter); + +/* + * Close the given read stream and release all resources associated with it. + * Returns 0 on success, a negative error code otherwise. + */ +int odb_read_stream_close(struct odb_read_stream *stream); + +/* + * Read data from the stream into the buffer. Returns 0 on EOF and the number + * of bytes read on success. Returns a negative error code in case reading from + * the stream fails. + */ +ssize_t odb_read_stream_read(struct odb_read_stream *stream, void *buf, size_t len); + +/* + * Look up the object by its ID and write the full contents to the file + * descriptor. The object must be a blob, or the function will fail. When + * provided, the filter is used to transform the blob contents. + * + * `can_seek` should be set to 1 in case the given file descriptor can be + * seek(3p)'d on. This is used to support files with holes in case a + * significant portion of the blob contains NUL bytes. + * + * Returns a negative error code on failure, 0 on success. + */ +int odb_stream_blob_to_fd(struct object_database *odb, + int fd, + const struct object_id *oid, + struct stream_filter *filter, + int can_seek); + +#endif /* STREAMING_H */ diff --git a/packfile.c b/packfile.c index ad56ce0b90..7a16aaa90d 100644 --- a/packfile.c +++ b/packfile.c @@ -20,7 +20,7 @@ #include "tree.h" #include "object-file.h" #include "odb.h" -#include "streaming.h" +#include "odb/streaming.h" #include "midx.h" #include "commit-graph.h" #include "pack-revindex.h" diff --git a/parallel-checkout.c b/parallel-checkout.c index 1cb6701b92..0bf4bd6d4a 100644 --- a/parallel-checkout.c +++ b/parallel-checkout.c @@ -13,7 +13,7 @@ #include "read-cache-ll.h" #include "run-command.h" #include "sigchain.h" -#include "streaming.h" +#include "odb/streaming.h" #include "symlinks.h" #include "thread-utils.h" #include "trace2.h" diff --git a/streaming.c b/streaming.c deleted file mode 100644 index 06993a751c..0000000000 --- a/streaming.c +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Copyright (c) 2011, Google Inc. - */ - -#include "git-compat-util.h" -#include "convert.h" -#include "environment.h" -#include "streaming.h" -#include "repository.h" -#include "object-file.h" -#include "odb.h" -#include "replace-object.h" -#include "packfile.h" - -#define FILTER_BUFFER (1024*16) - -/***************************************************************** - * - * Filtered stream - * - *****************************************************************/ - -struct odb_filtered_read_stream { - struct odb_read_stream base; - struct odb_read_stream *upstream; - struct stream_filter *filter; - char ibuf[FILTER_BUFFER]; - char obuf[FILTER_BUFFER]; - int i_end, i_ptr; - int o_end, o_ptr; - int input_finished; -}; - -static int close_istream_filtered(struct odb_read_stream *_fs) -{ - struct odb_filtered_read_stream *fs = (struct odb_filtered_read_stream *)_fs; - free_stream_filter(fs->filter); - return odb_read_stream_close(fs->upstream); -} - -static ssize_t read_istream_filtered(struct odb_read_stream *_fs, char *buf, - size_t sz) -{ - struct odb_filtered_read_stream *fs = (struct odb_filtered_read_stream *)_fs; - size_t filled = 0; - - while (sz) { - /* do we already have filtered output? */ - if (fs->o_ptr < fs->o_end) { - size_t to_move = fs->o_end - fs->o_ptr; - if (sz < to_move) - to_move = sz; - memcpy(buf + filled, fs->obuf + fs->o_ptr, to_move); - fs->o_ptr += to_move; - sz -= to_move; - filled += to_move; - continue; - } - fs->o_end = fs->o_ptr = 0; - - /* do we have anything to feed the filter with? */ - if (fs->i_ptr < fs->i_end) { - size_t to_feed = fs->i_end - fs->i_ptr; - size_t to_receive = FILTER_BUFFER; - if (stream_filter(fs->filter, - fs->ibuf + fs->i_ptr, &to_feed, - fs->obuf, &to_receive)) - return -1; - fs->i_ptr = fs->i_end - to_feed; - fs->o_end = FILTER_BUFFER - to_receive; - continue; - } - - /* tell the filter to drain upon no more input */ - if (fs->input_finished) { - size_t to_receive = FILTER_BUFFER; - if (stream_filter(fs->filter, - NULL, NULL, - fs->obuf, &to_receive)) - return -1; - fs->o_end = FILTER_BUFFER - to_receive; - if (!fs->o_end) - break; - continue; - } - fs->i_end = fs->i_ptr = 0; - - /* refill the input from the upstream */ - if (!fs->input_finished) { - fs->i_end = odb_read_stream_read(fs->upstream, fs->ibuf, FILTER_BUFFER); - if (fs->i_end < 0) - return -1; - if (fs->i_end) - continue; - } - fs->input_finished = 1; - } - return filled; -} - -static struct odb_read_stream *attach_stream_filter(struct odb_read_stream *st, - struct stream_filter *filter) -{ - struct odb_filtered_read_stream *fs; - - CALLOC_ARRAY(fs, 1); - fs->base.close = close_istream_filtered; - fs->base.read = read_istream_filtered; - fs->upstream = st; - fs->filter = filter; - fs->base.size = -1; /* unknown */ - fs->base.type = st->type; - - return &fs->base; -} - -/***************************************************************** - * - * In-core stream - * - *****************************************************************/ - -struct odb_incore_read_stream { - struct odb_read_stream base; - char *buf; /* from odb_read_object_info_extended() */ - unsigned long read_ptr; -}; - -static int close_istream_incore(struct odb_read_stream *_st) -{ - struct odb_incore_read_stream *st = (struct odb_incore_read_stream *)_st; - free(st->buf); - return 0; -} - -static ssize_t read_istream_incore(struct odb_read_stream *_st, char *buf, size_t sz) -{ - struct odb_incore_read_stream *st = (struct odb_incore_read_stream *)_st; - size_t read_size = sz; - size_t remainder = st->base.size - st->read_ptr; - - if (remainder <= read_size) - read_size = remainder; - if (read_size) { - memcpy(buf, st->buf + st->read_ptr, read_size); - st->read_ptr += read_size; - } - return read_size; -} - -static int open_istream_incore(struct odb_read_stream **out, - struct object_database *odb, - const struct object_id *oid) -{ - struct object_info oi = OBJECT_INFO_INIT; - struct odb_incore_read_stream stream = { - .base.close = close_istream_incore, - .base.read = read_istream_incore, - }; - struct odb_incore_read_stream *st; - int ret; - - oi.typep = &stream.base.type; - oi.sizep = &stream.base.size; - oi.contentp = (void **)&stream.buf; - ret = odb_read_object_info_extended(odb, oid, &oi, - OBJECT_INFO_DIE_IF_CORRUPT); - if (ret) - return ret; - - CALLOC_ARRAY(st, 1); - *st = stream; - *out = &st->base; - - return 0; -} - -/***************************************************************************** - * static helpers variables and functions for users of streaming interface - *****************************************************************************/ - -static int istream_source(struct odb_read_stream **out, - struct object_database *odb, - const struct object_id *oid) -{ - struct odb_source *source; - - if (!packfile_store_read_object_stream(out, odb->packfiles, oid)) - return 0; - - odb_prepare_alternates(odb); - for (source = odb->sources; source; source = source->next) - if (!odb_source_loose_read_object_stream(out, source, oid)) - return 0; - - return open_istream_incore(out, odb, oid); -} - -/**************************************************************** - * Users of streaming interface - ****************************************************************/ - -int odb_read_stream_close(struct odb_read_stream *st) -{ - int r = st->close(st); - free(st); - return r; -} - -ssize_t odb_read_stream_read(struct odb_read_stream *st, void *buf, size_t sz) -{ - return st->read(st, buf, sz); -} - -struct odb_read_stream *odb_read_stream_open(struct object_database *odb, - const struct object_id *oid, - enum object_type *type, - unsigned long *size, - struct stream_filter *filter) -{ - struct odb_read_stream *st; - const struct object_id *real = lookup_replace_object(odb->repo, oid); - int ret = istream_source(&st, odb, real); - - if (ret) - return NULL; - - if (filter) { - /* Add "&& !is_null_stream_filter(filter)" for performance */ - struct odb_read_stream *nst = attach_stream_filter(st, filter); - if (!nst) { - odb_read_stream_close(st); - return NULL; - } - st = nst; - } - - *size = st->size; - *type = st->type; - return st; -} - -int odb_stream_blob_to_fd(struct object_database *odb, - int fd, - const struct object_id *oid, - struct stream_filter *filter, - int can_seek) -{ - struct odb_read_stream *st; - enum object_type type; - unsigned long sz; - ssize_t kept = 0; - int result = -1; - - st = odb_read_stream_open(odb, oid, &type, &sz, filter); - if (!st) { - if (filter) - free_stream_filter(filter); - return result; - } - if (type != OBJ_BLOB) - goto close_and_exit; - for (;;) { - char buf[1024 * 16]; - ssize_t wrote, holeto; - ssize_t readlen = odb_read_stream_read(st, buf, sizeof(buf)); - - if (readlen < 0) - goto close_and_exit; - if (!readlen) - break; - if (can_seek && sizeof(buf) == readlen) { - for (holeto = 0; holeto < readlen; holeto++) - if (buf[holeto]) - break; - if (readlen == holeto) { - kept += holeto; - continue; - } - } - - if (kept && lseek(fd, kept, SEEK_CUR) == (off_t) -1) - goto close_and_exit; - else - kept = 0; - wrote = write_in_full(fd, buf, readlen); - - if (wrote < 0) - goto close_and_exit; - } - if (kept && (lseek(fd, kept - 1, SEEK_CUR) == (off_t) -1 || - xwrite(fd, "", 1) != 1)) - goto close_and_exit; - result = 0; - - close_and_exit: - odb_read_stream_close(st); - return result; -} diff --git a/streaming.h b/streaming.h deleted file mode 100644 index 7cb55213b7..0000000000 --- a/streaming.h +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (c) 2011, Google Inc. - */ -#ifndef STREAMING_H -#define STREAMING_H 1 - -#include "object.h" - -struct object_database; -struct odb_read_stream; -struct stream_filter; - -typedef int (*odb_read_stream_close_fn)(struct odb_read_stream *); -typedef ssize_t (*odb_read_stream_read_fn)(struct odb_read_stream *, char *, size_t); - -/* - * A stream that can be used to read an object from the object database without - * loading all of it into memory. - */ -struct odb_read_stream { - odb_read_stream_close_fn close; - odb_read_stream_read_fn read; - enum object_type type; - unsigned long size; /* inflated size of full object */ -}; - -/* - * Create a new object stream for the given object database. Populates the type - * and size pointers with the object's info. An optional filter can be used to - * transform the object's content. - * - * Returns the stream on success, a `NULL` pointer otherwise. - */ -struct odb_read_stream *odb_read_stream_open(struct object_database *odb, - const struct object_id *oid, - enum object_type *type, - unsigned long *size, - struct stream_filter *filter); - -/* - * Close the given read stream and release all resources associated with it. - * Returns 0 on success, a negative error code otherwise. - */ -int odb_read_stream_close(struct odb_read_stream *stream); - -/* - * Read data from the stream into the buffer. Returns 0 on EOF and the number - * of bytes read on success. Returns a negative error code in case reading from - * the stream fails. - */ -ssize_t odb_read_stream_read(struct odb_read_stream *stream, void *buf, size_t len); - -/* - * Look up the object by its ID and write the full contents to the file - * descriptor. The object must be a blob, or the function will fail. When - * provided, the filter is used to transform the blob contents. - * - * `can_seek` should be set to 1 in case the given file descriptor can be - * seek(3p)'d on. This is used to support files with holes in case a - * significant portion of the blob contains NUL bytes. - * - * Returns a negative error code on failure, 0 on success. - */ -int odb_stream_blob_to_fd(struct object_database *odb, - int fd, - const struct object_id *oid, - struct stream_filter *filter, - int can_seek); - -#endif /* STREAMING_H */ -- cgit v1.3