diff --git a/Cargo.lock b/Cargo.lock index ca80b3d..599a4d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -268,6 +268,12 @@ version = "3.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" +[[package]] +name = "bytecount" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e" + [[package]] name = "byteorder" version = "1.5.0" @@ -1957,6 +1963,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "md5" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae960838283323069879657ca3de837e9f7bbb4c7bf6ea7f1b290d5e9476d2e0" + [[package]] name = "memchr" version = "2.8.0" @@ -2312,6 +2324,17 @@ dependencies = [ "sha2", ] +[[package]] +name = "papergrid" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6978128c8b51d8f4080631ceb2302ab51e32cc6e8615f735ee2f83fd269ae3f1" +dependencies = [ + "bytecount", + "fnv", + "unicode-width", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -2496,6 +2519,28 @@ dependencies = [ "elliptic-curve", ] +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -2803,12 +2848,14 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls 0.26.4", + "tokio-util", "tower", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 1.0.6", ] @@ -2843,6 +2890,7 @@ version = "0.0.0" dependencies = [ "assert_cmd", "better-panic", + "chrono", "clap", "clap-verbosity-flag", "clap_complete", @@ -2857,6 +2905,7 @@ dependencies = [ "iso8601-timestamp", "libc", "log", + "md5", "oauth2", "openidconnect", "openssl", @@ -2867,10 +2916,13 @@ dependencies = [ "serde", "serde_json", "snafu", + "tabled", "tokio", + "tokio-util", "toml 0.8.23", "url", "vergen", + "walkdir", ] [[package]] @@ -3542,6 +3594,30 @@ dependencies = [ "libc", ] +[[package]] +name = "tabled" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e39a2ee1fbcd360805a771e1b300f78cc88fec7b8d3e2f71cd37bbf23e725c7d" +dependencies = [ + "papergrid", + "tabled_derive", + "testing_table", +] + +[[package]] +name = "tabled_derive" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ea5d1b13ca6cff1f9231ffd62f15eefd72543dab5e468735f1a456728a02846" +dependencies = [ + "heck", + "proc-macro-error2", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tar" version = "0.4.44" @@ -3582,6 +3658,15 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" +[[package]] +name = "testing_table" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f8daae29995a24f65619e19d8d31dea5b389f3d853d8bf297bbf607cd0014cc" +dependencies = [ + "unicode-width", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -4232,6 +4317,19 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.244.0" diff --git a/Cargo.toml b/Cargo.toml index 3d6a6bf..ac47d42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,11 +23,16 @@ openssl = { version = "0.10.75", optional = true } reqwest = { version = "0.12.28", default-features = false, features = [ "json", "multipart", + "stream", ] } serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" snafu = { version = "0.8.5" } tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", features = ["codec"] } +walkdir = "2" +tabled = "0.20.0" +chrono = "0.4.43" futures = { version = "0.3" } regex = { version = "1.12.3" } iso8601-timestamp = { version = "0.2.17" } @@ -54,13 +59,14 @@ self_update = { version = "0.43.1", features = [ "archive-zip", "compression-flate2", ] } +md5 = "0.8.0" [features] default = ["reqwest/default-tls"] # link against system library rustls = [ "reqwest/rustls-tls", "openidconnect/rustls-tls", - "self_update/rustls" + "self_update/rustls", ] # include rustls, ssl library written in rust vendored-openssl = ["openssl/vendored"] # include compiled openssl library vendored-zlib = [ diff --git a/src/cli.rs b/src/cli.rs index c01876d..17b8f12 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -25,6 +25,8 @@ pub async fn execute_cmd(opts: MainOpts) -> Result<(), CmdError> { #[cfg(feature = "user-doc")] SubCommand::UserDoc(input) => input.exec(ctx).await?, + + SubCommand::Dataset(input) => input.exec(ctx).await?, }; Ok(()) } diff --git a/src/cli/cmd.rs b/src/cli/cmd.rs index 59b15d3..0750340 100644 --- a/src/cli/cmd.rs +++ b/src/cli/cmd.rs @@ -1,3 +1,4 @@ +pub mod dataset; pub mod login; pub mod project; pub mod update; @@ -109,6 +110,9 @@ pub enum CmdError { #[cfg(feature = "user-doc")] #[snafu(display("UserDoc - {}", source))] UserDoc { source: userdoc::Error }, + + #[snafu(display("Dataset - {}", source))] + Dataset { source: dataset::Error }, } impl From for CmdError { @@ -140,3 +144,9 @@ impl From for CmdError { CmdError::Login { source } } } + +impl From for CmdError { + fn from(source: dataset::Error) -> Self { + CmdError::Dataset { source } + } +} diff --git a/src/cli/cmd/dataset.rs b/src/cli/cmd/dataset.rs new file mode 100644 index 0000000..4c6d36a --- /dev/null +++ b/src/cli/cmd/dataset.rs @@ -0,0 +1,80 @@ +pub mod deposit; +pub mod zenodo; +mod zenodo_api; + +use super::Context as ParentContext; +use clap::{Parser, ValueEnum}; +use snafu::{ResultExt, Snafu}; + +#[derive(Debug, Clone, ValueEnum)] +pub enum Provider { + Zenodo, +} + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Error with deposit: {}", source))] + Deposit { source: deposit::Error }, +} + +/// Sub command for managing datasets +#[derive(Parser, Debug)] +pub struct Input { + #[command(subcommand)] + pub subcmd: DatasetCommand, +} + +#[derive(Parser, Debug)] +pub enum DatasetCommand { + Deposit { + #[arg(long, default_value = "zenodo")] + provider: Provider, + #[command(subcommand)] + cmd: DepositCommand, + }, +} + +#[derive(Parser, Debug)] +pub enum DepositCommand { + #[command(name = "cp")] + CopyFiles(deposit::CopyInput), + #[command(name = "ls")] + ListDeposits(deposit::ListInput), + #[command(name = "lsf")] + ListFiles(deposit::ListFiles), +} + +pub struct Context { + pub parent: ParentContext, + pub provider: Provider, +} + +impl Context { + pub fn new(ctx: ParentContext, provider: Provider) -> Context { + Context { + parent: ctx, + provider, + } + } +} + +impl Input { + pub async fn exec(&self, ctx: ParentContext) -> Result<(), Error> { + match &self.subcmd { + DatasetCommand::Deposit { provider, cmd } => match cmd { + DepositCommand::CopyFiles(input) => input + .exec(Context::new(ctx, provider.clone())) + .await + .context(DepositSnafu), + DepositCommand::ListDeposits(input) => input + .exec(Context::new(ctx, provider.clone())) + .await + .context(DepositSnafu), + DepositCommand::ListFiles(input) => input + .exec(Context::new(ctx, provider.clone())) + .await + .context(DepositSnafu), + }, + } + } +} diff --git a/src/cli/cmd/dataset/deposit.rs b/src/cli/cmd/dataset/deposit.rs new file mode 100644 index 0000000..960dd09 --- /dev/null +++ b/src/cli/cmd/dataset/deposit.rs @@ -0,0 +1,123 @@ +use super::zenodo; +use super::{Context, Provider}; +use clap::Parser; +use snafu::{ResultExt, Snafu}; +use std::env::VarError; +use std::path::PathBuf; +use tabled::builder::Builder; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("A dataset deposit error occured: {}", source))] + Zenodo { source: zenodo::Error }, + + #[snafu(display("Env variable error: {}", source))] + EnvVarMissing { source: VarError }, +} + +/// Copies the data from a location into a data deposit +#[derive(Parser, Debug)] +pub struct CopyInput { + /// The source directory where the files to be copied can be found. + #[arg()] + pub source_dir: PathBuf, + + /// The id of the deposit the data should be copied to. + #[arg()] + pub deposit_id: String, +} + +impl CopyInput { + pub async fn exec(&self, ctx: Context) -> Result<(), Error> { + match ctx.provider { + Provider::Zenodo => { + let token = std::env::var("ZENODO_API_KEY").context(EnvVarMissingSnafu)?; + let clnt = zenodo::ZenodoClient::new( + token, + ctx.parent + .opts + .verbosity + .log_level() + .unwrap_or(log::Level::Warn) + > log::Level::Info, + ); + clnt.upload_files(&self.deposit_id, &self.source_dir) + .await + .context(ZenodoSnafu) + } + } + } +} + +/// List all depositions for the specific provider +#[derive(Parser, Debug)] +pub struct ListInput {} + +impl ListInput { + pub async fn exec(&self, ctx: Context) -> Result<(), Error> { + match ctx.provider { + Provider::Zenodo => { + let token = std::env::var("ZENODO_API_KEY").context(EnvVarMissingSnafu)?; + let clnt = zenodo::ZenodoClient::new( + token, + ctx.parent + .opts + .verbosity + .log_level() + .unwrap_or(log::Level::Warn) + > log::Level::Info, + ); + let deps = clnt.get_depositions().await.context(ZenodoSnafu)?; + let mut table = Builder::default(); + table.push_record(["ID", "Title", "State", "Created at"]); + for d in deps { + table.push_record([ + d.id.to_string(), + d.title, + d.state, + d.created + .to_rfc3339_opts(chrono::SecondsFormat::Secs, false), + ]); + } + println!("{}", table.build()); + Ok(()) + } + } + } +} + +/// List all files in a specific deposit +#[derive(Parser, Debug)] +pub struct ListFiles { + deposit_id: String, +} + +impl ListFiles { + pub async fn exec(&self, ctx: Context) -> Result<(), Error> { + match ctx.provider { + Provider::Zenodo => { + let token = std::env::var("ZENODO_API_KEY").context(EnvVarMissingSnafu)?; + let clnt = zenodo::ZenodoClient::new( + token, + ctx.parent + .opts + .verbosity + .log_level() + .unwrap_or(log::Level::Warn) + > log::Level::Info, + ); + let files = clnt + .list_files(&self.deposit_id) + .await + .context(ZenodoSnafu)?; + let mut table = Builder::default(); + table.push_record(["Filename", "Size", "Checksum"]); + for f in files { + table.push_record([f.filename, f.filesize.to_string(), f.checksum]); + } + println!("{}", table.build()); + Ok(()) + } + } + } +} diff --git a/src/cli/cmd/dataset/zenodo.rs b/src/cli/cmd/dataset/zenodo.rs new file mode 100644 index 0000000..557db5a --- /dev/null +++ b/src/cli/cmd/dataset/zenodo.rs @@ -0,0 +1,261 @@ +use futures::stream::StreamExt; +use md5::Context as Md5Context; +use reqwest::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE, HeaderMap}; +use reqwest::{self, Body, Response, StatusCode}; +use serde::de::DeserializeOwned; +use snafu::{ResultExt, Snafu}; +use std::path::{Path, PathBuf, StripPrefixError}; +use tokio::fs::File; +use tokio_util::codec::{BytesCodec, FramedRead}; +use tokio_util::io::ReaderStream; +use url::{ParseError, Url}; +use walkdir::{DirEntry, WalkDir}; + +use crate::cli::cmd::dataset::zenodo_api::{FileResponse, FileUploadResponse}; + +use super::zenodo_api::DepositionResponse; +use std::io; +use std::sync::LazyLock; + +pub struct ZenodoClient { + http_client: reqwest::Client, + base_url: Url, + token: String, + debug: bool, +} + +static BASE_URL: LazyLock = LazyLock::new(|| { + Url::parse(&std::env::var("ZENODO_URL").unwrap_or("https://zenodo.org".to_string())) + .expect("Invalid Base URL config") +}); + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("A zendoo client error occured: {}", source))] + Reqwest { source: reqwest::Error }, + + #[snafu(display("A directory listing error occured: {}", source))] + DirWalk { source: walkdir::Error }, + + #[snafu(display("An error occured reading the response: {}", source))] + DeserializeResp { source: reqwest::Error }, + + #[snafu(display("An error occured reading the file: {}", source))] + FileReading { source: io::Error }, + + #[snafu(display("An error occured parsing the file path: {}", fp.to_string_lossy()))] + FileParsing { fp: PathBuf }, + + #[snafu(display("An error occured parsing the url {}", source))] + UrlParse { source: ParseError }, + + #[snafu(display("Stripping file prefix failed {}", source))] + StripPathPrefix { source: StripPrefixError }, + + #[snafu(display("An error occured desearializing the response to json: {}", source))] + DeserializeJson { source: serde_json::Error }, + + #[snafu(display( + "The request to zenodo at {} resulted in an unexpected status code {}", + url, + status_code, + ))] + FailedRequest { url: Url, status_code: StatusCode }, + + #[snafu(display("Directories like {} cannot be uploaded in Zenodo, please zip the directory and then upload the archive.", fp.display()))] + DirUpload { fp: PathBuf }, +} + +impl ZenodoClient { + pub fn new(token: String, debug: bool) -> ZenodoClient { + let mut headers = HeaderMap::new(); + headers.insert(ACCEPT, "application/json".parse().unwrap()); + let http_client = reqwest::Client::builder() + .user_agent("SDSC/renku") // Zenodo rejects the reqwest User Agent + .build() + .unwrap(); + ZenodoClient { + http_client, + base_url: BASE_URL.clone(), + token, + debug, + } + } + + fn make_url(&self, path: &str) -> Result { + self.base_url.join(path).context(UrlParseSnafu) + } + + pub async fn get_depositions(&self) -> Result, Error> { + let endpoint = self.make_url("/api/deposit/depositions")?; + let res = self + .http_client + .get(endpoint) + .bearer_auth(&self.token) + .send() + .await + .context(ReqwestSnafu)?; + Self::json_parse(res, self.debug).await + } + + pub async fn get_deposition(&self, deposition_id: &str) -> Result { + let endpoint = self.make_url(&format!("/api/deposit/depositions/{deposition_id}"))?; + let res = self + .http_client + .get(endpoint) + .bearer_auth(&self.token) + .send() + .await + .context(ReqwestSnafu)?; + Self::json_parse(res, self.debug).await + } + + pub async fn upload_files(&self, deposition_id: &str, source_path: &Path) -> Result<(), Error> { + let dep = self.get_deposition(deposition_id).await?; + if dep.links.bucket.is_none() { + println!("Could not find the expected bucket link in the deposit"); + return Ok(()); + } + let bucket = &dep.links.bucket.unwrap(); + fn is_hidden(entry: &DirEntry) -> bool { + entry + .file_name() + .to_str() + .map(|s| s.starts_with(".")) + .unwrap_or(false) + } + let walker = WalkDir::new(source_path).into_iter(); + for (f_ind, f) in walker.filter_entry(|e| !is_hidden(e)).enumerate() { + let path = f.context(DirWalkSnafu)?; + let path_std = path.path(); + if f_ind == 0 && path_std.is_dir() { + // If the source path is a dir the first entry of the walk is the same dir that was + // passed + continue; + } + if path_std.is_dir() { + // If a directory is encountered we fail because zenodo does not support + // uploading directories + return Err(Error::DirUpload { + fp: path_std.to_path_buf(), + }); + } + let remote_path = path_std + .file_name() + .ok_or(Error::FileParsing { + fp: path_std.to_path_buf(), + })? + .to_str() + .ok_or(Error::FileParsing { + fp: path_std.to_path_buf(), + })?; + log::info!("uploading file {} -> {}", path_std.display(), remote_path); + let existing_files = self.list_files(deposition_id).await?; + self.upload_file(bucket, path_std, remote_path, existing_files) + .await?; + } + Ok(()) + } + + pub async fn list_files(&self, deposition_id: &str) -> Result, Error> { + let endpoint = self.make_url(&format!("/api/deposit/depositions/{deposition_id}/files"))?; + let res = self + .http_client + .get(endpoint) + .bearer_auth(&self.token) + .send() + .await + .context(ReqwestSnafu)?; + Self::json_parse(res, self.debug).await + } + + async fn upload_file( + &self, + bucket_url: &str, + local_file: &Path, + remote_file: &str, + existing_files: Vec, + ) -> Result { + let file = File::open(local_file).await.context(FileReadingSnafu)?; + let metadata = file.metadata().await.context(FileReadingSnafu)?; + let file_size = metadata.len(); + let file_may_exist = existing_files + .iter() + .find(|f| f.filename == remote_file && f.filesize == file_size as f64); + if let Some(existing_file) = file_may_exist { + let hash = Self::md5_hash(local_file).await?; + if hash == file_may_exist.unwrap().checksum { + log::info!( + "File {} exists already and hash matches, skipping.", + remote_file + ); + return Ok(FileUploadResponse { + key: existing_file.filename.to_owned(), + mimetype: "".to_owned(), + checksum: existing_file.checksum.to_owned(), + size: existing_file.filesize as u64, + }); + } + } + let stream = FramedRead::new(file, BytesCodec::new()); + let file_body = Body::wrap_stream(stream); + let res = self + .http_client + .put(format!( + "{}/{}", + bucket_url.strip_suffix("/").unwrap_or(bucket_url), + remote_file.strip_prefix("/").unwrap_or(remote_file) + )) + .header(CONTENT_TYPE, "application/octet-stream") + .header(CONTENT_LENGTH, file_size) + .bearer_auth(&self.token) + .body(file_body) + .send() + .await + .context(ReqwestSnafu)?; + Self::json_parse(res, self.debug).await + } + + async fn json_parse(resp: Response, debug: bool) -> Result { + let url = resp.url().to_owned(); + let status = resp.status(); + if !resp.status().is_success() { + let body = resp.text().await.context(DeserializeRespSnafu).unwrap(); + log::debug!( + "The request at {} failed with status code {}, body {}", + url, + status, + body, + ); + return Err(Error::FailedRequest { + url: url.to_owned(), + status_code: status, + }); + } + if debug { + let body = resp.text().await.context(DeserializeRespSnafu).unwrap(); + log::debug!( + "Zenodo client request at {} responded with code {} and body {}", + url, + status, + body + ); + serde_json::from_str::(&body).context(DeserializeJsonSnafu) + } else { + resp.json::().await.context(DeserializeRespSnafu) + } + } + + async fn md5_hash(file_path: &Path) -> Result { + let file = File::open(file_path).await.context(FileReadingSnafu)?; + let mut reader = ReaderStream::new(file); + let mut hasher = Md5Context::new(); + + while let Some(chunk) = reader.next().await { + let chunk = chunk.context(FileReadingSnafu)?; // Handle IO error + hasher.consume(&chunk); + } + + Ok(format!("{:x}", hasher.finalize())) + } +} diff --git a/src/cli/cmd/dataset/zenodo_api.rs b/src/cli/cmd/dataset/zenodo_api.rs new file mode 100644 index 0000000..e0ea285 --- /dev/null +++ b/src/cli/cmd/dataset/zenodo_api.rs @@ -0,0 +1,31 @@ +use chrono::{DateTime, Local}; +use serde::Deserialize; + +#[derive(Deserialize, Debug)] +pub struct ZenodoLinks { + pub bucket: Option, +} + +#[derive(Deserialize, Debug)] +pub struct DepositionResponse { + pub id: u32, + pub title: String, + pub state: String, + pub created: DateTime, + pub links: ZenodoLinks, +} + +#[derive(Deserialize, Debug)] +pub struct FileResponse { + pub filename: String, + pub filesize: f64, + pub checksum: String, +} + +#[derive(Deserialize, Debug)] +pub struct FileUploadResponse { + pub key: String, + pub mimetype: String, + pub checksum: String, + pub size: u64, +} diff --git a/src/cli/cmd/login.rs b/src/cli/cmd/login.rs index 88231c5..d372c87 100644 --- a/src/cli/cmd/login.rs +++ b/src/cli/cmd/login.rs @@ -68,7 +68,7 @@ enum Steps<'a> { Complete, } impl Input { - fn get_steps(&self) -> Steps<'_> { + fn get_steps(&'_ self) -> Steps<'_> { if let Some(p) = &self.continue_from { Steps::Continue(p) } else if self.user_code_only { diff --git a/src/cli/cmd/version.rs b/src/cli/cmd/version.rs index a9fdee0..917df2d 100644 --- a/src/cli/cmd/version.rs +++ b/src/cli/cmd/version.rs @@ -58,7 +58,7 @@ pub struct Versions<'a> { pub renku_url: &'a str, } impl Versions<'_> { - pub fn create(server: VersionInfo, renku_url: &str) -> Versions<'_> { + pub fn create(server: VersionInfo, renku_url: &'_ str) -> Versions<'_> { Versions { client: BuildInfo::default(), server, diff --git a/src/cli/opts.rs b/src/cli/opts.rs index a77a554..9b87f16 100644 --- a/src/cli/opts.rs +++ b/src/cli/opts.rs @@ -63,6 +63,9 @@ pub enum SubCommand { #[cfg(feature = "user-doc")] UserDoc(userdoc::Input), + + #[command()] + Dataset(dataset::Input), } /// This is the command line interface to the Renku platform. Main diff --git a/src/httpclient.rs b/src/httpclient.rs index 0db5487..cf23c87 100644 --- a/src/httpclient.rs +++ b/src/httpclient.rs @@ -44,6 +44,7 @@ use std::path::PathBuf; const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); #[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] pub enum Error { #[snafu(display("An error was received from {}: {}", url, source))] Http { source: reqwest::Error, url: String },