From 42deaf7cbe745d9858fffb671b0259f55de876a3 Mon Sep 17 00:00:00 2001 From: zoooo0820 Date: Tue, 7 Apr 2026 17:49:16 +0800 Subject: [PATCH 1/3] support moe-topk use topk_reduce_func --- fastdeploy/model_executor/layers/moe/ep.py | 2 + .../layers/moe/fused_moe_cutlass_backend.py | 1 + .../layers/moe/fused_moe_deepgemm_backend.py | 110 ++---------------- fastdeploy/model_executor/layers/moe/moe.py | 17 +++ fastdeploy/model_executor/models/glm4_moe.py | 1 + 5 files changed, 32 insertions(+), 99 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index 4489f5ec089..ab65ed024ef 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -508,6 +508,7 @@ def moe_select(self, layer: nn.Layer, gate_out: paddle.Tensor): expert_in_rank_num_list=expert_in_rank_num_list, tokens_per_expert_stats_list=tokens_per_expert_stats_list, redundant_ep_rank_num_plus_one=layer.fd_config.eplb_config.redundant_experts_num + 1, + topk_reduce_func=layer.topk_reduce_func, ) else: topk_idx, topk_weights = fastdeploy.model_executor.ops.gpu.moe_redundant_topk_select( @@ -533,6 +534,7 @@ def moe_select(self, layer: nn.Layer, gate_out: paddle.Tensor): layer.routed_scaling_factor, layer.gate_correction_bias, getattr(layer, "renormalize", True), + topk_reduce_func=layer.topk_reduce_func, ) else: topk_idx, topk_weights = fastdeploy.model_executor.ops.gpu.moe_topk_select( diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py index a179d1c80af..bcc342d5114 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py @@ -301,6 +301,7 @@ def apply_tp( layer.routed_scaling_factor, layer.gate_correction_bias, getattr(layer, "renormalize", True), + topk_reduce_func=getattr(layer, "topk_reduce_func", None), ) if current_platform.is_iluvatar(): ( diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py index b09cea5f1a4..7664da3376c 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py @@ -147,67 +147,6 @@ def m_grouped_fp8_gemm_nt_contiguous_custom_python_op( return ffn_out -def moe_topk_select( - gating_output: paddle.Tensor, - n_group: int, - topk_group: int, - top_k: int, - routed_scaling_factor: float, - e_score_correction_bias: paddle.Tensor, - renormalize: bool = False, -): - """ - Topk selection using paddle PHI topk API. - - Args: - gating_output: gate output logits, shape [seq_len, n_experts] - n_group: number of expert groups - topk_group: number of top-k groups to select - top_k: number of top experts per token - routed_scaling_factor: scaling factor for routed experts - e_score_correction_bias: bias for expert selection - renormalize: whether to renormalize topk probabilities - - Returns: - topk_weights: normalized topk probabilities, shape [seq_len, top_k] - topk_ids: topk expert indices, shape [seq_len, top_k] - """ - # compute gate probs via sigmoid - gate_probs = paddle.nn.functional.sigmoid(gating_output) - # probs_for_choice includes correction bias for topk selection - probs_for_choice = gate_probs + e_score_correction_bias if e_score_correction_bias is not None else gate_probs - # group-based topk selection - n_group = n_group if n_group > 0 else 1 - topk_group = topk_group if topk_group > 0 else 1 - if n_group > 1 and topk_group < n_group: - seq_length, n_experts = probs_for_choice.shape - group_scores = ( - probs_for_choice.reshape([seq_length, n_group, -1]).topk(2, axis=-1)[0].sum(axis=-1) - ) # [seq_len, n_group] - group_idx = paddle.topk(group_scores, k=topk_group, axis=-1, sorted=True)[1] # [seq_len, topk_group] - group_mask = paddle.sum( - paddle.nn.functional.one_hot(group_idx, num_classes=n_group).cast(group_scores.dtype), - axis=1, # Sum over topk_group dimension -> [seq_len, n_group] - ) - score_mask = ( - group_mask.unsqueeze(-1).expand([seq_length, n_group, n_experts // n_group]).reshape([seq_length, -1]) - ) # [seq_len, n_experts] - probs_for_choice = probs_for_choice.masked_fill(~score_mask.astype(paddle.bool), float("-inf")) - - _, topk_ids = paddle.topk(probs_for_choice, top_k, axis=-1) - topk_weights = paddle.index_sample(gate_probs, topk_ids) - - # normalize combine weights - if renormalize: - topk_weights = topk_weights / paddle.clip(topk_weights.sum(-1, keepdim=True), min=1e-12) - - # apply routed scaling factor - if routed_scaling_factor: - topk_weights = topk_weights * routed_scaling_factor - - return topk_weights, topk_ids - - class DeepGemmFusedMoeMethod(MoEMethodBase): """ DeepGemmFusedMoeMethod is a class that implements the MoEMethodBase interface for DeepGemm backend. @@ -329,22 +268,7 @@ def apply_ep_prefill( hidden_size = x.shape[1] # 1. Select topk experts and weights - if ( - fastdeploy.envs.FD_USE_PHI_MOE_TOPK - and layer.redundant_table_manger is None - and layer.topk_method == "noaux_tc" - ): - topk_weights, topk_idx = moe_topk_select( - gate_out, - layer.n_group, - layer.topk_group, - layer.top_k, - layer.routed_scaling_factor, - layer.gate_correction_bias, - getattr(layer, "renormalize", True), - ) - else: - topk_idx, topk_weights = self.ep_prefill_runner.moe_select(layer, gate_out) + topk_idx, topk_weights = self.ep_prefill_runner.moe_select(layer, gate_out) if topk_ids_hookfunc is not None: topk_ids_hookfunc(topk_ids=topk_idx) @@ -619,28 +543,16 @@ def apply_tp( gate_out = gate(x.cast("float32")) if layer.topk_method == "noaux_tc": - - if not fastdeploy.envs.FD_USE_PHI_MOE_TOPK: - _, topk_weights, topk_ids = fastdeploy.model_executor.layers.moe.moe.get_moe_scores( - gate_out, - layer.n_group, - layer.topk_group, - layer.top_k, - layer.routed_scaling_factor, - layer.gate_correction_bias, - getattr(layer, "renormalize", True), - ) - else: - topk_weights, topk_ids = moe_topk_select( - gate_out, - layer.n_group, - layer.topk_group, - layer.top_k, - layer.routed_scaling_factor, - layer.gate_correction_bias, - getattr(layer, "renormalize", True), - ) - + _, topk_weights, topk_ids = fastdeploy.model_executor.layers.moe.moe.get_moe_scores( + gate_out, + layer.n_group, + layer.topk_group, + layer.top_k, + layer.routed_scaling_factor, + layer.gate_correction_bias, + getattr(layer, "renormalize", True), + topk_reduce_func=getattr(layer, "topk_reduce_func", None), + ) else: topk_ids, topk_weights = fastdeploy.model_executor.ops.gpu.moe_topk_select( gate_out, diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 12964ef25e0..91230bf6e67 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -86,6 +86,7 @@ def get_moe_scores( expert_in_rank_num_list: paddle.Tensor = None, tokens_per_expert_stats_list: paddle.Tensor = None, redundant_ep_rank_num_plus_one: int = 1, + topk_reduce_func: Callable = None, ) -> paddle.Tensor: """ compute moe scores using e_score_correction_bias. @@ -93,6 +94,14 @@ def get_moe_scores( scores = paddle.nn.functional.sigmoid(gating_output) assert e_score_correction_bias is not None, "e_score_correction_bias is none!" scores_with_bias = scores + e_score_correction_bias + + if envs.FD_USE_PHI_MOE_TOPK: + # calculate renormalize and routed_scaling_factor value outside the noaux_tc + original_renormalize = renormalize + original_routed_scaling_factor = routed_scaling_factor + renormalize = False + routed_scaling_factor = 1.0 + if expert_id_to_ep_rank_array is None: scores, topk_values, topk_idx = noaux_tc( scores, @@ -119,6 +128,12 @@ def get_moe_scores( routed_scaling_factor, redundant_ep_rank_num_plus_one, ) + if envs.FD_USE_PHI_MOE_TOPK: + if topk_reduce_func is not None and original_renormalize: + topk_values = topk_values / topk_reduce_func(topk_values) + + if original_routed_scaling_factor != 1.0: + topk_values *= original_routed_scaling_factor return scores, topk_values, topk_idx @@ -148,6 +163,7 @@ def __init__( with_bias: bool = False, activation="swiglu", model_format: Optional[str] = None, + topk_reduce_func: Callable = None, # only used when FD_USE_PHI_MOE_TOPK=1 ): """ Initialize the Moe layer with given parameters. @@ -193,6 +209,7 @@ def __init__( self.moe_tag = moe_tag self.with_bias = with_bias self.activation = activation + self.topk_reduce_func = topk_reduce_func if self.ep_size > 1: expert_id_offset = expert_id_offset + self.ep_rank * self.num_local_experts diff --git a/fastdeploy/model_executor/models/glm4_moe.py b/fastdeploy/model_executor/models/glm4_moe.py index 20d86fbaaf8..3f74ae226af 100644 --- a/fastdeploy/model_executor/models/glm4_moe.py +++ b/fastdeploy/model_executor/models/glm4_moe.py @@ -170,6 +170,7 @@ def __init__( layer_idx=layer_id, gate_correction_bias=self.gate.e_score_correction_bias, weight_key_map=weight_key_map, + topk_reduce_func=lambda x: x.sum(axis=-1, keepdim=True) + 1e-20, ) if self.n_shared_experts > 0: From c93ba724e6f2424014a352b126c1e80cd748897c Mon Sep 17 00:00:00 2001 From: zoooo0820 Date: Tue, 7 Apr 2026 20:48:03 +0800 Subject: [PATCH 2/3] fix ep error --- fastdeploy/model_executor/layers/moe/ep.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index ab65ed024ef..13c55132fde 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -508,7 +508,7 @@ def moe_select(self, layer: nn.Layer, gate_out: paddle.Tensor): expert_in_rank_num_list=expert_in_rank_num_list, tokens_per_expert_stats_list=tokens_per_expert_stats_list, redundant_ep_rank_num_plus_one=layer.fd_config.eplb_config.redundant_experts_num + 1, - topk_reduce_func=layer.topk_reduce_func, + topk_reduce_func=getattr(layer, "topk_reduce_func", None), ) else: topk_idx, topk_weights = fastdeploy.model_executor.ops.gpu.moe_redundant_topk_select( @@ -534,7 +534,7 @@ def moe_select(self, layer: nn.Layer, gate_out: paddle.Tensor): layer.routed_scaling_factor, layer.gate_correction_bias, getattr(layer, "renormalize", True), - topk_reduce_func=layer.topk_reduce_func, + topk_reduce_func=getattr(layer, "topk_reduce_func", None), ) else: topk_idx, topk_weights = fastdeploy.model_executor.ops.gpu.moe_topk_select( From 36be4ac72387a8da813b32fd85dea0c99e322f15 Mon Sep 17 00:00:00 2001 From: zoooo0820 Date: Wed, 8 Apr 2026 10:52:27 +0800 Subject: [PATCH 3/3] fix ut --- fastdeploy/model_executor/layers/moe/moe.py | 13 ++++--- tests/operators/test_noaux_tc_redundant.py | 38 ++++++++++++++------- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 91230bf6e67..0b1d4a5308f 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -86,7 +86,7 @@ def get_moe_scores( expert_in_rank_num_list: paddle.Tensor = None, tokens_per_expert_stats_list: paddle.Tensor = None, redundant_ep_rank_num_plus_one: int = 1, - topk_reduce_func: Callable = None, + topk_reduce_func: Callable = lambda x: x.sum(axis=-1, keepdim=True) + 1e-20, ) -> paddle.Tensor: """ compute moe scores using e_score_correction_bias. @@ -129,8 +129,12 @@ def get_moe_scores( redundant_ep_rank_num_plus_one, ) if envs.FD_USE_PHI_MOE_TOPK: - if topk_reduce_func is not None and original_renormalize: - topk_values = topk_values / topk_reduce_func(topk_values) + if original_renormalize: + if topk_reduce_func is not None: + topk_values = topk_values / topk_reduce_func(topk_values) + else: + # 使用默认的 sum + epsilon + topk_values = topk_values / (topk_values.sum(axis=-1, keepdim=True) + 1e-20) if original_routed_scaling_factor != 1.0: topk_values *= original_routed_scaling_factor @@ -163,7 +167,8 @@ def __init__( with_bias: bool = False, activation="swiglu", model_format: Optional[str] = None, - topk_reduce_func: Callable = None, # only used when FD_USE_PHI_MOE_TOPK=1 + topk_reduce_func: Callable = lambda x: x.sum(axis=-1, keepdim=True) + + 1e-20, # only used when FD_USE_PHI_MOE_TOPK=1, default is same as noaux_tc kernel ): """ Initialize the Moe layer with given parameters. diff --git a/tests/operators/test_noaux_tc_redundant.py b/tests/operators/test_noaux_tc_redundant.py index 60d1aad2a22..f5289e0ab3c 100644 --- a/tests/operators/test_noaux_tc_redundant.py +++ b/tests/operators/test_noaux_tc_redundant.py @@ -1,10 +1,22 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import unittest +from unittest import mock import paddle -from fastdeploy.model_executor.layers.moe.fused_moe_deepgemm_backend import ( - moe_topk_select, -) from fastdeploy.model_executor.layers.moe.moe import get_moe_scores @@ -135,15 +147,17 @@ def test_group_topk_using_phi_topk(self): e_score_correction_bias=e_score_correction_bias, ) - topk_values, topk_idx = moe_topk_select( - gating_output=gating_output, - n_group=n_group, - topk_group=topk_group, - top_k=top_k, - routed_scaling_factor=routed_scaling_factor, - e_score_correction_bias=e_score_correction_bias, - renormalize=renormalize, - ) + with mock.patch.dict("os.environ", {"FD_USE_PHI_MOE_TOPK": "1"}): + new_score, topk_values, topk_idx = get_moe_scores( + gating_output=gating_output, + n_group=n_group, + topk_group=topk_group, + top_k=top_k, + routed_scaling_factor=routed_scaling_factor, + e_score_correction_bias=e_score_correction_bias, + renormalize=renormalize, + topk_reduce_func=lambda x: x.sum(axis=-1, keepdim=True) + 1e-20, + ) equal_topk_value = paddle.allclose(topk_values, ref_topk_values, atol=1e-03, rtol=1e-03).item() equal_topk_ids = paddle.allclose(