Skip to content

Crash-Safe Quantization and Disk Space Estimation#548

Open
nkanu17 wants to merge 4 commits intofeat/index-migrator-v0-lcfrom
feat/index-migrator-v0-lc-checkpoint-backup
Open

Crash-Safe Quantization and Disk Space Estimation#548
nkanu17 wants to merge 4 commits intofeat/index-migrator-v0-lcfrom
feat/index-migrator-v0-lc-checkpoint-backup

Conversation

@nkanu17
Copy link
Copy Markdown
Collaborator

@nkanu17 nkanu17 commented Mar 31, 2026

Note:feat/index-migrator-v0-lc-checkpoint-backup into feat/index-migrator-v0-lc

Summary

Adds two capabilities to the index migrator: pre-migration disk space estimation and crash-safe vector re-encoding. Together these let operators understand resource costs upfront and safely resume interrupted quantization without data loss.

Feature Details

1. Pre-Migration Disk Space Estimation

Pure calculation from the migration plan (no Redis mutations). Shows peak memory, RDB delta, and AOF growth before the operator commits to a migration.

rvl migrate estimate --plan migration_plan.yaml

Output example:

Disk Space Estimate for 'my_index' (100,000 docs)
  embedding: FLOAT32 -> FLOAT16 (12,288 -> 6,144 bytes/doc)
  Peak memory during migration: 1.14 GB
  Final memory savings: 585.94 MB (50.0%)
  RDB size delta: -556.64 MB
  AOF growth (if enabled): 596.05 MB

2. Crash-Safe Quantization

Four reliability mechanisms protect vector re-encoding during migration:

Idempotent Re-Encoding -- is_already_quantized() inspects vector byte length against expected dimensions. If the vector already matches the target dtype, the key is skipped. This makes partial reruns safe without double-converting data.

Bounded Undo Buffer -- Tracks only the current batch's original bytes in memory. On batch failure, rolls back via Redis pipeline, then clears. Never holds more than one batch (500 keys) worth of data.

Atomic Checkpoints -- Uses os.replace() for crash-safe writes. The checkpoint records all processed keys so get_remaining_keys() filters them out on resume. Saved after every successful batch and on failure.

BGSAVE Safety Net -- Triggers BGSAVE and polls rdb_bgsave_in_progress until the snapshot completes before the index drop. This gives operators a recovery point if something goes wrong during the migration.

3. CLI Changes

New --resume flag on rvl migrate apply:

# First run (creates checkpoint automatically)
rvl migrate apply --plan migration_plan.yaml --resume quantize_checkpoint.yaml

# After crash or interrupt (same command resumes from where it left off)
rvl migrate apply --plan migration_plan.yaml --resume quantize_checkpoint.yaml

Updated progress labels (was 5 steps, now 6):

  [1/6] Enumerate keys: 100,000 keys
  [2/6] BGSAVE snapshot: done
  [3/6] Drop index: done
  [4/6] Quantize vectors: 50,000/100,000 docs
  [5/6] Create index: done
  [6/6] Re-indexing: done

Design Decisions

  1. dims in planner output -- The planner now returns dimension count alongside source/target dtype. This is required for is_already_quantized() to calculate expected byte lengths without an extra Redis call per key.

  2. Separate async_rollback method -- The sync rollback(pipe) calls pipe.execute() which returns a coroutine in async contexts. Rather than making rollback() async-aware, a separate async_rollback(pipe) properly awaits the pipeline. This avoids changing the sync API.

  3. Checkpoint lives outside the plan -- The checkpoint file is user-specified via --resume rather than embedded in the migration plan. This keeps plans stateless and reusable.

  4. No wizard changes -- The wizard only builds plans. All reliability logic lives in the execution path.

Tests

39 new unit tests (no Redis required):

# Run all migration tests
python -m pytest tests/unit/test_async_migration_executor.py \
  tests/unit/test_migration_planner.py \
  tests/unit/test_batch_migration.py \
  tests/unit/test_async_migration_planner.py -v

# 90 passed

