Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ dist

# Plans
.plans
docs/plans

# CF Workers
node_modules
.dev.vars
*.pyc
34 changes: 27 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
[workspace]
members = [
"crates/core",
"crates/cf-workers",
"crates/metering",
"crates/path-mapping",
"crates/static-config",
"crates/sts",
"crates/oidc-provider",
Expand All @@ -12,6 +14,7 @@ members = [
default-members = [
"crates/core",
"crates/metering",
"crates/path-mapping",
"crates/static-config",
"crates/sts",
"crates/oidc-provider",
Expand Down Expand Up @@ -100,4 +103,6 @@ multistore = { path = "crates/core" }
multistore-static-config = { path = "crates/static-config" }
multistore-sts = { path = "crates/sts" }
multistore-metering = { path = "crates/metering" }
multistore-cf-workers = { path = "crates/cf-workers" }
multistore-oidc-provider = { path = "crates/oidc-provider" }
multistore-path-mapping = { path = "crates/path-mapping" }
39 changes: 39 additions & 0 deletions crates/cf-workers/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
[package]
name = "multistore-cf-workers"
version.workspace = true
edition.workspace = true
license.workspace = true
description = "Cloudflare Workers runtime adapters for the multistore S3 proxy gateway"

[features]
default = []
azure = ["multistore/azure"]
gcp = ["multistore/gcp"]

[dependencies]
multistore.workspace = true
bytes.workspace = true
http.workspace = true
tracing.workspace = true
object_store.workspace = true
futures.workspace = true
http-body.workspace = true
http-body-util.workspace = true
async-trait.workspace = true

# Cloudflare Workers SDK
worker.workspace = true
wasm-bindgen.workspace = true
wasm-bindgen-futures.workspace = true
js-sys.workspace = true
web-sys.workspace = true

[dependencies.getrandom_v02]
package = "getrandom"
version = "0.2"
features = ["js"]

[dependencies.getrandom_v03]
package = "getrandom"
version = "0.3"
features = ["wasm_js"]
111 changes: 80 additions & 31 deletions examples/cf-workers/src/client.rs → crates/cf-workers/src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
//! Backend client and HTTP helpers for the Cloudflare Workers runtime.
//! Backend client for the Cloudflare Workers runtime.
//!
//! Contains:
//! - `WorkerBackend` — implements `ProxyBackend` using the Fetch API + FetchConnector
//! - `FetchHttpExchange` — implements `HttpExchange` for OIDC token exchange
//! Contains `WorkerBackend` which implements `ProxyBackend` using the Fetch API
//! and `FetchConnector` for `object_store` HTTP requests.

use crate::body::JsBody;
use crate::fetch_connector::FetchConnector;
use bytes::Bytes;
use http::HeaderMap;
use multistore::backend::ForwardResponse;
use multistore::backend::{build_signer, create_builder, ProxyBackend, RawResponse, StoreBuilder};
use multistore::error::ProxyError;
use multistore::route_handler::ForwardRequest;
use multistore::route_handler::RESPONSE_HEADER_ALLOWLIST;
use multistore::types::BucketConfig;
use multistore_oidc_provider::{HttpExchange, OidcProviderError};
use object_store::list::PaginatedListStore;
use object_store::signer::Signer;
use std::sync::Arc;
Expand All @@ -24,12 +26,85 @@ use worker::Fetch;
pub struct WorkerBackend;

impl ProxyBackend for WorkerBackend {
type ResponseBody = web_sys::Response;

async fn forward<Body: 'static>(
&self,
request: ForwardRequest,
body: Body,
) -> Result<ForwardResponse<Self::ResponseBody>, ProxyError> {
// Downcast to the concrete JsBody type used by the Workers runtime.
let any_body: Box<dyn std::any::Any> = Box::new(body);
let js_body = any_body
.downcast::<JsBody>()
.map_err(|_| ProxyError::Internal("unexpected body type".into()))?;

// Build web_sys::Headers from the forwarding headers.
let ws_headers = web_sys::Headers::new()
.map_err(|e| ProxyError::Internal(format!("failed to create Headers: {:?}", e)))?;
for (key, value) in request.headers.iter() {
if let Ok(v) = value.to_str() {
let _ = ws_headers.set(key.as_str(), v);
}
}

// Build web_sys::RequestInit.
let init = web_sys::RequestInit::new();
init.set_method(request.method.as_str());
init.set_headers(&ws_headers.into());

// Bypass Cloudflare's subrequest cache for Range requests.
if request.headers.contains_key(http::header::RANGE) {
init.set_cache(web_sys::RequestCache::NoStore);
}

// For PUT: attach the original ReadableStream directly (zero-copy!).
if request.method == http::Method::PUT {
if let Some(ref stream) = js_body.0 {
init.set_body(stream);
}
}

// Build the outgoing request.
let ws_request = web_sys::Request::new_with_str_and_init(request.url.as_str(), &init)
.map_err(|e| ProxyError::Internal(format!("failed to create request: {:?}", e)))?;

// Fetch via the worker crate's Fetch API.
let worker_req: worker::Request = ws_request.into();
let worker_resp = worker::Fetch::Request(worker_req)
.send()
.await
.map_err(|e| ProxyError::BackendError(format!("fetch failed: {}", e)))?;

// Convert to web_sys::Response to access the body stream.
let backend_ws: web_sys::Response = worker_resp.into();
let status = backend_ws.status();

// Build filtered response headers using the existing allowlist.
let headers = extract_response_headers(&backend_ws.headers());
let content_length = headers
.get(http::header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok());

Ok(ForwardResponse {
status,
headers,
body: backend_ws,
content_length,
})
}

fn create_paginated_store(
&self,
config: &BucketConfig,
) -> Result<Box<dyn PaginatedListStore>, ProxyError> {
let builder = match create_builder(config)? {
StoreBuilder::S3(s) => StoreBuilder::S3(s.with_http_connector(FetchConnector)),
#[cfg(feature = "azure")]
StoreBuilder::Azure(a) => StoreBuilder::Azure(a.with_http_connector(FetchConnector)),
#[cfg(feature = "gcp")]
StoreBuilder::Gcs(g) => StoreBuilder::Gcs(g.with_http_connector(FetchConnector)),
};
builder.build()
}
Expand Down Expand Up @@ -102,8 +177,6 @@ impl ProxyBackend for WorkerBackend {
}
}

use multistore::route_handler::RESPONSE_HEADER_ALLOWLIST;

/// Extract response headers from a `web_sys::Headers` using an allowlist.
pub fn extract_response_headers(ws_headers: &web_sys::Headers) -> HeaderMap {
let mut resp_headers = HeaderMap::new();
Expand All @@ -116,27 +189,3 @@ pub fn extract_response_headers(ws_headers: &web_sys::Headers) -> HeaderMap {
}
resp_headers
}

/// [`HttpExchange`] implementation using reqwest on WASM (wraps `web_sys::fetch`).
#[derive(Clone)]
pub struct FetchHttpExchange;

impl HttpExchange for FetchHttpExchange {
async fn post_form(
&self,
url: &str,
form: &[(&str, &str)],
) -> Result<String, OidcProviderError> {
let client = reqwest::Client::new();
let resp = client
.post(url)
.form(form)
.send()
.await
.map_err(|e| OidcProviderError::HttpError(e.to_string()))?;

resp.text()
.await
.map_err(|e| OidcProviderError::HttpError(e.to_string()))
}
}
37 changes: 37 additions & 0 deletions crates/cf-workers/src/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//! Zero-copy body wrapper for Cloudflare Workers.
//!
//! Holds the raw `ReadableStream` from an incoming request so it can be
//! forwarded to the backend without copying through WASM memory.

use bytes::Bytes;

/// Zero-copy body wrapper. Holds the raw `ReadableStream` from the incoming
/// request, passing it through the Gateway untouched for Forward requests.
pub struct JsBody(pub Option<web_sys::ReadableStream>);

// SAFETY: Workers is single-threaded; these are required by Gateway's generic bounds.
unsafe impl Send for JsBody {}
unsafe impl Sync for JsBody {}

/// Materialize a `JsBody` into `Bytes` for the NeedsBody path.
///
/// Uses the `Response::arrayBuffer()` JS trick: wrap the stream in a
/// `web_sys::Response`, call `.array_buffer()`, and convert via `Uint8Array`.
/// This is only used for small multipart payloads.
pub async fn collect_js_body(body: JsBody) -> std::result::Result<Bytes, String> {
match body.0 {
None => Ok(Bytes::new()),
Some(stream) => {
let resp = web_sys::Response::new_with_opt_readable_stream(Some(&stream))
.map_err(|e| format!("Response::new failed: {:?}", e))?;
let promise = resp
.array_buffer()
.map_err(|e| format!("arrayBuffer() failed: {:?}", e))?;
let buf = wasm_bindgen_futures::JsFuture::from(promise)
.await
.map_err(|e| format!("arrayBuffer await failed: {:?}", e))?;
let uint8 = js_sys::Uint8Array::new(&buf);
Ok(Bytes::from(uint8.to_vec()))
}
}
}
28 changes: 28 additions & 0 deletions crates/cf-workers/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//! Cloudflare Workers runtime adapters for the multistore S3 proxy gateway.
//!
//! This crate provides reusable runtime primitives for running a multistore
//! proxy on Cloudflare Workers:
//!
//! - [`FetchConnector`] — `object_store::client::HttpConnector` using the Fetch API
//! - [`JsBody`] — zero-copy body wrapper around `web_sys::ReadableStream`
//! - [`WorkerBackend`] — `ProxyBackend` implementation using the Fetch API
//! - [`WorkerSubscriber`] — `tracing::Subscriber` routing to `console.log`
//! - [`NoopCredentialRegistry`] — anonymous-only credential registry
//! - Response helpers for building `web_sys::Response` from proxy results

pub mod backend;
pub mod body;
pub mod fetch_connector;
pub mod noop_creds;
pub mod response;
pub mod tracing_layer;

pub use backend::WorkerBackend;
pub use body::{collect_js_body, JsBody};
pub use fetch_connector::FetchConnector;
pub use noop_creds::NoopCredentialRegistry;
pub use response::{
convert_ws_headers, forward_response_to_ws, http_headermap_to_ws_headers,
proxy_result_to_ws_response, ws_error_response, ws_xml_response,
};
pub use tracing_layer::WorkerSubscriber;
24 changes: 24 additions & 0 deletions crates/cf-workers/src/noop_creds.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//! No-op credential registry for anonymous-only access.

use multistore::error::ProxyError;
use multistore::registry::CredentialRegistry;
use multistore::types::{RoleConfig, StoredCredential};

/// Credential registry that always returns `None`.
///
/// Used for anonymous-only deployments where no authentication is supported.
#[derive(Clone)]
pub struct NoopCredentialRegistry;

impl CredentialRegistry for NoopCredentialRegistry {
async fn get_credential(
&self,
_access_key_id: &str,
) -> Result<Option<StoredCredential>, ProxyError> {
Ok(None)
}

async fn get_role(&self, _role_id: &str) -> Result<Option<RoleConfig>, ProxyError> {
Ok(None)
}
}
Loading
Loading