我的代码是线程安全的吗?[Java、CAS、汇款]

Is my code thread-safe? [Java, CAS, money transfer]

提问人:Blednov Roman 提问时间:11/12/2023 更新时间:11/12/2023 访问量:73

问:

我正在解决经典的 java 并发套路:您有一堆帐户,您需要在多线程环境中将资金从一个账户转移到另一个账户。

因此,我已经学习并编写了所有可能的锁定/同步解决方案,现在我正在尝试使用 CAS 和 AtomicReference 以非阻塞方式解决此任务。

问题是,在测试过程中,我很少看到余额的总和不相同 - 这意味着我的解决方案存在缺点。请帮忙)

这是我的帐户代码:

public class AtomicRefAccount implements Account {
    private final int id;
    private final AtomicReference<BigDecimal> atomicAmount;
    private final int MAX_RETRY = 5;
    private final Lock lock = new ReentrantLock();

    public AtomicRefAccount(AtomicReference<BigDecimal> atomicAmount, int id) {
        this.id = id;
        this.atomicAmount = atomicAmount;
    }

    public static AtomicRefAccount of(int amount) {
        return new AtomicRefAccount(new AtomicReference<>(new BigDecimal(amount)), 0);
    }

    @Override
    public int getId() {
        return id;
    }

    @Override
    public BigDecimal getBalance() {
        return atomicAmount.get();
    }

