-
Notifications
You must be signed in to change notification settings - Fork 80
Symmetric memory pytorch backends #6023
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
14fd212
5646c03
14816aa
6996d05
49d669c
8962475
62c6945
67181c8
eea57d8
a9ddffd
f9cac71
8e62ccc
1be0134
3596301
9b05915
6147139
b5a2418
af128e4
828573d
294a867
6a5d3c3
2908e70
aa4f9c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ | |
| #include <numeric> | ||
|
|
||
| #ifdef NVFUSER_DISTRIBUTED | ||
| #include <torch/csrc/distributed/c10d/GroupRegistry.hpp> | ||
| #include <torch/csrc/distributed/c10d/PrefixStore.hpp> | ||
| #include <torch/csrc/distributed/c10d/exception.h> | ||
| #ifdef USE_C10D_NCCL | ||
|
|
@@ -121,7 +122,8 @@ bool parseEnv( | |
| } | ||
|
|
||
| // retrieves master port | ||
| if ((env = std::getenv("NVFUSER_MASTER_PORT")) != nullptr) { | ||
| env = std::getenv("NVFUSER_MASTER_PORT"); | ||
| if (env != nullptr) { | ||
| master_port = std::atoi(env); | ||
| } else { | ||
| LOG(INFO) << "The environment variable NVFUSER_MASTER_PORT has not been " | ||
|
|
@@ -248,10 +250,10 @@ void waitForDebuggerAtRanks( | |
| std::cerr << "Process " << pid | ||
| << " is waiting for the debugger. To continue debugging, " | ||
| << "start gdb, `attach " << pid | ||
| << "`, `set var waiting=false`, and `fini`." << std::endl; | ||
| << "`, `set var waiting=false`, and `fini`.\n"; | ||
| while (waiting) { // Please change `waiting` in the debugger. | ||
| } | ||
| std::cerr << "Process " << getpid() << " finished waiting." << std::endl; | ||
| std::cerr << "Process " << getpid() << " finished waiting.\n"; | ||
| } | ||
|
|
||
| if (communicator->is_available()) { | ||
|
|
@@ -331,6 +333,13 @@ Communicator& Communicator::getInstance() { | |
| return *communicator; | ||
| } | ||
|
|
||
| void Communicator::registerProcessGroup( | ||
| const std::string& name, | ||
| const c10::intrusive_ptr<c10d::ProcessGroup>& pg) { | ||
| c10d::register_process_group(name, pg); | ||
| process_groups_[name] = pg; | ||
| } | ||
|
|
||
| void Communicator::cleanup() { | ||
| static bool cleaned_up = false; | ||
| NVF_CHECK( | ||
|
|
@@ -349,19 +358,25 @@ void Communicator::cleanup() { | |
|
|
||
| store_ = nullptr; | ||
|
|
||
| #if defined(NVFUSER_DISTRIBUTED) && defined(USE_C10D_NCCL) | ||
| #if defined(NVFUSER_DISTRIBUTED) | ||
| #if defined(USE_C10D_NCCL) | ||
| // Sort backends to work around a NCCL bug (nvbugs/4889623). Closing backends | ||
| // in different orders between ranks have been causing a hang. | ||
| std::vector<std::pair<std::string, c10::intrusive_ptr<c10d::Backend>>> | ||
| keyed_backends(backends_.begin(), backends_.end()); | ||
| std::sort(keyed_backends.begin(), keyed_backends.end()); | ||
| std::ranges::sort(keyed_backends.begin(), keyed_backends.end()); | ||
| for (auto& [key, backend] : keyed_backends) { | ||
| // Call shutdown before destructing a ProcessGroupNCCL as instructed by | ||
| // https://github.com/pytorch/pytorch/blob/e62073d7997c9e63896cb5289ffd0874a8cc1838/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp#L1164-L1170. | ||
| if (auto* pg_nccl = dynamic_cast<c10d::ProcessGroupNCCL*>(backend.get())) { | ||
| pg_nccl->shutdown(); | ||
| } | ||
| } | ||
| #endif | ||
| for (const auto& entry : process_groups_) { | ||
| c10d::unregister_process_group(entry.first); | ||
| } | ||
| process_groups_.clear(); | ||
| #endif | ||
| backends_.clear(); | ||
| } | ||
|
|
@@ -382,16 +397,16 @@ c10d::Backend* Communicator::getBackendForTeam( | |
| // generate a string key which is unique to the team | ||
| // create the team and cache it | ||
| std::string team_key = prefix + getTeamKey(team, b); | ||
| // check that the caller's rank belongs to the requested team | ||
| auto rank_it = std::ranges::find(team.begin(), team.end(), deviceId()); | ||
| if (rank_it == team.end()) { | ||
| return nullptr; | ||
| } | ||
| // check if backend associated with the team is present in the cache | ||
| if (backends_.find(team_key) == | ||
| backends_.end()) { // create the backend and cache it | ||
| #ifdef NVFUSER_DISTRIBUTED | ||
| backends_[team_key] = [&]() -> c10::intrusive_ptr<c10d::Backend> { | ||
| // check that the caller's rank belongs to the requested team | ||
| auto rank_it = std::find(team.begin(), team.end(), deviceId()); | ||
| if (rank_it == team.end()) { | ||
| return nullptr; | ||
| } | ||
| // retrieve the caller's rank index/position in the team | ||
| RankType team_rank = std::distance(team.begin(), rank_it); | ||
| return createBackend( | ||
|
|
@@ -404,6 +419,26 @@ c10d::Backend* Communicator::getBackendForTeam( | |
| backends_[team_key] = nullptr; | ||
| #endif | ||
| } | ||
| #if defined(NVFUSER_DISTRIBUTED) && defined(USE_DISTRIBUTED) | ||
| if (process_groups_.find(team_key) == process_groups_.end()) { | ||
| if (b == CommunicatorBackend::kNccl) { | ||
| RankType team_rank = std::distance(team.begin(), rank_it); | ||
|
|
||
| auto pg = c10::make_intrusive<c10d::ProcessGroup>( | ||
| c10::make_intrusive<c10d::PrefixStore>(team_key, store_), | ||
| team_rank, | ||
| static_cast<int>(team.size())); | ||
| pg->setBackend( | ||
| c10::DeviceType::CUDA, | ||
| c10d::ProcessGroup::BackendType::NCCL, | ||
| backends_[team_key]); | ||
| pg->setDefaultBackend(c10d::ProcessGroup::BackendType::NCCL); | ||
| pg->setGroupName(team_key); | ||
|
|
||
| registerProcessGroup(team_key, pg); | ||
| } | ||
|
saivishal1999 marked this conversation as resolved.
Comment on lines
+422
to
+439
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you explain why we need this change? I am not sure to understand the logic and motivation. It seems like an old artifact --
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added this to keep track of process groups registered by fuser's symmem so that they can be unregistered during cleanup and also to keep track if the group is already registered or not. in the next commit you'll see that i'll use this variable's keys(to read) and during cleanup
saivishal1999 marked this conversation as resolved.
|
||
| } | ||
| #endif | ||
| return backends_.at(team_key).get(); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,8 +11,9 @@ | |
| #include <ATen/core/ivalue.h> | ||
| #include <c10/util/intrusive_ptr.h> | ||
|
|
||
| #ifdef NVFUSER_DISTRIBUTED | ||
| #if defined(NVFUSER_DISTRIBUTED) | ||
| #include <torch/csrc/distributed/c10d/Backend.hpp> | ||
| #include <torch/csrc/distributed/c10d/ProcessGroup.hpp> | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this header should always be present, no?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added this header when I added the process_groups_ variable, so the same guard is used. It wasn't needed before my changes |
||
| #include <torch/csrc/distributed/c10d/TCPStore.hpp> | ||
| #include <torch/csrc/distributed/c10d/Work.hpp> | ||
| #else | ||
|
|
@@ -110,6 +111,10 @@ class NVF_API Communicator { | |
| c10d::Backend* getWorld( | ||
| std::optional<CommunicatorBackend> backend = std::nullopt); | ||
|
|
||
| void registerProcessGroup( | ||
| const std::string& name, | ||
| const c10::intrusive_ptr<c10d::ProcessGroup>& pg); | ||
|
|
||
| // returns if a backend is available for creation | ||
| bool isBackendAvailable(CommunicatorBackend backend) const { | ||
| if (backend == CommunicatorBackend::kUcc) { | ||
|
|
@@ -153,6 +158,10 @@ class NVF_API Communicator { | |
| c10::intrusive_ptr<c10d::TCPStore> store_; | ||
| // cache for the created backends. The keys are strings generated from Teams | ||
| std::unordered_map<std::string, c10::intrusive_ptr<c10d::Backend>> backends_; | ||
| // c10d process-group wrappers registered for symmetric-memory rendezvous. | ||
| // Keeps track of the process groups created for the rendezvous. | ||
| std::unordered_map<std::string, c10::intrusive_ptr<c10d::ProcessGroup>> | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please make sure Also, can you explain (and add a comment in the code) why we need ProcessGroup here? |
||
| process_groups_; | ||
|
saivishal1999 marked this conversation as resolved.
|
||
| }; | ||
|
|
||
| } // namespace nvfuser | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
process_groups_cleanup guard mismatch — compile error whenNVFUSER_DISTRIBUTEDis set withoutUSE_DISTRIBUTEDprocess_groups_is declared incommunicator.hunder#if defined(NVFUSER_DISTRIBUTED) && defined(USE_DISTRIBUTED), but the cleanup loop here lives under the broader#if defined(NVFUSER_DISTRIBUTED)(without theUSE_DISTRIBUTEDguard). When a build definesNVFUSER_DISTRIBUTEDbut notUSE_DISTRIBUTED,process_groups_does not exist as a member, yet this code tries to iterate over it — a hard compile error.c10d::unregister_process_group(fromGroupRegistry.hpp) is already included under#ifdef NVFUSER_DISTRIBUTED, so fixing just the guard on these lines is sufficient:(The surrounding
#if defined(NVFUSER_DISTRIBUTED)/#endifalready provides the outer distributed guard.)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment is relevant -- but the real fix is to replace
USE_DISTRIBUTEDbyNVFUSER_DISTRIBUTEDwherever it appears (we don't useUSE_DISTRIBUTEDin the codebase)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. You're right—
USE_DISTRIBUTEDshould be replaced withNVFUSER_DISTRIBUTEDeverywhere. The codebase currently hasprocess_groups_guarded by both macros, but the cleanup code only checks one, which causes the mismatch.USE_DISTRIBUTEDappears in 4 files:csrc/multidevice/communicator.h(line 14, line 162) — the header whereprocess_groups_is declaredcsrc/multidevice/communicator.cpp(line 414) — whereprocess_groups_is populatedcsrc/multidevice/symmetric_tensor.cpp(multiple locations) — where symmetric memory usesUSE_DISTRIBUTEDcsrc/multidevice/symmetric_tensor.h(line 88) — wheretorch_symm_handle_is declaredAll of these
#if defined(NVFUSER_DISTRIBUTED) && defined(USE_DISTRIBUTED)guards should become#if defined(NVFUSER_DISTRIBUTED).The Python files (
python/utils.py,python/tools/gen_nvfuser_version.py) andCMakeLists.txtalso referenceUSE_DISTRIBUTEDfor build configuration, so those would need updating too depending on your build system changes.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a build "build_image_ARM_igpu_20_TNVF" which was failing - it was setting NVFUSER_DISTRIBUTED to ON but USE_DISTRIBUTED to False during cmake configuration. So I had to explicitly add USE_DISTRIBUTED checks. I'll add the missing guard here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I didnt know
cc @wujingyue to make sure you are aware of this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for tagging me.
That sounds right.
I'm not sure why NVFUSER_DISTRIBUTED was created in the first place -- it was there before I joined the project. We could use USE_DISTRIBUTED throughout the codebase. I guess NVFUSER_DISTRIBUTED gives an extra control of turning nvFuser distributed off even when pytorch distributed is on. But I'm not sure that use case is relevant.
Are you sure about this given
Fuser/CMakeLists.txt
Line 47 in 771eddd
Also note
Fuser/csrc/multidevice/c10d_mock.h
Lines 12 to 19 in 771eddd
#ifby defining a mock.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wujingyue I saw that USE_DISTRIBUTED was false and NVFUSER_DISTRIBUTE was on in logs here https://gitlab-master.nvidia.com/dl/pytorch/fuser-gh-mirror/-/jobs/287606349/raw.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@saivishal1999 The other guard still looks missing here? Did you attempt the mock approach -- that should ideally work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm adding mocks and removing these guards in future commits, builds/tests are passing.