[KSM] fix logz when topk#7232
Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Pull request overview
该 PR 旨在在 top_k + top_p 截断采样场景下保留/补回 logZ(候选集合概率质量的对数和)计算结果,为后续将 logprobs 归一化到“实际截断分布”提供必要信息;同时在 SamplerOutput 中新增字段承载该数据。
Changes:
- 为
SamplerOutput新增logz_per_batch字段用于承载每个 batch row 的 logZ。 - 调整
_compute_sampling_mask返回值,除稀疏 sampling mask 外额外返回logz_per_batch,并在采样 forward 路径中计算与回填。
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| fastdeploy/worker/output.py | SamplerOutput 新增 logz_per_batch 字段及注释说明 |
| fastdeploy/model_executor/layers/sample/sampler.py | _compute_sampling_mask 增加 logZ 计算并调整调用处以透传到 SamplerOutput |
| Tuple of (sparse_indices, logz_per_batch): | ||
| - sparse_indices: List of length num_reqs; element i is a 1-D int64 | ||
| numpy array of the retained vocab indices for request i. | ||
| - logz_per_batch: 1-D numpy array of shape [num_reqs] containing | ||
| log(Z_K) where Z_K is the sum of probabilities in the candidate set. |
There was a problem hiding this comment.
_compute_sampling_mask 返回的 logz_per_batch 实际 shape 等于 probs.shape[0]。在 speculative decoding 中这里的 batch 维度会被展开成 total_accepted_tokens,因此 docstring 里写“shape [num_reqs]”容易误导后续使用者;建议改成更准确的描述(例如“shape [B] where B = probs.shape[0]”并说明不同调用场景的含义)。
| logz_per_batch = None | ||
| if sampling_metadata.keep_sampling_mask: | ||
| sampling_mask = _compute_sampling_mask( | ||
| sampling_mask, logz_per_batch = _compute_sampling_mask( | ||
| probs, | ||
| sampling_metadata.top_p, |
There was a problem hiding this comment.
目前 logz_per_batch 只在 keep_sampling_mask 打开时计算并塞进 SamplerOutput,但仓库内未找到任何地方消费/传递该字段(例如 _build_stream_transfer_data、ZMQ side-channel、OpenAI response 都没有携带它)。如果该 PR 的目标是修复 top_k 场景下 logZ/用于 logprobs 归一化,这里还缺少把 logZ 应用到 logprobs(或把 logZ 随输出传递给上层做归一化)的完整链路,否则改动对外行为基本不生效。
| # Compute sampling mask at accepted token positions. | ||
| # Shape: [total_accepted_tokens, vocab_size], bool (CPU). | ||
| sampling_mask = None | ||
| logz_per_batch = None | ||
| if keep_sampling_mask: | ||
| # Expand top_p from [batch, 1] to [total_accepted, 1]. | ||
| accept_top_p = sampling_metadata.top_p[:real_bsz].squeeze(1).repeat_interleave(accept_nums).unsqueeze(1) |
There was a problem hiding this comment.
在 speculative decoding 路径中这里调用 _compute_sampling_mask(target_probs, ...) 时 target_probs 的 batch 维度是 total_accepted_tokens,因此得到的 logz_per_batch 也是按“accepted token”展开的。如果后续要按 request 维度返回/使用 logZ,建议在 post-process 阶段按 accept_num 做 regroup/对齐(否则容易和 per-request 的其它字段错位)。
| top_p: paddle.Tensor, | ||
| top_k: Optional[paddle.Tensor] = None, | ||
| top_k_list: Optional[list] = None, | ||
| ) -> List[np.ndarray]: | ||
| ) -> tuple[List[np.ndarray], np.ndarray]: | ||
| """ |
There was a problem hiding this comment.
PR 标题目前是 "[KSM] fix logz when topk"(还带引号),与模板要求的 [CLASS]Title/Tag 列表不一致;另外 PR 描述里的 Modifications/Usage/Accuracy Tests 等段落基本未补充。建议按模板补全描述并把标题改成符合约定的 Tag(例如 [BugFix] ... 或其它语义明确的标签)。
| @@ -647,8 +682,9 @@ def forward_cuda( | |||
| # Compute sampling mask BEFORE top_k_top_p_sampling modifies probs. | |||
| # Binary mask [num_reqs, vocab_size]: 1 = retained by top_k/top_p, 0 = truncated. | |||
There was a problem hiding this comment.
这里的注释仍写“Binary mask [num_reqs, vocab_size] bool tensor”,但 _compute_sampling_mask 实际返回的是稀疏索引 List[np.ndarray](CPU),且在 speculative 路径维度也不是简单的 num_reqs。建议同步更新注释,避免后续误用。
| # Binary mask [num_reqs, vocab_size]: 1 = retained by top_k/top_p, 0 = truncated. | |
| # `_compute_sampling_mask` returns CPU-side sparse retained-token indices, | |
| # i.e. a List[np.ndarray], not a dense bool tensor of shape | |
| # [num_reqs, vocab_size]. In speculative paths, the outer list length also | |
| # does not necessarily match `num_reqs`. |
fastdeploy-bot
left a comment
There was a problem hiding this comment.
📋 Review 摘要
PR 概述:修复 topk 情况下 logz 计算问题,正确计算 topk + topp 截断后的归一化常数。
变更范围:fastdeploy/model_executor/layers/sample/sampler.py、fastdeploy/worker/output.py
影响面 Tag:[BugFix] [OP]
📝 PR 规范检查
PR 标题使用了非官方 Tag [KSM],且作为 Cherry-Pick PR 未遵循规范。
问题:
- 标题 Tag
[KSM]不在官方 Tag 列表中 - 作为 Cherry-Pick PR(kms_2.5 → release/2.5),未按规范添加
[Cherry-Pick]前缀和原 PR ID
标题建议(可直接复制):
[Cherry-Pick][BugFix] fix logz when topk(#7225)
描述建议:
Motivation
修复在 topk + topp 组合采样时,logz 计算未考虑 topk 截断的问题,导致 logprobs 归一化不准确。
Modifications
- 修改
_compute_sampling_mask函数,新增返回值logz_per_batch- 在 topk 激活时,正确计算两阶段归一化:logZ = log(Z_topk) + log(Z_topp_on_renorm)
- 在
SamplerOutput类中添加logz_per_batch字段- 更新两处 CUDA 采样路径调用点
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🟡 建议 | PR 规范 | 标题 Tag 不规范,Cherry-Pick 格式缺失 |
总体评价
代码逻辑正确,完整考虑了 topk + topp 组合采样时的归一化计算。接口变更向下兼容(Optional 字段 + 默认值),所有调用点已正确适配。主要问题为 PR 标题格式未遵循规范。
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## release/2.5 #7232 +/- ##
==============================================
Coverage ? 68.99%
==============================================
Files ? 390
Lines ? 54405
Branches ? 8577
==============================================
Hits ? 37535
Misses ? 14161
Partials ? 2709
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
fastdeploy-bot
left a comment
There was a problem hiding this comment.
🤖 AI Code Review | 2026-04-08
📋 Review 摘要
PR 概述:修复使用 top-k 时 logz 计算问题,使用 logz 对 logprobs 进行重归一化
变更范围:model_executor/layers/sample/sampler.py、pre_and_post_process.py、worker/output.py
影响面 Tag:[BugFix]
📝 PR 规范检查
PR 标题包含 [KSM] 标签,描述说明了修复动机和相关 PR 链接,符合规范。
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🔴 Bug | pre_and_post_process.py:531 |
变量名错误:使用未定义的 valid_mask 应为 log_valid_mask |
| 🟡 建议 | pre_and_post_process.py:359-377, 527-539 |
代码重复:logprobs 归一化逻辑在两函数中完全相同 |
| 🟡 建议 | 无 | 缺少单元测试:建议添加 _compute_sampling_mask logz 计算的测试用例 |
总体评价
PR 修复了 top-k 场景下 logz 计算的核心逻辑,数学公式正确。但存在一个变量命名错误会导致运行时错误,需要修复。建议将重复的归一化逻辑提取为辅助函数,并增加测试覆盖。
| if sampler_output.logprobs_tensors is not None and sampler_output.logz_per_batch is not None: | ||
| logprobs = sampler_output.logprobs_tensors.logprobs | ||
| logz = paddle.to_tensor(sampler_output.logz_per_batch, dtype=logprobs.dtype) | ||
| valid_mask = paddle.isfinite(logprobs) |
There was a problem hiding this comment.
🔴 Bug 变量名错误导致运行时异常
在 post_process_specualate 函数中,529 行定义了变量 log_valid_mask,但在 531 行的 paddle.where 中使用了 valid_mask,该变量未定义会导致 NameError。
建议修复:
将 531 行的 valid_mask 改为 log_valid_mask。
| model_output.mask_rollback, | ||
| ) | ||
|
|
||
| # Renormalize logprobs to match truncated sampling distribution (when enabled). |
There was a problem hiding this comment.
🟡 建议 代码重复
post_process_normal(359-377行)和 post_process_specualate(527-539行)中的 logprobs 归一化逻辑完全相同。
建议:
提取为共享辅助函数,如:
def _renormalize_logprobs(sampler_output: SamplerOutput) -> None:
"""Renormalize logprobs to match truncated sampling distribution."""
if sampler_output.logprobs_tensors is None or sampler_output.logz_per_batch is None:
return
logprobs = sampler_output.logprobs_tensors.logprobs
logz = paddle.to_tensor(sampler_output.logz_per_batch, dtype=logprobs.dtype)
valid_mask = paddle.isfinite(logprobs)
normalized_logprobs = paddle.where(
valid_mask,
logprobs - logz.unsqueeze(1),
paddle.full_like(logprobs, float("-inf")),
)
sampler_output.logprobs_tensors = LogprobsTensors(
logprob_token_ids=sampler_output.logprobs_tensors.logprob_token_ids,
logprobs=normalized_logprobs,
selected_token_ranks=sampler_output.logprobs_tensors.selected_token_ranks,
)| # logprobs_tensors.logprobs: [B, max_num_logprobs + 1] | ||
| logprobs = sampler_output.logprobs_tensors.logprobs | ||
| # logz_per_batch: [B], log(sum(probs in candidate set K)) for each request | ||
| logz = paddle.to_tensor(sampler_output.logz_per_batch, dtype=logprobs.dtype) |
There was a problem hiding this comment.
logz is created from a NumPy array without specifying place, which can put it on CPU while logprobs is on GPU. In Paddle this commonly causes a device mismatch error when executing logprobs - logz.unsqueeze(1). Create logz on the same place/device as logprobs (e.g., pass place=logprobs.place or otherwise ensure the tensor is moved to the same device) before subtraction.
| logz = paddle.to_tensor(sampler_output.logz_per_batch, dtype=logprobs.dtype) | |
| logz = paddle.to_tensor( | |
| sampler_output.logz_per_batch, dtype=logprobs.dtype, place=logprobs.place | |
| ) |
| # Renormalize logprobs to match truncated sampling distribution (when enabled). | ||
| if sampler_output.logprobs_tensors is not None and sampler_output.logz_per_batch is not None: | ||
| logprobs = sampler_output.logprobs_tensors.logprobs | ||
| logz = paddle.to_tensor(sampler_output.logz_per_batch, dtype=logprobs.dtype) |
There was a problem hiding this comment.
Same device-placement issue as the normal path: logz constructed from NumPy may land on CPU, while logprobs may be on GPU, breaking logprobs - logz.unsqueeze(1). Ensure logz is created/moved to logprobs's place/device before use.
| logz = paddle.to_tensor(sampler_output.logz_per_batch, dtype=logprobs.dtype) | |
| logz = paddle.to_tensor(sampler_output.logz_per_batch, dtype=logprobs.dtype, place=logprobs.place) |
| # Renormalize logprobs to match truncated sampling distribution (when enabled). | ||
| if sampler_output.logprobs_tensors is not None and sampler_output.logz_per_batch is not None: | ||
| # logprobs_tensors.logprobs: [B, max_num_logprobs + 1] | ||
| logprobs = sampler_output.logprobs_tensors.logprobs | ||
| # logz_per_batch: [B], log(sum(probs in candidate set K)) for each request | ||
| logz = paddle.to_tensor(sampler_output.logz_per_batch, dtype=logprobs.dtype) | ||
| # Renormalize: log π_masked = log π_full - log Z_K | ||
| # Only normalize valid candidates; padding positions use -inf | ||
| valid_mask = paddle.isfinite(logprobs) | ||
| normalized_logprobs = paddle.where( | ||
| valid_mask, | ||
| logprobs - logz.unsqueeze(1), # broadcast subtraction | ||
| paddle.full_like(logprobs, float("-inf")), | ||
| ) | ||
| # Update logprobs_tensors with normalized values | ||
| sampler_output.logprobs_tensors = LogprobsTensors( | ||
| logprob_token_ids=sampler_output.logprobs_tensors.logprob_token_ids, | ||
| logprobs=normalized_logprobs, | ||
| selected_token_ranks=sampler_output.logprobs_tensors.selected_token_ranks, | ||
| ) | ||
|
|
||
|
|
There was a problem hiding this comment.
The renormalization block is duplicated in both post_process_normal and post_process_specualate with near-identical logic. Consider extracting this into a small helper (e.g., _renormalize_logprobs_with_logz(sampler_output)) to reduce duplication and the risk of future drift (especially around masking/device placement).
| # Renormalize logprobs to match truncated sampling distribution (when enabled). | |
| if sampler_output.logprobs_tensors is not None and sampler_output.logz_per_batch is not None: | |
| # logprobs_tensors.logprobs: [B, max_num_logprobs + 1] | |
| logprobs = sampler_output.logprobs_tensors.logprobs | |
| # logz_per_batch: [B], log(sum(probs in candidate set K)) for each request | |
| logz = paddle.to_tensor(sampler_output.logz_per_batch, dtype=logprobs.dtype) | |
| # Renormalize: log π_masked = log π_full - log Z_K | |
| # Only normalize valid candidates; padding positions use -inf | |
| valid_mask = paddle.isfinite(logprobs) | |
| normalized_logprobs = paddle.where( | |
| valid_mask, | |
| logprobs - logz.unsqueeze(1), # broadcast subtraction | |
| paddle.full_like(logprobs, float("-inf")), | |
| ) | |
| # Update logprobs_tensors with normalized values | |
| sampler_output.logprobs_tensors = LogprobsTensors( | |
| logprob_token_ids=sampler_output.logprobs_tensors.logprob_token_ids, | |
| logprobs=normalized_logprobs, | |
| selected_token_ranks=sampler_output.logprobs_tensors.selected_token_ranks, | |
| ) | |
| _renormalize_logprobs_with_logz(sampler_output) | |
| def _renormalize_logprobs_with_logz(sampler_output: SamplerOutput): | |
| """Renormalize logprobs to match the truncated sampling distribution.""" | |
| if sampler_output.logprobs_tensors is None or sampler_output.logz_per_batch is None: | |
| return | |
| # logprobs_tensors.logprobs: [B, max_num_logprobs + 1] | |
| logprobs = sampler_output.logprobs_tensors.logprobs | |
| # logz_per_batch: [B], log(sum(probs in candidate set K)) for each request | |
| logz = paddle.to_tensor(sampler_output.logz_per_batch, dtype=logprobs.dtype) | |
| # Renormalize: log π_masked = log π_full - log Z_K | |
| # Only normalize valid candidates; padding positions use -inf | |
| valid_mask = paddle.isfinite(logprobs) | |
| normalized_logprobs = paddle.where( | |
| valid_mask, | |
| logprobs - logz.unsqueeze(1), # broadcast subtraction | |
| paddle.full_like(logprobs, float("-inf")), | |
| ) | |
| sampler_output.logprobs_tensors = LogprobsTensors( | |
| logprob_token_ids=sampler_output.logprobs_tensors.logprob_token_ids, | |
| logprobs=normalized_logprobs, | |
| selected_token_ranks=sampler_output.logprobs_tensors.selected_token_ranks, | |
| ) |
| logz_per_batch = (log_z_topk + paddle.log(z_topp + 1e-10)).cpu().numpy() # [B] | ||
| else: | ||
| candidate_probs = paddle.where(final_mask, sorted_probs, paddle.zeros_like(sorted_probs)) | ||
| z_k = candidate_probs.sum(axis=-1) # [B] | ||
| logz_per_batch = paddle.log(z_k + 1e-10).cpu().numpy() # [B] |
There was a problem hiding this comment.
logz_per_batch is computed on GPU, then immediately transferred to CPU (.cpu().numpy()), and later converted back to a Paddle tensor in post-processing for renormalization. This adds synchronization + extra H2D/D2H copies. If logz_per_batch is only used for renormalizing logprobs, consider keeping it as a paddle.Tensor on the same device as logprobs (and only convert to NumPy at the boundary if it must be returned externally).
| logz_per_batch = (log_z_topk + paddle.log(z_topp + 1e-10)).cpu().numpy() # [B] | |
| else: | |
| candidate_probs = paddle.where(final_mask, sorted_probs, paddle.zeros_like(sorted_probs)) | |
| z_k = candidate_probs.sum(axis=-1) # [B] | |
| logz_per_batch = paddle.log(z_k + 1e-10).cpu().numpy() # [B] | |
| logz_per_batch = log_z_topk + paddle.log(z_topp + 1e-10) # [B] | |
| else: | |
| candidate_probs = paddle.where(final_mask, sorted_probs, paddle.zeros_like(sorted_probs)) | |
| z_k = candidate_probs.sum(axis=-1) # [B] | |
| logz_per_batch = paddle.log(z_k + 1e-10) # [B] |
| else: | ||
| candidate_probs = paddle.where(final_mask, sorted_probs, paddle.zeros_like(sorted_probs)) | ||
| z_k = candidate_probs.sum(axis=-1) # [B] | ||
| logz_per_batch = paddle.log(z_k + 1e-10).cpu().numpy() # [B] |
There was a problem hiding this comment.
logz_per_batch is computed on GPU, then immediately transferred to CPU (.cpu().numpy()), and later converted back to a Paddle tensor in post-processing for renormalization. This adds synchronization + extra H2D/D2H copies. If logz_per_batch is only used for renormalizing logprobs, consider keeping it as a paddle.Tensor on the same device as logprobs (and only convert to NumPy at the boundary if it must be returned externally).
fastdeploy-bot
left a comment
There was a problem hiding this comment.
🤖 AI Code Review |
2026-04-08
📋 Review 摘要
PR 概述:修复 topk 模式下 logz 的计算逻辑,保留 logz 计算过程并使用 logz 对 logprobs 进行重归一化
变更范围:model_executor/layers/sample/sampler.py、model_executor/pre_and_post_process.py、worker/output.py
影响面 Tag:[Models]
📝 PR 规范检查
PR 标题包含 [KSM] 标签(属于 Models 类型),描述包含 Motivation 和 Modifications,格式符合规范。
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🟡 建议 | fastdeploy/worker/output.py:196 |
logz_per_batch 形状注释在 speculative decoding 场景下不准确 |
总体评价
PR 实现正确,在 _compute_sampling_mask 函数中正确计算了 logz(log 归一化因子),并在 post-process 中对 logprobs 进行了重归一化。数学推导和代码实现均正确。唯一需要改进的是注释中对 logz_per_batch 形状的描述需要区分非 speculative 和 speculative 两种模式。
📐 数学和逻辑验证
logz 计算逻辑验证:
当同时启用 top_k 和 top_p 时:
- 先进行 top_k 截断并重归一化:
π_topk(k) = π_full(k) / Z_topk - 再在
π_topk上应用 top_p:最终候选集 K - 总的 log 归一化因子:
logZ = log(Z_topk) + log(Z_topp)
其中:
Z_topk = Σ_{j∈topk} π_full(j)=row_sumsZ_topp = Σ_{k∈K} π_topk(k)=z_topp
代码中的实现完全符合这个数学推导,逻辑正确。
Post-process 重归一化验证:
log π_masked(k) = log π_full(k) - logZ代码实现:
normalized_logprobs = logprobs - logz.unsqueeze(1) # 广播减法此逻辑正确,确保返回的 logprobs 对应于实际采样分布。
| # check whether the current path is speculative or non-speculative when | ||
| # interpreting the dimension. | ||
| sampling_mask: Optional[List[np.ndarray]] = None | ||
| # logZ_K for each request: log(sum(probs in candidate set K)) |
There was a problem hiding this comment.
🟡 建议 注释中 logz_per_batch 的形状描述需要区分两种模式。
当前注释说形状是 [num_reqs],这只在非 speculative decoding 模式下成立。
在 speculative decoding 模式下(SpeculativeSampler.forward_cuda,line 1068 调用 _compute_sampling_mask),输入的 target_probs 形状是 [total_accepted_tokens, vocab_size],所以返回的 logz_per_batch 形状也是 [total_accepted_tokens],每个 accepted token 都有独立的 logz 值。
建议更新注释为:
# logZ_K for logprobs renormalization:
# - Non-speculative: shape [num_reqs], one value per request
# - Speculative: shape [total_accepted_tokens], one per accepted token
Motivation
保留logz计算过程,使用logz对logprobs重归一化,#7225 #6966
Modifications
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.