提问人:cangozpi 提问时间:11/10/2023 更新时间:11/10/2023 访问量:28
Torch 分布式绑定端口过多,阻碍 128 进程创建
torch distributed binding on too many ports and hindering 128 process creation
问:
我正在尝试使用火炬分布式包进行分布式 cpu 训练。在我的代码中,我通过调用 ddp_setup() 初始化和创建组,如下所示:
def get_dist_info():
GLOBAL_WORLD_SIZE = int(os.environ["WORLD_SIZE"])
GLOBAL_RANK = int(os.environ["RANK"])
LOCAL_WORLD_SIZE = int(os.environ["LOCAL_WORLD_SIZE"])
LOCAL_RANK = int(os.environ["LOCAL_RANK"])
return GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK
def ddp_setup(logger, use_cuda):
"""
Setups torch.distributed and creates p_groups for training.
In every node, 1 process (process with local_rank == 0) is assigned to the agents_group, the remaining processes are
assigned to the env_workers_group. To get a better understanding check out the example below.
agents_group processes have an instance of RNDAgent and perform optimizations.
env_workers_group processes have an instance of the environment and perform interactions with it.
Example:
Available from torchrun:
nnodes: number of nodes = 3
nproc_per_node: number of processes per node = 4
---
************** NODE 0:
LOCAL_RANK 0: GPUs --> agents_group
LOCAL_RANK != 0: CPUs --> env_workers_group
**************
...
************** NODE: 1:
LOCAL_RANK 0: GPUs --> agents_group
LOCAL_RANK != 0: CPUs --> env_workers_group
**************
...
************** NODE: 2:
LOCAL_RANK 0: GPUs --> agents_group
LOCAL_RANK != 0: CPUs --> env_workers_group
**************
-node0- -node1- -node2-
0,1,2,3 4,5,6,7 8,9,10,11 || agents_group_ranks=[0,4,8], env_workers_group_rank=[remaining ranks]
* * *
"""
GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK = get_dist_info()
if use_cuda:
assert torch.cuda.is_available() == True, "use_cuda:True is passed but cuda is not available !"
if torch.cuda.is_available() and use_cuda:
gpu_id = "cuda:" + str(LOCAL_RANK % torch.cuda.device_count())
backend = "nccl"
else:
gpu_id = "cpu"
backend = "gloo"
if torch.cuda.is_available() and use_cuda and is_agents_group_member():
torch.cuda.set_device(gpu_id) # GPU should only be used by the agents_group processes and each process should have a unique cuda device (otherwise see the error at: https://github.com/pytorch/torchrec/issues/328)
init_process_group(backend="gloo")
agents_group, env_workers_group, agents_group_global_ranks, env_workers_group_global_ranks, env_workers_group_per_node_global_ranks = create_process_groups_for_training(agents_group_backend=backend, env_workers_group_backend="gloo")
logger.log_msg_to_both_console_and_file(f'Initializing process with global_rank: {GLOBAL_RANK}, local_rank: {LOCAL_RANK}, local_world_size: {LOCAL_WORLD_SIZE}, global_world_size: {GLOBAL_WORLD_SIZE}, [{gpu_id}], group: {"agents_group" if GLOBAL_RANK in agents_group_global_ranks else "env_workers_group"}, group_backend: {backend if GLOBAL_RANK in agents_group_global_ranks else "gloo"}, default_backend: {backend}')
return GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK, gpu_id, agents_group, env_workers_group, agents_group_global_ranks, env_workers_group_global_ranks, env_workers_group_per_node_global_ranks
def is_agents_group_member():
"""
returns True if the process belongs to the agents_group, False otherwise.
"""
GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK = get_dist_info()
return GLOBAL_RANK % LOCAL_WORLD_SIZE == 0
def create_process_groups_for_training(agents_group_backend="nccl", env_workers_group_backend="gloo"):
GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK = get_dist_info()
# Calculate ranks for the groups:
agents_group_global_ranks = list(filter(lambda x: x != None, [rank if rank % LOCAL_WORLD_SIZE == 0 else None for rank in range(GLOBAL_WORLD_SIZE)]))
env_workers_group_global_ranks = list(filter(lambda x: x != None, [rank if rank % LOCAL_WORLD_SIZE != 0 else None for rank in range(GLOBAL_WORLD_SIZE)]))
env_workers_group_per_node_global_ranks = [rank for rank in range((GLOBAL_RANK//LOCAL_WORLD_SIZE) * LOCAL_WORLD_SIZE, ((GLOBAL_RANK//LOCAL_WORLD_SIZE) + 1) * LOCAL_WORLD_SIZE)][1:]
# Create groups from the calculated ranks:
agents_group = dist.new_group(
ranks=agents_group_global_ranks,
backend=agents_group_backend
) # group for agent processes across nodes
env_workers_group = dist.new_group(
ranks=env_workers_group_global_ranks,
backend=env_workers_group_backend
) # group for env_worker processes across nodes
env_workers_group_per_node = dist.new_group(
ranks=env_workers_group_per_node_global_ranks,
backend=env_workers_group_backend
) # group for env_worker processes per node
assert len(env_workers_group_per_node_global_ranks) == (LOCAL_WORLD_SIZE - 1)
return agents_group, env_workers_group, agents_group_global_ranks, env_workers_group_global_ranks, env_workers_group_per_node_global_ranks
并使用(假设 3 个进程,以便更轻松地调试):
torchrun --nnodes 1 --nproc_per_node 3 main.py
当我运行代码并通过 lsof 和 netstat 检查端口使用情况时,我看到了一些差异。首先,netstat 向我展示了两个绑定在端口(29400 和 50015)上的 python 进程,但是,lsof 显示了 2 个以上的 python 进程,这些进程绑定在不同的端口上,并且与 netstat 显示的不同。我不明白为什么这两个过程的输出之间存在差异。其次,我不明白为什么lsof上有这么多进程。我假设 3 个不同的 python 进程,每个进程都属于 torchrun 生成的 3 个进程之一,但是,为什么它们具有不同的端口绑定?
最后,我尝试在具有 128 个进程的单个节点上运行我的代码。每当我尝试这样做时,我都会收到与已在使用的端口相关的错误。我相信这是由于火炬 dist 试图绑定太多端口,并且没有所需的可用端口。我想知道有没有办法解决这个问题。此外,通过torch.multiprocessing,我们可以使用不绑定在端口上的管道进行通信。我不明白为什么分布式火炬不支持类似的东西。我认为对于进程超过 128 的多处理(例如,在深度强化学习中运行并行环境)来说,这不是一个可行的选择。我说得对吗?
谢谢。我真的很感激任何帮助。
答: 暂无答案
评论