Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 78 additions & 75 deletions core/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions core/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ md-5 = "0.10"
mea = { workspace = true }
percent-encoding = "2"
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
reqsign-core = { version = "3.0.0", default-features = false }
reqwest = { version = "0.12.24", features = [
"stream",
], default-features = false }
Expand Down
44 changes: 43 additions & 1 deletion core/core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub static GLOBAL_REQWEST_CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqw
/// HttpFetcher is a type erased [`HttpFetch`].
pub type HttpFetcher = Arc<dyn HttpFetchDyn>;

/// A HTTP client instance for OpenDAL's services.
/// An HTTP client instance for OpenDAL's services.
///
/// # Notes
///
Expand All @@ -62,13 +62,33 @@ pub struct HttpClient {
fetcher: HttpFetcher,
}

/// A reqsign `HttpSend` implementation that always forwards requests to the
/// current http client stored inside [`AccessorInfo`].
#[derive(Clone)]
pub struct AccessorInfoHttpSend {
info: Arc<AccessorInfo>,
}

impl AccessorInfoHttpSend {
/// Create a new [`AccessorInfoHttpSend`].
pub fn new(info: Arc<AccessorInfo>) -> Self {
Self { info }
}
}

/// We don't want users to know details about our clients.
impl Debug for HttpClient {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HttpClient").finish()
}
}

impl Debug for AccessorInfoHttpSend {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AccessorInfoHttpSend").finish()
}
}

impl Default for HttpClient {
fn default() -> Self {
Self {
Expand Down Expand Up @@ -109,6 +129,28 @@ impl HttpClient {
}
}

impl reqsign_core::HttpSend for HttpClient {
async fn http_send(&self, req: Request<Bytes>) -> reqsign_core::Result<Response<Bytes>> {
let req = req.map(Buffer::from);
let resp = self.send(req).await.map_err(|err| {
let retryable = err.is_temporary();
reqsign_core::Error::unexpected("send request via OpenDAL HttpClient")
.with_source(err)
.set_retryable(retryable)
})?;

let (parts, body) = resp.into_parts();
Ok(Response::from_parts(parts, body.to_bytes()))
}
}

impl reqsign_core::HttpSend for AccessorInfoHttpSend {
async fn http_send(&self, req: Request<Bytes>) -> reqsign_core::Result<Response<Bytes>> {
let client = self.info.http_client();
reqsign_core::HttpSend::http_send(&client, req).await
}
}

/// HttpFetch is the trait to fetch a request in async way.
/// User should implement this trait to provide their own http client.
pub trait HttpFetch: Send + Sync + Unpin + 'static {
Expand Down
1 change: 1 addition & 0 deletions core/core/src/raw/http_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
//! it easier to develop services and layers outside opendal.

mod client;
pub use client::AccessorInfoHttpSend;
/// temporary client used by several features
#[allow(unused_imports)]
pub use client::GLOBAL_REQWEST_CLIENT;
Expand Down
7 changes: 3 additions & 4 deletions core/services/azblob/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features = false }
opendal-service-azure-common = { path = "../azure-common", version = "0.55.0" }
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
reqsign-azure-storage = { version = "2.0.2", default-features = false }
reqsign-core = { version = "2.0.1", default-features = false }
reqsign-file-read-tokio = { version = "2.0.1", default-features = false }
reqsign-http-send-reqwest = { version = "2.0.1", default-features = false }
reqsign-azure-storage = { version = "3.0.0", default-features = false }
reqsign-core = { version = "3.0.0", default-features = false }
reqsign-file-read-tokio = { version = "3.0.0", default-features = false }
serde = { workspace = true, features = ["derive"] }
sha2 = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
Expand Down
10 changes: 5 additions & 5 deletions core/services/azblob/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use reqsign_core::OsEnv;
use reqsign_core::Signer;
use reqsign_core::StaticEnv;
use reqsign_file_read_tokio::TokioFileRead;
use reqsign_http_send_reqwest::ReqwestHttpSend;
use sha2::Digest;
use sha2::Sha256;

Expand Down Expand Up @@ -369,9 +368,11 @@ impl Builder for AzblobBuilder {
}
};

let info = Arc::new(AccessorInfo::default());

