Conversation
There was a problem hiding this comment.
Pull request overview
This PR integrates DeepEP-based Expert Parallelism (EP) over the NCCL backend into TurboMind, wiring EP initialization into runtime context creation and extending LLaMA MoE execution to support EP token routing/dispatch/combine.
Changes:
- Add DeepEP/NCCL EP backend (
NcclCommImpl::InitializeEp/Dispatch/Combine) and build it as a newdeepepstatic library. - Extend TurboMind engine/model parameters for EP (
ep_size,ep_rank,ll_max_tokens_per_rank) and initialize EP inTurboMind::Impl::CreateContext. - Update LLaMA unified decoder + MoE FFN to support EP routing and add a fused RMSNorm path that supports EP token partitioning (ReduceScatterV/AllGatherV).
Reviewed changes
Copilot reviewed 41 out of 42 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| src/turbomind/turbomind.cc | Parse EP/LL params and initialize EP in device communicator during context setup |
| src/turbomind/models/llama/unified_decoder.{h,cc} | Add EP-aware hidden-state layout + fused RMSNorm integration + partial-token FFN execution |
| src/turbomind/models/llama/moe_ffn_layer.{h,cc} | Add EP routing/dispatch/combine implementation and EP-mode state |
| src/turbomind/models/llama/llama_params.h | Add EP + LL threshold parameters to engine/moe config |
| src/turbomind/models/llama/LlamaDenseWeight.{h,cc} | Shard MoE expert weights by ep_size/ep_rank |
| src/turbomind/models/llama/LlamaDecoderLayerWeight.{h,cc} | Thread EP params into MoE weight construction; adjust MLP TP handling for EP |
| src/turbomind/models/llama/FusedRMSNormLayer.h | New TP/EP fused RMSNorm abstraction with EP ReduceScatterV/AllGatherV |
| src/turbomind/kernels/gemm/moe_ep_utils.{h,cu} | New kernels/utilities for EP gating and (LL/HT) combine helpers |
| src/turbomind/comm/device_comm.h | Extend device-comm interface with ReduceScatterV/AllGatherV and EP APIs |
| src/turbomind/comm/nccl/{nccl_comm.h,nccl.cu,nccl_ep.cu} | Refactor NCCL comm impl into header + add DeepEP EP ops |
| src/turbomind/comm/nccl/deep_ep/* | Vendored DeepEP implementation and kernels |
| src/turbomind/comm/nccl/CMakeLists.txt | Build/link deepep and include EP source in nccl_comm |
| lmdeploy/turbomind/turbomind.py | Add EP parallel-config derivation in Python front-end |
| lmdeploy/turbomind/deploy/{config.py,converter.py,module.py} | Plumb ep_size into deploy config and TP sizing for EP |
| lmdeploy/messages.py | Add ep to TurbomindEngineConfig |
| lmdeploy/cli/serve.py | Add CLI wiring to pass --ep into engine config |
| src/turbomind/models/llama/llama_utils.cu | Add Compare<int64_t> instantiation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| *moe_recv_counter = -1; | ||
| MoeLLDispatchRoutingMapKernel<<<num_local_experts, threads, 0, st>>>(moe_recv_counter_mapped, f2n, f2E, offsets); | ||
| sync_check_cuda_error(); | ||
| core::Context::stream().Sync(); | ||
|
|
||
| while (*moe_recv_counter < 0) {}; | ||
| out = Tensor({*moe_recv_counter, hidden}, packed_recv_x.dtype(), packed_recv_x.device()); | ||
| TM_CHECK_EQ(hidden * byte_size(packed_recv_x.dtype()) % sizeof(int4), 0LL); |
There was a problem hiding this comment.
invokeMoeLLDispatchPostprocess spins in a tight busy-wait loop (while (*moe_recv_counter < 0) {}) with no timeout/backoff. This can peg a CPU core and can hang indefinitely if the counter is never updated/visible. Prefer a synchronization primitive (e.g., cudaStreamSynchronize + a device-side __threadfence_system() before writing the mapped counter) and/or add a bounded timeout with an error message.
| int group, | ||
| cudaStream_t stream) | ||
| { | ||
| NCCLCHECK(ncclBroadcast(recvbuff, recvbuff, count, to_nccl_dtype(type), root, groups_.at(group), stream)); |
There was a problem hiding this comment.
Broadcast ignores the sendbuff argument and always passes recvbuff as both NCCL send/recv buffers. This breaks the API contract if callers ever provide distinct send/recv buffers (root will broadcast uninitialized/incorrect data). Use sendbuff for the NCCL send pointer (and allow sendbuff==recvbuff as an optimization).
| NCCLCHECK(ncclBroadcast(recvbuff, recvbuff, count, to_nccl_dtype(type), root, groups_.at(group), stream)); | |
| NCCLCHECK(ncclBroadcast(sendbuff, recvbuff, count, to_nccl_dtype(type), root, groups_.at(group), stream)); |
| if not complete_parallel_config(cfg) and cfg.ep > 1: | ||
| if cfg.communicator in ['cuda-ipc', 'native']: | ||
| assert cfg.nnodes == 1, 'TurboMind does not support multi-node with ep > 1' | ||
| total = cfg.dp * cfg.ep | ||
| if not cfg.device_num: | ||
| count = torch.cuda.device_count() * cfg.nnodes | ||
| if total < count: | ||
| count = total | ||
| cfg.device_num = count | ||
| assert total % cfg.device_num == 0 | ||
| overlap = total // cfg.device_num | ||
| attn_dp_size = overlap | ||
| inner_tp_size = cfg.ep // overlap | ||
| cfg.outer_dp_size = cfg.dp // overlap | ||
| cfg.attn_dp_size = overlap // cfg.nnodes | ||
| cfg.attn_tp_size = inner_tp_size // cfg.cp | ||
| cfg.attn_cp_size = cfg.cp | ||
| cfg.mlp_dp_size = 1 | ||
| cfg.mlp_tp_size = cfg.attn_dp_size * cfg.attn_tp_size * cfg.attn_cp_size |
There was a problem hiding this comment.
EP path can compute attn_dp_size = overlap // cfg.nnodes, which becomes 0 for common multi-node cases (e.g., overlap==1 and nnodes>1), violating later invariants and producing invalid parallel config. Since device_num already accounts for nnodes, avoid dividing overlap by nnodes here (or otherwise ensure attn_dp_size>=1 with a correct derivation).
|
|
||
| void SetWarpup(ForwardParam& p); | ||
|
|
||
| void ForwardNative(ForwardParam& p); | ||
|
|
||
| void ForwardFused(ForwardParam& p); | ||
|
|
||
| void RouteTP(ForwardParam& p, Tensor_<float>& logits); | ||
|
|
||
| void RouteEP(ForwardParam& p, Tensor_<float>& logits); | ||
|
|
||
| void CombineTP(ForwardParam& p); | ||
|
|
||
| void CombineEP(ForwardParam& p); |
There was a problem hiding this comment.
Typo in new private helper name SetWarpup (should be SetWarmup). Keeping the misspelling makes call sites harder to discover/search and looks inconsistent with the existing is_warm_up_ naming.
| struct EpCombineInput { | ||
| EpMode& mode; | ||
| core::Tensor& x; | ||
| std::vector<core::Tensor>& handle; | ||
| std::optional<core::Tensor> topk_weights; | ||
| std::optional<core::Tensor> topk_idx; | ||
| }; |
There was a problem hiding this comment.
EpCombineInput uses std::optional, but this header doesn't include <optional>, which will cause compilation errors depending on include order. Add #include <optional> (and keep headers self-contained).
| int comm_nranks_ = -1; // Number of ranks in NCCL communicator | ||
|
|
||
| ncclComm_t nccl_comm_; | ||
|
|
||
| ncclDevComm_t dev_ht_comm_{}; | ||
| ncclDevComm_t dev_ll_comm_{}; | ||
|
|
||
| std::unordered_map<void*, ncclWindow_t> wins_; | ||
| std::unordered_map<void*, size_t> buffers_; | ||
|
|
||
| // GIN signal management |
There was a problem hiding this comment.
This header declares std::unordered_map members (wins_, buffers_) but doesn't include <unordered_map>, which will fail to compile in translation units that include this header first. Add the missing include (and any other required STL headers) to keep the header self-contained.
| @@ -596,7 +600,7 @@ MoeFfnWeight::MoeFfnWeight(int layer_id, | |||
| group_size, | |||
| act_type, | |||
| fuse_silu_act}); | |||
| register_module("experts", *experts.back(), i); | |||
| register_module("experts", *experts.back(), i + expert_offset); | |||
| } | |||
There was a problem hiding this comment.
local_expert_num is computed via integer division (expert_num / ep_size) without validating divisibility. If expert_num isn't a multiple of ep_size, this will silently drop experts and mis-register / mis-load weights. Add a TM_CHECK_EQ(expert_num % ep_size, 0) (and ideally validate ep_rank < ep_size) before computing local_expert_num/expert_offset.
Related link deepseek-ai/DeepEP#521