-
Notifications
You must be signed in to change notification settings - Fork 1
feat(storage): add Bundle API client support #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,173 @@ | ||
| import { describe, expect, it, vi, afterEach } from 'vitest'; | ||
| import { bundle } from './bundle'; | ||
|
|
||
| describe('bundle', () => { | ||
| const originalFetch = globalThis.fetch; | ||
|
|
||
| afterEach(() => { | ||
| globalThis.fetch = originalFetch; | ||
| vi.restoreAllMocks(); | ||
| }); | ||
|
|
||
| it('returns error for empty bucket name', async () => { | ||
| const result = await bundle('', ['key1']); | ||
| expect(result.error).toBeDefined(); | ||
| expect(result.error?.message).toContain('Bucket name is required'); | ||
| }); | ||
|
|
||
| it('returns error for empty keys', async () => { | ||
| const result = await bundle('my-bucket', []); | ||
| expect(result.error).toBeDefined(); | ||
| expect(result.error?.message).toContain('At least one key is required'); | ||
| }); | ||
|
|
||
| it('returns error for undefined keys', async () => { | ||
| const result = await bundle('my-bucket', undefined as unknown as string[]); | ||
| expect(result.error).toBeDefined(); | ||
| expect(result.error?.message).toContain('At least one key is required'); | ||
| }); | ||
|
|
||
| it('sends correct request with defaults', async () => { | ||
| let capturedUrl = ''; | ||
| let capturedInit: RequestInit | undefined; | ||
|
|
||
| globalThis.fetch = vi.fn(async (url: string | URL, init?: RequestInit) => { | ||
| capturedUrl = url.toString(); | ||
| capturedInit = init; | ||
| return new Response(new ReadableStream(), { | ||
| status: 200, | ||
| headers: { 'content-type': 'application/x-tar' }, | ||
| }); | ||
| }) as typeof fetch; | ||
|
|
||
| const result = await bundle('my-bucket', ['a.jpg', 'b.jpg'], { | ||
| config: { | ||
| endpoint: 'https://test.endpoint.dev', | ||
| accessKeyId: 'test-key', | ||
| secretAccessKey: 'test-secret', | ||
| }, | ||
| }); | ||
|
|
||
| expect(result.error).toBeUndefined(); | ||
| expect(result.data).toBeDefined(); | ||
|
|
||
| // Verify URL. | ||
| expect(capturedUrl).toContain('/my-bucket'); | ||
| expect(capturedUrl).toContain('bundle='); | ||
|
|
||
| // Verify method. | ||
| expect(capturedInit?.method).toBe('POST'); | ||
|
|
||
| // Verify body contains keys. | ||
| const body = JSON.parse(capturedInit?.body as string); | ||
| expect(body.keys).toEqual(['a.jpg', 'b.jpg']); | ||
|
|
||
| // Verify headers. | ||
| const headers = capturedInit?.headers as Record<string, string>; | ||
| expect(headers['X-Tigris-Bundle-Format']).toBe('tar'); | ||
| expect(headers['X-Tigris-Bundle-Compression']).toBe('none'); | ||
| expect(headers['X-Tigris-Bundle-On-Error']).toBe('skip'); | ||
| expect(headers['Content-Type']).toBe('application/json'); | ||
|
|
||
| // SigV4 should have added Authorization header. | ||
| expect(headers['authorization']).toBeDefined(); | ||
|
|
||
| expect(result.data?.contentType).toBe('application/x-tar'); | ||
| }); | ||
|
|
||
| it('sends custom compression and error mode', async () => { | ||
| let capturedInit: RequestInit | undefined; | ||
|
|
||
| globalThis.fetch = vi.fn(async (_url: string | URL, init?: RequestInit) => { | ||
| capturedInit = init; | ||
| return new Response(new ReadableStream(), { | ||
| status: 200, | ||
| headers: { 'content-type': 'application/gzip' }, | ||
| }); | ||
| }) as typeof fetch; | ||
|
|
||
| const result = await bundle('my-bucket', ['x.txt'], { | ||
| compression: 'gzip', | ||
| onError: 'fail', | ||
| config: { | ||
| endpoint: 'https://test.endpoint.dev', | ||
| accessKeyId: 'test-key', | ||
| secretAccessKey: 'test-secret', | ||
| }, | ||
| }); | ||
|
|
||
| expect(result.error).toBeUndefined(); | ||
|
|
||
| const headers = capturedInit?.headers as Record<string, string>; | ||
| expect(headers['X-Tigris-Bundle-Compression']).toBe('gzip'); | ||
| expect(headers['X-Tigris-Bundle-On-Error']).toBe('fail'); | ||
| expect(result.data?.contentType).toBe('application/gzip'); | ||
| }); | ||
|
|
||
| it('returns error on HTTP failure', async () => { | ||
| globalThis.fetch = vi.fn(async () => { | ||
| return new Response('<Error><Code>InvalidArgument</Code></Error>', { | ||
| status: 400, | ||
| statusText: 'Bad Request', | ||
| }); | ||
| }) as typeof fetch; | ||
|
|
||
| const result = await bundle('my-bucket', ['a.jpg'], { | ||
| config: { | ||
| endpoint: 'https://test.endpoint.dev', | ||
| accessKeyId: 'test-key', | ||
| secretAccessKey: 'test-secret', | ||
| }, | ||
| }); | ||
|
|
||
| expect(result.error).toBeDefined(); | ||
| expect(result.error?.message).toContain('HTTP 400'); | ||
| expect(result.data).toBeUndefined(); | ||
| }); | ||
|
|
||
| it('returns error when response has no body', async () => { | ||
| globalThis.fetch = vi.fn(async () => { | ||
| return new Response(null, { | ||
| status: 200, | ||
| headers: { 'content-type': 'application/x-tar' }, | ||
| }); | ||
| }) as typeof fetch; | ||
|
|
||
| const result = await bundle('my-bucket', ['a.jpg'], { | ||
| config: { | ||
| endpoint: 'https://test.endpoint.dev', | ||
| accessKeyId: 'test-key', | ||
| secretAccessKey: 'test-secret', | ||
| }, | ||
| }); | ||
|
|
||
| expect(result.error).toBeDefined(); | ||
| expect(result.error?.message).toContain('No body'); | ||
| }); | ||
|
|
||
| it('uses session token auth when provided', async () => { | ||
| let capturedInit: RequestInit | undefined; | ||
|
|
||
| globalThis.fetch = vi.fn(async (_url: string | URL, init?: RequestInit) => { | ||
| capturedInit = init; | ||
| return new Response(new ReadableStream(), { | ||
| status: 200, | ||
| headers: { 'content-type': 'application/x-tar' }, | ||
| }); | ||
| }) as typeof fetch; | ||
|
|
||
| await bundle('my-bucket', ['a.jpg'], { | ||
| config: { | ||
| endpoint: 'https://test.endpoint.dev', | ||
| sessionToken: 'my-session-token', | ||
| organizationId: 'my-org', | ||
| }, | ||
| }); | ||
|
|
||
| const headers = capturedInit?.headers as Record<string, string>; | ||
| expect(headers['x-amz-security-token']).toBe('my-session-token'); | ||
| expect(headers['X-Tigris-Namespace']).toBe('my-org'); | ||
| // Should NOT have SigV4 Authorization. | ||
| expect(headers['authorization']).toBeUndefined(); | ||
| }); | ||
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| import { generateSignatureHeaders, TigrisHeaders } from '@shared/index'; | ||
| import { config } from '../config'; | ||
| import type { TigrisStorageConfig, TigrisStorageResponse } from '../types'; | ||
|
|
||
| export type BundleOptions = { | ||
| config?: TigrisStorageConfig; | ||
| /** | ||
| * Compression algorithm for the response. | ||
| * - `"none"` (default): No compression. Best for already-compressed data (JPEG, PNG). | ||
| * - `"gzip"`: Gzip compression. | ||
| * - `"zstd"`: Zstd compression. Best ratio + speed for large bundles. | ||
| */ | ||
| compression?: 'none' | 'gzip' | 'zstd'; | ||
| /** | ||
| * Error handling mode for missing objects. | ||
| * - `"skip"` (default): Omit missing objects and append `__bundle_errors.json` to the tar. | ||
| * - `"fail"`: Return an error before streaming if any object is missing. | ||
| */ | ||
| onError?: 'skip' | 'fail'; | ||
| }; | ||
|
|
||
| export type BundleResponse = { | ||
| /** The streaming tar archive body. Caller is responsible for consuming and closing. */ | ||
| body: ReadableStream<Uint8Array>; | ||
| /** The response Content-Type (e.g. "application/x-tar", "application/gzip"). */ | ||
| contentType: string; | ||
| }; | ||
|
|
||
| /** | ||
| * Fetch multiple objects from a bucket as a streaming tar archive in a single request. | ||
| * | ||
| * This is a Tigris extension to the S3 API, designed for ML training workloads | ||
| * that need to fetch thousands of objects per batch without per-object HTTP overhead. | ||
| * | ||
| * @example | ||
| * ```ts | ||
| * const result = await bundle("my-bucket", ["img_001.jpg", "img_002.jpg"]); | ||
| * if (result.error) throw result.error; | ||
| * | ||
| * // Pipe to file, or process with a tar library | ||
| * const reader = result.data.body.getReader(); | ||
| * ``` | ||
| * | ||
| * @param bucketName - The bucket containing the objects. | ||
| * @param keys - Array of object keys to include in the bundle (max 5,000). | ||
| * @param options - Optional configuration for compression and error handling. | ||
| */ | ||
| export async function bundle( | ||
| bucketName: string, | ||
| keys: string[], | ||
| options?: BundleOptions | ||
| ): Promise<TigrisStorageResponse<BundleResponse, Error>> { | ||
| if (!bucketName) { | ||
| return { error: new Error('Bucket name is required') }; | ||
| } | ||
|
|
||
| if (!keys || keys.length === 0) { | ||
| return { error: new Error('At least one key is required') }; | ||
| } | ||
|
|
||
| const endpoint = | ||
| options?.config?.endpoint ?? config.endpoint ?? 'https://t3.storage.dev'; | ||
| const accessKeyId = options?.config?.accessKeyId ?? config.accessKeyId; | ||
| const secretAccessKey = | ||
| options?.config?.secretAccessKey ?? config.secretAccessKey; | ||
| const sessionToken = options?.config?.sessionToken ?? config.sessionToken; | ||
| const organizationId = | ||
| options?.config?.organizationId ?? config.organizationId; | ||
|
|
||
| const compression = options?.compression ?? 'none'; | ||
| const onError = options?.onError ?? 'skip'; | ||
|
|
||
| try { | ||
| const url = new URL(`/${bucketName}`, endpoint); | ||
| url.searchParams.set('bundle', ''); | ||
|
|
||
| const bodyString = JSON.stringify({ keys }); | ||
|
|
||
| const headers: Record<string, string> = { | ||
| 'Content-Type': 'application/json', | ||
| [TigrisHeaders.BUNDLE_FORMAT]: 'tar', | ||
| [TigrisHeaders.BUNDLE_COMPRESSION]: compression, | ||
| [TigrisHeaders.BUNDLE_ON_ERROR]: onError, | ||
| }; | ||
|
|
||
| if (accessKeyId && secretAccessKey && !sessionToken) { | ||
| const signedHeaders = await generateSignatureHeaders( | ||
| 'POST', | ||
| url, | ||
| headers, | ||
| bodyString, | ||
| accessKeyId, | ||
| secretAccessKey | ||
| ); | ||
| Object.assign(headers, signedHeaders); | ||
| } else { | ||
| if (sessionToken) { | ||
| headers[TigrisHeaders.SESSION_TOKEN] = sessionToken; | ||
| } | ||
| if (organizationId) { | ||
| headers[TigrisHeaders.NAMESPACE] = organizationId; | ||
| } | ||
| } | ||
| const response = await fetch(url.toString(), { | ||
| method: 'POST', | ||
| headers, | ||
| body: bodyString, | ||
| }); | ||
|
|
||
| if (!response.ok) { | ||
| let errorMessage: string; | ||
| try { | ||
| const text = await response.text(); | ||
| errorMessage = text || response.statusText; | ||
| } catch { | ||
| errorMessage = response.statusText; | ||
| } | ||
| return { | ||
| error: new Error( | ||
| `Bundle request failed (HTTP ${response.status}): ${errorMessage}` | ||
| ), | ||
| }; | ||
| } | ||
|
|
||
| if (!response.body) { | ||
| return { error: new Error('No body returned from bundle request') }; | ||
| } | ||
|
|
||
| return { | ||
| data: { | ||
| body: response.body, | ||
| contentType: | ||
| response.headers.get('content-type') ?? 'application/x-tar', | ||
| }, | ||
| }; | ||
| } catch (error) { | ||
| return { | ||
| error: new Error( | ||
| `Bundle request failed: ${(error as Error).message ?? 'Unknown error'}` | ||
| ), | ||
| }; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.