let ctx = Context::new()
.with_file_read(TokioFileRead)
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
.with_http_send(AccessorInfoHttpSend::new(info.clone()))
.with_env(StaticEnv {
home_dir: os_env.home_dir(),
envs,
Expand Down Expand Up @@ -401,8 +402,7 @@ impl Builder for AzblobBuilder {
Ok(AzblobBackend {
core: Arc::new(AzblobCore {
info: {
let am = AccessorInfo::default();
am.set_scheme(AZBLOB_SCHEME)
info.set_scheme(AZBLOB_SCHEME)
.set_root(&root)
.set_name(container)
.set_native_capability(Capability {
Expand Down Expand Up @@ -447,7 +447,7 @@ impl Builder for AzblobBuilder {
..Default::default()
});

am.into()
info.clone()
},
root,
endpoint,
Expand Down
7 changes: 3 additions & 4 deletions core/services/azdls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features = false }
opendal-service-azure-common = { path = "../azure-common", version = "0.55.0" }
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
reqsign-azure-storage = { version = "2.0.2", default-features = false }
reqsign-core = { version = "2.0.1", default-features = false }
reqsign-file-read-tokio = { version = "2.0.1", default-features = false }
reqsign-http-send-reqwest = { version = "2.0.1", default-features = false }
reqsign-azure-storage = { version = "3.0.0", default-features = false }
reqsign-core = { version = "3.0.0", default-features = false }
reqsign-file-read-tokio = { version = "3.0.0", default-features = false }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }

Expand Down
10 changes: 5 additions & 5 deletions core/services/azdls/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use reqsign_core::OsEnv;
use reqsign_core::Signer;
use reqsign_core::StaticEnv;
use reqsign_file_read_tokio::TokioFileRead;
use reqsign_http_send_reqwest::ReqwestHttpSend;

use super::AZDLS_SCHEME;
use super::config::AzdlsConfig;
Expand Down Expand Up @@ -293,9 +292,11 @@ impl Builder for AzdlsBuilder {
}

let os_env = OsEnv;
let info = Arc::new(AccessorInfo::default());

let ctx = Context::new()
.with_file_read(TokioFileRead)
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
.with_http_send(AccessorInfoHttpSend::new(info.clone()))
.with_env(StaticEnv {
home_dir: os_env.home_dir(),
envs,
Expand All @@ -319,8 +320,7 @@ impl Builder for AzdlsBuilder {
Ok(AzdlsBackend {
core: Arc::new(AzdlsCore {
info: {
let am = AccessorInfo::default();
am.set_scheme(AZDLS_SCHEME)
info.set_scheme(AZDLS_SCHEME)
.set_root(&root)
.set_name(filesystem)
.set_native_capability(Capability {
Expand Down Expand Up @@ -348,7 +348,7 @@ impl Builder for AzdlsBuilder {
..Default::default()
});

am.into()
info.clone()
},
filesystem: self.config.filesystem.clone(),
root,
Expand Down
7 changes: 3 additions & 4 deletions core/services/azfile/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features = false }
opendal-service-azure-common = { path = "../azure-common", version = "0.55.0" }
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
reqsign-azure-storage = { version = "2.0.2", default-features = false }
reqsign-core = { version = "2.0.1", default-features = false }
reqsign-file-read-tokio = { version = "2.0.1", default-features = false }
reqsign-http-send-reqwest = { version = "2.0.1", default-features = false }
reqsign-azure-storage = { version = "3.0.0", default-features = false }
reqsign-core = { version = "3.0.0", default-features = false }
reqsign-file-read-tokio = { version = "3.0.0", default-features = false }
serde = { workspace = true, features = ["derive"] }

[dev-dependencies]
Expand Down
10 changes: 5 additions & 5 deletions core/services/azfile/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use reqsign_core::OsEnv;
use reqsign_core::Signer;
use reqsign_core::StaticEnv;
use reqsign_file_read_tokio::TokioFileRead;
use reqsign_http_send_reqwest::ReqwestHttpSend;

use super::AZFILE_SCHEME;
use super::config::AzfileConfig;
Expand Down Expand Up @@ -209,9 +208,11 @@ impl Builder for AzfileBuilder {
}

let os_env = OsEnv;
let info = Arc::new(AccessorInfo::default());

let ctx = Context::new()
.with_file_read(TokioFileRead)
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
.with_http_send(AccessorInfoHttpSend::new(info.clone()))
.with_env(StaticEnv {
home_dir: os_env.home_dir(),
envs,
Expand All @@ -232,8 +233,7 @@ impl Builder for AzfileBuilder {
Ok(AzfileBackend {
core: Arc::new(AzfileCore {
info: {
let am = AccessorInfo::default();
am.set_scheme(AZFILE_SCHEME)
info.set_scheme(AZFILE_SCHEME)
.set_root(&root)
.set_native_capability(Capability {
stat: true,
Expand All @@ -254,7 +254,7 @@ impl Builder for AzfileBuilder {
..Default::default()
});

am.into()
info.clone()
},
root,
endpoint,
Expand Down
10 changes: 3 additions & 7 deletions core/services/cos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,9 @@ http = { workspace = true }
log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features = false }
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
reqsign-core = { version = "2.0.1", default-features = false }
reqsign-file-read-tokio = { version = "2.0.1", default-features = false }
reqsign-http-send-reqwest = { version = "2.0.1", default-features = false }
reqsign-tencent-cos = { version = "2.0.2", default-features = false }
reqwest = { version = "0.12.24", default-features = false, features = [
"stream",
] }
reqsign-core = { version = "3.0.0", default-features = false }
reqsign-file-read-tokio = { version = "3.0.0", default-features = false }
reqsign-tencent-cos = { version = "3.0.0", default-features = false }
serde = { workspace = true, features = ["derive"] }

[dev-dependencies]
Expand Down
17 changes: 7 additions & 10 deletions core/services/cos/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use reqsign_core::Env as _;
use reqsign_core::OsEnv;
use reqsign_core::Signer;
use reqsign_file_read_tokio::TokioFileRead;
use reqsign_http_send_reqwest::ReqwestHttpSend;
use reqsign_tencent_cos::DefaultCredentialProvider;
use reqsign_tencent_cos::RequestSigner;
use reqsign_tencent_cos::StaticCredentialProvider;
Expand All @@ -43,9 +42,6 @@ use super::lister::CosObjectVersionsLister;
use super::writer::CosWriter;
use super::writer::CosWriters;
use opendal_core::raw::*;
use std::sync::LazyLock;

static GLOBAL_REQWEST_CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);
use opendal_core::*;

/// Tencent-Cloud COS services support.
Expand Down Expand Up @@ -181,17 +177,19 @@ impl Builder for CosBuilder {
let endpoint = uri.host().unwrap().replace(&format!("//{bucket}."), "//");
debug!("backend use endpoint {}", &endpoint);

let info = Arc::new(AccessorInfo::default());

let os_env = OsEnv;
let envs = os_env.vars();
let ctx = Context::new()
.with_file_read(TokioFileRead)
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
.with_http_send(AccessorInfoHttpSend::new(info.clone()))
.with_env(os_env);

let mut credential = if self.config.disable_config_load {
DefaultCredentialProvider::builder()
.disable_env(true)
.disable_assume_role(true)
.no_env()
.no_web_identity()
.build()
} else {
DefaultCredentialProvider::new()
Expand Down Expand Up @@ -222,8 +220,7 @@ impl Builder for CosBuilder {
Ok(CosBackend {
core: Arc::new(CosCore {
info: {
let am = AccessorInfo::default();
am.set_scheme(COS_SCHEME)
info.set_scheme(COS_SCHEME)
.set_root(&root)
.set_name(&bucket)
.set_native_capability(Capability {
Expand Down Expand Up @@ -282,7 +279,7 @@ impl Builder for CosBuilder {
..Default::default()
});

am.into()
info.clone()
},
bucket: bucket.clone(),
root,
Expand Down
11 changes: 3 additions & 8 deletions core/services/gcs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,9 @@ log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features = false }
percent-encoding = "2.3"
quick-xml = { workspace = true, features = ["serialize"] }
reqsign-core = { version = "2.0.1", default-features = false }
reqsign-file-read-tokio = { version = "2.0.1", default-features = false }
reqsign-google = { version = "2.0.2", default-features = false }
reqsign-http-send-reqwest = { version = "2.0.1", default-features = false }
reqwest = { version = "0.12.24", default-features = false, features = [
"json",
"stream",
] }
reqsign-core = { version = "3.0.0", default-features = false }
reqsign-file-read-tokio = { version = "3.0.0", default-features = false }
reqsign-google = { version = "3.0.0", default-features = false }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt"] }
Expand Down
Loading
Loading