diff --git a/NEWS b/NEWS index 1f0608f2..8ded1516 100644 --- a/NEWS +++ b/NEWS @@ -1,3 +1,8 @@ + * xbps-install: add support for parallel downloads when fetching + binary packages. The number of parallel jobs can be configured + with the --parallel-download option or the fetch_jobs setting + in xbps.conf. + xbps-0.60.7 (2026-02-09): * xbps-query(1): fix off-by-one error in list ellipsis. [@kkmisiaszek] diff --git a/bin/xbps-install/main.c b/bin/xbps-install/main.c index 8f8be816..f577eb04 100644 --- a/bin/xbps-install/main.c +++ b/bin/xbps-install/main.c @@ -57,6 +57,7 @@ usage(bool fail) " -M, --memory-sync Remote repository data is fetched and stored\n" " in memory, ignoring on-disk repodata archives\n" " -n, --dry-run Dry-run mode\n" + " -p, --parallel-download Number of parallel fetch jobs\n" " -R, --repository Add repository to the top of the list\n" " This option can be specified multiple times\n" " -r, --rootdir Full path to rootdir\n" @@ -97,7 +98,7 @@ repo_import_key_cb(struct xbps_repo *repo, void *arg UNUSED, bool *done UNUSED) int main(int argc, char **argv) { - const char *shortopts = "AC:c:DdfhIiMnR:r:SuUVvy"; + const char *shortopts = "AC:c:DdfhIip:MnR:r:SuUVvy"; const struct option longopts[] = { { "automatic", no_argument, NULL, 'A' }, { "config", required_argument, NULL, 'C' }, @@ -110,6 +111,7 @@ main(int argc, char **argv) { "ignore-file-conflicts", no_argument, NULL, 'I' }, { "memory-sync", no_argument, NULL, 'M' }, { "dry-run", no_argument, NULL, 'n' }, + { "parallel-download", required_argument, NULL, 'p' }, { "repository", required_argument, NULL, 'R' }, { "rootdir", required_argument, NULL, 'r' }, { "sync", no_argument, NULL, 'S' }, @@ -134,6 +136,7 @@ main(int argc, char **argv) syncf = yes = force = drun = update = false; memset(&xh, 0, sizeof(xh)); + xh.fetch_jobs = 1; while ((c = getopt_long(argc, argv, shortopts, longopts, NULL)) != -1) { switch (c) { @@ -179,6 +182,11 @@ main(int argc, char **argv) case 'n': drun = true; break; + case 'p': + xh.fetch_jobs = atoi(optarg); + if (xh.fetch_jobs <= 0) + xh.fetch_jobs = 1; + break; case 'R': xbps_repo_store(&xh, optarg); break; diff --git a/bin/xbps-install/xbps-install.1 b/bin/xbps-install/xbps-install.1 index 020edff6..581e3403 100644 --- a/bin/xbps-install/xbps-install.1 +++ b/bin/xbps-install/xbps-install.1 @@ -138,6 +138,13 @@ for in memory sync. .Pp The output will be a line for each action in the following format: .D1 +.Fl p , Fl -parallel-download +.Ar jobs +Set the number of parallel downloads when fetching binary packages +from remote repositories. The default value is 1, which performs +downloads sequentially. Increasing this value allows multiple +packages to be downloaded concurrently and may improve performance +on high bandwidth or high latency connections. .It Fl R , Fl -repository Ar url Appends the specified repository to the top of the list. The diff --git a/data/_xbps b/data/_xbps index bbd00f77..7faabb20 100644 --- a/data/_xbps +++ b/data/_xbps @@ -143,7 +143,8 @@ _xbps_install() { {-U,--unpack-only}'[Unpack packages without configuring]' \ {-M,--memory-sync}'[Keep remote repository data in memory only]' \ {-n,--dry-run}'[Dry-run mode]' \ - '*'{-R,--repository=-}'[Add repository to the top of the list]:repository url:_files -/' \ + {-p,--parallel-download=-}'[Number of parallel downloads]:jobs:(1 2 3 4 5 6 8 12 16)' \ + '*'{-R,--repository=-}'[Add repository to the top of the list]:repository url:_files -/' \ {-S,--sync}'[Sync remote repository index]' \ --reproducible'[Enable reproducible mode in pkgdb]' \ --staging'[Enable use of staged packages]' \ diff --git a/data/xbps.d.5 b/data/xbps.d.5 index 3c5101bf..9b3cd1a9 100644 --- a/data/xbps.d.5 +++ b/data/xbps.d.5 @@ -66,6 +66,14 @@ Sets the default cache directory to store downloaded binary packages from remote repositories, as well as its signatures. If path starts with '/' it's an absolute path, otherwise it will be relative to .Ar rootdir . +.It Sy fetch_jobs=number +Specifies the number of parallel downloads when fetching binary +packages from remote repositories. +The default value is 1, which performs downloads sequentially. +Increasing this value allows multiple packages to be downloaded +concurrently and may improve performance on fast network +connections or high latency networks. +This option may be overridden by command line options. .It Sy ignorepkg=pkgname Declares an ignored package. If a package depends on an ignored package the dependency is always satisfied, diff --git a/include/xbps.h.in b/include/xbps.h.in index 6a35eebb..3d03b636 100644 --- a/include/xbps.h.in +++ b/include/xbps.h.in @@ -691,6 +691,10 @@ struct xbps_handle { * - XBPS_FLAG_* (see above) */ int flags; + /** + * Number of parallel fetch jobs. + */ + unsigned int fetch_jobs; }; /** diff --git a/lib/cb_util.c b/lib/cb_util.c index a867fb63..a14a43da 100644 --- a/lib/cb_util.c +++ b/lib/cb_util.c @@ -23,6 +23,7 @@ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include #include #include #include @@ -31,6 +32,8 @@ #include "xbps_api_impl.h" +static pthread_mutex_t fetch_cb_lock = PTHREAD_MUTEX_INITIALIZER; + void HIDDEN xbps_set_cb_fetch(struct xbps_handle *xhp, off_t file_size, @@ -54,7 +57,10 @@ xbps_set_cb_fetch(struct xbps_handle *xhp, xfcd.cb_start = cb_start; xfcd.cb_update = cb_update; xfcd.cb_end = cb_end; + + pthread_mutex_lock(&fetch_cb_lock); (*xhp->fetch_cb)(&xfcd, xhp->fetch_cb_data); + pthread_mutex_unlock(&fetch_cb_lock); } int HIDDEN PRINTF_LIKE(5, 6) diff --git a/lib/conf.c b/lib/conf.c index 4ac1dab9..960fc7cc 100644 --- a/lib/conf.c +++ b/lib/conf.c @@ -204,6 +204,7 @@ enum { KEY_ARCHITECTURE, KEY_BESTMATCHING, KEY_CACHEDIR, + KEY_FETCHJOBS, KEY_IGNOREPKG, KEY_INCLUDE, KEY_NOEXTRACT, @@ -224,6 +225,7 @@ static const struct key { { "architecture", 12, KEY_ARCHITECTURE }, { "bestmatching", 12, KEY_BESTMATCHING }, { "cachedir", 8, KEY_CACHEDIR }, + { "fetch_jobs", 10, KEY_FETCHJOBS }, { "ignorepkg", 9, KEY_IGNOREPKG }, { "include", 7, KEY_INCLUDE }, { "keepconf", 8, KEY_KEEPCONF }, @@ -461,6 +463,12 @@ parse_file(struct xbps_handle *xhp, const char *path, bool nested) rv = parse_files_glob(xhp, NULL, dirname(dir), val, true); free(dir); break; + case KEY_FETCHJOBS: + xhp->fetch_jobs = strtoul(val, NULL, 10); + if (xhp->fetch_jobs == 0) + xhp->fetch_jobs = 1; + xbps_dbg_printf("%s: fetch_jobs set to %u\n", path, xhp->fetch_jobs); + break; } } free(linebuf); diff --git a/lib/transaction_fetch.c b/lib/transaction_fetch.c index 8d484415..fae951a0 100644 --- a/lib/transaction_fetch.c +++ b/lib/transaction_fetch.c @@ -26,12 +26,23 @@ #include #include +#include +#include #include #include #include "xbps_api_impl.h" #include "fetch.h" +struct fetch_task { + struct xbps_handle *xhp; + xbps_array_t fetch; + unsigned int index; + unsigned int count; + pthread_mutex_t lock; + int rv; +}; + static int verify_binpkg(struct xbps_handle *xhp, xbps_dictionary_t pkgd) { @@ -193,6 +204,40 @@ download_binpkg(struct xbps_handle *xhp, xbps_dictionary_t repo_pkgd) return rv; } + +static void * +fetch_worker(void *arg) +{ + struct fetch_task *task = arg; + while (1) { + xbps_dictionary_t pkgd; + int r; + + pthread_mutex_lock(&task->lock); + + if (task->index >= task->count) { + pthread_mutex_unlock(&task->lock); + break; + } + + pkgd = xbps_array_get(task->fetch, task->index); + task->index++; + + pthread_mutex_unlock(&task->lock); + + r = download_binpkg(task->xhp, pkgd); + if (r != 0) { + pthread_mutex_lock(&task->lock); + if (task->rv == 0) + task->rv = r; + pthread_mutex_unlock(&task->lock); + break; + } + } + return NULL; +} + + int xbps_transaction_fetch(struct xbps_handle *xhp, xbps_object_iterator_t iter) { @@ -203,6 +248,10 @@ xbps_transaction_fetch(struct xbps_handle *xhp, xbps_object_iterator_t iter) int rv = 0; unsigned int i, n; + long cpus = sysconf(_SC_NPROCESSORS_ONLN); + unsigned int jobs; + + xbps_object_iterator_reset(iter); while ((obj = xbps_object_iterator_next(iter)) != NULL) { @@ -242,15 +291,76 @@ xbps_transaction_fetch(struct xbps_handle *xhp, xbps_object_iterator_t iter) * Download binary packages (if they come from a remote repository) * and don't exist already. */ + if (fetch == NULL) + goto out; + n = xbps_array_count(fetch); if (n) { xbps_set_cb_state(xhp, XBPS_STATE_TRANS_DOWNLOAD, 0, NULL, NULL); xbps_dbg_printf("[trans] downloading %d packages.\n", n); } - for (i = 0; i < n; i++) { - if ((rv = download_binpkg(xhp, xbps_array_get(fetch, i))) != 0) { - xbps_dbg_printf("[trans] failed to download binpkgs: " - "%s\n", strerror(rv)); + + jobs = xhp->fetch_jobs; + + /* Don't spawn more workers than packages */ + if (jobs > n) + jobs = n; + + if (cpus < 1) + cpus = 1; + + /* Limit workers to a reasonable multiple of CPU cores */ + if (jobs > cpus * 2) + jobs = cpus * 2; + + /* Fallback to serial download when parallelism is unnecessary */ + if (jobs <= 1 || n <= 1) { + for (i = 0; i < n; i++) { + if ((rv = download_binpkg(xhp, xbps_array_get(fetch, i))) != 0) { + xbps_dbg_printf("[trans] failed to download binpkgs: %s\n", + strerror(rv)); + goto out; + } + } + } else { + pthread_t *th; + struct fetch_task task; + unsigned int created = 0; + + th = calloc(jobs, sizeof(*th)); + if (th == NULL) { + rv = errno; + goto out; + } + + task.xhp = xhp; + task.fetch = fetch; + task.index = 0; + task.count = n; + task.rv = 0; + + if (pthread_mutex_init(&task.lock, NULL) != 0) { + rv = errno; + free(th); + goto out; + } + + for (i = 0; i < jobs; i++) { + if (pthread_create(&th[i], NULL, fetch_worker, &task) != 0) { + rv = errno; + break; + } + created++; + } + + for (i = 0; i < created; i++) + pthread_join(th[i], NULL); + + pthread_mutex_destroy(&task.lock); + free(th); + + if (task.rv != 0) { + rv = task.rv; goto out; } }