Test coverage:

  • test_estimate_* (7 tests) -- byte calculations, AOF, JSON storage, format helper
  • test_detect_dtype_* (5 tests) -- float64/32/16, int8, unknown length
  • test_is_already_quantized_* (3 tests) -- match, mismatch, source-matches-current
  • test_checkpoint_* (4 tests) -- round-trip, remaining keys, mark complete, missing file
  • test_bgsave_* (2 tests) -- immediate completion, polling loop
  • test_undo_buffer_* (5 tests) -- store/rollback, clear, empty rollback, size
  • test_async_rollback_* (2 tests) -- awaited execution, empty buffer

Diff Stats

9 files changed, 1096 insertions(+), 53 deletions(-)
1 new file (reliability.py)

Architecture Diagram

                          rvl migrate apply --plan plan.yaml --resume ckpt.yaml
                                              |
                                              v
                    +-----------------------------------------------+
                    |              MigrationExecutor.apply()         |
                    +-----------------------------------------------+
                                              |
              +-------------------------------+-------------------------------+
              |               |               |               |               |
              v               v               v               v               v
        +-----------+   +-----------+   +-----------+   +-----------+   +-----------+
        | Enumerate |   | Estimate  |   |  BGSAVE   |   |   Drop    |   |  Create   |
        |   Keys    |   |   Disk    |   |  Safety   |   |   Index   |   |   Index   |
        |           |   |   Space   |   |    Net    |   |           |   |           |
        | FT.AGGR   |   | models.py |   | reliab.py |   | FT.DROP   |   | FT.CREATE |
        |  EGATE    |   | utils.py  |   |           |   |  IF EXISTS|   |           |
        +-----------+   +-----------+   +-----------+   +-----------+   +-----------+
              |               |               |               |               |
              |               |               v               v               |
              |               |         +------------------+  |               |
              |               |         | Poll INFO until  |  |               |
              |               |         | bgsave_in_prog=0 |  |               |
              |               |         +------------------+  |               |
              |               |                               |               |
              +---> keys[]    +---> log summary               |               |
                      |                                       |               |
                      v                                       |               |
        +------------------------------------------+          |               |
        |         _quantize_vectors()              |<---------+               |
        |                                          |                          |
        |  +------------------------------------+  |                          |
        |  | Load Checkpoint (if --resume)      |  |                          |
        |  | get_remaining_keys() to filter     |  |                          |
        |  +------------------------------------+  |                          |
        |                   |                      |                          |
        |                   v                      |                          |
        |  +------------------------------------+  |                          |
        |  | For each batch (500 keys):         |  |                          |
        |  |                                    |  |                          |
        |  |   +------------------------------+ |  |                          |
        |  |   | For each key, each field:     | |  |                          |
        |  |   |                               | |  |                          |
        |  |   | is_already_quantized()?       | |  |                          |
        |  |   |   YES --> skip                | |  |                          |
        |  |   |   NO  --> undo.store(orig)    | |  |                          |
        |  |   |           convert + HSET pipe | |  |                          |
        |  |   +------------------------------+ |  |                          |
        |  |                                    |  |                          |
        |  |   pipe.execute()                   |  |                          |
        |  |     |                              |  |                          |
        |  |     +---> SUCCESS                  |  |                          |
        |  |     |       undo.clear()           |  |                          |
        |  |     |       checkpoint.record()    |  |                          |
        |  |     |       checkpoint.save()      |  |                          |
        |  |     |                              |  |                          |
        |  |     +---> FAILURE                  |  |                          |
        |  |             undo.rollback(pipe)    |  |                          |
        |  |             checkpoint.save()      |  |                          |
        |  |             raise                  |  |                          |
        |  +------------------------------------+  |                          |
        |                   |                      |                          |
        |                   v                      |                          |
        |  checkpoint.mark_complete()              |                          |
        +------------------------------------------+                          |
                                                                              |
                      +-------------------------------------------------------+
                      v
              +--------------+
              | Wait for     |
              | Re-indexing  |
              | + Validate   |
              +--------------+


        Component Responsibilities:

        +-------------------+--------------------------------------------------+
        | Component         | What it does                                     |
        +-------------------+--------------------------------------------------+
        | detect_vector_    | Inspects byte length to infer stored dtype       |
        | dtype()           | (float64/32/16, int8)                            |
        +-------------------+--------------------------------------------------+
        | is_already_       | Returns True if detected dtype matches target    |
        | quantized()       | so the key can be skipped                        |
        +-------------------+--------------------------------------------------+
        | Quantization      | Tracks processed keys on disk with atomic        |
        | Checkpoint        | os.replace() writes. Enables resume via          |
        |                   | get_remaining_keys()                             |
        +-------------------+--------------------------------------------------+
        | BatchUndoBuffer   | Stores original bytes for current batch only.    |
        |                   | rollback() restores via pipeline on failure.     |
        |                   | async_rollback() for async contexts.             |
        |                   | clear() after each successful batch.             |
        +-------------------+--------------------------------------------------+
        | trigger_bgsave_   | BGSAVE + poll loop before index drop.            |
        | and_wait()        | Provides RDB recovery point.                     |
        +-------------------+--------------------------------------------------+
        | estimate_disk_    | Pure calculation from plan. No Redis calls.      |
        | space()           | Shows peak memory, savings, RDB/AOF impact.      |
        +-------------------+--------------------------------------------------+

        Data flow on crash and resume:

        CRASH during batch N
              |
              v
        checkpoint.yaml has batches 0..N-1 recorded
        batch N was rolled back by undo buffer (or partially written)
              |
              v
        Re-run: rvl migrate apply --plan plan.yaml --resume checkpoint.yaml
              |
              v
        Load checkpoint --> get_remaining_keys() skips batches 0..N-1
        is_already_quantized() catches any keys in batch N that were written
        Resume from batch N onward

