Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions packages/storage/src/lib/object/bundle.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import { describe, expect, it, vi, afterEach } from 'vitest';
import { bundle } from './bundle';

describe('bundle', () => {
const originalFetch = globalThis.fetch;

afterEach(() => {
globalThis.fetch = originalFetch;
vi.restoreAllMocks();
});

it('returns error for empty bucket name', async () => {
const result = await bundle('', ['key1']);
expect(result.error).toBeDefined();
expect(result.error?.message).toContain('Bucket name is required');
});

it('returns error for empty keys', async () => {
const result = await bundle('my-bucket', []);
expect(result.error).toBeDefined();
expect(result.error?.message).toContain('At least one key is required');
});

it('returns error for undefined keys', async () => {
const result = await bundle('my-bucket', undefined as unknown as string[]);
expect(result.error).toBeDefined();
expect(result.error?.message).toContain('At least one key is required');
});

it('sends correct request with defaults', async () => {
let capturedUrl = '';
let capturedInit: RequestInit | undefined;

globalThis.fetch = vi.fn(async (url: string | URL, init?: RequestInit) => {
capturedUrl = url.toString();
capturedInit = init;
return new Response(new ReadableStream(), {
status: 200,
headers: { 'content-type': 'application/x-tar' },
});
}) as typeof fetch;

const result = await bundle('my-bucket', ['a.jpg', 'b.jpg'], {
config: {
endpoint: 'https://test.endpoint.dev',
accessKeyId: 'test-key',
secretAccessKey: 'test-secret',
},
});

expect(result.error).toBeUndefined();
expect(result.data).toBeDefined();

// Verify URL.
expect(capturedUrl).toContain('/my-bucket');
expect(capturedUrl).toContain('bundle=');

// Verify method.
expect(capturedInit?.method).toBe('POST');

// Verify body contains keys.
const body = JSON.parse(capturedInit?.body as string);
expect(body.keys).toEqual(['a.jpg', 'b.jpg']);

// Verify headers.
const headers = capturedInit?.headers as Record<string, string>;
expect(headers['X-Tigris-Bundle-Format']).toBe('tar');
expect(headers['X-Tigris-Bundle-Compression']).toBe('none');
expect(headers['X-Tigris-Bundle-On-Error']).toBe('skip');
expect(headers['Content-Type']).toBe('application/json');

// SigV4 should have added Authorization header.
expect(headers['authorization']).toBeDefined();

expect(result.data?.contentType).toBe('application/x-tar');
});

it('sends custom compression and error mode', async () => {
let capturedInit: RequestInit | undefined;

globalThis.fetch = vi.fn(async (_url: string | URL, init?: RequestInit) => {
capturedInit = init;
return new Response(new ReadableStream(), {
status: 200,
headers: { 'content-type': 'application/gzip' },
});
}) as typeof fetch;

const result = await bundle('my-bucket', ['x.txt'], {
compression: 'gzip',
onError: 'fail',
config: {
endpoint: 'https://test.endpoint.dev',
accessKeyId: 'test-key',
secretAccessKey: 'test-secret',
},
});

expect(result.error).toBeUndefined();

const headers = capturedInit?.headers as Record<string, string>;
expect(headers['X-Tigris-Bundle-Compression']).toBe('gzip');
expect(headers['X-Tigris-Bundle-On-Error']).toBe('fail');
expect(result.data?.contentType).toBe('application/gzip');
});

it('returns error on HTTP failure', async () => {
globalThis.fetch = vi.fn(async () => {
return new Response('<Error><Code>InvalidArgument</Code></Error>', {
status: 400,
statusText: 'Bad Request',
});
}) as typeof fetch;

const result = await bundle('my-bucket', ['a.jpg'], {
config: {
endpoint: 'https://test.endpoint.dev',
accessKeyId: 'test-key',
secretAccessKey: 'test-secret',
},
});

expect(result.error).toBeDefined();
expect(result.error?.message).toContain('HTTP 400');
expect(result.data).toBeUndefined();
});

it('returns error when response has no body', async () => {
globalThis.fetch = vi.fn(async () => {
return new Response(null, {
status: 200,
headers: { 'content-type': 'application/x-tar' },
});
}) as typeof fetch;

const result = await bundle('my-bucket', ['a.jpg'], {
config: {
endpoint: 'https://test.endpoint.dev',
accessKeyId: 'test-key',
secretAccessKey: 'test-secret',
},
});

expect(result.error).toBeDefined();
expect(result.error?.message).toContain('No body');
});

it('uses session token auth when provided', async () => {
let capturedInit: RequestInit | undefined;

globalThis.fetch = vi.fn(async (_url: string | URL, init?: RequestInit) => {
capturedInit = init;
return new Response(new ReadableStream(), {
status: 200,
headers: { 'content-type': 'application/x-tar' },
});
}) as typeof fetch;

await bundle('my-bucket', ['a.jpg'], {
config: {
endpoint: 'https://test.endpoint.dev',
sessionToken: 'my-session-token',
organizationId: 'my-org',
},
});

const headers = capturedInit?.headers as Record<string, string>;
expect(headers['x-amz-security-token']).toBe('my-session-token');
expect(headers['X-Tigris-Namespace']).toBe('my-org');
// Should NOT have SigV4 Authorization.
expect(headers['authorization']).toBeUndefined();
});
});
143 changes: 143 additions & 0 deletions packages/storage/src/lib/object/bundle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { generateSignatureHeaders, TigrisHeaders } from '@shared/index';
import { config } from '../config';
import type { TigrisStorageConfig, TigrisStorageResponse } from '../types';

