⏰ Combine 调度器

CI

一些调度器,使 Combine 的使用更易于测试和更通用。

动机

Combine 框架提供了 Scheduler 协议,这是一个强大的抽象,用于描述工作单元如何以及何时执行。它统一了许多不同的工作执行方式,例如 DispatchQueueRunLoopOperationQueue

然而,一旦你在响应式代码中使用任何这些调度器,你就会立即将发布者变成异步的,因此更难测试,迫使你使用 expectation 并等待时间过去,以便你的发布者执行。

这个库提供了新的调度器,允许你将任何异步发布者转换为同步发布者,以便于测试和调试。

了解更多

这个库是在 Point-Free 的许多剧集中设计的,这是一个由 Brandon WilliamsStephen Celis 主持的探索函数式编程和 Swift 的视频系列。

你可以在这里观看所有剧集。

video poster image

AnyScheduler

AnySchedulerScheduler 协议提供了一个类型擦除包装器,这对于在多种类型的调度器上保持泛型非常有用,而无需在你的代码中实际引入泛型。Combine 框架附带了许多类型擦除包装器,例如 AnySubscriberAnyPublisherAnyCancellable,但出于某种原因,没有附带 AnyScheduler

当你想要能够从外部自定义某些代码中使用的调度器,但又不想引入泛型使其可自定义时,此类型非常有用。例如,假设你有一个 ObservableObject 视图模型,它在调用方法时执行 API 请求

class EpisodeViewModel: ObservableObject {
  @Published var episode: Episode?

  let apiClient: ApiClient

  init(apiClient: ApiClient) {
    self.apiClient = apiClient
  }

  func reloadButtonTapped() {
    self.apiClient.fetchEpisode()
      .receive(on: DispatchQueue.main)
      .assign(to: &self.$episode)
  }
}

请注意,我们在 reloadButtonTapped 方法中使用了 DispatchQueue.main,因为 fetchEpisode 端点很可能在其输出在后台线程上交付(URLSession 就是这种情况)。

这段代码看起来很普通,但是 .receive(on: DispatchQueue.main) 的存在使得这段代码更难测试,因为你必须使用 XCTest expectation 来显式等待少量时间才能让队列执行。这可能会导致测试不稳定,并使测试套件的执行时间比必要的更长。

解决此测试问题的一种方法是使用 “immediate” 调度器 而不是 DispatchQueue.main,这将使 fetchEpisode 尽可能快地交付其输出,而无需线程跳转。为了允许这样做,我们需要将调度器注入到我们的视图模型中,以便我们可以从外部控制它

class EpisodeViewModel<S: Scheduler>: ObservableObject {
  @Published var episode: Episode?

  let apiClient: ApiClient
  let scheduler: S

  init(apiClient: ApiClient, scheduler: S) {
    self.apiClient = apiClient
    self.scheduler = scheduler
  }

  func reloadButtonTapped() {
    self.apiClient.fetchEpisode()
      .receive(on: self.scheduler)
      .assign(to: &self.$episode)
  }
}

现在我们可以在生产环境中使用 DispatchQueue.main 初始化这个视图模型,我们可以在测试中使用 DispatchQueue.immediate 初始化它。听起来不错!

然而,将此泛型引入我们的视图模型非常繁琐,因为它向外界大声宣布此类型使用调度器,更糟糕的是,它最终会感染任何接触此视图模型的代码,而这些代码也希望是可测试的。例如,任何使用此视图模型的视图都需要引入泛型,如果它也想要能够控制调度器,这将很有用,如果我们想要编写 快照测试

与其引入泛型来允许替换不同的调度器,我们可以使用 AnyScheduler。它允许我们在某种程度上是调度器泛型的,但实际上并没有引入泛型。

与其在我们的视图模型中持有泛型调度器,我们可以说我们只想要一个关联类型与 DispatchQueue 的类型匹配的调度器

class EpisodeViewModel: ObservableObject {
  @Published var episode: Episode?

  let apiClient: ApiClient
  let scheduler: AnySchedulerOf<DispatchQueue>

  init(apiClient: ApiClient, scheduler: AnySchedulerOf<DispatchQueue>) {
    self.apiClient = apiClient
    self.scheduler = scheduler
  }