Note

High Risk
High risk because it changes migration execution for quantization (adds BGSAVE, checkpoint/resume, rollback) and can affect production data mutation and recovery paths.

Overview
Adds a new rvl migrate estimate command and estimate_disk_space() helper that computes RDB snapshot, optional AOF growth, and post-migration memory savings for datatype-changing (quantization) migration plans, and records the estimate on MigrationReport.

Makes vector re-encoding crash-safe by introducing redisvl/migration/reliability.py (idempotent dtype detection, atomic checkpointing, bounded per-batch undo/rollback, and sync/async BGSAVE wait) and wiring it into both sync and async executors via --resume checkpoint support (including resume behavior when the source index was already dropped).

Tightens schema validation comparisons by renaming “unreliable” attrs to explicitly excluded attrs and switching validators to strip_excluded=True, and extends planner datatype-change output to include dims for idempotent detection; adds extensive unit tests for estimation and reliability utilities.

Written by Cursor Bugbot for commit e8181a1. This will update automatically on new commits. Configure here.

nkanu17 added 2 commits March 26, 2026 21:22
- Add reliability.py: idempotent dtype detection, checkpoint persistence,
  BGSAVE safety net, bounded undo buffer with async rollback
- Add DiskSpaceEstimate/VectorFieldEstimate models and estimate_disk_space()
- Wire reliability into both sync and async executors (_quantize_vectors)
- Add --resume flag to rvl migrate apply for checkpoint-based resume
- Add rvl migrate estimate subcommand for pre-migration cost analysis
- Update progress labels to 6 steps (enumerate, bgsave, drop, quantize,
  create, re-index)
- Planner returns dims in datatype change metadata for idempotent detection
- 39 new unit tests (90 total migration tests passing)
Copilot AI review requested due to automatic review settings March 31, 2026 03:15
@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Mar 31, 2026

@codex review

@jit-ci
Copy link
Copy Markdown

jit-ci bot commented Mar 31, 2026

🛡️ Jit Security Scan Results

CRITICAL HIGH MEDIUM

✅ No security findings were detected in this PR


Security scan by Jit

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: bf3773b992

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends the RedisVL index migrator with (1) a pre-migration disk space estimator and (2) crash-safe, resumable vector dtype re-encoding during drop/recreate migrations.