export type BundleOptions = {
config?: TigrisStorageConfig;
/**
* Compression algorithm for the response.
* - `"none"` (default): No compression. Best for already-compressed data (JPEG, PNG).
* - `"gzip"`: Gzip compression.
* - `"zstd"`: Zstd compression. Best ratio + speed for large bundles.
*/
compression?: 'none' | 'gzip' | 'zstd';
/**
* Error handling mode for missing objects.
* - `"skip"` (default): Omit missing objects and append `__bundle_errors.json` to the tar.
* - `"fail"`: Return an error before streaming if any object is missing.
*/
onError?: 'skip' | 'fail';
};

export type BundleResponse = {
/** The streaming tar archive body. Caller is responsible for consuming and closing. */
body: ReadableStream<Uint8Array>;
/** The response Content-Type (e.g. "application/x-tar", "application/gzip"). */
contentType: string;
};

/**
* Fetch multiple objects from a bucket as a streaming tar archive in a single request.
*
* This is a Tigris extension to the S3 API, designed for ML training workloads
* that need to fetch thousands of objects per batch without per-object HTTP overhead.
*
* @example
* ```ts
* const result = await bundle("my-bucket", ["img_001.jpg", "img_002.jpg"]);
* if (result.error) throw result.error;
*
* // Pipe to file, or process with a tar library
* const reader = result.data.body.getReader();
* ```
*
* @param bucketName - The bucket containing the objects.
* @param keys - Array of object keys to include in the bundle (max 5,000).
* @param options - Optional configuration for compression and error handling.
*/
export async function bundle(
bucketName: string,
keys: string[],
options?: BundleOptions
): Promise<TigrisStorageResponse<BundleResponse, Error>> {
if (!bucketName) {
return { error: new Error('Bucket name is required') };
}

if (!keys || keys.length === 0) {
return { error: new Error('At least one key is required') };
}

const endpoint =
options?.config?.endpoint ?? config.endpoint ?? 'https://t3.storage.dev';
const accessKeyId = options?.config?.accessKeyId ?? config.accessKeyId;
const secretAccessKey =
options?.config?.secretAccessKey ?? config.secretAccessKey;
const sessionToken = options?.config?.sessionToken ?? config.sessionToken;
const organizationId =
options?.config?.organizationId ?? config.organizationId;

const compression = options?.compression ?? 'none';
const onError = options?.onError ?? 'skip';

try {
const url = new URL(`/${bucketName}`, endpoint);
url.searchParams.set('bundle', '');

const bodyString = JSON.stringify({ keys });

const headers: Record<string, string> = {
'Content-Type': 'application/json',
[TigrisHeaders.BUNDLE_FORMAT]: 'tar',
[TigrisHeaders.BUNDLE_COMPRESSION]: compression,
[TigrisHeaders.BUNDLE_ON_ERROR]: onError,
};

if (accessKeyId && secretAccessKey && !sessionToken) {
const signedHeaders = await generateSignatureHeaders(
'POST',
url,
headers,
bodyString,
accessKeyId,
secretAccessKey
);
Object.assign(headers, signedHeaders);
} else {
if (sessionToken) {
headers[TigrisHeaders.SESSION_TOKEN] = sessionToken;
}
if (organizationId) {
headers[TigrisHeaders.NAMESPACE] = organizationId;
}
}
const response = await fetch(url.toString(), {
method: 'POST',
headers,
body: bodyString,
});

if (!response.ok) {
let errorMessage: string;
try {
const text = await response.text();
errorMessage = text || response.statusText;
} catch {
errorMessage = response.statusText;
}
return {
error: new Error(
`Bundle request failed (HTTP ${response.status}): ${errorMessage}`
),
};
}

if (!response.body) {
return { error: new Error('No body returned from bundle request') };
}

return {
data: {
body: response.body,
contentType:
response.headers.get('content-type') ?? 'application/x-tar',
},
};
} catch (error) {
return {
error: new Error(
`Bundle request failed: ${(error as Error).message ?? 'Unknown error'}`
),
};
}
}
5 changes: 5 additions & 0 deletions packages/storage/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,9 @@ export {
handleClientUpload,
type ClientUploadRequest,
} from './lib/upload/server';
export {
bundle,
type BundleOptions,
type BundleResponse,
} from './lib/object/bundle';
export { UploadAction } from './lib/upload/shared';
3 changes: 3 additions & 0 deletions shared/headers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ export enum TigrisHeaders {
FORK_SOURCE_BUCKET_SNAPSHOT = 'X-Tigris-Fork-Source-Bucket-Snapshot',
RENAME = 'X-Tigris-Rename',
COPY_SOURCE = 'X-Amz-Copy-Source',
BUNDLE_FORMAT = 'X-Tigris-Bundle-Format',
BUNDLE_COMPRESSION = 'X-Tigris-Bundle-Compression',
BUNDLE_ON_ERROR = 'X-Tigris-Bundle-On-Error',
Comment thread
efirs marked this conversation as resolved.
}
1 change: 1 addition & 0 deletions shared/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export { executeWithConcurrency, handleError, toError } from './utils';
export { TigrisHeaders } from './headers';
export {
createTigrisHttpClient,
generateSignatureHeaders,
type HttpClientRequest,
type HttpClientResponse,
type TigrisHttpClient,
Expand Down
Loading