数据结构的锁定安全性 [已关闭]

Locking safety of datastructure [closed]

提问人:Aedoro 提问时间:11/10/2023 最后编辑:Aedoro 更新时间:11/15/2023 访问量:182

问:


想改进这个问题吗?通过编辑这篇文章添加详细信息并澄清问题。

7天前关闭。

我有一个数据结构:

  • 多个项目“A”。
  • 每个都有多个项目。每个都恰好链接到 1 。ABBA
  • 每个都有多个项目。每个都恰好链接到 1 。BCCB

对于本地搜索算法,我需要从多个线程修改此数据结构。 物品的数量非常大。目标是让每个线程锁定两个项目,修改它们,然后解锁它们。AA

一个问题是,这些运算符的选择是基于项目的。这样就需要锁定两个项目对应于项目。CAC

我目前的方法是:

  • 选择(偏置)两个项目和 .Cc1c2
  • 存储它们所属的 、 和 (间接通过其链接项)Aa1a2B
  • Lock 和 .a1a2
  • 检查是否 和 仍在计划中 和 。如果是这样,并且可以变异。c1c2a1a2a1a2

这种方法是否要求此数据结构的任何部分是原子的或任何其他额外的同步?我在下面做了一个玩具示例。


编辑

在突变期间,每个线程必须始终指向一个,而一个必须始终指向被突变线程锁定的线程。CBA

突变可以在此约束范围内改变 B.a 和 C.b。为了简单起见,可以忽略 A 和 B 的创建/销毁。

该示例添加了突变函数和一些控制回路。


编辑 2

似乎 b->a 和 c->b 必须是原子的,以确保可见性。否则,不能保证变量更改在检查时可见。



struct B;
struct C;

struct A {
    std::mutex mtx; // this lock is for A including its children
    std::vector<B*> b;
};
struct B {
    A* a;
    std::vector<C*> c;
};
struct C {
    B* b;
};

struct Instance
{
    // assume a & b have reserved space enough to never reallocate
    std::vector<A> a;
    std::vector<B> b;
    std::vector<C> c;
    std::vector<std::pair<C*, C*>> c_pairs;
};

struct Lock
{
    std::unique_lock<std::mutex> a1, a2;
    std::pair<C*, C*> c_pair;
};

Lock try_get_locked_pair(const Instance& instance)
{
    const auto c_pair = instance.c_pairs[0 /* random number */];

    auto* a1 = c_pair.first->b->a;
    auto* a2 = c_pair.second->b->a;

    if (a1 == a2)
    {
        return { .c_pair = {nullptr, nullptr} };
    }

    if (std::unique_lock lk_a1{ a1->mtx, std::defer_lock }; lk_a1.try_lock())
    {
        // any threads that modify c_pair.first->b->a needs to have a lock on BOTH
        // the old and new value of b->a
        // we currently have a lock on 'a1'
        // thus no thread can change it to or from 'a1'
        // so if we get that b->a is 'a1', it cannot be changed to or from 
        // this value by another thread since the moment we got the lock
        // aka: any other thread modifying b->a while we also compare it true to a1, 
        // implies that thread must have the lock on a1 => contradition
        if (a1 == c_pair.first->b->a)
        {
            if (std::unique_lock lk_a2{ a2->mtx, std::defer_lock }; lk_a2.try_lock())
            {
                if (a2 == c_pair.second->b->a)
                {
                    return { std::move(lk_a1), std::move(lk_a2), c_pair };
                }
            }
        }
    }
    return { .c_pair = {nullptr, nullptr} };
}


void attempt_some_mutator(const Instance& instance)
{
    const auto lock = try_get_locked_pair(instance);
    if (!lock.c_pair.first)
    {
        return;
    }

    auto* a1 = lock.c_pair.first->b->a;
    auto* a2 = lock.c_pair.second->b->a;

    std::vector<C*> all_c;
    const auto collect_all_c_and_clear_c = [&](A* a)
    {
        for (B* b : a->b)
        {
            for (C* c : b->c)
            {
                all_c.push_back(c);
            }
            b->c.clear();
        }
    };
    collect_all_c_and_clear_c(a1);
    collect_all_c_and_clear_c(a2);

    const auto reconstruct_c_on_b = [&]() {
        for (auto* c : all_c)
        {
            c->b->c.push_back(c);
        }
    };

    if (all_c.size() <= 2)
    {
        reconstruct_c_on_b();
        return;
    }

    const auto evaluate_insert = [&](A* a, C* c) -> std::pair<B*, int>
    {
        for (auto* b : a->b)
        {
            /* ... code that evaluate the feasability and cost delta for adding c to b ... */
            const bool feasible = true;
            const int cost = 0;
            if (feasible)
            {
                return { b, cost };
            }
        }
        return { nullptr, 0 };
    };

    for (auto* c : all_c)
    {
        auto [b1, cost1] = evaluate_insert(a1, c);
        auto [b2, cost2] = evaluate_insert(a2, c);

        if (b1 && (!b2 || cost1 < cost2))
        {
            b1->c.push_back(c);
        }
        else if (b2)
        {
            b2->c.push_back(c);
        }
        else // infeasible
        {
            reconstruct_c_on_b();
            return;
        }
    }

    auto apply = [&](A* a)
    {
        for (B* b : a->b)
        {
            for (C* c : b->c)
            {
                c->b = b;
            }
        }
    };

    apply(a1);
    apply(a2);
}

void loop(const Instance& instance)
{
    // ... not really local search, but tests the mutator datastructure ...
    std::vector<std::future<void>> futures;
    for (int i = 0, n = std::thread::hardware_concurrency(); i < n; ++i)
    {
        futures.push_back(std::async(i == 0 ? std::launch::deferred : std::launch::async, [&](){
            for (int i = 0; i < 1000; ++i)
            {
                attempt_some_mutator(instance);
            }
        }));
    }
    for (auto& token : futures)
    {
        token.get();
    }
}
C++ 多线程 数据结构 互斥锁

评论

1赞 Mooing Duck 11/11/2023
我们可以假设对象永远不会被添加或删除吗?否则,你最终会得到悬空的指针。Instance
1赞 Den-Jason 11/11/2023
“原子只是一种比锁更快的同步方式。”您仍然会调用内存屏障惩罚。如果需要以原子方式更改 1 个以上的值,可能会发现最好使用互斥锁。(在某处的会议演讲中看到它的基准)
1赞 Mooing Duck 11/11/2023
@Den-Jason:更明确地说:原子速度更快,但只能同步单个值。如果需要同步多个值,那么原子就不是一个真正的函数模型,必须使用锁。
3赞 Mooing Duck 11/11/2023
我们缺少太多细节,无法告诉您您需要什么。
2赞 Andrew Henle 11/11/2023
@Aedoro 这无关紧要。同样,如果线程可以更改任何值,则所有计算都是潜在的竞争条件。在 中,两者都容易受到竞争条件的影响 锁定 根本没有提供任何保护。b->ab->atry_get_locked_pair()auto* a1 = c_pair.first->b->a;auto* a2 = c_pair.second->b->a;ab->a

答: 暂无答案