Crash-Safe Quantization and Disk Space Estimation#548
Crash-Safe Quantization and Disk Space Estimation#548nkanu17 wants to merge 4 commits intofeat/index-migrator-v0-lcfrom
Conversation
- Add reliability.py: idempotent dtype detection, checkpoint persistence, BGSAVE safety net, bounded undo buffer with async rollback - Add DiskSpaceEstimate/VectorFieldEstimate models and estimate_disk_space() - Wire reliability into both sync and async executors (_quantize_vectors) - Add --resume flag to rvl migrate apply for checkpoint-based resume - Add rvl migrate estimate subcommand for pre-migration cost analysis - Update progress labels to 6 steps (enumerate, bgsave, drop, quantize, create, re-index) - Planner returns dims in datatype change metadata for idempotent detection - 39 new unit tests (90 total migration tests passing)
|
@codex review |
🛡️ Jit Security Scan Results✅ No security findings were detected in this PR
Security scan by Jit
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: bf3773b992
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Pull request overview
This PR extends the RedisVL index migrator with (1) a pre-migration disk space estimator and (2) crash-safe, resumable vector dtype re-encoding during drop/recreate migrations.
Changes:
- Add a pure disk-space estimation API (
estimate_disk_space) plus CLI commandrvl migrate estimateand pre-apply estimate output. - Introduce crash-safety utilities (dtype detection, undo buffer, checkpointing, BGSAVE safety net) and wire them into sync/async executors with a new
--resumecheckpoint flag. - Update schema validation comparison to exclude non-structural FT.INFO attributes via
strip_excluded.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
tests/unit/test_async_migration_executor.py |
Adds unit tests for disk space estimation and reliability helpers (dtype detection, checkpointing, undo buffer, BGSAVE). |
redisvl/migration/validation.py |
Switch schema validation to use strip_excluded in schema comparison. |
redisvl/migration/async_validation.py |
Async equivalent of the strip_excluded schema validation change. |
redisvl/migration/utils.py |
Renames/expands schema canonicalization flags and adds estimate_disk_space() implementation. |
redisvl/migration/models.py |
Adds DiskSpaceEstimate (+ helpers/constants) and attaches it to MigrationReport. |
redisvl/migration/reliability.py |
New reliability module: dtype detection, checkpoint model, BGSAVE waiters, and bounded undo buffer. |
redisvl/migration/planner.py |
Planner now includes dims in datatype-change output for idempotent detection. |
redisvl/migration/executor.py |
Sync executor: adds disk estimate, BGSAVE safety snapshot, checkpoint/resume support, and per-batch rollback for quantization. |
redisvl/migration/async_executor.py |
Async executor: mirrors sync changes (estimate, BGSAVE, checkpoint/resume, rollback). |
redisvl/migration/__init__.py |
Exposes DiskSpaceEstimate in the migration package exports. |
redisvl/cli/migrate.py |
Adds estimate command and --resume flag; updates progress labels; prints disk estimate before apply. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: bf3773b992
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
- Fix bfloat16/uint8 idempotent detection using dtype byte-width families so float16<->bfloat16 and int8<->uint8 are treated as equivalent - Validate checkpoint index_name matches source index before resuming - Force checkpoint_path to the load path, not the stored value - Record all batch keys in checkpoint (including skipped) to avoid re-scanning on resume - Fix misleading AOF wording when aof_enabled is not set
|
@codex review |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 30cc6c1052
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
…uracy - Fix #2: docs_processed increments by full batch size (including skipped) so progress reaches 100% even when vectors are already quantized - Fix #4: is_already_quantized prevents skipping same-width dtype conversions (e.g. float16->bfloat16) since encodings differ - Fix #5: apply() detects checkpoint on resume and bypasses index validation, BGSAVE, field renames, drop, and key renames (all already done pre-crash); enumerates keys via SCAN with plan prefix instead - Add IM-16 (auto-detect AOF) and IM-17 (compact checkpoint) to backlog
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e8181a142d
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| checkpoint = QuantizationCheckpoint( | ||
| index_name=source_index.name, | ||
| total_keys=total_keys, | ||
| checkpoint_path=checkpoint_path, | ||
| ) |
There was a problem hiding this comment.
Save checkpoint before first quantization batch
When --resume starts a fresh run, this creates an in-memory QuantizationCheckpoint but does not persist it until after a batch succeeds (or an in-loop Exception occurs). If the process is interrupted after the source index is dropped but before that first save (for example SIGKILL/node crash), no checkpoint file exists, the next run won’t enter resume mode, and recovery can fail because the source index is already gone; persisting an initial checkpoint immediately on creation avoids this gap (the async quantization path has the same issue).
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| prefix = prefixes[0] if prefixes else "" | ||
| # If a prefix change was part of the migration, keys | ||
| # were already renamed before the crash, so scan with | ||
| # the new prefix instead. | ||
| if has_prefix_change and rename_ops.change_prefix: | ||
| prefix = rename_ops.change_prefix | ||
| cursor: int = 0 | ||
| while True: | ||
| cursor, scanned = client.scan( # type: ignore[misc] | ||
| cursor=cursor, | ||
| match=f"{prefix}*" if prefix else "*", | ||
| count=1000, | ||
| ) | ||
| for k in scanned: | ||
| keys_to_process.append( | ||
| k.decode() if isinstance(k, bytes) else str(k) | ||
| ) |
There was a problem hiding this comment.
Resume path enumerates keys via SCAN using match=f"{prefix}*", which can (a) include unrelated keys when the index prefix in plans doesn’t include the key separator (e.g. test vs test:) and (b) return duplicate keys (SCAN is allowed to repeat elements). That can cause quantization to touch keys outside the index or over-count progress/checkpointing. Consider building the SCAN pattern using plan.source.keyspace.key_separator (same logic as planner sampling) and de-duping keys before processing.
| prefix = prefixes[0] if prefixes else "" | |
| # If a prefix change was part of the migration, keys | |
| # were already renamed before the crash, so scan with | |
| # the new prefix instead. | |
| if has_prefix_change and rename_ops.change_prefix: | |
| prefix = rename_ops.change_prefix | |
| cursor: int = 0 | |
| while True: | |
| cursor, scanned = client.scan( # type: ignore[misc] | |
| cursor=cursor, | |
| match=f"{prefix}*" if prefix else "*", | |
| count=1000, | |
| ) | |
| for k in scanned: | |
| keys_to_process.append( | |
| k.decode() if isinstance(k, bytes) else str(k) | |
| ) | |
| scan_prefix = prefixes[0] if prefixes else "" | |
| # If a prefix change was part of the migration, keys | |
| # were already renamed before the crash, so scan with | |
| # the new prefix instead. | |
| if has_prefix_change and rename_ops.change_prefix: | |
| scan_prefix = rename_ops.change_prefix | |
| key_separator = getattr(plan.source.keyspace, "key_separator", "") | |
| if scan_prefix: | |
| # Build SCAN pattern using the key separator to avoid | |
| # matching unrelated keys that merely share a prefix. | |
| if key_separator and not scan_prefix.endswith(key_separator): | |
| match_pattern = f"{scan_prefix}{key_separator}*" | |
| else: | |
| match_pattern = f"{scan_prefix}*" | |
| else: | |
| match_pattern = "*" | |
| cursor: int = 0 | |
| seen_keys = set() | |
| while True: | |
| cursor, scanned = client.scan( # type: ignore[misc] | |
| cursor=cursor, | |
| match=match_pattern, | |
| count=1000, | |
| ) | |
| for k in scanned: | |
| key_str = k.decode() if isinstance(k, bytes) else str(k) | |
| if key_str in seen_keys: | |
| continue | |
| seen_keys.add(key_str) | |
| keys_to_process.append(key_str) |
| cursor: int = 0 | ||
| while True: | ||
| cursor, scanned = await client.scan( # type: ignore[misc] | ||
| cursor=cursor, | ||
| match=f"{prefix}*" if prefix else "*", | ||
| count=1000, | ||
| ) | ||
| for k in scanned: | ||
| keys_to_process.append( | ||
| k.decode() if isinstance(k, bytes) else str(k) | ||
| ) |
There was a problem hiding this comment.
Resume path enumerates keys via SCAN using match=f"{prefix}*", which can (a) include unrelated keys when the index prefix in plans doesn’t include the key separator and (b) return duplicate keys (SCAN is allowed to repeat elements). That can cause quantization to touch keys outside the index or over-count progress/checkpointing. Consider building the SCAN pattern using plan.source.keyspace.key_separator and de-duping keys before processing.
| cursor: int = 0 | |
| while True: | |
| cursor, scanned = await client.scan( # type: ignore[misc] | |
| cursor=cursor, | |
| match=f"{prefix}*" if prefix else "*", | |
| count=1000, | |
| ) | |
| for k in scanned: | |
| keys_to_process.append( | |
| k.decode() if isinstance(k, bytes) else str(k) | |
| ) | |
| key_separator = plan.source.keyspace.key_separator | |
| cursor: int = 0 | |
| # Track seen keys to avoid duplicates from SCAN. | |
| seen_keys: set[str] = set() | |
| while True: | |
| cursor, scanned = await client.scan( # type: ignore[misc] | |
| cursor=cursor, | |
| match=f"{prefix}{key_separator}*" if prefix else "*", | |
| count=1000, | |
| ) | |
| for k in scanned: | |
| key_str = k.decode() if isinstance(k, bytes) else str(k) | |
| if key_str in seen_keys: | |
| continue | |
| seen_keys.add(key_str) | |
| keys_to_process.append(key_str) |
| for field_name, change in datatype_changes.items(): | ||
| field_data: bytes | None = client.hget(key, field_name) # type: ignore[misc,assignment] | ||
| if not field_data: | ||
| continue | ||
|
|
There was a problem hiding this comment.
Quantization reads/writes use HGET/HSET unconditionally. For storage_type == "json" migrations, vectors won’t be fetched/updated (they live in JSON paths), but the code will still mark keys as processed/checkpointed and then recreate the index with the new dtype. Quantization needs a JSON branch (JSON.GET/JSON.SET + rollback) analogous to the field-rename logic.
| for key in batch: | ||
| for field_name, change in datatype_changes.items(): | ||
| field_data: bytes | None = await client.hget(key, field_name) # type: ignore[misc,assignment] | ||
| if not field_data: | ||
| continue |
There was a problem hiding this comment.
Quantization reads/writes use HGET/HSET unconditionally. For storage_type == "json" migrations, vectors won’t be fetched/updated (they live in JSON paths), but keys will still be checkpointed as processed. Quantization needs a JSON branch (JSON.GET/JSON.SET + rollback) when plan.source.keyspace.storage_type == "json".
| index_name: str | ||
| total_keys: int | ||
| completed_keys: int = 0 | ||
| completed_batches: int = 0 | ||
| last_batch_keys: List[str] = Field(default_factory=list) | ||
| processed_keys: List[str] = Field(default_factory=list) | ||
| status: str = "in_progress" |
There was a problem hiding this comment.
QuantizationCheckpoint stores every processed key in processed_keys, and save() YAML-dumps the entire growing list after every batch. For large indexes this becomes very slow (O(n^2) total serialization), creates huge checkpoint files, and can dominate migration time. Consider a more compact checkpoint format (e.g., append-only log of completed batches/keys, periodic compaction, or tracking completed batch indices when key ordering is stable) to keep per-batch checkpoint writes bounded.
| # Compute disk space estimate before any mutations | ||
| disk_estimate = estimate_disk_space(plan) | ||
| if disk_estimate.has_quantization: | ||
| logger.info( | ||
| "Disk space estimate: RDB ~%d bytes, AOF ~%d bytes, total ~%d bytes", | ||
| disk_estimate.rdb_snapshot_disk_bytes, | ||
| disk_estimate.aof_growth_bytes, | ||
| disk_estimate.total_new_disk_bytes, | ||
| ) | ||
| report.disk_space_estimate = disk_estimate |
There was a problem hiding this comment.
estimate_disk_space() supports aof_enabled, but executors always call it with the default False, so the estimate attached to MigrationReport never includes AOF growth even when running against a live Redis with AOF enabled. If the goal is to inform operators up-front, consider detecting AOF status from the live connection (e.g., CONFIG GET/INFO) and passing the real value, with a graceful fallback when not permitted.
| # Compute disk space estimate before any mutations | ||
| disk_estimate = estimate_disk_space(plan) | ||
| if disk_estimate.has_quantization: | ||
| logger.info( | ||
| "Disk space estimate: RDB ~%d bytes, AOF ~%d bytes, total ~%d bytes", | ||
| disk_estimate.rdb_snapshot_disk_bytes, | ||
| disk_estimate.aof_growth_bytes, | ||
| disk_estimate.total_new_disk_bytes, | ||
| ) | ||
| report.disk_space_estimate = disk_estimate |
There was a problem hiding this comment.
estimate_disk_space() supports aof_enabled, but the async executor always calls it with the default False, so the estimate never includes AOF growth even when running against a live Redis with AOF enabled. Consider detecting AOF status from the live connection and passing the real value (with a fallback if CONFIG/INFO is blocked).
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
|
|
||
| # Count all keys in the batch as processed (including skipped), | ||
| # since they are recorded in the checkpoint and won't be retried. | ||
| docs_processed += len(batch) |
There was a problem hiding this comment.
Quantize return value counts all keys, not converted ones
Medium Severity
docs_processed += len(batch) counts every key in the batch, including keys with no vector data and keys skipped by is_already_quantized. The old code used docs_processed += len(keys_updated_in_batch), counting only keys where a vector field was actually re-encoded. The return value feeds into report.warnings as "Re-encoded {docs_quantized} documents" and the log message "Quantized {docs_processed} documents", both of which now report inflated numbers. The keys_updated_in_batch set is still computed but only used for the pipe.execute() gate, not for counting.


