diff --git a/gvfs-helper.c b/gvfs-helper.c index 0b865dc3ecc0db..b622af7d0724b9 100644 --- a/gvfs-helper.c +++ b/gvfs-helper.c @@ -2250,39 +2250,56 @@ struct ph { }; /* - * Extract the next packfile from the multipack. - * Install {.pack, .idx, .keep} set. + * Per-packfile metadata collected during the extraction phase + * of prefetch installation. After all packfiles are extracted + * from the multipack, each entry is handed to index-pack. + */ +struct prefetch_entry { + struct strbuf temp_path_pack; + struct strbuf temp_path_idx; + char hex_checksum[GIT_MAX_HEXSZ + 1]; + timestamp_t timestamp; +}; + +#define PREFETCH_ENTRY_INIT { \ + .temp_path_pack = STRBUF_INIT, \ + .temp_path_idx = STRBUF_INIT, \ + .hex_checksum = {0}, \ + .timestamp = 0, \ +} + +static void prefetch_entry_release(struct prefetch_entry *pe) +{ + strbuf_release(&pe->temp_path_pack); + strbuf_release(&pe->temp_path_idx); +} + +/* + * Extract the next packfile from the multipack into a temp file. + * Populate `entry` with the temp path and checksum, then advance + * the fd past any trailing .idx data. * - * Mark each successfully installed prefetch pack as .keep it as installed - * in case we have errors decoding/indexing later packs within the received - * multipart file. (A later pass can delete the unnecessary .keep files - * from this and any previous invocations.) + * This is the I/O-bound phase that must run sequentially because + * the multipack is a single stream. */ static void extract_packfile_from_multipack( - struct gh__request_params *params, struct gh__response_status *status, int fd_multipack, - unsigned short k) + unsigned short k, + struct prefetch_entry *entry) { struct ph ph; struct tempfile *tempfile_pack = NULL; int result = -1; int b_no_idx_in_multipack; struct object_id packfile_checksum; - char hex_checksum[GIT_MAX_HEXSZ + 1]; - struct strbuf buf_timestamp = STRBUF_INIT; - struct strbuf temp_path_pack = STRBUF_INIT; - struct strbuf temp_path_idx = STRBUF_INIT; - struct strbuf final_path_pack = STRBUF_INIT; - struct strbuf final_path_idx = STRBUF_INIT; - struct strbuf final_filename = STRBUF_INIT; if (xread(fd_multipack, &ph, sizeof(ph)) != sizeof(ph)) { strbuf_addf(&status->error_message, "could not read header for packfile[%d] in multipack", k); status->ec = GH__ERROR_CODE__COULD_NOT_INSTALL_PREFETCH; - goto done; + return; } ph.timestamp = my_get_le64(ph.timestamp); @@ -2293,7 +2310,7 @@ static void extract_packfile_from_multipack( strbuf_addf(&status->error_message, "packfile[%d]: zero length packfile?", k); status->ec = GH__ERROR_CODE__COULD_NOT_INSTALL_PREFETCH; - goto done; + return; } b_no_idx_in_multipack = (ph.idx_len == maximum_unsigned_value_of_type(uint64_t) || @@ -2306,7 +2323,7 @@ static void extract_packfile_from_multipack( */ my_create_tempfile(status, 0, "pack", &tempfile_pack, NULL, NULL); if (!tempfile_pack) - goto done; + return; /* * Copy the current packfile from the open stream and capture @@ -2322,29 +2339,23 @@ static void extract_packfile_from_multipack( GIT_SHA1_RAWSZ); packfile_checksum.algo = GIT_HASH_SHA1; - if (result < 0){ + if (result < 0) { strbuf_addf(&status->error_message, "could not extract packfile[%d] from multipack", k); - goto done; + delete_tempfile(&tempfile_pack); + return; } - strbuf_addstr(&temp_path_pack, get_tempfile_path(tempfile_pack)); + strbuf_addstr(&entry->temp_path_pack, get_tempfile_path(tempfile_pack)); close_tempfile_gently(tempfile_pack); - oid_to_hex_r(hex_checksum, &packfile_checksum); - - /* - * Always compute the .idx file from the .pack file. - */ - strbuf_addbuf(&temp_path_idx, &temp_path_pack); - strbuf_strip_suffix(&temp_path_idx, ".pack"); - strbuf_addstr(&temp_path_idx, ".idx"); + oid_to_hex_r(entry->hex_checksum, &packfile_checksum); + entry->timestamp = (timestamp_t)ph.timestamp; - my_run_index_pack(params, status, - &temp_path_pack, &temp_path_idx, - NULL); - if (status->ec != GH__ERROR_CODE__OK) - goto done; + /* Derive the .idx temp path from the .pack temp path. */ + strbuf_addbuf(&entry->temp_path_idx, &entry->temp_path_pack); + strbuf_strip_suffix(&entry->temp_path_idx, ".pack"); + strbuf_addstr(&entry->temp_path_idx, ".idx"); if (!b_no_idx_in_multipack) { /* @@ -2356,30 +2367,118 @@ static void extract_packfile_from_multipack( "could not skip index[%d] in multipack", k); status->ec = GH__ERROR_CODE__COULD_NOT_INSTALL_PREFETCH; - goto done; + return; } } +} + +/* + * Finalize a prefetch packfile after index-pack has already run: + * compute final pathnames and move .pack/.idx/.keep into the ODB. + */ +static void finalize_prefetch_packfile(struct gh__request_params *params, + struct gh__response_status *status, + struct prefetch_entry *entry) +{ + struct strbuf buf_timestamp = STRBUF_INIT; + struct strbuf final_path_pack = STRBUF_INIT; + struct strbuf final_path_idx = STRBUF_INIT; + struct strbuf final_filename = STRBUF_INIT; - strbuf_addf(&buf_timestamp, "%u", (unsigned int)ph.timestamp); - create_final_packfile_pathnames("prefetch", buf_timestamp.buf, hex_checksum, + strbuf_addf(&buf_timestamp, "%u", (unsigned int)entry->timestamp); + create_final_packfile_pathnames("prefetch", buf_timestamp.buf, + entry->hex_checksum, &final_path_pack, &final_path_idx, &final_filename); - strbuf_release(&buf_timestamp); my_finalize_packfile(params, status, 1, - &temp_path_pack, &temp_path_idx, + &entry->temp_path_pack, &entry->temp_path_idx, &final_path_pack, &final_path_idx, &final_filename); -done: - delete_tempfile(&tempfile_pack); - strbuf_release(&temp_path_pack); - strbuf_release(&temp_path_idx); + strbuf_release(&buf_timestamp); strbuf_release(&final_path_pack); strbuf_release(&final_path_idx); strbuf_release(&final_filename); } +#define PREFETCH_MAX_WORKERS 4 + +/* + * Context for parallel index-pack execution. + * + * The run_processes_parallel() callbacks are always called from + * the main thread, so no locking is needed for these fields. + */ +struct prefetch_parallel_ctx { + struct prefetch_entry *entries; + unsigned short np; + unsigned short next; + + struct gh__request_params *params; + struct gh__response_status *status; + + struct progress *progress; + int nr_finished; + int nr_installed; +}; + +static int prefetch_get_next_task(struct child_process *cp, + struct strbuf *out UNUSED, + void *pp_cb, + void **pp_task_cb) +{ + struct prefetch_parallel_ctx *ctx = pp_cb; + struct prefetch_entry *entry; + + if (ctx->next >= ctx->np) + return 0; + + entry = &ctx->entries[ctx->next]; + *pp_task_cb = entry; + ctx->next++; + + cp->git_cmd = 1; + strvec_push(&cp->args, "index-pack"); + strvec_push(&cp->args, "--no-rev-index"); + strvec_pushl(&cp->args, "-o", entry->temp_path_idx.buf, NULL); + strvec_push(&cp->args, entry->temp_path_pack.buf); + cp->no_stdin = 1; + + return 1; +} + +static int prefetch_task_finished(int result, + struct strbuf *out UNUSED, + void *pp_cb, + void *pp_task_cb) +{ + struct prefetch_parallel_ctx *ctx = pp_cb; + struct prefetch_entry *entry = pp_task_cb; + + ctx->nr_finished++; + display_progress(ctx->progress, ctx->nr_finished); + + if (result) { + unlink(entry->temp_path_pack.buf); + unlink(entry->temp_path_idx.buf); + + if (ctx->status->ec == GH__ERROR_CODE__OK) { + strbuf_addf(&ctx->status->error_message, + "index-pack failed on '%s'", + entry->temp_path_pack.buf); + ctx->status->ec = GH__ERROR_CODE__INDEX_PACK_FAILED; + } + return 0; + } + + finalize_prefetch_packfile(ctx->params, ctx->status, entry); + if (ctx->status->ec == GH__ERROR_CODE__OK) + ctx->nr_installed++; + + return 0; +} + struct keep_files_data { timestamp_t max_timestamp; int pos_of_max; @@ -2450,6 +2549,8 @@ static void install_prefetch(struct gh__request_params *params, int fd = -1; int nr_installed = 0; + struct prefetch_entry *entries = NULL; + struct strbuf temp_path_mp = STRBUF_INIT; /* @@ -2485,22 +2586,103 @@ static void install_prefetch(struct gh__request_params *params, trace2_data_intmax(TR2_CAT, NULL, "prefetch/packfile_count", np); + if (!np) + goto cleanup; + + CALLOC_ARRAY(entries, np); + for (k = 0; k < np; k++) { + struct prefetch_entry pe = PREFETCH_ENTRY_INIT; + entries[k] = pe; + } + + /* + * Phase 1: extract all packfiles from the multipack into + * individual temp files. This must be sequential because + * the multipack is a single byte stream. + */ if (gh__cmd_opts.show_progress) - params->progress = start_progress(the_repository, "Installing prefetch packfiles", np); + params->progress = start_progress( + the_repository, "Extracting prefetch packfiles", np); for (k = 0; k < np; k++) { - extract_packfile_from_multipack(params, status, fd, k); + extract_packfile_from_multipack(status, fd, k, &entries[k]); display_progress(params->progress, k + 1); if (status->ec != GH__ERROR_CODE__OK) break; - nr_installed++; } stop_progress(¶ms->progress); + /* The multipack fd is no longer needed after extraction. */ + close(fd); + fd = -1; + + if (status->ec != GH__ERROR_CODE__OK) + goto cleanup; + + /* + * Phase 2: run index-pack on the extracted packfiles in + * parallel and finalize each into the ODB. + * + * Use up to PREFETCH_MAX_WORKERS concurrent index-pack + * processes. The entries are already in timestamp order + * (oldest first), so the largest pack—the one that takes + * the longest—starts immediately while the remaining + * workers cycle through the smaller daily/hourly packs. + * + * When there is only one packfile there is no benefit from + * the parallel infrastructure, so fall through to a simple + * sequential index-pack + finalize. + */ + if (np == 1) { + my_run_index_pack(params, status, + &entries[0].temp_path_pack, + &entries[0].temp_path_idx, + NULL); + if (status->ec == GH__ERROR_CODE__OK) { + finalize_prefetch_packfile(params, status, &entries[0]); + if (status->ec == GH__ERROR_CODE__OK) + nr_installed++; + } + } else { + struct prefetch_parallel_ctx pctx = { + .entries = entries, + .np = np, + .next = 0, + .params = params, + .status = status, + .nr_finished = 0, + .nr_installed = 0, + }; + struct run_process_parallel_opts pp_opts = { + .tr2_category = TR2_CAT, + .tr2_label = "prefetch/index-pack", + .processes = MY_MIN(np, PREFETCH_MAX_WORKERS), + .get_next_task = prefetch_get_next_task, + .task_finished = prefetch_task_finished, + .data = &pctx, + }; + + if (gh__cmd_opts.show_progress) + pctx.progress = start_progress( + the_repository, + "Installing prefetch packfiles", np); + + run_processes_parallel(&pp_opts); + + stop_progress(&pctx.progress); + nr_installed = pctx.nr_installed; + } + if (nr_installed) delete_stale_keep_files(params, status); cleanup: + if (entries) { + for (k = 0; k < np; k++) + prefetch_entry_release(&entries[k]); + free(entries); + } + if (fd != -1) close(fd);