提问人:WolfLink 提问时间:10/13/2023 更新时间:10/14/2023 访问量:147
如何在 Swift 并发中实现异步队列?
How to implement an asynchronous queue in Swift concurrency?
问:
背景:我熟悉使用 Locks 和 Semaphores 的并发,并且正在学习 Swift 的新并发功能。
下面是 Swift 中使用 DispatchSemaphore 的异步队列的简化示例:
class AsynchronousQueue {
var data: [MyDataType] = []
var semaphore = DispatchSemaphore(value: 0)
func push(data: MyData) {
data.append(data)
semaphore.signal()
}
func pop() -> MyData {
semaphore.wait()
return data.popLast()
}
}
请注意,这有点过于简单了 - 我还希望限制数据数组的编辑,使其不会同时被两个线程突变,这可以通过锁或 Actor 来实现。我可能还想发送某种“取消”信号,这也是一个相对较小的更改。
我不确定如何使用 Swift Concurrency 完成的部分是信号量在这里扮演的角色。如果数据可用,我想立即返回,或者如果数据不可用,我想无限期地等待。pop()
我遇到过很多不鼓励在现代 Swift 代码中使用信号量的帖子,但我还没有看到一个如何使用 Swift 并发功能 () 进行这种等待的例子,这并不比使用信号量复杂得多。await/async/actor
请注意,不能在函数中使用,因此似乎很难将新功能与基于 的代码一起使用,这就是我问的原因。DispatchSemaphore
async
await/async/actor
DispatchSemaphore
答:
从技术上讲,您描述的是一个堆栈(或后进先出队列)。
这是我的实现。
它支持 、 阻塞和非阻塞以及 和 。push
pop
peek
cancel
isEmpty
和属性可用。stackDepth
actor AsyncStack<Element> {
private var storage = [Element]()
private var awaiters = [CheckedContinuation<Element,Error>]()
/// Push a new element onto the stack
/// - Parameter newElement: The element to push
/// - Returns: Void
public func push(_ newElement: Element) async-> Void {
if !awaiters.isEmpty {
let awaiter = awaiters.removeFirst()
awaiter.resume(returning: newElement)
} else {
storage.append(newElement)
}
}
/// Pop the element at the top of the stack or wait until an element becomes available
/// - Returns: The popped element
/// - Throws: An AsyncQueueError if the waiting pop is cancelled
public func popOrWait() async throws -> Element {
if let element = storage.popLast() {
return element
}
return try await withCheckedThrowingContinuation { continuation in
awaiters.append(continuation)
}
}
/// Pop an element from the top of the stack if possible
/// - Returns: An element or nil if the stack is empty
public func pop() async -> Element? {
return storage.popLast()
}
/// Return the element at the top of the stack, if any, without removing it
/// - Returns: The element a the top of the stack or nil
public func peek() async -> Element? {
return storage.last
}
/// True if the stack is empty
public var isEmpty: Bool {
get async {
return storage.isEmpty
}
}
/// Current stack depth
public var stackDepth: Int {
get async {
return storage.count
}
}
/// Cancel all pending popOrWait operations.
/// Pending operations will throw `AsyncQueue.OperationCancelled`
/// - Returns: Void
public func cancel() async -> Void {
for awaiter in awaiters {
awaiter.resume(throwing: AsyncQueueError.OperationCancelled)
}
awaiters.removeAll()
}
public enum AsyncQueueError: Error {
case OperationCancelled
}
}
如果堆栈为空,我使用 CheckedContinuation
来阻止该操作。延续保存在它们自己的数组中,以允许多个被阻塞的“poppers”。该操作将取消所有未完成的操作。取消的通话将 .pop
cancel
popOrWait
throw
被阻止的操作按 FIFO 顺序完成。pop
评论
cancel
semaphore.wait
您可以通过创建一个globalActor
@globalActor
struct MySpecialQueue {
static var shared: MySpecialActor = .init()
typealias ActorType = MySpecialActor
actor MySpecialActor {
}
}
然后,您可以在任何想要“安排”某些事情时使用。@MySpecialQueue
class MyService {
private (set) var data: [String] = []
@MySpecialQueue func push(data: String) {
print(#function)
for _ in 0...10000000 {
//im busy
}
self.data.append(data)
}
@MySpecialQueue func pop() -> String? {
print(#function)
return data.popLast()
}
}
请注意,在 Actor 内部执行操作时,Actor 被释放以执行下一件事。所以,如果你把await
@MySpecialQueue func push(data: String) {
print(#function)
for _ in 0...10000000 {
//im busy
}
self.data.append(data)
}
对于类似的东西
@MySpecialQueue func push(data: String) async {
print(#function)
try? await Task.sleep(for: .seconds(10))
self.data.append(data)
}
演员将在 es 完成之前。pop
push
评论
actor
评论
class
actor
global actor
actor
Dispatch