Torch 分布式绑定端口过多,阻碍 128 进程创建

torch distributed binding on too many ports and hindering 128 process creation

提问人:cangozpi 提问时间:11/10/2023 更新时间:11/10/2023 访问量:28

问:

我正在尝试使用火炬分布式包进行分布式 cpu 训练。在我的代码中,我通过调用 ddp_setup() 初始化和创建组,如下所示:

def get_dist_info():
    GLOBAL_WORLD_SIZE = int(os.environ["WORLD_SIZE"])
    GLOBAL_RANK = int(os.environ["RANK"])
    LOCAL_WORLD_SIZE = int(os.environ["LOCAL_WORLD_SIZE"])
    LOCAL_RANK = int(os.environ["LOCAL_RANK"])
    return GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK


def ddp_setup(logger, use_cuda):
    """
    Setups torch.distributed and creates p_groups for training.
    In every node, 1 process (process with local_rank == 0) is assigned to the agents_group, the remaining processes are
    assigned to the env_workers_group. To get a better understanding check out the example below.
    agents_group processes have an instance of RNDAgent and perform optimizations.
    env_workers_group processes have an instance of the environment and perform interactions with it.


    Example:
        Available from torchrun:
            nnodes: number of nodes = 3
            nproc_per_node: number of processes per node = 4
        ---

        ************** NODE 0:
        LOCAL_RANK 0: GPUs --> agents_group
        LOCAL_RANK != 0: CPUs --> env_workers_group
        **************
        ...

        ************** NODE: 1:
        LOCAL_RANK 0: GPUs --> agents_group
        LOCAL_RANK != 0: CPUs --> env_workers_group
        **************
        ...

        ************** NODE: 2:
        LOCAL_RANK 0: GPUs --> agents_group
        LOCAL_RANK != 0: CPUs --> env_workers_group
        **************

        -node0-  -node1-   -node2-
        0,1,2,3  4,5,6,7  8,9,10,11    ||    agents_group_ranks=[0,4,8], env_workers_group_rank=[remaining ranks]
        *        *        *

    """

    GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK = get_dist_info()

    if use_cuda:
        assert torch.cuda.is_available() == True, "use_cuda:True is passed but cuda is not available !"

    if torch.cuda.is_available() and use_cuda:
        gpu_id = "cuda:" + str(LOCAL_RANK % torch.cuda.device_count())
        backend = "nccl"
    else:
        gpu_id = "cpu"
        backend = "gloo"

    if torch.cuda.is_available() and use_cuda and is_agents_group_member():
        torch.cuda.set_device(gpu_id) # GPU should only be used by the agents_group processes and each process should have a unique cuda device (otherwise see the error at: https://github.com/pytorch/torchrec/issues/328)
    init_process_group(backend="gloo")

    agents_group, env_workers_group, agents_group_global_ranks, env_workers_group_global_ranks, env_workers_group_per_node_global_ranks = create_process_groups_for_training(agents_group_backend=backend, env_workers_group_backend="gloo")

    logger.log_msg_to_both_console_and_file(f'Initializing process with global_rank: {GLOBAL_RANK}, local_rank: {LOCAL_RANK}, local_world_size: {LOCAL_WORLD_SIZE}, global_world_size: {GLOBAL_WORLD_SIZE}, [{gpu_id}], group: {"agents_group" if GLOBAL_RANK in agents_group_global_ranks else "env_workers_group"}, group_backend: {backend if GLOBAL_RANK in agents_group_global_ranks else "gloo"}, default_backend: {backend}')
    
    return GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK, gpu_id, agents_group, env_workers_group, agents_group_global_ranks, env_workers_group_global_ranks, env_workers_group_per_node_global_ranks

def is_agents_group_member():
    """
    returns True if the process belongs to the agents_group, False otherwise.
    """
    GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK = get_dist_info()
    return GLOBAL_RANK % LOCAL_WORLD_SIZE == 0


def create_process_groups_for_training(agents_group_backend="nccl", env_workers_group_backend="gloo"):
    GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK = get_dist_info()

    # Calculate ranks for the groups:
    agents_group_global_ranks = list(filter(lambda x: x != None, [rank if rank % LOCAL_WORLD_SIZE == 0 else None for rank in range(GLOBAL_WORLD_SIZE)]))
    env_workers_group_global_ranks = list(filter(lambda x: x != None, [rank if rank % LOCAL_WORLD_SIZE != 0 else None for rank in range(GLOBAL_WORLD_SIZE)]))
    env_workers_group_per_node_global_ranks = [rank for rank in range((GLOBAL_RANK//LOCAL_WORLD_SIZE) * LOCAL_WORLD_SIZE, ((GLOBAL_RANK//LOCAL_WORLD_SIZE) + 1) * LOCAL_WORLD_SIZE)][1:]

    # Create groups from the calculated ranks:
    agents_group = dist.new_group(
        ranks=agents_group_global_ranks, 
        backend=agents_group_backend
        ) # group for agent processes across nodes

    env_workers_group = dist.new_group(
        ranks=env_workers_group_global_ranks, 
        backend=env_workers_group_backend
        ) # group for env_worker processes across nodes

    env_workers_group_per_node = dist.new_group(
        ranks=env_workers_group_per_node_global_ranks, 
        backend=env_workers_group_backend
        ) # group for env_worker processes per node

    assert len(env_workers_group_per_node_global_ranks) == (LOCAL_WORLD_SIZE - 1)

    return agents_group, env_workers_group, agents_group_global_ranks, env_workers_group_global_ranks, env_workers_group_per_node_global_ranks

并使用(假设 3 个进程,以便更轻松地调试):

torchrun --nnodes 1 --nproc_per_node 3 main.py

当我运行代码并通过 lsofnetstat 检查端口使用情况时,我看到了一些差异。首先,netstat 向我展示了两个绑定在端口(29400 和 50015)上的 python 进程,但是,lsof 显示了 2 个以上的 python 进程,这些进程绑定在不同的端口上,并且与 netstat 显示的不同。我不明白为什么这两个过程的输出之间存在差异。其次,我不明白为什么lsof上有这么多进程。我假设 3 个不同的 python 进程,每个进程都属于 torchrun 生成的 3 个进程之一,但是,为什么它们具有不同的端口绑定?enter image description here

最后,我尝试在具有 128 个进程的单个节点上运行我的代码。每当我尝试这样做时,我都会收到与已在使用的端口相关的错误。我相信这是由于火炬 dist 试图绑定太多端口,并且没有所需的可用端口。我想知道有没有办法解决这个问题。此外,通过torch.multiprocessing,我们可以使用不绑定在端口上的管道进行通信。我不明白为什么分布式火炬不支持类似的东西。我认为对于进程超过 128 的多处理(例如,在深度强化学习中运行并行环境)来说,这不是一个可行的选择。我说得对吗?

谢谢。我真的很感激任何帮助。

深度学习 pytorch python-multiprocessing torch

评论


答: 暂无答案