From 701d4d42703662f5775c847bb55ff1baa6f2a5e7 Mon Sep 17 00:00:00 2001 From: Tasko Olevski <16360283+olevski@users.noreply.github.com> Date: Wed, 11 Feb 2026 11:50:18 +0100 Subject: [PATCH 1/4] chore: initial implementation chore: fix linting --- Cargo.lock | 20 ++++++ Cargo.toml | 2 + src/cli.rs | 2 + src/cli/cmd.rs | 10 +++ src/cli/cmd/dataset.rs | 44 ++++++++++++ src/cli/cmd/dataset/deposit.rs | 50 +++++++++++++ src/cli/cmd/dataset/zenodo.rs | 126 +++++++++++++++++++++++++++++++++ src/cli/cmd/login.rs | 2 +- src/cli/cmd/version.rs | 2 +- src/cli/opts.rs | 3 + src/httpclient.rs | 1 + 11 files changed, 260 insertions(+), 2 deletions(-) create mode 100644 src/cli/cmd/dataset.rs create mode 100644 src/cli/cmd/dataset/deposit.rs create mode 100644 src/cli/cmd/dataset/zenodo.rs diff --git a/Cargo.lock b/Cargo.lock index ca80b3d..2dec6a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -506,6 +506,7 @@ checksum = "d64e8af5551369d19cf50138de61f1c42074ab970f74e99be916646777f8fc87" dependencies = [ "encode_unicode", "libc", + "once_cell", "unicode-width", "windows-sys 0.61.2", ] @@ -2809,6 +2810,7 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 1.0.6", ] @@ -2871,6 +2873,7 @@ dependencies = [ "toml 0.8.23", "url", "vergen", + "walkdir", ] [[package]] @@ -4422,6 +4425,23 @@ dependencies = [ "windows-link 0.1.3", ] +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-result" version = "0.3.4" diff --git a/Cargo.toml b/Cargo.toml index 3d6a6bf..06a5c03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,8 @@ serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" snafu = { version = "0.8.5" } tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", features = ["codec"] } +walkdir = "2" futures = { version = "0.3" } regex = { version = "1.12.3" } iso8601-timestamp = { version = "0.2.17" } diff --git a/src/cli.rs b/src/cli.rs index c01876d..17b8f12 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -25,6 +25,8 @@ pub async fn execute_cmd(opts: MainOpts) -> Result<(), CmdError> { #[cfg(feature = "user-doc")] SubCommand::UserDoc(input) => input.exec(ctx).await?, + + SubCommand::Dataset(input) => input.exec(ctx).await?, }; Ok(()) } diff --git a/src/cli/cmd.rs b/src/cli/cmd.rs index 59b15d3..0750340 100644 --- a/src/cli/cmd.rs +++ b/src/cli/cmd.rs @@ -1,3 +1,4 @@ +pub mod dataset; pub mod login; pub mod project; pub mod update; @@ -109,6 +110,9 @@ pub enum CmdError { #[cfg(feature = "user-doc")] #[snafu(display("UserDoc - {}", source))] UserDoc { source: userdoc::Error }, + + #[snafu(display("Dataset - {}", source))] + Dataset { source: dataset::Error }, } impl From for CmdError { @@ -140,3 +144,9 @@ impl From for CmdError { CmdError::Login { source } } } + +impl From for CmdError { + fn from(source: dataset::Error) -> Self { + CmdError::Dataset { source } + } +} diff --git a/src/cli/cmd/dataset.rs b/src/cli/cmd/dataset.rs new file mode 100644 index 0000000..35c5e01 --- /dev/null +++ b/src/cli/cmd/dataset.rs @@ -0,0 +1,44 @@ +pub mod deposit; +pub mod zenodo; + +use super::Context; +use clap::Parser; +use snafu::Snafu; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Error with deposit: {}", source))] + Deposit { source: deposit::Error }, +} + +/// Sub command for managing datasets +#[derive(Parser, Debug)] +pub struct Input { + #[command(subcommand)] + pub subcmd: DatasetCommand, +} + +#[derive(Parser, Debug)] +pub enum DatasetCommand { + Deposit { + #[command(subcommand)] + cmd: DepositCommand, + }, +} + +#[derive(Parser, Debug)] +pub enum DepositCommand { + #[command()] + CopyFiles(deposit::CopyInput), +} + +impl Input { + pub async fn exec(&self, _ctx: Context) -> Result<(), Error> { + match self.subcmd { + DatasetCommand::Deposit { cmd: _ } => { + print!("Hi"); + Ok(()) + } + } + } +} diff --git a/src/cli/cmd/dataset/deposit.rs b/src/cli/cmd/dataset/deposit.rs new file mode 100644 index 0000000..17cbdd3 --- /dev/null +++ b/src/cli/cmd/dataset/deposit.rs @@ -0,0 +1,50 @@ +use super::zenodo; +use super::Context; +use clap::{Parser, ValueEnum}; +use snafu::{ResultExt, Snafu}; +use std::env::VarError; +use std::path::PathBuf; + +#[derive(Debug, Clone, ValueEnum)] +pub enum Provider { + Zenodo, +} + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("A dataset deposit error occured: {}", source))] + Zenodo { source: zenodo::Error }, + + #[snafu(display("Env variable error: {}", source))] + EnvVarMissing { source: VarError }, +} + +/// Copies the data from a location into a data deposit +#[derive(Parser, Debug)] +pub struct CopyInput { + /// The id of the deposit the data should be copied to. + #[arg()] + pub deposit_id: String, + + /// The provider for the dataset + #[arg()] + pub provider: Provider, + + /// The source directory where the files to be copied can be found. + #[arg()] + pub source_dir: PathBuf, +} + +impl CopyInput { + pub async fn exec(&self, _ctx: Context) -> Result<(), Error> { + match self.provider { + Provider::Zenodo => { + let token = std::env::var("ZENODO_API_KEY").context(EnvVarMissingSnafu)?; + let clnt = zenodo::ZenodoClient::new(token); + clnt.upload_files(&self.deposit_id, &self.source_dir) + .await + .context(ZenodoSnafu) + } + } + } +} diff --git a/src/cli/cmd/dataset/zenodo.rs b/src/cli/cmd/dataset/zenodo.rs new file mode 100644 index 0000000..3289786 --- /dev/null +++ b/src/cli/cmd/dataset/zenodo.rs @@ -0,0 +1,126 @@ +use mime_guess::from_path; +use reqwest; +use serde::de::DeserializeOwned; +use snafu::{ResultExt, Snafu}; +use std::path::{Path, PathBuf}; +use tokio::fs::File; +use tokio_util::codec::{BytesCodec, FramedRead}; +use url::{ParseError, Url}; +use walkdir::WalkDir; + +use std::io; +use std::sync::LazyLock; + +pub struct ZenodoClient { + http_client: reqwest::Client, + base_url: Url, + token: String, +} + +static BASE_URL: LazyLock = + LazyLock::new(|| Url::parse("https://zenodo.org").expect("Invalid Base URL config")); + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("A zendoo client error occured: {}", source))] + Reqwest { source: reqwest::Error }, + #[snafu(display("A directory listing error occured: {}", source))] + DirWalk { source: walkdir::Error }, + #[snafu(display("An error occured reading the response: {}", source))] + DeserializeResp { source: reqwest::Error }, + #[snafu(display("An error occured reading the response: {}", source))] + FileReading { source: io::Error }, + #[snafu(display("An error occured parsing the file path: {}", fp.to_string_lossy()))] + FileParsing { fp: PathBuf }, + #[snafu(display("An error occured parsing the url {}", source))] + UrlParse { source: ParseError }, +} + +impl ZenodoClient { + pub fn new(token: String) -> ZenodoClient { + let http_client = reqwest::Client::new(); + ZenodoClient { + http_client, + base_url: BASE_URL.clone(), + token, + } + } + + fn make_url(&self, path: &str) -> Result { + self.base_url.join(path).context(UrlParseSnafu) + } + + pub async fn get_deposition( + &self, + deposition_id: &str, + ) -> Result { + let endpoint = self.make_url(&format!("/api/deposit/depositions/{deposition_id}"))?; + let res = self + .http_client + .get(endpoint) + .bearer_auth(&self.token) + .send() + .await + .context(ReqwestSnafu)?; + // res.error_for_status().context(ReqwestSnafu)?; + res.json::().await.context(DeserializeRespSnafu) + } + + pub async fn upload_files(&self, deposition_id: &str, source_path: &Path) -> Result<(), Error> { + for f in WalkDir::new(source_path) { + self.upload_file::<()>(deposition_id, f.context(DirWalkSnafu)?.path()) + .await?; + } + Ok(()) + } + + pub async fn list_files(&self, deposition_id: &str) -> Result { + let endpoint = self.make_url(&format!("/api/deposit/depositions/{deposition_id}/files"))?; + let res = self + .http_client + .get(endpoint) + .bearer_auth(&self.token) + .send() + .await + .context(ReqwestSnafu)?; + res.json::().await.context(DeserializeRespSnafu) + } + + async fn upload_file( + &self, + deposition_id: &str, + file_path: &Path, + ) -> Result { + let file = File::open(file_path).await.context(FileReadingSnafu)?; + let endpoint = self.make_url(&format!("/api/deposit/depositions/{deposition_id}/files"))?; + let stream = FramedRead::new(file, BytesCodec::new()); + let file_name = file_path + .file_name() + .ok_or(Error::FileParsing { + fp: file_path.to_path_buf(), + })? + .to_str() + .ok_or(Error::FileParsing { + fp: file_path.to_path_buf(), + })?; + let mime_type = from_path(file_path).first_or_octet_stream(); + let form = reqwest::multipart::Form::new() + .text("name", file_name.to_owned()) + .part( + "file", + reqwest::multipart::Part::stream(reqwest::Body::wrap_stream(stream)) + .file_name(file_name.to_owned()) + .mime_str(mime_type.as_ref()) + .context(ReqwestSnafu)?, + ); + let res = self + .http_client + .post(endpoint) + .bearer_auth(&self.token) + .multipart(form) + .send() + .await + .context(ReqwestSnafu)?; + res.json::().await.context(DeserializeRespSnafu) + } +} diff --git a/src/cli/cmd/login.rs b/src/cli/cmd/login.rs index 88231c5..d372c87 100644 --- a/src/cli/cmd/login.rs +++ b/src/cli/cmd/login.rs @@ -68,7 +68,7 @@ enum Steps<'a> { Complete, } impl Input { - fn get_steps(&self) -> Steps<'_> { + fn get_steps(&'_ self) -> Steps<'_> { if let Some(p) = &self.continue_from { Steps::Continue(p) } else if self.user_code_only { diff --git a/src/cli/cmd/version.rs b/src/cli/cmd/version.rs index a9fdee0..917df2d 100644 --- a/src/cli/cmd/version.rs +++ b/src/cli/cmd/version.rs @@ -58,7 +58,7 @@ pub struct Versions<'a> { pub renku_url: &'a str, } impl Versions<'_> { - pub fn create(server: VersionInfo, renku_url: &str) -> Versions<'_> { + pub fn create(server: VersionInfo, renku_url: &'_ str) -> Versions<'_> { Versions { client: BuildInfo::default(), server, diff --git a/src/cli/opts.rs b/src/cli/opts.rs index a77a554..9b87f16 100644 --- a/src/cli/opts.rs +++ b/src/cli/opts.rs @@ -63,6 +63,9 @@ pub enum SubCommand { #[cfg(feature = "user-doc")] UserDoc(userdoc::Input), + + #[command()] + Dataset(dataset::Input), } /// This is the command line interface to the Renku platform. Main diff --git a/src/httpclient.rs b/src/httpclient.rs index 0db5487..cf23c87 100644 --- a/src/httpclient.rs +++ b/src/httpclient.rs @@ -44,6 +44,7 @@ use std::path::PathBuf; const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); #[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] pub enum Error { #[snafu(display("An error was received from {}: {}", url, source))] Http { source: reqwest::Error, url: String }, From e26800357f74f939890f406e10f0b59449c9b16e Mon Sep 17 00:00:00 2001 From: Tasko Olevski <16360283+olevski@users.noreply.github.com> Date: Fri, 13 Feb 2026 23:25:22 +0100 Subject: [PATCH 2/4] feat: add zenodo dataset commands --- Cargo.lock | 55 ++++++++ Cargo.toml | 3 + src/cli/cmd/dataset.rs | 56 ++++++-- src/cli/cmd/dataset/deposit.rs | 85 +++++++++--- src/cli/cmd/dataset/zenodo.rs | 223 ++++++++++++++++++++++++------ src/cli/cmd/dataset/zenodo_api.rs | 31 +++++ 6 files changed, 380 insertions(+), 73 deletions(-) create mode 100644 src/cli/cmd/dataset/zenodo_api.rs diff --git a/Cargo.lock b/Cargo.lock index 2dec6a8..68762cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -274,6 +274,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" +[[package]] +name = "bytecount" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e" + [[package]] name = "bytes" version = "1.11.1" @@ -1958,6 +1964,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "md5" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae960838283323069879657ca3de837e9f7bbb4c7bf6ea7f1b290d5e9476d2e0" + [[package]] name = "memchr" version = "2.8.0" @@ -2313,6 +2325,17 @@ dependencies = [ "sha2", ] +[[package]] +name = "papergrid" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6978128c8b51d8f4080631ceb2302ab51e32cc6e8615f735ee2f83fd269ae3f1" +dependencies = [ + "bytecount", + "fnv", + "unicode-width", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -2497,6 +2520,28 @@ dependencies = [ "elliptic-curve", ] +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -2869,6 +2914,7 @@ dependencies = [ "serde", "serde_json", "snafu", + "tabled", "tokio", "toml 0.8.23", "url", @@ -3585,6 +3631,15 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" +[[package]] +name = "testing_table" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f8daae29995a24f65619e19d8d31dea5b389f3d853d8bf297bbf607cd0014cc" +dependencies = [ + "unicode-width", +] + [[package]] name = "thiserror" version = "1.0.69" diff --git a/Cargo.toml b/Cargo.toml index 06a5c03..af1c379 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,8 @@ snafu = { version = "0.8.5" } tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7", features = ["codec"] } walkdir = "2" +tabled = "0.20.0" +chrono = "0.4.43" futures = { version = "0.3" } regex = { version = "1.12.3" } iso8601-timestamp = { version = "0.2.17" } @@ -56,6 +58,7 @@ self_update = { version = "0.43.1", features = [ "archive-zip", "compression-flate2", ] } +md5 = "0.8.0" [features] default = ["reqwest/default-tls"] # link against system library diff --git a/src/cli/cmd/dataset.rs b/src/cli/cmd/dataset.rs index 35c5e01..4c6d36a 100644 --- a/src/cli/cmd/dataset.rs +++ b/src/cli/cmd/dataset.rs @@ -1,9 +1,15 @@ pub mod deposit; pub mod zenodo; +mod zenodo_api; -use super::Context; -use clap::Parser; -use snafu::Snafu; +use super::Context as ParentContext; +use clap::{Parser, ValueEnum}; +use snafu::{ResultExt, Snafu}; + +#[derive(Debug, Clone, ValueEnum)] +pub enum Provider { + Zenodo, +} #[derive(Debug, Snafu)] pub enum Error { @@ -21,6 +27,8 @@ pub struct Input { #[derive(Parser, Debug)] pub enum DatasetCommand { Deposit { + #[arg(long, default_value = "zenodo")] + provider: Provider, #[command(subcommand)] cmd: DepositCommand, }, @@ -28,17 +36,45 @@ pub enum DatasetCommand { #[derive(Parser, Debug)] pub enum DepositCommand { - #[command()] + #[command(name = "cp")] CopyFiles(deposit::CopyInput), + #[command(name = "ls")] + ListDeposits(deposit::ListInput), + #[command(name = "lsf")] + ListFiles(deposit::ListFiles), +} + +pub struct Context { + pub parent: ParentContext, + pub provider: Provider, +} + +impl Context { + pub fn new(ctx: ParentContext, provider: Provider) -> Context { + Context { + parent: ctx, + provider, + } + } } impl Input { - pub async fn exec(&self, _ctx: Context) -> Result<(), Error> { - match self.subcmd { - DatasetCommand::Deposit { cmd: _ } => { - print!("Hi"); - Ok(()) - } + pub async fn exec(&self, ctx: ParentContext) -> Result<(), Error> { + match &self.subcmd { + DatasetCommand::Deposit { provider, cmd } => match cmd { + DepositCommand::CopyFiles(input) => input + .exec(Context::new(ctx, provider.clone())) + .await + .context(DepositSnafu), + DepositCommand::ListDeposits(input) => input + .exec(Context::new(ctx, provider.clone())) + .await + .context(DepositSnafu), + DepositCommand::ListFiles(input) => input + .exec(Context::new(ctx, provider.clone())) + .await + .context(DepositSnafu), + }, } } } diff --git a/src/cli/cmd/dataset/deposit.rs b/src/cli/cmd/dataset/deposit.rs index 17cbdd3..92117ce 100644 --- a/src/cli/cmd/dataset/deposit.rs +++ b/src/cli/cmd/dataset/deposit.rs @@ -1,14 +1,10 @@ use super::zenodo; -use super::Context; -use clap::{Parser, ValueEnum}; +use super::{Context, Provider}; +use clap::Parser; use snafu::{ResultExt, Snafu}; use std::env::VarError; use std::path::PathBuf; - -#[derive(Debug, Clone, ValueEnum)] -pub enum Provider { - Zenodo, -} +use tabled::builder::Builder; #[derive(Debug, Snafu)] pub enum Error { @@ -22,25 +18,21 @@ pub enum Error { /// Copies the data from a location into a data deposit #[derive(Parser, Debug)] pub struct CopyInput { - /// The id of the deposit the data should be copied to. - #[arg()] - pub deposit_id: String, - - /// The provider for the dataset - #[arg()] - pub provider: Provider, - /// The source directory where the files to be copied can be found. #[arg()] pub source_dir: PathBuf, + + /// The id of the deposit the data should be copied to. + #[arg()] + pub deposit_id: String, } impl CopyInput { - pub async fn exec(&self, _ctx: Context) -> Result<(), Error> { - match self.provider { + pub async fn exec(&self, ctx: Context) -> Result<(), Error> { + match ctx.provider { Provider::Zenodo => { let token = std::env::var("ZENODO_API_KEY").context(EnvVarMissingSnafu)?; - let clnt = zenodo::ZenodoClient::new(token); + let clnt = zenodo::ZenodoClient::new(token, ctx.parent.opts.verbose > 1); clnt.upload_files(&self.deposit_id, &self.source_dir) .await .context(ZenodoSnafu) @@ -48,3 +40,60 @@ impl CopyInput { } } } + +/// List all depositions for the specific provider +#[derive(Parser, Debug)] +pub struct ListInput {} + +impl ListInput { + pub async fn exec(&self, ctx: Context) -> Result<(), Error> { + match ctx.provider { + Provider::Zenodo => { + let token = std::env::var("ZENODO_API_KEY").context(EnvVarMissingSnafu)?; + let clnt = zenodo::ZenodoClient::new(token, ctx.parent.opts.verbose > 1); + let deps = clnt.get_depositions().await.context(ZenodoSnafu)?; + let mut table = Builder::default(); + table.push_record(["ID", "Title", "State", "Created at"]); + for d in deps { + table.push_record([ + d.id.to_string(), + d.title, + d.state, + d.created + .to_rfc3339_opts(chrono::SecondsFormat::Secs, false), + ]); + } + println!("{}", table.build()); + Ok(()) + } + } + } +} + +/// List all files in a specific deposit +#[derive(Parser, Debug)] +pub struct ListFiles { + deposit_id: String, +} + +impl ListFiles { + pub async fn exec(&self, ctx: Context) -> Result<(), Error> { + match ctx.provider { + Provider::Zenodo => { + let token = std::env::var("ZENODO_API_KEY").context(EnvVarMissingSnafu)?; + let clnt = zenodo::ZenodoClient::new(token, ctx.parent.opts.verbose > 1); + let files = clnt + .list_files(&self.deposit_id) + .await + .context(ZenodoSnafu)?; + let mut table = Builder::default(); + table.push_record(["Filename", "Size", "Checksum"]); + for f in files { + table.push_record([f.filename, f.filesize.to_string(), f.checksum]); + } + println!("{}", table.build()); + Ok(()) + } + } + } +} diff --git a/src/cli/cmd/dataset/zenodo.rs b/src/cli/cmd/dataset/zenodo.rs index 3289786..a769db3 100644 --- a/src/cli/cmd/dataset/zenodo.rs +++ b/src/cli/cmd/dataset/zenodo.rs @@ -1,13 +1,19 @@ -use mime_guess::from_path; -use reqwest; +use futures::stream::StreamExt; +use md5::Context as Md5Context; +use reqwest::header::{HeaderMap, ACCEPT, CONTENT_LENGTH, CONTENT_TYPE}; +use reqwest::{self, Body, Response, StatusCode}; use serde::de::DeserializeOwned; use snafu::{ResultExt, Snafu}; -use std::path::{Path, PathBuf}; +use std::path::{Path, PathBuf, StripPrefixError}; use tokio::fs::File; use tokio_util::codec::{BytesCodec, FramedRead}; +use tokio_util::io::ReaderStream; use url::{ParseError, Url}; -use walkdir::WalkDir; +use walkdir::{DirEntry, WalkDir}; +use crate::cli::cmd::dataset::zenodo_api::{FileResponse, FileUploadResponse}; + +use super::zenodo_api::DepositionResponse; use std::io; use std::sync::LazyLock; @@ -15,6 +21,7 @@ pub struct ZenodoClient { http_client: reqwest::Client, base_url: Url, token: String, + debug: bool, } static BASE_URL: LazyLock = @@ -24,25 +31,52 @@ static BASE_URL: LazyLock = pub enum Error { #[snafu(display("A zendoo client error occured: {}", source))] Reqwest { source: reqwest::Error }, + #[snafu(display("A directory listing error occured: {}", source))] DirWalk { source: walkdir::Error }, + #[snafu(display("An error occured reading the response: {}", source))] DeserializeResp { source: reqwest::Error }, - #[snafu(display("An error occured reading the response: {}", source))] + + #[snafu(display("An error occured reading the file: {}", source))] FileReading { source: io::Error }, + #[snafu(display("An error occured parsing the file path: {}", fp.to_string_lossy()))] FileParsing { fp: PathBuf }, + #[snafu(display("An error occured parsing the url {}", source))] UrlParse { source: ParseError }, + + #[snafu(display("Stripping file prefix failed {}", source))] + StripPathPrefix { source: StripPrefixError }, + + #[snafu(display("An error occured desearializing the response to json: {}", source))] + DeserializeJson { source: serde_json::Error }, + + #[snafu(display( + "The request to zenodo at {} resulted in an unexpected status code {}", + url, + status_code, + ))] + FailedRequest { url: Url, status_code: StatusCode }, + + #[snafu(display("Directories like {} cannot be uploaded in Zenodo, please zip the directory and then upload the archive.", fp.display()))] + DirUpload { fp: PathBuf }, } impl ZenodoClient { - pub fn new(token: String) -> ZenodoClient { - let http_client = reqwest::Client::new(); + pub fn new(token: String, debug: bool) -> ZenodoClient { + let mut headers = HeaderMap::new(); + headers.insert(ACCEPT, "application/json".parse().unwrap()); + let http_client = reqwest::Client::builder() + .user_agent("SDSC/renku") // Zenodo rejects the reqwest User Agent + .build() + .unwrap(); ZenodoClient { http_client, base_url: BASE_URL.clone(), token, + debug, } } @@ -50,10 +84,19 @@ impl ZenodoClient { self.base_url.join(path).context(UrlParseSnafu) } - pub async fn get_deposition( - &self, - deposition_id: &str, - ) -> Result { + pub async fn get_depositions(&self) -> Result, Error> { + let endpoint = self.make_url("/api/deposit/depositions")?; + let res = self + .http_client + .get(endpoint) + .bearer_auth(&self.token) + .send() + .await + .context(ReqwestSnafu)?; + Self::json_parse(res, self.debug).await + } + + pub async fn get_deposition(&self, deposition_id: &str) -> Result { let endpoint = self.make_url(&format!("/api/deposit/depositions/{deposition_id}"))?; let res = self .http_client @@ -62,19 +105,57 @@ impl ZenodoClient { .send() .await .context(ReqwestSnafu)?; - // res.error_for_status().context(ReqwestSnafu)?; - res.json::().await.context(DeserializeRespSnafu) + Self::json_parse(res, self.debug).await } pub async fn upload_files(&self, deposition_id: &str, source_path: &Path) -> Result<(), Error> { - for f in WalkDir::new(source_path) { - self.upload_file::<()>(deposition_id, f.context(DirWalkSnafu)?.path()) + let dep = self.get_deposition(deposition_id).await?; + if dep.links.bucket.is_none() { + println!("Could not find the expected bucket link in the deposit"); + return Ok(()); + } + let bucket = &dep.links.bucket.unwrap(); + fn is_hidden(entry: &DirEntry) -> bool { + entry + .file_name() + .to_str() + .map(|s| s.starts_with(".")) + .unwrap_or(false) + } + let walker = WalkDir::new(source_path).into_iter(); + for (f_ind, f) in walker.filter_entry(|e| !is_hidden(e)).enumerate() { + let path = f.context(DirWalkSnafu)?; + let path_std = path.path(); + if f_ind == 0 && path_std.is_dir() { + // If the source path is a dir the first entry of the walk is the same dir that was + // passed + continue; + } + if path_std.is_dir() { + // If a directory is encountered we fail because zenodo does not support + // uploading directories + return Err(Error::DirUpload { + fp: path_std.to_path_buf(), + }); + } + let remote_path = path_std + .file_name() + .ok_or(Error::FileParsing { + fp: path_std.to_path_buf(), + })? + .to_str() + .ok_or(Error::FileParsing { + fp: path_std.to_path_buf(), + })?; + log::info!("uploading file {} -> {}", path_std.display(), remote_path); + let existing_files = self.list_files(deposition_id).await?; + self.upload_file(bucket, path_std, remote_path, existing_files) .await?; } Ok(()) } - pub async fn list_files(&self, deposition_id: &str) -> Result { + pub async fn list_files(&self, deposition_id: &str) -> Result, Error> { let endpoint = self.make_url(&format!("/api/deposit/depositions/{deposition_id}/files"))?; let res = self .http_client @@ -83,44 +164,96 @@ impl ZenodoClient { .send() .await .context(ReqwestSnafu)?; - res.json::().await.context(DeserializeRespSnafu) + Self::json_parse(res, self.debug).await } - async fn upload_file( + async fn upload_file( &self, - deposition_id: &str, - file_path: &Path, - ) -> Result { - let file = File::open(file_path).await.context(FileReadingSnafu)?; - let endpoint = self.make_url(&format!("/api/deposit/depositions/{deposition_id}/files"))?; + bucket_url: &str, + local_file: &Path, + remote_file: &str, + existing_files: Vec, + ) -> Result { + let file = File::open(local_file).await.context(FileReadingSnafu)?; + let metadata = file.metadata().await.context(FileReadingSnafu)?; + let file_size = metadata.len(); + let file_may_exist = existing_files + .iter() + .find(|f| f.filename == remote_file && f.filesize == file_size as f64); + if let Some(existing_file) = file_may_exist { + let hash = Self::md5_hash(local_file).await?; + if hash == file_may_exist.unwrap().checksum { + log::info!( + "File {} exists already and hash matches, skipping.", + remote_file + ); + return Ok(FileUploadResponse { + key: existing_file.filename.to_owned(), + mimetype: "".to_owned(), + checksum: existing_file.checksum.to_owned(), + size: existing_file.filesize as u64, + }); + } + } let stream = FramedRead::new(file, BytesCodec::new()); - let file_name = file_path - .file_name() - .ok_or(Error::FileParsing { - fp: file_path.to_path_buf(), - })? - .to_str() - .ok_or(Error::FileParsing { - fp: file_path.to_path_buf(), - })?; - let mime_type = from_path(file_path).first_or_octet_stream(); - let form = reqwest::multipart::Form::new() - .text("name", file_name.to_owned()) - .part( - "file", - reqwest::multipart::Part::stream(reqwest::Body::wrap_stream(stream)) - .file_name(file_name.to_owned()) - .mime_str(mime_type.as_ref()) - .context(ReqwestSnafu)?, - ); + let file_body = Body::wrap_stream(stream); let res = self .http_client - .post(endpoint) + .put(format!( + "{}/{}", + bucket_url.strip_suffix("/").unwrap_or(bucket_url), + remote_file.strip_prefix("/").unwrap_or(remote_file) + )) + .header(CONTENT_TYPE, "application/octet-stream") + .header(CONTENT_LENGTH, file_size) .bearer_auth(&self.token) - .multipart(form) + .body(file_body) .send() .await .context(ReqwestSnafu)?; - res.json::().await.context(DeserializeRespSnafu) + Self::json_parse(res, self.debug).await + } + + async fn json_parse(resp: Response, debug: bool) -> Result { + let url = resp.url().to_owned(); + let status = resp.status(); + if !resp.status().is_success() { + let body = resp.text().await.context(DeserializeRespSnafu).unwrap(); + log::debug!( + "The request at {} failed with status code {}, body {}", + url, + status, + body, + ); + return Err(Error::FailedRequest { + url: url.to_owned(), + status_code: status, + }); + } + if debug { + let body = resp.text().await.context(DeserializeRespSnafu).unwrap(); + log::debug!( + "Zenodo client request at {} responded with code {} and body {}", + url, + status, + body + ); + serde_json::from_str::(&body).context(DeserializeJsonSnafu) + } else { + resp.json::().await.context(DeserializeRespSnafu) + } + } + + async fn md5_hash(file_path: &Path) -> Result { + let file = File::open(file_path).await.context(FileReadingSnafu)?; + let mut reader = ReaderStream::new(file); + let mut hasher = Md5Context::new(); + + while let Some(chunk) = reader.next().await { + let chunk = chunk.context(FileReadingSnafu)?; // Handle IO error + hasher.consume(&chunk); + } + + Ok(format!("{:x}", hasher.finalize())) } } diff --git a/src/cli/cmd/dataset/zenodo_api.rs b/src/cli/cmd/dataset/zenodo_api.rs new file mode 100644 index 0000000..e0ea285 --- /dev/null +++ b/src/cli/cmd/dataset/zenodo_api.rs @@ -0,0 +1,31 @@ +use chrono::{DateTime, Local}; +use serde::Deserialize; + +#[derive(Deserialize, Debug)] +pub struct ZenodoLinks { + pub bucket: Option, +} + +#[derive(Deserialize, Debug)] +pub struct DepositionResponse { + pub id: u32, + pub title: String, + pub state: String, + pub created: DateTime, + pub links: ZenodoLinks, +} + +#[derive(Deserialize, Debug)] +pub struct FileResponse { + pub filename: String, + pub filesize: f64, + pub checksum: String, +} + +#[derive(Deserialize, Debug)] +pub struct FileUploadResponse { + pub key: String, + pub mimetype: String, + pub checksum: String, + pub size: u64, +} From 5082d9d5d01de72b0af4baa1b2b7b0ec06e5ac6b Mon Sep 17 00:00:00 2001 From: Tasko Olevski <16360283+olevski@users.noreply.github.com> Date: Mon, 16 Mar 2026 23:02:43 +0100 Subject: [PATCH 3/4] chore: add zenodo URL env var --- src/cli/cmd/dataset/zenodo.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/cli/cmd/dataset/zenodo.rs b/src/cli/cmd/dataset/zenodo.rs index a769db3..58f1498 100644 --- a/src/cli/cmd/dataset/zenodo.rs +++ b/src/cli/cmd/dataset/zenodo.rs @@ -24,8 +24,10 @@ pub struct ZenodoClient { debug: bool, } -static BASE_URL: LazyLock = - LazyLock::new(|| Url::parse("https://zenodo.org").expect("Invalid Base URL config")); +static BASE_URL: LazyLock = LazyLock::new(|| { + Url::parse(&std::env::var("ZENODO_URL").unwrap_or("https://zenodo.org".to_string())) + .expect("Invalid Base URL config") +}); #[derive(Debug, Snafu)] pub enum Error { From 45f53a2cbdb1b5ee46f30cc462d02f8dbccbb7bb Mon Sep 17 00:00:00 2001 From: Tasko Olevski <16360283+olevski@users.noreply.github.com> Date: Thu, 19 Mar 2026 11:18:01 +0100 Subject: [PATCH 4/4] squashme: minor fixes --- Cargo.lock | 71 ++++++++++++++++++++++------------ Cargo.toml | 3 +- src/cli/cmd/dataset/deposit.rs | 30 ++++++++++++-- src/cli/cmd/dataset/zenodo.rs | 2 +- 4 files changed, 77 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 68762cc..599a4d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -268,18 +268,18 @@ version = "3.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" -[[package]] -name = "byteorder" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" - [[package]] name = "bytecount" version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.11.1" @@ -512,7 +512,6 @@ checksum = "d64e8af5551369d19cf50138de61f1c42074ab970f74e99be916646777f8fc87" dependencies = [ "encode_unicode", "libc", - "once_cell", "unicode-width", "windows-sys 0.61.2", ] @@ -2849,6 +2848,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls 0.26.4", + "tokio-util", "tower", "tower-http", "tower-service", @@ -2890,6 +2890,7 @@ version = "0.0.0" dependencies = [ "assert_cmd", "better-panic", + "chrono", "clap", "clap-verbosity-flag", "clap_complete", @@ -2904,6 +2905,7 @@ dependencies = [ "iso8601-timestamp", "libc", "log", + "md5", "oauth2", "openidconnect", "openssl", @@ -2916,6 +2918,7 @@ dependencies = [ "snafu", "tabled", "tokio", + "tokio-util", "toml 0.8.23", "url", "vergen", @@ -3591,6 +3594,30 @@ dependencies = [ "libc", ] +[[package]] +name = "tabled" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e39a2ee1fbcd360805a771e1b300f78cc88fec7b8d3e2f71cd37bbf23e725c7d" +dependencies = [ + "papergrid", + "tabled_derive", + "testing_table", +] + +[[package]] +name = "tabled_derive" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ea5d1b13ca6cff1f9231ffd62f15eefd72543dab5e468735f1a456728a02846" +dependencies = [ + "heck", + "proc-macro-error2", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tar" version = "0.4.44" @@ -4290,6 +4317,19 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.244.0" @@ -4480,23 +4520,6 @@ dependencies = [ "windows-link 0.1.3", ] -[[package]] -name = "windows-interface" -version = "0.59.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "windows-link" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" - [[package]] name = "windows-result" version = "0.3.4" diff --git a/Cargo.toml b/Cargo.toml index af1c379..ac47d42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ openssl = { version = "0.10.75", optional = true } reqwest = { version = "0.12.28", default-features = false, features = [ "json", "multipart", + "stream", ] } serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" @@ -65,7 +66,7 @@ default = ["reqwest/default-tls"] # link against system library rustls = [ "reqwest/rustls-tls", "openidconnect/rustls-tls", - "self_update/rustls" + "self_update/rustls", ] # include rustls, ssl library written in rust vendored-openssl = ["openssl/vendored"] # include compiled openssl library vendored-zlib = [ diff --git a/src/cli/cmd/dataset/deposit.rs b/src/cli/cmd/dataset/deposit.rs index 92117ce..960dd09 100644 --- a/src/cli/cmd/dataset/deposit.rs +++ b/src/cli/cmd/dataset/deposit.rs @@ -32,7 +32,15 @@ impl CopyInput { match ctx.provider { Provider::Zenodo => { let token = std::env::var("ZENODO_API_KEY").context(EnvVarMissingSnafu)?; - let clnt = zenodo::ZenodoClient::new(token, ctx.parent.opts.verbose > 1); + let clnt = zenodo::ZenodoClient::new( + token, + ctx.parent + .opts + .verbosity + .log_level() + .unwrap_or(log::Level::Warn) + > log::Level::Info, + ); clnt.upload_files(&self.deposit_id, &self.source_dir) .await .context(ZenodoSnafu) @@ -50,7 +58,15 @@ impl ListInput { match ctx.provider { Provider::Zenodo => { let token = std::env::var("ZENODO_API_KEY").context(EnvVarMissingSnafu)?; - let clnt = zenodo::ZenodoClient::new(token, ctx.parent.opts.verbose > 1); + let clnt = zenodo::ZenodoClient::new( + token, + ctx.parent + .opts + .verbosity + .log_level() + .unwrap_or(log::Level::Warn) + > log::Level::Info, + ); let deps = clnt.get_depositions().await.context(ZenodoSnafu)?; let mut table = Builder::default(); table.push_record(["ID", "Title", "State", "Created at"]); @@ -81,7 +97,15 @@ impl ListFiles { match ctx.provider { Provider::Zenodo => { let token = std::env::var("ZENODO_API_KEY").context(EnvVarMissingSnafu)?; - let clnt = zenodo::ZenodoClient::new(token, ctx.parent.opts.verbose > 1); + let clnt = zenodo::ZenodoClient::new( + token, + ctx.parent + .opts + .verbosity + .log_level() + .unwrap_or(log::Level::Warn) + > log::Level::Info, + ); let files = clnt .list_files(&self.deposit_id) .await diff --git a/src/cli/cmd/dataset/zenodo.rs b/src/cli/cmd/dataset/zenodo.rs index 58f1498..557db5a 100644 --- a/src/cli/cmd/dataset/zenodo.rs +++ b/src/cli/cmd/dataset/zenodo.rs @@ -1,6 +1,6 @@ use futures::stream::StreamExt; use md5::Context as Md5Context; -use reqwest::header::{HeaderMap, ACCEPT, CONTENT_LENGTH, CONTENT_TYPE}; +use reqwest::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE, HeaderMap}; use reqwest::{self, Body, Response, StatusCode}; use serde::de::DeserializeOwned; use snafu::{ResultExt, Snafu};