Note:
feat/index-migrator-v0-lc-checkpoint-backupintofeat/index-migrator-v0-lcSummary
Adds two capabilities to the index migrator: pre-migration disk space estimation and crash-safe vector re-encoding. Together these let operators understand resource costs upfront and safely resume interrupted quantization without data loss.
Feature Details
1. Pre-Migration Disk Space Estimation
Pure calculation from the migration plan (no Redis mutations). Shows peak memory, RDB delta, and AOF growth before the operator commits to a migration.
Output example:
2. Crash-Safe Quantization
Four reliability mechanisms protect vector re-encoding during migration:
Idempotent Re-Encoding --
is_already_quantized()inspects vector byte length against expected dimensions. If the vector already matches the target dtype, the key is skipped. This makes partial reruns safe without double-converting data.Bounded Undo Buffer -- Tracks only the current batch's original bytes in memory. On batch failure, rolls back via Redis pipeline, then clears. Never holds more than one batch (500 keys) worth of data.
Atomic Checkpoints -- Uses
os.replace()for crash-safe writes. The checkpoint records all processed keys soget_remaining_keys()filters them out on resume. Saved after every successful batch and on failure.BGSAVE Safety Net -- Triggers
BGSAVEand pollsrdb_bgsave_in_progressuntil the snapshot completes before the index drop. This gives operators a recovery point if something goes wrong during the migration.3. CLI Changes
New
--resumeflag onrvl migrate apply:Updated progress labels (was 5 steps, now 6):
Design Decisions
dims in planner output -- The planner now returns dimension count alongside source/target dtype. This is required for
is_already_quantized()to calculate expected byte lengths without an extra Redis call per key.Separate async_rollback method -- The sync
rollback(pipe)callspipe.execute()which returns a coroutine in async contexts. Rather than makingrollback()async-aware, a separateasync_rollback(pipe)properly awaits the pipeline. This avoids changing the sync API.Checkpoint lives outside the plan -- The checkpoint file is user-specified via
--resumerather than embedded in the migration plan. This keeps plans stateless and reusable.No wizard changes -- The wizard only builds plans. All reliability logic lives in the execution path.
Tests
39 new unit tests (no Redis required):
Test coverage:
test_estimate_*(7 tests) -- byte calculations, AOF, JSON storage, format helpertest_detect_dtype_*(5 tests) -- float64/32/16, int8, unknown lengthtest_is_already_quantized_*(3 tests) -- match, mismatch, source-matches-currenttest_checkpoint_*(4 tests) -- round-trip, remaining keys, mark complete, missing filetest_bgsave_*(2 tests) -- immediate completion, polling looptest_undo_buffer_*(5 tests) -- store/rollback, clear, empty rollback, sizetest_async_rollback_*(2 tests) -- awaited execution, empty bufferDiff Stats
Architecture Diagram
Note
High Risk
High risk because it changes migration execution for quantization (adds BGSAVE, checkpoint/resume, rollback) and can affect production data mutation and recovery paths.
Overview
Adds a new
rvl migrate estimatecommand andestimate_disk_space()helper that computes RDB snapshot, optional AOF growth, and post-migration memory savings for datatype-changing (quantization) migration plans, and records the estimate onMigrationReport.Makes vector re-encoding crash-safe by introducing
redisvl/migration/reliability.py(idempotent dtype detection, atomic checkpointing, bounded per-batch undo/rollback, and sync/asyncBGSAVEwait) and wiring it into both sync and async executors via--resumecheckpoint support (including resume behavior when the source index was already dropped).Tightens schema validation comparisons by renaming “unreliable” attrs to explicitly excluded attrs and switching validators to
strip_excluded=True, and extends planner datatype-change output to includedimsfor idempotent detection; adds extensive unit tests for estimation and reliability utilities.Written by Cursor Bugbot for commit e8181a1. This will update automatically on new commits. Configure here.