Changes:

  • Add a pure disk-space estimation API (estimate_disk_space) plus CLI command rvl migrate estimate and pre-apply estimate output.
  • Introduce crash-safety utilities (dtype detection, undo buffer, checkpointing, BGSAVE safety net) and wire them into sync/async executors with a new --resume checkpoint flag.
  • Update schema validation comparison to exclude non-structural FT.INFO attributes via strip_excluded.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
tests/unit/test_async_migration_executor.py Adds unit tests for disk space estimation and reliability helpers (dtype detection, checkpointing, undo buffer, BGSAVE).
redisvl/migration/validation.py Switch schema validation to use strip_excluded in schema comparison.
redisvl/migration/async_validation.py Async equivalent of the strip_excluded schema validation change.
redisvl/migration/utils.py Renames/expands schema canonicalization flags and adds estimate_disk_space() implementation.
redisvl/migration/models.py Adds DiskSpaceEstimate (+ helpers/constants) and attaches it to MigrationReport.
redisvl/migration/reliability.py New reliability module: dtype detection, checkpoint model, BGSAVE waiters, and bounded undo buffer.
redisvl/migration/planner.py Planner now includes dims in datatype-change output for idempotent detection.
redisvl/migration/executor.py Sync executor: adds disk estimate, BGSAVE safety snapshot, checkpoint/resume support, and per-batch rollback for quantization.
redisvl/migration/async_executor.py Async executor: mirrors sync changes (estimate, BGSAVE, checkpoint/resume, rollback).
redisvl/migration/__init__.py Exposes DiskSpaceEstimate in the migration package exports.
redisvl/cli/migrate.py Adds estimate command and --resume flag; updates progress labels; prints disk estimate before apply.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Mar 31, 2026

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: bf3773b992

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

- Fix bfloat16/uint8 idempotent detection using dtype byte-width families
  so float16<->bfloat16 and int8<->uint8 are treated as equivalent
- Validate checkpoint index_name matches source index before resuming
- Force checkpoint_path to the load path, not the stored value
- Record all batch keys in checkpoint (including skipped) to avoid
  re-scanning on resume
- Fix misleading AOF wording when aof_enabled is not set
@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Mar 31, 2026

@codex review

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 30cc6c1052

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

…uracy

- Fix #2: docs_processed increments by full batch size (including skipped)
  so progress reaches 100% even when vectors are already quantized
- Fix #4: is_already_quantized prevents skipping same-width dtype
  conversions (e.g. float16->bfloat16) since encodings differ
- Fix #5: apply() detects checkpoint on resume and bypasses index
  validation, BGSAVE, field renames, drop, and key renames (all already
  done pre-crash); enumerates keys via SCAN with plan prefix instead
