From a524f53ae19f36e10a0c16031be4aa55d59d09bd Mon Sep 17 00:00:00 2001 From: A Ibrahim Date: Mon, 13 Apr 2026 19:15:41 +0200 Subject: [PATCH 1/4] feat: add source option in list and ls --- src/lib/ls.ts | 46 +++++++++++++++++++++-------------------- src/lib/objects/list.ts | 2 ++ src/specs.yaml | 6 ++++++ src/utils/messages.ts | 2 +- 4 files changed, 33 insertions(+), 23 deletions(-) diff --git a/src/lib/ls.ts b/src/lib/ls.ts index 2bfb789..34a581e 100644 --- a/src/lib/ls.ts +++ b/src/lib/ls.ts @@ -14,6 +14,7 @@ export default async function ls(options: Record) { 'snapshot', ]); const format = getFormat(options); + const source = getOption<'tigris' | 'shadow'>(options, ['source']); const { limit, pageToken } = getPaginationOptions(options); if (!pathString) { @@ -72,7 +73,9 @@ export default async function ls(options: Record) { const { data, error } = await list({ prefix, + delimiter: '/', ...(snapshotVersion ? { snapshotVersion } : {}), + ...(source ? { source } : {}), ...(limit !== undefined ? { limit } : {}), ...(pageToken ? { paginationToken: pageToken } : {}), config: { @@ -85,28 +88,27 @@ export default async function ls(options: Record) { exitWithError(error); } - const objects = (data.items || []) - .map((item) => { - // Strip the prefix from the name for cleaner display - const name = prefix ? item.name.slice(prefix.length) : item.name; - - // For immediate children only: if name contains /, only show up to first / - const firstSlash = name.indexOf('/'); - const displayName = - firstSlash === -1 ? name : name.slice(0, firstSlash + 1); - const isFolder = displayName.endsWith('/'); - - return { - key: displayName, - size: isFolder ? '-' : formatSize(item.size), - modified: item.lastModified, - }; - }) - // Filter out empty keys and deduplicate folders - .filter( - (item, index, arr) => - item.key !== '' && arr.findIndex((i) => i.key === item.key) === index - ); + // Common prefixes are "folders" returned by S3 when using a delimiter + const folders = (data.commonPrefixes || []).map((p) => { + const displayName = prefix ? p.slice(prefix.length) : p; + return { + key: displayName, + size: '-', + modified: '', + }; + }); + + // Items are files at this level + const files = (data.items || []).map((item) => { + const displayName = prefix ? item.name.slice(prefix.length) : item.name; + return { + key: displayName, + size: formatSize(item.size), + modified: item.lastModified, + }; + }); + + const objects = [...folders, ...files]; const columns = [ { key: 'key', header: 'Key' }, diff --git a/src/lib/objects/list.ts b/src/lib/objects/list.ts index f9ff4b0..812a51a 100644 --- a/src/lib/objects/list.ts +++ b/src/lib/objects/list.ts @@ -25,6 +25,7 @@ export default async function listObjects(options: Record) { 'snapshotVersion', 'snapshot', ]); + const source = getOption<'tigris' | 'shadow'>(options, ['source']); const { limit, pageToken } = getPaginationOptions(options); if (!bucketArg) { @@ -40,6 +41,7 @@ export default async function listObjects(options: Record) { const { data, error } = await list({ prefix, ...(snapshotVersion ? { snapshotVersion } : {}), + ...(source ? { source } : {}), ...(limit !== undefined ? { limit } : {}), ...(pageToken ? { paginationToken: pageToken } : {}), config: { diff --git a/src/specs.yaml b/src/specs.yaml index b0cd6d4..fc2c9b6 100644 --- a/src/specs.yaml +++ b/src/specs.yaml @@ -297,6 +297,9 @@ commands: - name: page-token description: Pagination token from a previous request to fetch the next page alias: pt + - name: source + description: List objects from a specific storage source on buckets with shadow migration enabled + options: [tigris, shadow] # mk - name: mk @@ -1236,6 +1239,9 @@ commands: - name: page-token description: Pagination token from a previous request to fetch the next page alias: pt + - name: source + description: List objects from a specific storage source on buckets with shadow migration enabled + options: [tigris, shadow] # get - name: get description: Download an object by key. Prints to stdout by default, or saves to a file with --output diff --git a/src/utils/messages.ts b/src/utils/messages.ts index 5c9359d..e785d4e 100644 --- a/src/utils/messages.ts +++ b/src/utils/messages.ts @@ -174,7 +174,7 @@ export function printDeprecated(message: string): void { */ export function printPaginationHint(paginationToken?: string): void { if (!paginationToken) return; - console.error(`\nNext page: --page-token ${paginationToken}`); + console.error(`\nNext page: --page-token "${paginationToken}"`); } /** From faacf0592d0dd7ece5d96869b0579ee700778407 Mon Sep 17 00:00:00 2001 From: A Ibrahim Date: Tue, 14 Apr 2026 13:06:01 +0200 Subject: [PATCH 2/4] chore: update storage package --- package-lock.json | 8 ++++---- package.json | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/package-lock.json b/package-lock.json index 5a54e20..5a07a52 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,7 @@ "@aws-sdk/credential-providers": "^3.1024.0", "@smithy/shared-ini-file-loader": "^4.4.7", "@tigrisdata/iam": "^2.1.0", - "@tigrisdata/storage": "^3.0.0", + "@tigrisdata/storage": "^3.1.0", "commander": "^14.0.3", "enquirer": "^2.4.1", "jose": "^6.2.2", @@ -4015,9 +4015,9 @@ } }, "node_modules/@tigrisdata/storage": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/@tigrisdata/storage/-/storage-3.0.0.tgz", - "integrity": "sha512-Rhw+aEOpl2bcgDhIymAguX2m178TYdco+lmX+zxYHw+P9jX8v4euwnZwRSb/+YwqmEawhBeapdNkCgIsBIVZ8g==", + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/@tigrisdata/storage/-/storage-3.1.0.tgz", + "integrity": "sha512-oRqFafHt8blowowXZdG6fs2+ud/CA23Q9KOOJBqLfL2vOuJBvwpbcnY3oaRD9tXm8X7JmNtMKKXd1btjvCBcwg==", "license": "MIT", "dependencies": { "@aws-crypto/sha256-js": "^5.2.0", diff --git a/package.json b/package.json index 3229460..00cce0b 100644 --- a/package.json +++ b/package.json @@ -93,7 +93,7 @@ "@aws-sdk/credential-providers": "^3.1024.0", "@smithy/shared-ini-file-loader": "^4.4.7", "@tigrisdata/iam": "^2.1.0", - "@tigrisdata/storage": "^3.0.0", + "@tigrisdata/storage": "^3.1.0", "commander": "^14.0.3", "enquirer": "^2.4.1", "jose": "^6.2.2", From 89cafef481a31fadfcaca90a245a8e54ed48d4b8 Mon Sep 17 00:00:00 2001 From: A Ibrahim Date: Tue, 14 Apr 2026 13:11:25 +0200 Subject: [PATCH 3/4] feat: bucket migrate command --- src/lib/buckets/migrate.ts | 472 +++++++++++++++++++++++++++++++++++++ src/specs.yaml | 20 ++ 2 files changed, 492 insertions(+) create mode 100644 src/lib/buckets/migrate.ts diff --git a/src/lib/buckets/migrate.ts b/src/lib/buckets/migrate.ts new file mode 100644 index 0000000..cae6792 --- /dev/null +++ b/src/lib/buckets/migrate.ts @@ -0,0 +1,472 @@ +import { getStorageConfig } from '@auth/provider.js'; +import type { ListItem } from '@tigrisdata/storage'; +import { + isMigrated, + list, + migrate as scheduleMigration, +} from '@tigrisdata/storage'; +import { executeWithConcurrency } from '@utils/concurrency.js'; +import { failWithError } from '@utils/exit.js'; +import { formatSize } from '@utils/format.js'; +import { msg, printFailure } from '@utils/messages.js'; +import { getOption } from '@utils/options.js'; +import { parseAnyPath } from '@utils/path.js'; + +const context = msg('buckets', 'migrate'); + +/** Max total bytes of in-flight (scheduled but not confirmed) migrations */ +const MAX_IN_FLIGHT_BYTES = 10 * 1024 * 1024 * 1024; // 10 GB + +/** Max concurrent migrate() or isMigrated() calls */ +const CONCURRENCY = 50; + +/** Seconds to wait between isMigrated polling rounds */ +const CHECK_INTERVAL_MS = 5_000; + +/** Batch size for scheduling migrate() calls before checking throttle */ +const SCHEDULE_BATCH_SIZE = 50; + +/** Max consecutive isMigrated failures before marking item as failed */ +const MAX_CHECK_FAILURES = 3; + +interface MigrationItem { + name: string; + size: number; +} + +interface InFlightItem extends MigrationItem { + checkFailures: number; +} + +interface MigrationState { + total: number; + totalBytes: number; + scheduled: number; + confirmed: number; + confirmedBytes: number; + failed: number; + inFlight: InFlightItem[]; + inFlightBytes: number; + errors: Array<{ name: string; error: string }>; + startTime: number; +} + +// --------------------------------------------------------------------------- +// PaginatedCursor: wraps list() with source-based pagination +// --------------------------------------------------------------------------- + +class PaginatedCursor { + private buffer: ListItem[] = []; + private index = 0; + private token: string | undefined; + private _done = false; + private fetched = false; + + constructor( + private bucket: string, + private source: 'tigris' | 'shadow', + private prefix: string | undefined, + private config: Record + ) {} + + get done(): boolean { + return this._done && this.index >= this.buffer.length; + } + + async current(): Promise { + if (this.index < this.buffer.length) { + return this.buffer[this.index]; + } + if (this._done) return null; + await this.fetchPage(); + return this.index < this.buffer.length ? this.buffer[this.index] : null; + } + + advance(): void { + this.index++; + } + + /** Number of items seen so far across all pages */ + get itemsSeen(): number { + return this.fetched ? this.index : 0; + } + + private async fetchPage(): Promise { + if (this._done) return; + + const { data, error } = await list({ + prefix: this.prefix, + source: this.source, + ...(this.token ? { paginationToken: this.token } : {}), + config: { + ...this.config, + bucket: this.bucket, + }, + }); + + if (error) { + throw error; + } + + this.buffer = data.items ?? []; + this.index = 0; + this.token = data.paginationToken; + this.fetched = true; + + if (!data.paginationToken && !data.hasMore) { + this._done = true; + } + } +} + +// --------------------------------------------------------------------------- +// Discovery: sorted merge-diff +// --------------------------------------------------------------------------- + +async function discoverDiff( + bucket: string, + prefix: string | undefined, + config: Record +): Promise { + const shadow = new PaginatedCursor(bucket, 'shadow', prefix, config); + const tigris = new PaginatedCursor(bucket, 'tigris', prefix, config); + + const diff: MigrationItem[] = []; + + let shadowItem = await shadow.current(); + let tigrisItem = await tigris.current(); + + while (shadowItem !== null) { + if (tigrisItem === null) { + // Tigris exhausted — all remaining shadow items need migration + diff.push({ name: shadowItem.name, size: shadowItem.size }); + shadow.advance(); + shadowItem = await shadow.current(); + continue; + } + + if (shadowItem.name < tigrisItem.name) { + // In shadow but not in tigris + diff.push({ name: shadowItem.name, size: shadowItem.size }); + shadow.advance(); + shadowItem = await shadow.current(); + } else if (shadowItem.name > tigrisItem.name) { + // In tigris but not in shadow — skip + tigris.advance(); + tigrisItem = await tigris.current(); + } else { + // In both — already migrated + shadow.advance(); + tigris.advance(); + shadowItem = await shadow.current(); + tigrisItem = await tigris.current(); + } + } + + return diff; +} + +// --------------------------------------------------------------------------- +// Migration loop +// --------------------------------------------------------------------------- + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function formatElapsed(ms: number): string { + const totalSeconds = Math.floor(ms / 1000); + const minutes = Math.floor(totalSeconds / 60); + const seconds = totalSeconds % 60; + if (minutes === 0) return `${seconds}s`; + return `${minutes}m ${seconds}s`; +} + +function printProgress(state: MigrationState, bucket: string): void { + if (!process.stderr.isTTY || globalThis.__TIGRIS_JSON_MODE) return; + + const elapsed = formatElapsed(Date.now() - state.startTime); + const line = + `\rMigrating ${bucket}: ` + + `${state.confirmed.toLocaleString()} / ${state.total.toLocaleString()} objects | ` + + `${formatSize(state.confirmedBytes)} / ${formatSize(state.totalBytes)} | ` + + `In-flight: ${formatSize(state.inFlightBytes)} | ` + + `${elapsed}`; + + process.stderr.write(line + ' '.repeat(Math.max(0, 100 - line.length))); +} + +function clearProgress(): void { + if (!process.stderr.isTTY || globalThis.__TIGRIS_JSON_MODE) return; + process.stderr.write('\r' + ' '.repeat(100) + '\r'); +} + +async function flushScheduleBatch( + batch: MigrationItem[], + state: MigrationState, + config: Record, + bucket: string +): Promise { + const results = await executeWithConcurrency( + batch.map( + (item) => () => + scheduleMigration(item.name, { + config: { ...config, bucket }, + }) + ), + CONCURRENCY + ); + + for (let i = 0; i < results.length; i++) { + const result = results[i]; + const item = batch[i]; + + if (result.error) { + state.failed++; + state.errors.push({ + name: item.name, + error: result.error.message, + }); + } else { + state.inFlight.push({ ...item, checkFailures: 0 }); + state.inFlightBytes += item.size; + state.scheduled++; + } + } +} + +async function drainCompleted( + state: MigrationState, + config: Record, + bucket: string +): Promise { + if (state.inFlight.length === 0) return; + + // Check oldest items first (FIFO), up to CONCURRENCY at a time + const toCheck = state.inFlight.slice(0, CONCURRENCY); + + const results = await executeWithConcurrency( + toCheck.map( + (item) => () => + isMigrated(item.name, { + config: { ...config, bucket }, + }) + ), + CONCURRENCY + ); + + const completedKeys = new Set(); + for (let i = 0; i < results.length; i++) { + const result = results[i]; + const item = toCheck[i]; + + if (result.error) { + item.checkFailures++; + if (item.checkFailures >= MAX_CHECK_FAILURES) { + completedKeys.add(item.name); + state.failed++; + state.inFlightBytes -= item.size; + state.errors.push({ + name: item.name, + error: `Failed to verify migration status after ${MAX_CHECK_FAILURES} attempts`, + }); + } + } else if (result.data) { + completedKeys.add(item.name); + state.confirmed++; + state.confirmedBytes += item.size; + state.inFlightBytes -= item.size; + } + } + + if (completedKeys.size > 0) { + state.inFlight = state.inFlight.filter( + (item) => !completedKeys.has(item.name) + ); + } + + // If nothing completed, wait before next check + if (completedKeys.size === 0) { + await sleep(CHECK_INTERVAL_MS); + } +} + +// --------------------------------------------------------------------------- +// Main command +// --------------------------------------------------------------------------- + +export default async function migrate( + options: Record +): Promise { + const pathString = getOption(options, ['path']); + + if (!pathString) { + failWithError(context, 'Bucket name or path is required'); + } + + const { bucket, path: prefix } = parseAnyPath(pathString); + + if (!bucket) { + failWithError(context, 'Invalid path'); + } + + const config = await getStorageConfig(); + + // Handle SIGINT gracefully + let interrupted = false; + const sigintHandler = () => { + interrupted = true; + }; + process.on('SIGINT', sigintHandler); + + try { + // Phase 1: Discovery + if (process.stderr.isTTY && !globalThis.__TIGRIS_JSON_MODE) { + process.stderr.write('Discovering objects to migrate...'); + } + + let diff: MigrationItem[]; + try { + diff = await discoverDiff(bucket, prefix, config); + } catch (err) { + clearProgress(); + failWithError(context, err); + } + + clearProgress(); + + if (diff.length === 0) { + if (process.stderr.isTTY && !globalThis.__TIGRIS_JSON_MODE) { + console.error('All objects are already migrated.'); + } + if (globalThis.__TIGRIS_JSON_MODE) { + console.log( + JSON.stringify({ + action: 'migrate', + bucket, + toMigrate: 0, + confirmed: 0, + failed: 0, + }) + ); + } + return; + } + + const totalBytes = diff.reduce((sum, item) => sum + item.size, 0); + + if (process.stderr.isTTY && !globalThis.__TIGRIS_JSON_MODE) { + console.error( + `Found ${diff.length.toLocaleString()} objects to migrate (${formatSize(totalBytes)})` + ); + } + + // Phase 2: Migration loop + const state: MigrationState = { + total: diff.length, + totalBytes, + scheduled: 0, + confirmed: 0, + confirmedBytes: 0, + failed: 0, + inFlight: [], + inFlightBytes: 0, + errors: [], + startTime: Date.now(), + }; + + let batch: MigrationItem[] = []; + + for (const item of diff) { + if (interrupted) break; + + // Throttle: wait until capacity is available + while ( + state.inFlightBytes + item.size > MAX_IN_FLIGHT_BYTES && + state.inFlight.length > 0 && + !interrupted + ) { + await drainCompleted(state, config, bucket); + printProgress(state, bucket); + } + + if (interrupted) break; + + batch.push(item); + + if (batch.length >= SCHEDULE_BATCH_SIZE) { + await flushScheduleBatch(batch, state, config, bucket); + batch = []; + printProgress(state, bucket); + } + } + + // Flush remaining batch + if (batch.length > 0 && !interrupted) { + await flushScheduleBatch(batch, state, config, bucket); + printProgress(state, bucket); + } + + // Phase 3: Drain all remaining in-flight items + while (state.inFlight.length > 0 && !interrupted) { + await drainCompleted(state, config, bucket); + printProgress(state, bucket); + } + + clearProgress(); + + // Summary + const elapsed = formatElapsed(Date.now() - state.startTime); + + if (globalThis.__TIGRIS_JSON_MODE) { + console.log( + JSON.stringify({ + action: 'migrate', + bucket, + toMigrate: state.total, + scheduled: state.scheduled, + confirmed: state.confirmed, + failed: state.failed, + elapsed, + ...(state.errors.length > 0 + ? { errors: state.errors.slice(0, 20) } + : {}), + }) + ); + } + + if (interrupted) { + console.error( + `\nInterrupted. ${state.confirmed} confirmed, ${state.inFlight.length} still in-flight, ${state.total - state.scheduled} not yet scheduled.` + ); + process.exit(1); + } + + if (state.failed > 0) { + printFailure( + context, + `${state.failed} object(s) failed to migrate. ${state.confirmed} migrated successfully in ${elapsed}.` + ); + if ( + process.stderr.isTTY && + !globalThis.__TIGRIS_JSON_MODE && + state.errors.length > 0 + ) { + const shown = state.errors.slice(0, 10); + for (const err of shown) { + console.error(` ${err.name}: ${err.error}`); + } + if (state.errors.length > 10) { + console.error(` ... and ${state.errors.length - 10} more`); + } + } + process.exit(1); + } + + console.error( + `\nMigration complete: ${state.confirmed.toLocaleString()} object(s) migrated (${formatSize(state.confirmedBytes)}) in ${elapsed}` + ); + } finally { + process.removeListener('SIGINT', sigintHandler); + } +} diff --git a/src/specs.yaml b/src/specs.yaml index fc2c9b6..1205e57 100644 --- a/src/specs.yaml +++ b/src/specs.yaml @@ -943,6 +943,26 @@ commands: - name: disable description: Disable migration and clear all migration settings type: flag + # migrate + - name: migrate + description: Actively migrate all objects from a shadow bucket to Tigris by scheduling server-side migration for unmigrated objects + examples: + - "tigris buckets migrate my-bucket" + - "tigris buckets migrate my-bucket/images/" + - "tigris buckets migrate t3://my-bucket/prefix/" + messages: + onStart: '' + onSuccess: '' + onFailure: 'Migration failed' + arguments: + - name: path + description: Bucket name or path with optional prefix. Supports t3:// and tigris:// prefixes + type: positional + required: true + examples: + - my-bucket + - my-bucket/images/ + - t3://my-bucket/prefix/ # set-transition - name: set-transition description: Configure a lifecycle transition rule on a bucket. Automatically move objects to a different storage class after a number of days or on a specific date From 62ba1e0d64541ca03a37c48b93a9c0e8bc5e68d1 Mon Sep 17 00:00:00 2001 From: A Ibrahim Date: Tue, 14 Apr 2026 13:35:47 +0200 Subject: [PATCH 4/4] fix: address PR review feedback - Remove unused itemsSeen getter and fetched field from PaginatedCursor - Filter out empty keys in ls from S3 folder marker objects Co-Authored-By: Claude Opus 4.6 (1M context) --- src/lib/buckets/migrate.ts | 7 ------- src/lib/ls.ts | 20 +++++++++++--------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/src/lib/buckets/migrate.ts b/src/lib/buckets/migrate.ts index cae6792..c04db27 100644 --- a/src/lib/buckets/migrate.ts +++ b/src/lib/buckets/migrate.ts @@ -60,7 +60,6 @@ class PaginatedCursor { private index = 0; private token: string | undefined; private _done = false; - private fetched = false; constructor( private bucket: string, @@ -86,11 +85,6 @@ class PaginatedCursor { this.index++; } - /** Number of items seen so far across all pages */ - get itemsSeen(): number { - return this.fetched ? this.index : 0; - } - private async fetchPage(): Promise { if (this._done) return; @@ -111,7 +105,6 @@ class PaginatedCursor { this.buffer = data.items ?? []; this.index = 0; this.token = data.paginationToken; - this.fetched = true; if (!data.paginationToken && !data.hasMore) { this._done = true; diff --git a/src/lib/ls.ts b/src/lib/ls.ts index 34a581e..90cf3ec 100644 --- a/src/lib/ls.ts +++ b/src/lib/ls.ts @@ -98,15 +98,17 @@ export default async function ls(options: Record) { }; }); - // Items are files at this level - const files = (data.items || []).map((item) => { - const displayName = prefix ? item.name.slice(prefix.length) : item.name; - return { - key: displayName, - size: formatSize(item.size), - modified: item.lastModified, - }; - }); + // Items are files at this level (filter out empty keys from folder marker objects) + const files = (data.items || []) + .map((item) => { + const displayName = prefix ? item.name.slice(prefix.length) : item.name; + return { + key: displayName, + size: formatSize(item.size), + modified: item.lastModified, + }; + }) + .filter((item) => item.key !== ''); const objects = [...folders, ...files];