  func reloadButtonTapped() {
    self.apiClient.fetchEpisode()
      .receive(on: self.scheduler)
      .assign(to: &self.$episode)
  }
}

然后,在生产环境中,我们可以创建一个使用实时 DispatchQueue 的视图模型,但我们只需要先擦除其类型

let viewModel = EpisodeViewModel(
  apiClient: ...,
  scheduler: DispatchQueue.main.eraseToAnyScheduler()
)

对于常见的调度器,例如 DispatchQueueOperationQueueRunLoopAnyScheduler 上甚至有一个静态助手,可以进一步简化这一点

let viewModel = EpisodeViewModel(
  apiClient: ...,
  scheduler: .main
)

然后在测试中,我们可以使用 immediate 调度器

let viewModel = EpisodeViewModel(
  apiClient: ...,
  scheduler: .immediate
)

因此,总的来说,AnyScheduler 非常适合允许人们控制在类、函数等中使用哪个调度器,而无需引入泛型,这可以帮助简化代码并减少实现细节的泄漏。

TestScheduler

一个调度器,其当前时间和执行可以以确定性的方式控制。此调度器对于测试时间流逝如何影响使用异步运算符(例如 debouncethrottledelaytimeoutreceive(on:)subscribe(on:) 等)的发布者非常有用。

例如,考虑以下 race 运算符,它并行运行两个 future,但仅发出第一个完成的 future

func race<Output, Failure: Error>(
  _ first: Future<Output, Failure>,
  _ second: Future<Output, Failure>
) -> AnyPublisher<Output, Failure> {
  first
    .merge(with: second)
    .prefix(1)
    .eraseToAnyPublisher()
}

虽然这个发布者非常简单,但我们可能仍然想为其编写一些测试。

为此,我们可以创建一个测试调度器并创建两个 future,一个在一秒后发出,另一个在两秒后发出

let scheduler = DispatchQueue.test

let first = Future<Int, Never> { callback in
  scheduler.schedule(after: scheduler.now.advanced(by: 1)) { callback(.success(1)) }
}
let second = Future<Int, Never> { callback in
  scheduler.schedule(after: scheduler.now.advanced(by: 2)) { callback(.success(2)) }
}

然后我们可以 race 这些 future 并将其发射收集到一个数组中

var output: [Int] = []
let cancellable = race(first, second).sink { output.append($0) }

然后我们可以确定性地在调度器中向前移动时间,以查看发布者如何发出。我们可以从将时间向前移动一秒开始

scheduler.advance(by: 1)
XCTAssertEqual(output, [1])

这证明我们从发布者那里获得了第一次发射,因为已经过去了一秒钟的时间。如果我们再向前推进一秒钟,我们可以证明我们没有获得更多的发射

scheduler.advance(by: 1)
XCTAssertEqual(output, [1])

这是一个非常简单的示例,说明如何使用测试调度器控制时间流逝,但此技术可用于测试任何涉及 Combine 异步操作的发布者。

ImmediateScheduler

Combine 框架自带一个 ImmediateScheduler 类型,但它为 SchedulerTimeTypeSchedulerOptions 的关联类型定义了所有新类型。这意味着你无法轻松地在实时 DispatchQueue 和同步执行工作的 “immediate” DispatchQueue 之间切换。唯一的方法是在任何使用该调度器的代码中引入泛型,这可能会变得笨拙。

因此,相反,此库的 ImmediateScheduler 使用与现有调度器相同的关联类型,这意味着你可以使用 DispatchQueue.immediate 来拥有一个看起来像 dispatch 队列但立即执行其工作的调度器。类似地,你可以构造 RunLoop.immediateOperationQueue.immediate

此调度器对于针对使用异步运算符(例如 receive(on:)subscribe(on:) 等)的发布者编写测试非常有用,因为它强制发布者立即发射,而无需等待线程跳转或使用 XCTestExpectation 的延迟。

此调度器与 TestScheduler 的不同之处在于,你不能显式控制时间如何流经你的发布者,而是你正在立即将时间折叠成一个点。

作为一个基本示例,假设你有一个视图模型,它在按钮被点击后等待 10 秒钟加载一些数据

class HomeViewModel: ObservableObject {
  @Published var episodes: [Episode]?

