如何在 Swift 并发中实现异步队列?

How to implement an asynchronous queue in Swift concurrency?

提问人:WolfLink 提问时间:10/13/2023 更新时间:10/14/2023 访问量:147

问:

背景:我熟悉使用 Locks 和 Semaphores 的并发,并且正在学习 Swift 的新并发功能。

下面是 Swift 中使用 DispatchSemaphore 的异步队列的简化示例:

class AsynchronousQueue {
    var data: [MyDataType] = []
    var semaphore = DispatchSemaphore(value: 0)
    
    func push(data: MyData) {
        data.append(data)
        semaphore.signal()
    }

    func pop() -> MyData {
        semaphore.wait()
        return data.popLast()
    }
}

请注意,这有点过于简单了 - 我还希望限制数据数组的编辑,使其不会同时被两个线程突变,这可以通过锁或 Actor 来实现。我可能还想发送某种“取消”信号,这也是一个相对较小的更改。

我不确定如何使用 Swift Concurrency 完成的部分是信号量在这里扮演的角色。如果数据可用,我想立即返回,或者如果数据不可用,我想无限期地等待。pop()

我遇到过很多不鼓励在现代 Swift 代码中使用信号量的帖子,但我还没有看到一个如何使用 Swift 并发功能 () 进行这种等待的例子,这并不比使用信号量复杂得多。await/async/actor

请注意,不能在函数中使用,因此似乎很难将新功能与基于 的代码一起使用,这就是我问的原因。DispatchSemaphoreasyncawait/async/actorDispatchSemaphore

swift async-await 信号 swift 并发 调度信号量

评论

1赞 Sweeper 10/13/2023
你在寻找这样的东西吗?forums.swift.org/t/communicating-between-two-concurrent-tasks/......
1赞 lorem ipsum 10/13/2023
尝试将 or 更改为 or,因为您有一个可变变量,因此不需要信号量,因为默认情况下会这样做。旧工作在线程方面,新并发工作在参与者方面工作,这就是它们不兼容的原因。classactorglobal actoractorDispatch
0赞 WolfLink 10/14/2023
@Sweeper看起来 AsyncStream 在我的示例中是缓冲区的现有实现。很高兴知道它的存在,但我仍然想知道如何使用 Swift 并发原语实现这样的东西。这种缓冲区是信号量的典型例子,但存在更复杂的情况。

答:

2赞 Paulw11 10/13/2023 #1

从技术上讲,您描述的是一个堆栈(或后进先出队列)。

这是我的实现。

它支持 、 阻塞和非阻塞以及 和 。pushpoppeekcancel

isEmpty和属性可用。stackDepth

actor AsyncStack<Element> {
    private var storage = [Element]()
    private var awaiters = [CheckedContinuation<Element,Error>]()
    
    
    /// Push a new element onto the stack
    /// - Parameter newElement: The element to push
    /// - Returns: Void

    public func push(_ newElement: Element) async-> Void {
        if !awaiters.isEmpty {
            let awaiter = awaiters.removeFirst()
            awaiter.resume(returning: newElement)
        } else {
            storage.append(newElement)
        }
    }
    
    /// Pop the  element at the top of the stack or wait until an element becomes available
    /// - Returns: The popped element
    /// - Throws: An AsyncQueueError if the waiting pop is cancelled

    public func popOrWait() async throws -> Element {
        if let element = storage.popLast() {
            return element
        }
        return try await withCheckedThrowingContinuation { continuation in
            awaiters.append(continuation)
        }
    }
    
    /// Pop an element from the top of the stack if possible
    /// - Returns: An element or nil if the stack is empty

    public func pop() async -> Element? {
        return storage.popLast()
    }
    
    /// Return the element at the top of the stack, if any, without removing it
    /// - Returns: The element a the top of the stack or nil
    public func peek() async -> Element? {
        return storage.last
    }
    
    /// True if the stack is empty

    public var isEmpty: Bool {
        get async {
            return storage.isEmpty
        }
    }

    /// Current stack depth

    public var stackDepth: Int {
        get async {
            return storage.count
        }
    }
    
    /// Cancel all pending popOrWait operations.
    /// Pending operations will throw `AsyncQueue.OperationCancelled`
    /// - Returns: Void

    public func cancel() async -> Void {
        for awaiter in awaiters {
            awaiter.resume(throwing: AsyncQueueError.OperationCancelled)
        }
        awaiters.removeAll()
    }
    
    public enum AsyncQueueError: Error {
        case OperationCancelled
    }
}

如果堆栈为空,我使用 CheckedContinuation 来阻止该操作。延续保存在它们自己的数组中,以允许多个被阻塞的“poppers”。该操作将取消所有未完成的操作。取消的通话将 .popcancelpopOrWaitthrow

被阻止的操作按 FIFO 顺序完成。pop

评论

0赞 WolfLink 10/14/2023
这看起来与使用互斥锁的信号量或条件变量的实现非常相似。这应该有效,但相当不令人满意,因为它正在重新发明轮子(重新实现信号量)。
0赞 Paulw11 10/14/2023
我猜演员就像一个互斥锁。延续通常称为承诺。互斥锁和信号量是低级并发工具。Swift 并发提供了更抽象的工具。如果您想使用信号量,请坚持使用 GCD 实现并使用信号量。
0赞 Paulw11 10/14/2023
虽然实现延续队列需要少量代码,但优点是没有阻塞线程的风险,没有死锁的风险,并且易于实现操作。使用 DispatchSemaphore,您将不得不求助于某种超时/检查取消/重试逻辑,这很可能会引入 pop 排序的错误(操作可能会重新排序)。在选择超时值时,还需要进行某种权衡,以便在开销和延迟之间进行选择cancelsemaphore.wait
0赞 Paulw11 10/14/2023
Rob 在这里总结了针对信号量的案例
0赞 WolfLink 10/14/2023
我不确定你说的“没有阻塞线程的风险”是什么意思,因为“阻塞线程”是具体的目标(让线程等到数据可用)。
1赞 lorem ipsum 10/13/2023 #2

您可以通过创建一个globalActor

@globalActor
struct MySpecialQueue {
    static var shared: MySpecialActor = .init()
    
    typealias ActorType = MySpecialActor
    
    actor MySpecialActor {
        
    }
}

然后,您可以在任何想要“安排”某些事情时使用。@MySpecialQueue

class MyService {
    private (set) var data: [String] = []
    
    @MySpecialQueue func push(data: String) {
        print(#function)
        for _ in 0...10000000 {
            //im busy
        }
        
        self.data.append(data)
    }
    
    @MySpecialQueue func pop() -> String? {
        print(#function)
        return data.popLast()
    }
}

请注意,在 Actor 内部执行操作时,Actor 被释放以执行下一件事。所以,如果你把await

    @MySpecialQueue func push(data: String) {
        print(#function)
        for _ in 0...10000000 {
            //im busy
        }
        
        self.data.append(data)
    }

对于类似的东西

    @MySpecialQueue func push(data: String) async {
        print(#function)
        try? await Task.sleep(for: .seconds(10))
        
        self.data.append(data)
    }

演员将在 es 完成之前。poppush

评论

0赞 WolfLink 10/14/2023
如果在队列为空时调用 pop,而不是等待数据,这难道不会简单地抛出错误吗?
0赞 Paulw11 10/14/2023
使用全局执行组件不能很好地扩展,因为数据结构与全局执行组件耦合。我认为最好使用一个,以便堆栈/队列是独立的,您可以拥有任意数量的它们actor
0赞 lorem ipsum 10/14/2023
@WolfLink没有爆破楦是安全的