    @Override
    public void withdrawAmount(BigDecimal amount) {
        for (int i = 0; i < MAX_RETRY; i++) {
            BigDecimal curVal = atomicAmount.get();
            if (curVal.compareTo(amount) == -1) {
                throw new IllegalStateException("[withdrawAmount] insufficient funds");
            } else {
                if (atomicAmount.compareAndSet(curVal, curVal.subtract(amount))) {
                    return;
                }else {
                    Thread.yield();
                }
            }
        }
        System.out.println("[withdrawAmount] retries fired");
        // trying locking now
        lock.lock();
        try {
            if (atomicAmount.get().compareTo(amount) == -1) {
                throw new IllegalStateException("[withdrawAmount] insufficient funds");
            } else {
                atomicAmount.set(atomicAmount.get().subtract(amount));
            }
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void addAmount(BigDecimal amount) {
        for (int i = 0; i < MAX_RETRY; i++) {
            BigDecimal curVal = atomicAmount.get();
            if (atomicAmount.compareAndSet(curVal, curVal.add(amount))) {
                return;
            }else {
                Thread.yield();
            }
        }
        System.out.println("[addAmount] retries fired");
        // trying locking now
        lock.lock();
        try {
            atomicAmount.set(atomicAmount.get().add(amount));
        } finally {
            lock.unlock();
        }
    }
}

这是我的汇款服务代码:

public class DefaultMoneyTransferService implements MoneyTransferService {
    @Override
    public void transfer(Account from, Account to, BigDecimal amountToTransfer) {
        //validation start
        validation(from, to, amountToTransfer);
        //validation end

        from.withdrawAmount(amountToTransfer);
        to.addAmount(amountToTransfer);
    }

    private static void validation(Account from, Account to, BigDecimal amountToTransfer) {
        Objects.requireNonNull(from, "[from] account is null");
        Objects.requireNonNull(to, "[to] account is null");
        Objects.requireNonNull(amountToTransfer, "[amountToTransfer] account is null");
        if (from == to) {
            throw new IllegalArgumentException("[from|to] provided to same accounts");
        }
        if (amountToTransfer.compareTo(BigDecimal.ZERO) != 1) {
            throw new IllegalArgumentException("[amountToTransfer] have to be positive");
        }
        if (from.getBalance().compareTo(amountToTransfer) == -1) {
            throw new IllegalStateException("[from] have insufficient funds");
        }
    }
}

我正在尝试在一致性方面有一个子弹教授解决方案,但使用非阻塞方法。

Java 并发 非阻塞 比较交换

评论

1赞 rzwitserloot 11/12/2023
compareTo返回负数,而不是 .BigDecimal 目前可能总是返回 -1,但下一个 java 版本它可能会开始返回 -2。正确的比较是 ,不是(和 ,不是,你明白了)。-1<0==-1>=0!=-1
1赞 Jorn 11/12/2023
@K.尼古拉斯:为什么?至少解释一下你是否要发表这样的评论
0赞 K.Nicholas 11/13/2023
何苦。这完全是胡说八道。开发人员桥下的巨魔。

答:

3赞 rzwitserloot 11/12/2023 #1

从长远来看,这并不是防弹的,但我认为我们在这里删减了定义。让我们来看看这段代码中的主要失败。

滥用compareTo

您似乎在假设该方法返回 、 或 的假设下工作。根据规范,没有,它没有。它返回 0、正数或负数。您绝不能将 的结果与 0 以外的任何结果进行比较。即唯一有效的运算是 、 、 、 和 、 和 , 分别为 '相等'、'不相等'、'a 大于 b'、'a 小于 b'、'a 等于或小于 b / b 不大于 a'、'a 等于或大于 b / b 不小于 a'。compareTo-101compareTo== 0!= 0> 0< 0<= 0>= 0

锁定回退已损坏

您可以回退到使用锁。这完全被打破了 - 锁是一条双向的街道:它们不会做任何事情,除非每次访问你试图并发防锁的东西。您的代码没有。因此,这种情况是微不足道的:

  • 您的帐户中有 1000 欧元。你设法拿到了 20 张相同的卡片。
  • 你与其他黑客协调:每个人都拿着一张卡走开,然后去ATM,计划同时提取所有现金来扰乱你的代码。
  • 由于这一切,你最终会在代码的基于锁的部分得到一些人(因为他们在重试中失败了),同时在基于 CAS 的循环中也有人。Larry 在基于锁的部分,Carl 在 CAS 部分。
  • Larry 的代码执行,最终为 900 欧元(Larry 从机器中提取 100 欧元)。atomicAmount.get().subtract(amount)
  • 无论出于何种原因,Larry 的线程都被抢占了(嘿,它发生了 - 这就是重点,你无法控制它何时发生。 使它发生得更快,但没有 - 当然,有多个内核,事情可以同时运行。Thread.yield()Thread.unyield()
  • Carl 的线程和 .因此,CAS 调用成功;Carl 的代码也从账户中减去了 100 欧元,Carl 的 ATM 机吐出一张清脆的 100 欧元账单,您的账户现在是 900 欧元,-.curVal.subtract(amount)curVal.compareAndSet
  • Larry 的线程继续,只是覆盖了帐户余额。
  • 拉里的自动取款机也吐出一张清脆的 100 欧元钞票。
  • 您的帐户现在是 €900,-.

多田。银行被骗了 100 欧元,-:一个以前有 1000 欧元的账户现在有 900 欧元,但我们有 2 100 欧元的钞票。

解决方案:你做不到。如果重试失败,您告诉 ATM 告诉用户,无论出于何种原因,他们现在都无法获得现金,对不起,然后将卡吐回。没有回退的锁,时期。如果你要使用锁,你必须始终使用它们。

它只是不是防弹的,时期。

整个方法从根本上被打破了,但这里可能会出现肉馅这个词。

黑客又来了:

他们现在的计划不是从银行偷窃,而是让它们受到监管。他们的计划是让银行看起来像是在从客户那里偷东西。

所以,这是他们的计划:他们将亲自去银行,表现得像某个主要政党的官员。他们收集了一些小东西,使它们看起来合法。比如假护照。不过,这是一个劣质的假货。

他们走进银行,要求将钱从当事人的储蓄账户转移到流动账户。因为这似乎是在单个实体的范围内进行转账,所以银行出纳员不会大惊小怪,也不会认真检查窗口另一边的人是否真的被授权这样做。毕竟,黑客有什么可能的目的将资金从一个实体的储蓄账户转移到同一实体的运行账户,对吧?

因此,出纳员开始转账操作。黑客以某种方式很好地了解了时间,并设法在操作过程中拔掉了计算机上的插头 - 实际上,正好在您和通话之间。出纳员并不聪明,银行也不聪明。但实际情况是,5000欧元已经从政治实体的储蓄账户中消失了。这笔钱没了,还没有添加到他们的运行账户中。from.withdrawAmount(amountToTransfer)to.addAmount(amountToTransfer)

最终,政党弄清楚了这一点,大发雷霆,银行被罚款数亿美元。

解决方案是什么?

日记

如果你和我一样老了,还记得太空球中的这个场景——准备准备吗?

那。正确的方法如下:

  • 在日记日志中写下“即将开始交易 12-34-56:出于 [原因] 将 5000 欧元从账户 12345 转移到 54321 欧元”,由“[运营商]”授权。等待磁盘报告它真的真的保存了它。
  • 在日记日志中写下“[12-34-56]:从 12345 当前的 9500 欧元中减去 5000 欧元。在这里,并且始终等待磁盘报告它已这样做。
  • 从 12345 中减去 5000 欧元 - 现在是 4500 欧元,- -这是您第一次真正做某事,而不仅仅是记录。
  • 在日记日志中写道:“[12-34-56]:从 12345 中减去 5000 欧元;现在是 4500 欧元,-'
  • 你明白了。

该系统的优点是,在启动时,在系统重新联机之前,它可以检查日志的最后几行并“修复”所有半完成的事务,或者至少检查发生了什么。例如,如果日志中的最后一行是“[12-34-56]:从 12345 的当前 9500 欧元中减去 5000 欧元,-”,则检查 12345 的余额。如果是 9500 欧元,您就知道该行为在系统失败之前没有通过。您可以从那里继续事务(第一个日志行),或者您知道没有什么可以还原的,并且可以继续启动过程。

如果帐户余额为 4500 欧元,-,您就知道日志行所说的事情即将发生,确实发生了,但是在黑客拔掉机器上的插头之前,报告它这样做的日志没有成功。您现在可以撤消该交易(将余额设置回 9500 欧元),也可以从该点开始继续交易(即首先在日志中写下它发生了)。

一旦所有事务都“修复”了(没有设计为原子的事务处于半生不熟的状态 - 即所有事务都已完成或恢复),系统将正常启动。

当您无法直接控制知道事情是否发生时,这甚至有效。例如,自动取款机做同样的事情:“准备吐出 100 欧元的钞票。吐出 100 欧元的钞票。记录该用户抢走了账单。记录那扇门现在已经关闭了。

如果自动取款机崩溃了,日记中的最后一行是“吐出 100 欧元的钞票”,如果有必要,您可以要求人工操作员检查视频源并检查钞票是否真的被吐出(并被账户持有人抓住),或者机器是否从未完全设法进入“吐出现金”程序的“门现在打开”阶段。

无法测试的火箭科学的问题

正如你所看到的,这些东西非常困难。这是那些未知的未知事物之一:你怎么知道你已经走过了每一条途径?你不能为你的并发可能失败的方式编写测试,而你不知道。

因此,不要做任何这些,把它留给专业人士。做这些事情的正常方法是获得一个好的数据库,例如 postgresql,将其设置为 SERIALIZABLE 事务级隔离,启动一个事务,在该单个事务中进行传输,然后 COMMIT。

psql实际上或多或少地与上面概述的我的修复程序的代码完全相同:psql不一定使用锁(它是“乐观锁定” - 你可以在维基百科上搜索解释,它是基于CAS的,或多或少你在做什么),并记录它所做的一切,如果你在psql中间被电源线绊倒,则在启动时使用该日志首先撤消任何半面包店,以保存事务永久,在它允许任何传入连接之前

重试失败

要真正弄清楚为什么你不应该手写这些东西,你的代码中有一个严重的错误,你永远不会弄清楚。

有没有遇到过这样的情况:你正要向右走到某人身上,所以你向左转向以避免碰撞,但他们也向左转向?你羞涩地笑了,向右转,但是......他们也转向了,对吧?

这种情况很少见,但它确实发生了。但是,在计算机领域,这是非常普遍的——毕竟,计算机通常是相当确定的。真的是错。

因此,有可能,甚至有可能,同时尝试向同一银行帐户写入更新的 2+ 线程将不断相互干扰。它们会导致彼此的 CAS 失败,然后这些进程重新开始,从而相互干扰,以至于它们都无法完成,并且它们都超出了重试限制。

解决这个问题的方法是掷骰子。不,真的。这个漂亮的发明来自梅特卡夫先生的大脑,他当时非常嘲笑以太网网络解决方案轻松击败了 Token Ring 和所有其他投入大量资金的技术,因为掷骰子的愚蠢想法效果很好。

诀窍是,如果需要重试,请不要立即尝试。相反,掷一些骰子。等待随机时间,然后重试。这避免了“双方在同一方向上同时躲避,因此实际上并没有避免碰撞”的情况。想象一下,在“两个行人即将相撞”的场景中,他们俩不是立即转向以避免碰撞,而是都在脑海中选择一个随机数,等待那么久,然后转向?确实,一个人比另一个人更早转向的可能性非常高,然后一切都很好。

确实是计算机这样做的方式:psql 等确实会掷骰子,而您的以太网电缆确实会检测到冲突并等待随机次数以避免重复冲突。

出于某种原因,以指数方式执行此操作是个好主意。

您的代码需要在重试块的末尾有一行,类似于 .等待随机数量,如果我们已经重试了一段时间,请等待更长的时间(是跟踪我们正在进行的重试的计数器)。Thread.sleep((int) (Math.random() * i * 500));i

你永远不会记得每次都这样做。因此,您需要一个框架。继续努力这个想法,很快你就从头开始重建了一个数据库引擎。跳过该过程,只需使用数据库即可。

评论

0赞 Blednov Roman 11/13/2023
非常感谢!<3,其实我并不是想在某个地方用到这个,我是想在实践中学习一些并发问题。Ofk DB 可以更好地解决这个问题。问题在于,Java 的 leetcode 或 hackerank 上没有太多好的并发挑战,而 DB 的此类挑战实际上为零(您可以在其中使用隔离级别和 opt/pes 锁)。
2赞 Solomon Slow 11/12/2023 #2

这行不通:

for (int i = 0; i < MAX_RETRY; i++) {
    BigDecimal curVal = atomicAmount.get();
    if (atomicAmount.compareAndSet(curVal, curVal.add(amount))) {
        return;
    }
}

lock.lock();
atomicAmount.set(atomicAmount.get().add(amount));
lock.unlock();

如果其他线程能够在不锁定锁的情况下更改,则没有意义。lock.lock()atomicAmount


更新:rzwitserloot的回答更详细地解释了这一点。另外,请阅读 rzwitserloot 所说的关于测试返回值的内容。compareTo

评论

0赞 Blednov Roman 11/13/2023
谢谢你@Solomon慢,关于锁 - 这么愚蠢的错误,现在对我来说很明显,谢谢!比对周围的好捕获!