Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 228 additions & 46 deletions gvfs-helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -2250,39 +2250,56 @@ struct ph {
};

/*
* Extract the next packfile from the multipack.
* Install {.pack, .idx, .keep} set.
* Per-packfile metadata collected during the extraction phase
* of prefetch installation. After all packfiles are extracted
* from the multipack, each entry is handed to index-pack.
*/
struct prefetch_entry {
struct strbuf temp_path_pack;
struct strbuf temp_path_idx;
char hex_checksum[GIT_MAX_HEXSZ + 1];
timestamp_t timestamp;
};

#define PREFETCH_ENTRY_INIT { \
.temp_path_pack = STRBUF_INIT, \
.temp_path_idx = STRBUF_INIT, \
.hex_checksum = {0}, \
.timestamp = 0, \
}

static void prefetch_entry_release(struct prefetch_entry *pe)
{
strbuf_release(&pe->temp_path_pack);
strbuf_release(&pe->temp_path_idx);
}

/*
* Extract the next packfile from the multipack into a temp file.
* Populate `entry` with the temp path and checksum, then advance
* the fd past any trailing .idx data.
*
* Mark each successfully installed prefetch pack as .keep it as installed
* in case we have errors decoding/indexing later packs within the received
* multipart file. (A later pass can delete the unnecessary .keep files
* from this and any previous invocations.)
* This is the I/O-bound phase that must run sequentially because
* the multipack is a single stream.
*/
static void extract_packfile_from_multipack(
struct gh__request_params *params,
struct gh__response_status *status,
int fd_multipack,
unsigned short k)
unsigned short k,
struct prefetch_entry *entry)
{
struct ph ph;
struct tempfile *tempfile_pack = NULL;
int result = -1;
int b_no_idx_in_multipack;
struct object_id packfile_checksum;
char hex_checksum[GIT_MAX_HEXSZ + 1];
struct strbuf buf_timestamp = STRBUF_INIT;
struct strbuf temp_path_pack = STRBUF_INIT;
struct strbuf temp_path_idx = STRBUF_INIT;
struct strbuf final_path_pack = STRBUF_INIT;
struct strbuf final_path_idx = STRBUF_INIT;
struct strbuf final_filename = STRBUF_INIT;

if (xread(fd_multipack, &ph, sizeof(ph)) != sizeof(ph)) {
strbuf_addf(&status->error_message,
"could not read header for packfile[%d] in multipack",
k);
status->ec = GH__ERROR_CODE__COULD_NOT_INSTALL_PREFETCH;
goto done;
return;
}

ph.timestamp = my_get_le64(ph.timestamp);
Expand All @@ -2293,7 +2310,7 @@ static void extract_packfile_from_multipack(
strbuf_addf(&status->error_message,
"packfile[%d]: zero length packfile?", k);
status->ec = GH__ERROR_CODE__COULD_NOT_INSTALL_PREFETCH;
goto done;
return;
}

b_no_idx_in_multipack = (ph.idx_len == maximum_unsigned_value_of_type(uint64_t) ||
Expand All @@ -2306,7 +2323,7 @@ static void extract_packfile_from_multipack(
*/
my_create_tempfile(status, 0, "pack", &tempfile_pack, NULL, NULL);
if (!tempfile_pack)
goto done;
return;

/*
* Copy the current packfile from the open stream and capture
Expand All @@ -2322,29 +2339,23 @@ static void extract_packfile_from_multipack(
GIT_SHA1_RAWSZ);
packfile_checksum.algo = GIT_HASH_SHA1;

if (result < 0){
if (result < 0) {
strbuf_addf(&status->error_message,
"could not extract packfile[%d] from multipack",
k);
goto done;
delete_tempfile(&tempfile_pack);
return;
}
strbuf_addstr(&temp_path_pack, get_tempfile_path(tempfile_pack));
strbuf_addstr(&entry->temp_path_pack, get_tempfile_path(tempfile_pack));
close_tempfile_gently(tempfile_pack);

oid_to_hex_r(hex_checksum, &packfile_checksum);

/*
* Always compute the .idx file from the .pack file.
*/
strbuf_addbuf(&temp_path_idx, &temp_path_pack);
strbuf_strip_suffix(&temp_path_idx, ".pack");
strbuf_addstr(&temp_path_idx, ".idx");
oid_to_hex_r(entry->hex_checksum, &packfile_checksum);
entry->timestamp = (timestamp_t)ph.timestamp;

my_run_index_pack(params, status,
&temp_path_pack, &temp_path_idx,
NULL);
if (status->ec != GH__ERROR_CODE__OK)
goto done;
/* Derive the .idx temp path from the .pack temp path. */
strbuf_addbuf(&entry->temp_path_idx, &entry->temp_path_pack);
strbuf_strip_suffix(&entry->temp_path_idx, ".pack");
strbuf_addstr(&entry->temp_path_idx, ".idx");

if (!b_no_idx_in_multipack) {
/*
Expand All @@ -2356,30 +2367,118 @@ static void extract_packfile_from_multipack(
"could not skip index[%d] in multipack",
k);
status->ec = GH__ERROR_CODE__COULD_NOT_INSTALL_PREFETCH;
goto done;
return;
}
}
}

/*
* Finalize a prefetch packfile after index-pack has already run:
* compute final pathnames and move .pack/.idx/.keep into the ODB.
*/
static void finalize_prefetch_packfile(struct gh__request_params *params,
struct gh__response_status *status,
struct prefetch_entry *entry)
{
struct strbuf buf_timestamp = STRBUF_INIT;
struct strbuf final_path_pack = STRBUF_INIT;
struct strbuf final_path_idx = STRBUF_INIT;
struct strbuf final_filename = STRBUF_INIT;

strbuf_addf(&buf_timestamp, "%u", (unsigned int)ph.timestamp);
create_final_packfile_pathnames("prefetch", buf_timestamp.buf, hex_checksum,
strbuf_addf(&buf_timestamp, "%u", (unsigned int)entry->timestamp);
create_final_packfile_pathnames("prefetch", buf_timestamp.buf,
entry->hex_checksum,
&final_path_pack, &final_path_idx,
&final_filename);
strbuf_release(&buf_timestamp);

my_finalize_packfile(params, status, 1,
&temp_path_pack, &temp_path_idx,
&entry->temp_path_pack, &entry->temp_path_idx,
&final_path_pack, &final_path_idx,
&final_filename);

done:
delete_tempfile(&tempfile_pack);
strbuf_release(&temp_path_pack);
strbuf_release(&temp_path_idx);
strbuf_release(&buf_timestamp);
strbuf_release(&final_path_pack);
strbuf_release(&final_path_idx);
strbuf_release(&final_filename);
}

#define PREFETCH_MAX_WORKERS 4

/*
* Context for parallel index-pack execution.
*
* The run_processes_parallel() callbacks are always called from
* the main thread, so no locking is needed for these fields.
*/
struct prefetch_parallel_ctx {
struct prefetch_entry *entries;
unsigned short np;
unsigned short next;

struct gh__request_params *params;
struct gh__response_status *status;

struct progress *progress;
int nr_finished;
int nr_installed;
};

static int prefetch_get_next_task(struct child_process *cp,
struct strbuf *out UNUSED,
void *pp_cb,
void **pp_task_cb)
{
struct prefetch_parallel_ctx *ctx = pp_cb;
struct prefetch_entry *entry;

if (ctx->next >= ctx->np)
return 0;

entry = &ctx->entries[ctx->next];
*pp_task_cb = entry;
ctx->next++;

cp->git_cmd = 1;
strvec_push(&cp->args, "index-pack");
strvec_push(&cp->args, "--no-rev-index");
strvec_pushl(&cp->args, "-o", entry->temp_path_idx.buf, NULL);
strvec_push(&cp->args, entry->temp_path_pack.buf);
cp->no_stdin = 1;

return 1;
}

static int prefetch_task_finished(int result,
struct strbuf *out UNUSED,
void *pp_cb,
void *pp_task_cb)
{
struct prefetch_parallel_ctx *ctx = pp_cb;
struct prefetch_entry *entry = pp_task_cb;

ctx->nr_finished++;
display_progress(ctx->progress, ctx->nr_finished);

if (result) {
unlink(entry->temp_path_pack.buf);
unlink(entry->temp_path_idx.buf);

if (ctx->status->ec == GH__ERROR_CODE__OK) {
strbuf_addf(&ctx->status->error_message,
"index-pack failed on '%s'",
entry->temp_path_pack.buf);
ctx->status->ec = GH__ERROR_CODE__INDEX_PACK_FAILED;
}
return 0;
}

finalize_prefetch_packfile(ctx->params, ctx->status, entry);
if (ctx->status->ec == GH__ERROR_CODE__OK)
ctx->nr_installed++;

return 0;
}

struct keep_files_data {
timestamp_t max_timestamp;
int pos_of_max;
Expand Down Expand Up @@ -2450,6 +2549,8 @@ static void install_prefetch(struct gh__request_params *params,
int fd = -1;
int nr_installed = 0;

struct prefetch_entry *entries = NULL;

struct strbuf temp_path_mp = STRBUF_INIT;

/*
Expand Down Expand Up @@ -2485,22 +2586,103 @@ static void install_prefetch(struct gh__request_params *params,
trace2_data_intmax(TR2_CAT, NULL,
"prefetch/packfile_count", np);

if (!np)
goto cleanup;

CALLOC_ARRAY(entries, np);
for (k = 0; k < np; k++) {
struct prefetch_entry pe = PREFETCH_ENTRY_INIT;
entries[k] = pe;
}

/*
* Phase 1: extract all packfiles from the multipack into
* individual temp files. This must be sequential because
* the multipack is a single byte stream.
*/
if (gh__cmd_opts.show_progress)
params->progress = start_progress(the_repository, "Installing prefetch packfiles", np);
params->progress = start_progress(
the_repository, "Extracting prefetch packfiles", np);

for (k = 0; k < np; k++) {
extract_packfile_from_multipack(params, status, fd, k);
extract_packfile_from_multipack(status, fd, k, &entries[k]);
display_progress(params->progress, k + 1);
if (status->ec != GH__ERROR_CODE__OK)
break;
nr_installed++;
}
stop_progress(&params->progress);

/* The multipack fd is no longer needed after extraction. */
close(fd);
fd = -1;

if (status->ec != GH__ERROR_CODE__OK)
goto cleanup;

/*
* Phase 2: run index-pack on the extracted packfiles in
* parallel and finalize each into the ODB.
*
* Use up to PREFETCH_MAX_WORKERS concurrent index-pack
* processes. The entries are already in timestamp order
* (oldest first), so the largest pack—the one that takes
* the longest—starts immediately while the remaining
* workers cycle through the smaller daily/hourly packs.
*
* When there is only one packfile there is no benefit from
* the parallel infrastructure, so fall through to a simple
* sequential index-pack + finalize.
*/
if (np == 1) {
my_run_index_pack(params, status,
&entries[0].temp_path_pack,
&entries[0].temp_path_idx,
NULL);
if (status->ec == GH__ERROR_CODE__OK) {
finalize_prefetch_packfile(params, status, &entries[0]);
if (status->ec == GH__ERROR_CODE__OK)
nr_installed++;
}
} else {
struct prefetch_parallel_ctx pctx = {
.entries = entries,
.np = np,
.next = 0,
.params = params,
.status = status,
.nr_finished = 0,
.nr_installed = 0,
};
struct run_process_parallel_opts pp_opts = {
.tr2_category = TR2_CAT,
.tr2_label = "prefetch/index-pack",
.processes = MY_MIN(np, PREFETCH_MAX_WORKERS),
.get_next_task = prefetch_get_next_task,
.task_finished = prefetch_task_finished,
.data = &pctx,
};

if (gh__cmd_opts.show_progress)
pctx.progress = start_progress(
the_repository,
"Installing prefetch packfiles", np);

run_processes_parallel(&pp_opts);

stop_progress(&pctx.progress);
nr_installed = pctx.nr_installed;
}

if (nr_installed)
delete_stale_keep_files(params, status);

cleanup:
if (entries) {
for (k = 0; k < np; k++)
prefetch_entry_release(&entries[k]);
free(entries);
}

if (fd != -1)
close(fd);

Expand Down
Loading