  let apiClient: ApiClient

  init(apiClient: ApiClient) {
    self.apiClient = apiClient
  }

  func reloadButtonTapped() {
    Just(())
      .delay(for: .seconds(10), scheduler: DispatchQueue.main)
      .flatMap { apiClient.fetchEpisodes() }
      .assign(to: &self.$episodes)
  }
}

为了测试这段代码,你实际上需要等待 10 秒钟才能让发布者发射

func testViewModel() {
  let viewModel = HomeViewModel(apiClient: .mock)

  viewModel.reloadButtonTapped()

  _ = XCTWaiter.wait(for: [XCTestExpectation()], timeout: 10)

  XCTAssert(viewModel.episodes, [Episode(id: 42)])
}

或者,我们可以显式地将调度器传递到视图模型初始化器中,以便可以从外部控制它

class HomeViewModel: ObservableObject {
  @Published var episodes: [Episode]?

  let apiClient: ApiClient
  let scheduler: AnySchedulerOf<DispatchQueue>

  init(apiClient: ApiClient, scheduler: AnySchedulerOf<DispatchQueue>) {
    self.apiClient = apiClient
    self.scheduler = scheduler
  }

  func reloadButtonTapped() {
    Just(())
      .delay(for: .seconds(10), scheduler: self.scheduler)
      .flatMap { self.apiClient.fetchEpisodes() }
      .assign(to: &self.$episodes)
  }
}

然后在测试中使用 immediate 调度器

func testViewModel() {
  let viewModel = HomeViewModel(
    apiClient: .mock,
    scheduler: .immediate
  )

  viewModel.reloadButtonTapped()

  // No more waiting...

  XCTAssert(viewModel.episodes, [Episode(id: 42)])
}

动画调度器

CombineSchedulers 附带助手,可帮助在 SwiftUI 和 UIKit 中进行异步动画。

如果 SwiftUI 状态突变应该是动画的,你可以调用 animationtransaction 方法将现有调度器转换为一个使用动画或在事务中调度其操作的调度器。这些 API 镜像了 SwiftUI 的 withAnimationwithTransaction 函数,这些函数由动画调度器调用。

例如,要在你的视图模型中为 API 响应制作动画,你可以指定接收此状态的调度器应该是动画的

self.apiClient.fetchEpisode()
  .receive(on: self.scheduler.animation())
  .assign(to: &self.$episode)

如果你正在使用 Combine 为 UIKit 功能提供支持,则可以使用 .animate 方法,该方法镜像了 UIView.animate

self.apiClient.fetchEpisode()
  .receive(on: self.scheduler.animate(withDuration: 0.3))
  .assign(to: &self.$episode)

UnimplementedScheduler

一个调度器,如果被使用会导致测试失败。

此调度器可以提供额外的确定性层,即经过测试的代码路径不需要使用调度器。

随着视图模型变得越来越复杂,只有部分逻辑可能需要调度器。在为任何需要调度器的逻辑编写单元测试时,应提供一个 unimplemented 调度器。这直接在测试中记录了该功能不使用调度器。如果它使用了,或者将来使用了,测试将会失败。

例如,以下视图模型有几个职责

class EpisodeViewModel: ObservableObject {
  @Published var episode: Episode?

  let apiClient: ApiClient
  let mainQueue: AnySchedulerOf<DispatchQueue>

  init(apiClient: ApiClient, mainQueue: AnySchedulerOf<DispatchQueue>) {
    self.apiClient = apiClient
    self.mainQueue = mainQueue
  }

  func reloadButtonTapped() {
    self.apiClient.fetchEpisode()
      .receive(on: self.mainQueue)
      .assign(to: &self.$episode)
  }

  func favoriteButtonTapped() {
    self.episode?.isFavorite.toggle()
  }
}

API 客户端在后台队列中交付剧集,因此视图模型必须在其主队列上接收它,然后才能改变其状态。

但是,点击收藏按钮不涉及任何调度。这意味着可以使用 unimplemented 调度器编写测试

func testFavoriteButton() {
  let viewModel = EpisodeViewModel(
    apiClient: .mock,
    mainQueue: .unimplemented
  )
  viewModel.episode = .mock

  viewModel.favoriteButtonTapped()
  XCTAssert(viewModel.episode?.isFavorite == true)

  viewModel.favoriteButtonTapped()
  XCTAssert(viewModel.episode?.isFavorite == false)
}

