swift-async-queue

CI Status codecov License

一个队列库,用于将有序任务从非隔离上下文发送到异步上下文。

任务排序和 Swift 并发

在 Swift 并发中,从非隔离上下文发送到异步上下文的任务本质上是无序的。 考虑以下测试

func testActorTaskOrdering() async {
    actor Counter {
        func incrementAndAssertCountEquals(_ expectedCount: Int) {
            count += 1
            let incrementedCount = count
            XCTAssertEqual(incrementedCount, expectedCount) // often fails
        }

        private var count = 0
    }

    let counter = Counter()
    var tasks = [Task<Void, Never>]()
    for iteration in 1...100 {
        tasks.append(Task {
            await counter.incrementAndAssertCountEquals(iteration)
        })
    }
    // Wait for all enqueued tasks to finish.
    for task in tasks {
        _ = await task.value
    }
}

因为 Task 是从非隔离的执行上下文中产生的,所以计划的异步工作的顺序无法得到保证。

虽然 actor 非常擅长序列化任务,但在标准 Swift 库中,没有简单的方法可以从非隔离的同步上下文或多个执行上下文向它们发送有序任务。

以 FIFO 顺序执行异步任务

使用 FIFOQueue 以 FIFO 顺序执行从非隔离上下文入队的异步任务。 发送到这些队列之一的任务保证以入队的顺序开始和结束执行。 FIFOQueue 以类似于 DispatchQueue 的方式执行任务:入队的任务以原子方式执行,如果 FIFOQueue 上执行的任务等待它正在执行的队列的结果,程序将死锁。

FIFOQueue 可以轻松地以 FIFO 顺序从非隔离上下文执行异步任务

func testFIFOQueueOrdering() async {
    actor Counter {
        nonisolated
        func incrementAndAssertCountEquals(_ expectedCount: Int) {
            queue.enqueue {
                await self.increment()
                let incrementedCount = await self.count
                XCTAssertEqual(incrementedCount, expectedCount) // always succeeds
            }
        }

        func flushQueue() async {
            await queue.enqueueAndWait { }
        }

        func increment() {
            count += 1
        }

        var count = 0

        private let queue = FIFOQueue()
    }

    let counter = Counter()
    for iteration in 1...100 {
        counter.incrementAndAssertCountEquals(iteration)
    }
    // Wait for all enqueued tasks to finish.
    await counter.flushQueue()
}

FIFO 执行有一个关键缺点:队列必须等待所有先前入队的工作(包括挂起的工作)完成,然后才能开始新的工作。 如果您希望在先前的任务挂起时开始新的工作,请使用 ActorQueue

从非隔离上下文向 Actor 发送有序的异步任务

使用 ActorQueue 从非隔离或同步上下文向 actor 的隔离上下文发送有序的异步任务。 发送到 actor 队列的任务保证以入队的顺序开始执行。 然而,与 FIFOQueue 不同,执行顺序仅保证到入队任务中的第一个挂起点ActorQueue 在其采用的 actor 的隔离上下文中执行任务,导致 ActorQueue 任务执行具有与 actor 代码执行相同的属性:挂起点之间的代码以原子方式执行,并且发送到单个 ActorQueue 的任务可以等待队列的结果而不会死锁。

ActorQueue 的实例设计为由单个 actor 实例使用:发送到 ActorQueue 的任务使用队列采用的 actor 的隔离上下文来序列化任务。 因此,在处理 ActorQueue 时,必须满足以下几个要求

  1. 任何 ActorQueue 的生命周期都不应超过其 actor 的生命周期。 强烈建议 ActorQueue 是采用的 actor 上的 private let 常量。 在其采用的 actor 被释放后,将任务排队到 ActorQueue 实例将导致崩溃。
  2. 使用 ActorQueueactor 应在 actorinit 中将队列的采用的执行上下文设置为 self。 在将工作排队到 ActorQueue 之前未能设置采用的执行上下文将导致崩溃。

ActorQueue 可以轻松地从非隔离上下文按顺序排队在 actor 的隔离上下文中执行的任务

func testActorQueueOrdering() async {
    actor Counter {
        init() {
            // Adopting the execution context in `init` satisfies requirement #2 above.
            queue.adoptExecutionContext(of: self)
        }

        nonisolated
        func incrementAndAssertCountEquals(_ expectedCount: Int) {
            queue.enqueue { myself in
                myself.count += 1
                XCTAssertEqual(expectedCount, myself.count) // always succeeds
            }
        }

        func flushQueue() async {
            await queue.enqueueAndWait { _ in }
        }

        private var count = 0
        // Making the queue a private let constant satisfies requirement #1 above.
        private let queue = ActorQueue<Counter>()
    }

    let counter = Counter()
    for iteration in 1...100 {
        counter.incrementAndAssertCountEquals(iteration)
    }
    // Wait for all enqueued tasks to finish.
    await counter.flushQueue()
}

从非隔离上下文向 @MainActor 发送有序的异步任务

使用 MainActorQueue 从非隔离或同步上下文向 @MainActor 的隔离上下文发送有序的异步任务。 发送到此队列类型的任务保证以入队的顺序开始执行。 与 ActorQueue 类似,执行顺序仅保证到入队任务中的第一个挂起点MainActorQueue 在其采用的 actor 的隔离上下文中执行任务,导致 MainActorQueue 任务执行具有与 @MainActor 的代码执行相同的属性:挂起点之间的代码以原子方式执行,并且发送到单个 MainActorQueue 的任务可以等待队列的结果而不会死锁。

MainActorQueue 可以轻松地以 FIFO 顺序从非隔离上下文执行异步任务

@MainActor
func testMainActorQueueOrdering() async {
    @MainActor
    final class Counter {
        nonisolated
        func incrementAndAssertCountEquals(_ expectedCount: Int) {
            MainActorQueue.shared.enqueue {
                self.increment()
                let incrementedCount = self.count
                XCTAssertEqual(incrementedCount, expectedCount) // always succeeds
            }
        }

        func flushQueue() async {
            await MainActorQueue.shared.enqueueAndWait { }
        }

        func increment() {
            count += 1
        }

        var count = 0
    }

    let counter = Counter()
    for iteration in 1...100 {
        counter.incrementAndAssertCountEquals(iteration)
    }
    // Wait for all enqueued tasks to finish.
    await counter.flushQueue()
}

要求

安装

Swift Package Manager

要使用 Swift Package Manager 在您的项目中安装 swift-async-queue,可以将以下行添加到您的 Package.swift 文件中

dependencies: [
    .package(url: "https://github.com/dfed/swift-async-queue", from: "0.6.0"),
]

CocoaPods

要使用 CocoaPods 在您的项目中安装 swift-async-queue,请将以下内容添加到您的 Podfile

pod 'AsyncQueue', '~> 0.6.0'

贡献

我很高兴您对 swift-async-queue 感兴趣,我很想看看您用它做什么。 在提交 Pull Request 之前,请阅读贡献指南

谢谢,祝您排队愉快!

开发

双击存储库根目录中的 Package.swift 以在 Xcode 中打开项目。