Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
370 changes: 370 additions & 0 deletions nitin_docs/index_migrator/99_tickets.md

Large diffs are not rendered by default.

79 changes: 64 additions & 15 deletions redisvl/cli/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
MigrationValidator,
)
from redisvl.migration.utils import (
estimate_disk_space,
list_indexes,
load_migration_plan,
load_yaml,
Expand All @@ -36,6 +37,7 @@ class Migrate:
"\tplan Generate a migration plan for a document-preserving drop/recreate migration",
"\twizard Interactively build a migration plan and schema patch",
"\tapply Execute a reviewed drop/recreate migration plan (use --async for large migrations)",
"\testimate Estimate disk space required for a migration plan (dry-run, no mutations)",
"\tvalidate Validate a completed migration plan against the live index",
"",
"Batch Commands:",
Expand Down Expand Up @@ -212,7 +214,8 @@ def apply(self):
parser = argparse.ArgumentParser(
usage=(
"rvl migrate apply --plan <migration_plan.yaml> "
"[--async] [--report-out <migration_report.yaml>]"
"[--async] [--resume <checkpoint.yaml>] "
"[--report-out <migration_report.yaml>]"
)
)
parser.add_argument("--plan", help="Path to migration_plan.yaml", required=True)
Expand All @@ -222,6 +225,12 @@ def apply(self):
help="Use async executor (recommended for large migrations with quantization)",
action="store_true",
)
parser.add_argument(
"--resume",
dest="checkpoint_path",
help="Path to quantization checkpoint file for crash-safe resume",
default=None,
)
parser.add_argument(
"--report-out",
help="Path to write migration_report.yaml",
Expand All @@ -243,31 +252,61 @@ def apply(self):
redis_url = create_redis_url(args)
plan = load_migration_plan(args.plan)

# Print disk space estimate for quantization migrations
disk_estimate = estimate_disk_space(plan)
if disk_estimate.has_quantization:
print(f"\n{disk_estimate.summary()}\n")

checkpoint_path = args.checkpoint_path
if args.use_async:
report = asyncio.run(
self._apply_async(plan, redis_url, args.query_check_file)
self._apply_async(
plan, redis_url, args.query_check_file, checkpoint_path
)
)
else:
report = self._apply_sync(plan, redis_url, args.query_check_file)
report = self._apply_sync(
plan, redis_url, args.query_check_file, checkpoint_path
)

write_migration_report(report, args.report_out)
if args.benchmark_out:
write_benchmark_report(report, args.benchmark_out)
self._print_report_summary(args.report_out, report, args.benchmark_out)

def _apply_sync(self, plan, redis_url: str, query_check_file: Optional[str]):
def estimate(self):
"""Estimate disk space required for a migration plan (dry-run)."""
parser = argparse.ArgumentParser(
usage="rvl migrate estimate --plan <migration_plan.yaml>"
)
parser.add_argument("--plan", help="Path to migration_plan.yaml", required=True)
args = parser.parse_args(sys.argv[3:])

plan = load_migration_plan(args.plan)
disk_estimate = estimate_disk_space(plan)
print(disk_estimate.summary())

def _apply_sync(
self,
plan,
redis_url: str,
query_check_file: Optional[str],
checkpoint_path: Optional[str] = None,
):
"""Execute migration synchronously."""
executor = MigrationExecutor()

print(f"\nApplying migration to '{plan.source.index_name}'...")

def progress_callback(step: str, detail: Optional[str]) -> None:
step_labels = {
"drop": "[1/5] Drop index",
"quantize": "[2/5] Quantize vectors",
"create": "[3/5] Create index",
"index": "[4/5] Re-indexing",
"validate": "[5/5] Validate",
"enumerate": "[1/6] Enumerate keys",
"bgsave": "[2/6] BGSAVE snapshot",
"drop": "[3/6] Drop index",
"quantize": "[4/6] Quantize vectors",
"create": "[5/6] Create index",
"index": "[6/6] Re-indexing",
"validate": "Validate",
}
label = step_labels.get(step, step)
if detail and not detail.startswith("done"):
Expand All @@ -280,24 +319,33 @@ def progress_callback(step: str, detail: Optional[str]) -> None:
redis_url=redis_url,
query_check_file=query_check_file,
progress_callback=progress_callback,
checkpoint_path=checkpoint_path,
)

self._print_apply_result(report)
return report

async def _apply_async(self, plan, redis_url: str, query_check_file: Optional[str]):
async def _apply_async(
self,
plan,
redis_url: str,
query_check_file: Optional[str],
checkpoint_path: Optional[str] = None,
):
"""Execute migration asynchronously (non-blocking for large quantization jobs)."""
executor = AsyncMigrationExecutor()

print(f"\nApplying migration to '{plan.source.index_name}' (async mode)...")

def progress_callback(step: str, detail: Optional[str]) -> None:
step_labels = {
"drop": "[1/5] Drop index",
"quantize": "[2/5] Quantize vectors",
"create": "[3/5] Create index",
"index": "[4/5] Re-indexing",
"validate": "[5/5] Validate",
"enumerate": "[1/6] Enumerate keys",
"bgsave": "[2/6] BGSAVE snapshot",
"drop": "[3/6] Drop index",
"quantize": "[4/6] Quantize vectors",
"create": "[5/6] Create index",
"index": "[6/6] Re-indexing",
"validate": "Validate",
}
label = step_labels.get(step, step)
if detail and not detail.startswith("done"):
Expand All @@ -310,6 +358,7 @@ def progress_callback(step: str, detail: Optional[str]) -> None:
redis_url=redis_url,
query_check_file=query_check_file,
progress_callback=progress_callback,
checkpoint_path=checkpoint_path,
)

self._print_apply_result(report)
Expand Down
2 changes: 2 additions & 0 deletions redisvl/migration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
BatchPlan,
BatchReport,
BatchState,
DiskSpaceEstimate,
FieldRename,
MigrationPlan,
MigrationReport,
Expand All @@ -25,6 +26,7 @@

__all__ = [
# Sync
"DiskSpaceEstimate",
"MigrationExecutor",
"MigrationPlan",
"MigrationPlanner",
Expand Down
Loading
Loading