使用 .unimplemented,此测试强烈声明收藏剧集不需要调度器来完成这项工作,这意味着可以合理地假设该功能很简单,并且不涉及任何异步。

将来,如果收藏剧集触发涉及调度器的 API 请求,则此测试将开始失败,这是一件好事!这将迫使我们解决引入的复杂性。如果我们使用了任何其他调度器,它将悄悄地接收这项额外的工作,并且测试将继续通过。

UIScheduler

一个调度器,它尽快在主队列上执行其工作。此调度器的灵感来自 ReactiveSwift 项目中的等效调度器。

如果从主线程调用 UIScheduler.shared.schedule,则工作单元将立即执行。这与 DispatchQueue.main.schedule 形成对比,后者在执行之前会产生线程跳转,因为它在底层使用了 DispatchQueue.main.async

此调度器对于需要在主线程上尽快执行工作,并且线程跳转会带来问题的情况非常有用,例如在执行动画时。

并发 API

此库提供了异步友好的 API,用于与 Combine 调度器交互。

// Suspend the current task for 1 second
try await scheduler.sleep(for: .seconds(1))

// Perform work every 1 second
for await instant in scheduler.timer(interval: .seconds(1)) {
  ...
}

Publishers.Timer

一个发布者,它在重复的时间间隔内发出调度器的当前时间。

此发布者是 Foundation 的 Timer.publisher 的替代方案,其主要区别在于它允许你为计时器使用任何调度器,而不仅仅是 RunLoop。这很有用,因为 RunLoop 调度器在以下意义上是不可测试的:如果你想针对使用 Timer.publisher 的发布者编写测试,你必须显式等待时间过去才能获得发射。这很可能导致脆弱的测试,并大大增加测试执行的时间。

它可以像 Foundation 的计时器一样使用,除了你指定调度器而不是 run loop

Publishers.Timer(every: .seconds(1), scheduler: DispatchQueue.main)
  .autoconnect()
  .sink { print("Timer", $0) }

或者,你可以在调度器上调用 timerPublisher 方法,以便在该调度器上派生重复计时器

DispatchQueue.main.timerPublisher(every: .seconds(1))
  .autoconnect()
  .sink { print("Timer", $0) }

但此计时器最好的部分是你可以将其与 TestScheduler 一起使用,以便你编写的任何涉及计时器的 Combine 代码都变得更易于测试。这展示了我们如何轻松地模拟在计时器中将时间向前移动 1,000 秒的想法

let scheduler = DispatchQueue.test
var output: [Int] = []

Publishers.Timer(every: 1, scheduler: scheduler)
  .autoconnect()
  .sink { _ in output.append(output.count) }
  .store(in: &self.cancellables)

XCTAssertEqual(output, [])

scheduler.advance(by: 1)
XCTAssertEqual(output, [0])

scheduler.advance(by: 1)
XCTAssertEqual(output, [0, 1])

scheduler.advance(by: 1_000)
XCTAssertEqual(output, Array(0...1_001))

兼容性

此库与 iOS 13.2 及更高版本兼容。请注意,Combine 框架和 iOS 13.1 及更低版本中存在错误,这些错误会导致在尝试比较 DispatchQueue.SchedulerTimeType 值时崩溃,这是 TestScheduler 依赖的操作。

安装

你可以通过将 CombineSchedulers 添加为包依赖项来将其添加到 Xcode 项目。

  1. 文件菜单中,选择Swift Packages › Add Package Dependency…
  2. 在包存储库 URL 文本字段中输入 “https://github.com/pointfreeco/combine-schedulers
  3. 取决于你的项目结构
    • 如果你有一个需要访问库的单个应用程序目标,则将 CombineSchedulers 直接添加到你的应用程序。
    • 如果你想从多个目标中使用此库,你必须创建一个依赖于 CombineSchedulers 的共享框架,然后从你的其他目标依赖于该框架。

文档

Combine Schedulers API 的最新文档可在此处获得。

其他库

许可证

此库在 MIT 许可证下发布。有关详细信息,请参阅 LICENSE