- Add IM-16 (auto-detect AOF) and IM-17 (compact checkpoint) to backlog
@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Mar 31, 2026

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e8181a142d

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +805 to +809
checkpoint = QuantizationCheckpoint(
index_name=source_index.name,
total_keys=total_keys,
checkpoint_path=checkpoint_path,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Save checkpoint before first quantization batch

When --resume starts a fresh run, this creates an in-memory QuantizationCheckpoint but does not persist it until after a batch succeeds (or an in-loop Exception occurs). If the process is interrupted after the source index is dropped but before that first save (for example SIGKILL/node crash), no checkpoint file exists, the next run won’t enter resume mode, and recovery can fail because the source index is already gone; persisting an initial checkpoint immediately on creation avoids this gap (the async quantization path has the same issue).

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +486 to +502
prefix = prefixes[0] if prefixes else ""
# If a prefix change was part of the migration, keys
# were already renamed before the crash, so scan with
# the new prefix instead.
if has_prefix_change and rename_ops.change_prefix:
prefix = rename_ops.change_prefix
cursor: int = 0
while True:
cursor, scanned = client.scan( # type: ignore[misc]
cursor=cursor,
match=f"{prefix}*" if prefix else "*",
count=1000,
)
for k in scanned:
keys_to_process.append(
k.decode() if isinstance(k, bytes) else str(k)
)
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resume path enumerates keys via SCAN using match=f"{prefix}*", which can (a) include unrelated keys when the index prefix in plans doesn’t include the key separator (e.g. test vs test:) and (b) return duplicate keys (SCAN is allowed to repeat elements). That can cause quantization to touch keys outside the index or over-count progress/checkpointing. Consider building the SCAN pattern using plan.source.keyspace.key_separator (same logic as planner sampling) and de-duping keys before processing.

Suggested change
prefix = prefixes[0] if prefixes else ""
# If a prefix change was part of the migration, keys
# were already renamed before the crash, so scan with
# the new prefix instead.
if has_prefix_change and rename_ops.change_prefix:
prefix = rename_ops.change_prefix
cursor: int = 0
while True:
cursor, scanned = client.scan( # type: ignore[misc]
cursor=cursor,
match=f"{prefix}*" if prefix else "*",
count=1000,
)
for k in scanned:
keys_to_process.append(
k.decode() if isinstance(k, bytes) else str(k)
)
scan_prefix = prefixes[0] if prefixes else ""
# If a prefix change was part of the migration, keys
# were already renamed before the crash, so scan with
# the new prefix instead.
if has_prefix_change and rename_ops.change_prefix:
scan_prefix = rename_ops.change_prefix
key_separator = getattr(plan.source.keyspace, "key_separator", "")
if scan_prefix:
# Build SCAN pattern using the key separator to avoid
# matching unrelated keys that merely share a prefix.
if key_separator and not scan_prefix.endswith(key_separator):
match_pattern = f"{scan_prefix}{key_separator}*"
else:
match_pattern = f"{scan_prefix}*"
else:
match_pattern = "*"
cursor: int = 0
seen_keys = set()
while True:
cursor, scanned = client.scan( # type: ignore[misc]
cursor=cursor,
match=match_pattern,
count=1000,
)
for k in scanned:
key_str = k.decode() if isinstance(k, bytes) else str(k)
if key_str in seen_keys:
continue
seen_keys.add(key_str)
keys_to_process.append(key_str)

Copilot uses AI. Check for mistakes.
Comment on lines +444 to +454
cursor: int = 0
while True:
cursor, scanned = await client.scan( # type: ignore[misc]
cursor=cursor,
match=f"{prefix}*" if prefix else "*",
count=1000,
)
for k in scanned:
keys_to_process.append(
k.decode() if isinstance(k, bytes) else str(k)
)
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resume path enumerates keys via SCAN using match=f"{prefix}*", which can (a) include unrelated keys when the index prefix in plans doesn’t include the key separator and (b) return duplicate keys (SCAN is allowed to repeat elements). That can cause quantization to touch keys outside the index or over-count progress/checkpointing. Consider building the SCAN pattern using plan.source.keyspace.key_separator and de-duping keys before processing.

Suggested change
cursor: int = 0
while True:
cursor, scanned = await client.scan( # type: ignore[misc]
cursor=cursor,
match=f"{prefix}*" if prefix else "*",
count=1000,
)
for k in scanned:
keys_to_process.append(
k.decode() if isinstance(k, bytes) else str(k)
)
key_separator = plan.source.keyspace.key_separator
cursor: int = 0
# Track seen keys to avoid duplicates from SCAN.
seen_keys: set[str] = set()
while True:
cursor, scanned = await client.scan( # type: ignore[misc]
cursor=cursor,
match=f"{prefix}{key_separator}*" if prefix else "*",
count=1000,
)
for k in scanned:
key_str = k.decode() if isinstance(k, bytes) else str(k)
if key_str in seen_keys:
continue
seen_keys.add(key_str)
keys_to_process.append(key_str)

Copilot uses AI. Check for mistakes.
Comment on lines +824 to +828
for field_name, change in datatype_changes.items():
field_data: bytes | None = client.hget(key, field_name) # type: ignore[misc,assignment]
if not field_data:
continue

Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quantization reads/writes use HGET/HSET unconditionally. For storage_type == "json" migrations, vectors won’t be fetched/updated (they live in JSON paths), but the code will still mark keys as processed/checkpointed and then recreate the index with the new dtype. Quantization needs a JSON branch (JSON.GET/JSON.SET + rollback) analogous to the field-rename logic.

Copilot uses AI. Check for mistakes.
Comment on lines +780 to +784
for key in batch:
for field_name, change in datatype_changes.items():
field_data: bytes | None = await client.hget(key, field_name) # type: ignore[misc,assignment]
if not field_data:
continue
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quantization reads/writes use HGET/HSET unconditionally. For storage_type == "json" migrations, vectors won’t be fetched/updated (they live in JSON paths), but keys will still be checkpointed as processed. Quantization needs a JSON branch (JSON.GET/JSON.SET + rollback) when plan.source.keyspace.storage_type == "json".

Copilot uses AI. Check for mistakes.
Comment on lines +134 to +140
index_name: str
total_keys: int
completed_keys: int = 0
completed_batches: int = 0
last_batch_keys: List[str] = Field(default_factory=list)
processed_keys: List[str] = Field(default_factory=list)
status: str = "in_progress"
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QuantizationCheckpoint stores every processed key in processed_keys, and save() YAML-dumps the entire growing list after every batch. For large indexes this becomes very slow (O(n^2) total serialization), creates huge checkpoint files, and can dominate migration time. Consider a more compact checkpoint format (e.g., append-only log of completed batches/keys, periodic compaction, or tracking completed batch indices when key ordering is stable) to keep per-batch checkpoint writes bounded.

Copilot uses AI. Check for mistakes.
Comment on lines +459 to +468
# Compute disk space estimate before any mutations
disk_estimate = estimate_disk_space(plan)
if disk_estimate.has_quantization:
logger.info(
"Disk space estimate: RDB ~%d bytes, AOF ~%d bytes, total ~%d bytes",
disk_estimate.rdb_snapshot_disk_bytes,
disk_estimate.aof_growth_bytes,
disk_estimate.total_new_disk_bytes,
)
report.disk_space_estimate = disk_estimate
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

estimate_disk_space() supports aof_enabled, but executors always call it with the default False, so the estimate attached to MigrationReport never includes AOF growth even when running against a live Redis with AOF enabled. If the goal is to inform operators up-front, consider detecting AOF status from the live connection (e.g., CONFIG GET/INFO) and passing the real value, with a graceful fallback when not permitted.

Copilot uses AI. Check for mistakes.
Comment on lines +412 to +421
# Compute disk space estimate before any mutations
disk_estimate = estimate_disk_space(plan)
if disk_estimate.has_quantization:
logger.info(
"Disk space estimate: RDB ~%d bytes, AOF ~%d bytes, total ~%d bytes",
disk_estimate.rdb_snapshot_disk_bytes,
disk_estimate.aof_growth_bytes,
disk_estimate.total_new_disk_bytes,
)
report.disk_space_estimate = disk_estimate
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

estimate_disk_space() supports aof_enabled, but the async executor always calls it with the default False, so the estimate never includes AOF growth even when running against a live Redis with AOF enabled. Consider detecting AOF status from the live connection and passing the real value (with a fallback if CONFIG/INFO is blocked).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.


# Count all keys in the batch as processed (including skipped),
# since they are recorded in the checkpoint and won't be retried.
docs_processed += len(batch)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quantize return value counts all keys, not converted ones

Medium Severity

docs_processed += len(batch) counts every key in the batch, including keys with no vector data and keys skipped by is_already_quantized. The old code used docs_processed += len(keys_updated_in_batch), counting only keys where a vector field was actually re-encoded. The return value feeds into report.warnings as "Re-encoded {docs_quantized} documents" and the log message "Quantized {docs_processed} documents", both of which now report inflated numbers. The keys_updated_in_batch set is still computed but only used for the pipe.execute() gate, not for counting.

Additional Locations (1)
Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants