ReactiveKit

Platform Build Status Twitter

ReactiveKit 是一个轻量级的 Swift 框架,用于响应式和函数式响应式编程,使您能够立即进入响应式世界。

该框架与所有 Apple 平台和 Linux 兼容。如果您正在开发 iOS 或 macOS 应用程序,请务必查看 Bond 框架,该框架提供 UIKit 和 AppKit 绑定、响应式委托和数据源。

ReactiveKit 目前正在与 Apple 的 Combine 框架进行 API 对齐。类型和函数正在被重命名,在适用的情况下,以匹配 Combine 的类型和函数。重要的是要注意,ReactiveKit 不会成为 Combine 的直接替代品。目标是使互操作性和转换平滑。所有工作都以向后兼容的方式完成,并将通过多个版本逐步完成。查看发行说明以跟踪该过程。

本文档将通过介绍框架的实现来介绍该框架。到最后,您应该对框架的实现方式以及使用它的最佳方式有一个相当好的理解。

要快速入门,请克隆该项目并探索工作区 Playground 中提供的教程!

摘要

简介

考虑当用户输入其姓名时,文本字段的文本如何更改。每个输入的字母都为我们提供了一个新状态。

---[J]---[Ji]---[Jim]--->

我们可以将这些状态更改视为事件序列。它与数组或列表非常相似,但不同之处在于事件是随着时间的推移而生成的,而不是一次性全部保存在内存中。

响应式编程背后的想法是,一切都可以表示为一个序列。让我们考虑另一个例子——一个网络请求。

---[Response]--->

网络请求的结果是一个响应。虽然我们只有一个响应,但我们仍然可以将其视为一个序列。一个包含一个元素的数组仍然是一个数组。

数组是有限的,因此它们具有一个我们称为 size 的属性。它是衡量数组占用多少内存的指标。当我们谈论随时间推移的序列时,我们不知道它们在其生命周期内会生成多少事件。我们不知道用户会输入多少个字母。但是,我们仍然想知道序列何时完成生成事件。

为了获得该信息,我们可以引入一种特殊的事件——完成事件。它是一个标记序列结束的事件。完成事件之后不应有任何事件。

我们将用一个垂直条直观地表示完成事件。

---[J]---[Ji]---[Jim]---|--->

完成事件很重要,因为它告诉我们正在发生的事情现在已经结束。我们可以在那时完成工作,并处理可能用于处理序列的任何资源。

不幸的是,宇宙不是由秩序支配,而是由混乱支配。意外的事情会发生,我们必须预料到这一点。例如,网络请求可能会失败,因此我们可能会收到错误而不是响应。

---!Error!--->

为了在我们的序列中表示错误,我们将引入另一种事件。我们将其称为失败事件。当发生意外情况时,将生成失败事件。就像完成事件一样,失败事件也将表示序列的结束。失败事件之后不应有任何事件。

让我们看看 ReactiveKit 中如何定义事件。

extension Signal {

    /// An event of a sequence.
    public enum Event {

        /// An event that carries next element.
        case next(Element)

        /// An event that represents failure. Carries an error.
        case failed(Error)

        /// An event that marks the completion of a sequence.
        case completed
    }
}

它只是我们拥有的三种事件的枚举。序列通常会有零个或多个 .next 事件,后跟 .completed.failed 事件。

序列呢?在 ReactiveKit 中,它们被称为 *signals*(信号)。这是定义它们的协议。

/// Represents a sequence of events.
public protocol SignalProtocol {

  /// The type of elements generated by the signal.
  associatedtype Element

  /// The type of error that can terminate the signal.
  associatedtype Error: Swift.Error

  /// Register the given observer.
  /// - Parameter observer: A function that will receive events.
  /// - Returns: A disposable that can be used to cancel the observation.
  public func observe(with observer: @escaping Observer<Element, Error>) -> Disposable
}

信号表示事件序列。您可以在序列上执行的最重要的事情是观察它生成的事件。事件由 *observer*(观察者)接收。观察者只是一个接受事件的函数。

/// Represents a type that receives events.
public typealias Observer<Element, Error: Swift.Error> = (Signal<Element, Error>.Event) -> Void

Signals(信号)

我们已经看到了定义信号的协议,但是实现呢?让我们实现一个基本的信号类型!

public struct Signal<Element, Error: Swift.Error>: SignalProtocol {

  private let producer: (Observer<Element, Error>) -> Void

  public init(producer: @escaping (Observer<Element, Error>) -> Void) {
    self.producer = producer
  }

  public func observe(with observer: @escaping Observer<Element, Error>) {
    producer(observer)
  }
}

我们将信号定义为一个属性的结构体——一个生产者。正如您所看到的,producer 只是一个将观察者作为参数的函数。当我们开始观察信号时,我们所做的基本上是用给定的观察者执行生产者。这就是信号如此简单!

ReactiveKit 中的信号的实现几乎与我们在这里展示的相同。它有一些附加功能,可以为我们提供一些保证,我们将在后面讨论。

让我们创建一个信号实例,该实例首先向观察者发送三个正整数,然后完成。

直观上,它看起来像

---[1]---[2]---[3]---|--->

而在代码中,我们将这样做

let counter = Signal<Int, Never> { observer in

  // send first three positive integers
  observer(.next(1))
  observer(.next(2))
  observer(.next(3))

  // send completed event
  observer(.completed)
}

由于观察者只是一个接收事件的函数,因此只要我们想发送新事件,就可以使用该事件执行它。我们总是通过发送 .completed.failed 事件来完成序列,以便接收者知道事件生成何时结束。

ReactiveKit 将观察者包装到一个具有各种辅助方法的结构体中,以使其更容易发送事件。这是一个定义它的协议。

/// Represents a type that receives events.
public protocol ObserverProtocol {
    
    /// Type of elements being received.
    associatedtype Element
    
    /// Type of error that can be received.
    associatedtype Error: Swift.Error

    /// Send the event to the observer.
    func on(_ event: Signal<Element, Error>.Event)
}

我们之前介绍的观察者基本上是 on(_:) 方法。ReactiveKit 还在观察者上提供了此扩展

public extension ObserverProtocol {

    /// Convenience method to send `.next` event.
    public func receive(_ element: Element) {
        on(.next(element))
    }

    /// Convenience method to send `.failed` or `.completed` event.
    public func receive(completion: Subscribers.Completion<Error>) {
        switch completion {
        case .finished:
            on(.completed)
        case .failure(let error):
            on(.failed(error))
        }
    }

    /// Convenience method to send `.next` event followed by a `.completed` event.
    public func receive(lastElement element: Element) {
        receive(element)
        receive(completion: .finished)
    }
}

因此,使用 ReactiveKit,我们可以像这样实现前面的示例

let counter = Signal<Int, Never> { observer in

  // send first three positive integers
  observer.receive(1)
  observer.receive(2)
  observer.receive(3)

  // send completed event
  observer.receive(completion: .finished)
}

当我们观察到这样的信号时会发生什么?请记住,观察者是一个接收事件的函数,因此我们可以将闭包传递给我们的观察方法。

counter.observe(with: { event in
  print(event)
})

当然,我们会得到打印出来的三个事件。

next(1)
next(2)
next(3)
completed

将异步调用包装到信号中

由于我们实现 Signal 类型的方式,我们可以轻松地将异步调用包装到信号中。假设我们有一个异步函数,用于获取用户。

func getUser(completion: (Result<User, ClientError>) -> Void) -> URLSessionTask

该函数通过完成闭包和一个 Result 类型传递获取结果,该类型的实例将包含用户或错误。要将其包装到信号中,我们需要做的就是在信号初始化程序的生产者闭包中调用该函数,并根据发生的情况发送相关事件。

func getUser() -> Signal<User, ClientError> {
  return Signal { observer in
    getUser(completion: { result in
      switch result {
      case .success(let user):
        observer.receive(user)
        observer.receive(completion: .finished)
      case .failure(let error):
        observer.receive(completion: .failure(error))
    })
    // return disposable, continue reading
  }
}

如果我们现在观察这个信号,我们将得到一个用户和一个完成事件

---[User]---|--->

或一个错误

---!ClientError!--->

在代码中,获取用户看起来像

let user = getUser()

user.observe { event in
  print(event) // prints ".next(user), .completed" in case of successful response
}

在这里让我问你一个重要的问题。何时执行获取用户的请求,即何时调用异步函数 getUser(completion:)?想想看。

我们在传递给信号初始化程序的生产者闭包中调用 getUser(completion:)。但是,创建信号时不会执行该闭包。这意味着代码 let user = getUser() 不会触发请求。它只是创建一个知道如何执行请求的信号。

当我们调用 observe(with:) 方法时,会发出请求,因为那是我们的生产者闭包被执行的点。这也意味着,如果我们多次调用 observe(with:) 方法,我们将多次调用生产者,因此我们将多次执行请求。这是信号的一个非常强大的方面,稍后当我们谈论 共享事件序列 时,我们将回到它。现在只需记住,每次调用 observe(with:) 意味着事件会被再次生成。

处理信号

我们的示例函数 getUser(completion:) 返回一个 URLSessionTask 对象。我们经常不会考虑它,但是可以取消 HTTP 请求。当屏幕被关闭时,我们可能应该取消任何正在进行的请求。一种方法是在用于发出请求的 URLSessionTask 上调用 cancel()。我们如何使用信号处理这个问题?

如果您一直在仔细阅读代码示例,您可能已经注意到我们没有正确地使我们的 Signal 符合 SignalProtocol。该协议指定 observe(with:) 方法返回一个名为 Disposable 的东西。Disposable 是一个可以取消信号观察和任何基础任务的对象。

让我给你 ReactiveKit 中 Disposable 的定义。

public protocol Disposable {

  /// Cancel the signal observation and any underlying tasks.
  func dispose()

  /// Returns `true` if already disposed.
  var isDisposed: Bool { get }
}

它有一个取消观察的方法和一个可以告诉我们它是否已被处理的属性。取消观察也称为 *处理信号*。

有各种 Disposable 的实现,但让我们关注信号创建中最常用的那个。当信号被处理时,我们通常希望执行某些操作来清理资源或停止基础任务。有什么比在信号被处理时执行闭包更好的方法呢?让我们实现一个在处理信号时执行给定闭包的 Disposable。我们将它称为 BlockDisposable

public final class BlockDisposable: Disposable {

  private var handler: (() -> Void)?

  public var isDisposed: Bool {
    return handler == nil
  }

  public init(_ handler: @escaping () -> Void) {
    self.handler = handler
  }

  public func dispose() {
    handler?()
    handler = nil
  }
}

很简单。它只是在调用 dispose() 方法时执行给定的闭包。我们如何使用这样的 Disposable?好吧,我们需要改进我们的信号实现。

谁应该创建 Disposable?由于 Disposable 表示一种传达信号取消的方式,因此显然是创建信号的人也应该提供一个可以取消信号的 Disposable。为此,我们将重构信号生产者以返回一个 Disposable。此外,我们将从 observe(with:) 方法返回该 Disposable,以便任何观察信号的人都可以取消观察。

public struct Signal<Element, Error: Swift.Error>: SignalProtocol {

  private let producer: (Observer<Element, Error>) -> Disposable

  public init(producer: @escaping (Observer<Element, Error>) -> Disposable) {
    self.producer = producer
  }

  public func observe(with observer: @escaping Observer<Element, Error>) -> Disposable {
    return producer(observer)
  }
}

这意味着当我们在创建信号时,我们还必须提供一个 Disposable。让我们重构我们的异步函数包装信号以提供一个 Disposable。

func getUser() -> Signal<User, ClientError> {
  return Signal { observer in
    let task = getUser(completion: { result in
      switch result {
      case .success(let user):
        observer.receive(user)
        observer.receive(completion: .finished)
      case .failure(let error):
        observer.receive(completion: .failure(error))
    })

    return BlockDisposable {
      task.cancel()
    }
  }
}

我们只是返回一个 BlockDisposable 实例,该实例在处理时取消任务。然后,我们可以在观察信号时获得该 Disposable。

let disposable = getUser().observe { event in
  print(event)
}

当我们不再对信号事件感兴趣时,我们可以处理该 Disposable。它将取消观察并取消网络任务。

disposable.dispose()

对于 ReactiveKit 中 Signal 的实际实现,还有其他机制可以防止在处理信号时发送事件,因此可以保证在处理信号后不会收到任何事件。在处理信号后从生产者发送的任何事件都将被忽略。

在 ReactiveKit 中,当信号以 .completed.failed 事件终止时,它们会自动被处理。

转换信号

这一切都很好,但我们为什么要这样做?有什么好处?这就是响应式编程最有趣的方面——信号运算符。

运算符是将一个或多个信号转换为其他信号的函数(即方法)。信号上的基本操作之一是过滤。假设我们有一个城市名称的信号,但我们只想要以字母 *P* 开头的名称。

filter(
---[Berlin]---[Paris]---[London]---[Porto]---|--->
)

--------------[Paris]--------------[Porto]---|--->

我们如何实现这样的运算符?非常容易。

extension SignalProtocol {

  /// Emit only elements that pass `isIncluded` test.
  public func filter(_ isIncluded: @escaping (Element) -> Bool) -> Signal<Element, Error> {
    return Signal { observer in
      return self.observe { event in
        switch event {
        case .next(let element):
          if isIncluded(element) {
            observer.receive(element)
          }
        default:
          observer(event)
        }
      }
    }
  }
}

我们在 SignalProtocol 上编写了一个扩展方法,我们在其中创建一个信号。在创建的信号的生产者中,我们观察 *self* - 我们正在过滤的信号 - 并传播通过测试的 .next 事件。我们还在 default 情况中传播完成和失败事件。

我们通过在信号上调用运算符来使用它。

cities.filter { $0.hasPrefix("P") }.observe { event in
  print(event) // prints .next("Paris"), .next("Porto"), .completed
}

信号有很多操作符。ReactiveKit 基本上是信号操作符的集合。让我们看看另一个常见的操作符。

在观察信号时,我们通常不关心终端事件,我们只关心 .next 事件中的元素。我们可以编写一个只给出这些元素的操作符。

extension SignalProtocol {

  /// Register an observer that will receive elements from `.next` events of the signal.
  public func observeNext(with observer: @escaping (Element) -> Void) -> Disposable {
    return observe { event in
      if case .next(let element) = event {
        observer(element)
      }
    }
  }
}

这应该很简单 - 只需从 .next 事件传播元素,忽略其他所有内容。现在我们可以这样做:

cities.filter { $0.hasPrefix("P") }.observeNext { name in
  print(name) // prints "Paris", "Porto"
}

当您只对这些事件感兴趣时,ReactiveKit 还提供 observeFailedobserveCompleted 操作符。

编写信号操作符就像编写扩展方法一样简单。当您需要框架未提供的功能时,只需自己编写即可!编写 ReactiveKit 的目的是使其易于理解。无论何时遇到困难,只需查看实现

更多关于错误

我们已经看到信号可能会因错误而终止。在我们的 getUser 示例中,当网络请求失败时,我们会发送 .failed 事件。因此,我们的 Signal 类型是通用的,既针对它发送的元素,也针对它可能失败的错误。但是,在某些情况下,可以保证信号不会失败,即它们永远不会发送错误。我们该如何定义呢?

ReactiveKit 提供以下类型:

/// An error type that cannot be instantiated. Used to make signals non-failable.
public enum Never: Error {
}

一个没有 case 且符合 Swift.Error 协议的枚举。由于它没有 case,我们永远无法创建它的实例。我们将使用这个技巧来获得信号不会失败的编译时保证。

例如,如果我们尝试:

let signal = Signal<Int, Never> { observer in
  ...
  observer.failed(/* What do I send here? */)
  ...
}

我们会碰壁,因为我们无法创建 Never 的实例,因此我们无法发送 .failed 事件。这是一个非常强大且重要的功能,因为每当您看到错误的类型专门针对 Never 类型的信号时,您可以安全地假设该信号不会失败 - 因为它不能。

绑定只适用于安全(非失败)信号。

创建简单信号

您经常需要一个只发出一个元素然后完成的信号。要创建它,请使用静态方法 just

let signal = Signal<Int, Never>.just(5)

这将为您提供以下信号:

---5-|--->

如果您需要一个触发多个元素然后完成的信号,您可以将任何 Sequence 转换为具有静态方法 sequence 的信号。

let signal = Signal<Int, Never>.sequence([1, 2, 3])
---1-2-3-|--->

要创建一个只完成而不发送任何元素的信号,请执行以下操作:

let signal = Signal<Int, Never>.completed()
---|--->

要创建一个只失败的信号,请执行以下操作:

let signal = Signal<Int, MyError>.failed(MyError.someError)
---!someError!--->

您还可以创建一个从不发送任何事件的信号(即从不终止的信号)。

let signal = Signal<Int, Never>.never()
------>

有时您需要一个在经过一定时间后发送特定元素的信号:

let signal = Signal<Int, Never>(just: 5, after: 60)
---/60 seconds/---5-|-->

最后,当您需要一个每隔 interval 秒发送一个整数的信号时,请执行以下操作:

let signal = Signal<Int, Never>(sequence: 0..., interval: 5)
---0---1---2---3---...>

在一个包中处理

在进行多次观察时,处理 disposables 可能会很麻烦。为了简化它,ReactiveKit 提供了一种名为 DisposeBag 的类型。它是一个容器,您可以将您的 disposables 放入其中。当 bag 被释放时,它将处理所有放入其中的 disposables。

class Example {

  let bag = DisposeBag()

  init() {
    ...
    someSignal
      .observe { ... }
      .dispose(in: bag)

    anotherSignal
      .observe { ... }
      .dispose(in: bag)
    ...
  }
}

在示例中,我们没有处理 disposables,而是通过在 disposable 上调用 dispose(in:) 方法将它们放入 bag 中。然后,当 bag 被释放时,disposables 将自动被处理。请注意,您也可以在 bag 上调用 dispose() 以随意处理其内容。

ReactiveKit 为 NSObject 及其子类提供了开箱即用的 bag。如果您正在进行 iOS 或 macOS 开发,您将在您的视图控制器和其他 UIKit 对象上获得一个免费的 bag,因为它们都是 NSObject 子类。

extension NSObject {
  public var bag: DisposeBag { get }
}

如果您像我一样不想担心释放,请查看绑定

线程

默认情况下,观察者在发送事件的线程或队列上接收事件。

例如,如果我们有一个像这样创建的信号:

let someImage = Signal<UIImage, Never> { observer in
  ...
  DispatchQueue.global(qos: .background).async {
    observer.receive(someImage)
  }
  ...
}

如果我们使用它来更新图像视图:

someImage
  .observeNext { image in
    imageView.image = image // called on background queue
  }
  .dispose(in: bag)

我们将最终得到一种奇怪的行为。我们将在一个非线程安全的 UIImageView 实例上从后台队列设置图像 - 就像其余的 UIKit 一样。

我们可以通过另一个异步调度到主队列来设置图像,但有一种更好的方法。只需将操作符 receive(on:) 与您希望调用观察者的队列一起使用即可。

someImage
  .receive(on: ExecutionContext.main)
  .observeNext { image in
    imageView.image = image // called on main queue
  }
  .dispose(in: bag)

还有另一方面。您可能有一个信号,该信号在观察它的任何线程或队列上进行一些缓慢的同步工作。

let someData = Signal<Data, Never> { observer in
  ...
  let data = // synchronously load large file
  observer.receive(data)
  ...
}

但是,我们不希望观察该信号会阻塞 UI。

someData
  .observeNext { data in // blocks current thread
    display(data)
  }
  .dispose(in: bag)

我们希望在另一个队列上进行加载。我们可以异步调度加载,但是如果我们无法更改信号生成器闭包(因为它是在框架中定义的,或者有其他原因导致我们无法更改它)该怎么办?这就是操作符 subscribe(on:) 拯救世界的时候。

someData
  .subscribe(on: ExecutionContext.global(qos: .background))
  .receive(on: ExecutionContext.main)
  .observeNext { data in // does not block current thread
    display(data)
  }
  .dispose(in: bag)

通过应用 subscribe(on:),我们定义了信号生成器在哪里执行。我们通常将其与 receive(on:) 结合使用,以定义观察者在哪里接收事件。

请注意,这些操作符与执行上下文一起使用。执行上下文是对线程或队列的简单抽象。您可以在此处查看它的实现方式。

绑定

绑定是具有特权的观察。大多数时候,您应该能够用绑定替换观察。考虑以下示例。假设我们有一个用户信号:

let presentUserProfile: Signal<User, Never> = ...

并且我们想在信号上发送用户时显示一个个人资料屏幕。通常我们会做这样的事情:

presentUserProfile.receive(on: ExecutionContext.main).observeNext { [weak self] user in
  let profileViewController = ProfileViewController(user: user)
  self?.present(profileViewController, animated: true)
}.dispose(in: bag)

但这太丑陋了!我们必须将所有内容调度到主队列,小心不要创建 retain 循环,并确保处理从观察中获得的 disposable。

幸运的是,有一种更好的方法。我们可以创建内联绑定而不是观察。只需执行以下操作:

presentUserProfile.bind(to: self) { me, user in
  let profileViewController = ProfileViewController(user: user)
  me.present(profileViewController, animated: true)
}

并且不再担心线程、retain 循环和释放,因为绑定会自动处理所有这些。只需将信号绑定到负责执行副作用的目标(在我们的示例中,绑定到负责显示个人资料视图控制器的视图控制器)。每当信号发出带有目标和已发送元素作为参数的元素时,都会调用您提供的闭包。

绑定目标

您可以绑定到符合 DeallocatableBindingExecutionContextProvider 协议的目标。

实际上,您可以只绑定到符合 Deallocatable 协议的目标,但是您必须通过调用 bind(to:context:setter) 来传递在其中更新目标的执行上下文。

符合 Deallocatable 的对象提供了一个信号,可以告诉我们对象何时被释放。

public protocol Deallocatable: class {

  /// A signal that fires `completed` event when the receiver is deallocated.
  var deallocated: Signal<Void, Never> { get }
}

ReactiveKit 为 NSObject 及其子类提供了开箱即用的此协议的符合性。

您如何符合 Deallocatable?最简单的方法是符合 DisposeBagProvider

/// A type that provides a dispose bag.
/// `DisposeBagProvider` conforms to `Deallocatable` out of the box.
public protocol DisposeBagProvider: Deallocatable {

  /// A `DisposeBag` that can be used to dispose observations and bindings.
  var bag: DisposeBag { get }
}

extension DisposeBagProvider {

  public var deallocated: Signal<Void, Never> {
    return bag.deallocated
  }
}

如您所见,DisposeBagProvider 继承了 Deallocatable 并通过从 bag 中获取已释放的信号来实现它。因此,您需要做的就是在您的类型上提供一个 bag 属性。

BindingExecutionContextProvider 协议提供对象应在其中更新的执行上下文。执行上下文只是对调度队列或线程的包装器。您可以在此处查看它的实现方式。

public protocol BindingExecutionContextProvider {

  /// An execution context used to deliver binding events.
  var bindingExecutionContext: ExecutionContext { get }
}

Bond 框架为各种 UIKit 对象提供了 BindingExecutionContextProvider 符合性,因此它们可以无缝绑定到,同时确保主线程。

您可以通过提供执行上下文来符合此协议。

extension MyViewModel: BindingExecutionContextProvider {

  public var bindingExecutionContext: ExecutionContext {
    return .immediateOnMain
  }
}

如果当前线程是主线程,ExecutionContext.immediateOnMain 同步执行,否则它会异步调度到主队列。如果要在后台队列上绑定,您可以返回 .global(qos: .background)

请注意,更新 UIKit 或 AppKit 对象必须始终从主线程或队列进行。

现在我们可以深入了解绑定实现。

extension SignalProtocol where Error == Never {

  @discardableResult
  public func bind<Target: Deallocatable>(to target: Target, setter: @escaping (Target, Element) -> Void) -> Disposable
  where Target: BindingExecutionContextProvider
  {
    return take(until: target.deallocated)
      .observeIn(target.bindingExecutionContext)
      .observeNext { [weak target] element in
        if let target = target {
          setter(target, element)
        }
      }
  }
}

首先,请注意 @discardableResult 注释。它在那里是因为我们可以安全地忽略返回的 disposable。当目标被释放时,绑定将自动被释放。这是由 take(until:) 操作符确保的。它从自身传播事件,直到给定信号完成 - 在我们的例子中,直到 target.deallocated 信号完成。然后,我们只需在正确的上下文中观察,并在每个下一个元素上使用提供的 setter 闭包更新目标。

另请注意,绑定仅在非失败信号上实现。

绑定到属性

给定一个字符串信号 name,我们知道我们可以通过以下方式将其绑定到标签:

name.bind(to: label) { label, name in
  label.text = name
}

但是如果我们能让它成为一行代码,岂不是很好?使用 Swift 4 键路径,我们可以!只需执行以下操作:

name.bind(to: label, keyPath: \.text)

其中 target 与上一个示例中的目标相同,keyPath 是应该使用在信号上发送的每个新元素更新的属性的键路径!

如果您选择使用 Bond 框架,事情会变得更简单:

name.bind(to: label.reactive.text)

Bond 提供了一种名为 Bond 的类型,该类型充当绑定目标,我们可以使用它为各种属性创建反应式扩展。查看其文档以获取更多信息。

共享事件序列

每当我们观察一个信号时,我们都会执行它的生成器。考虑以下信号:

let user = Signal { observer in
  print("Fetching user...")
  ...
}

如果现在我们这样做:

user.observe { ... } // prints: Fetching user...
user.observe { ... } // prints: Fetching user...

生成器将被调用两次,并且用户将被获取两次。相同的行为可能会在代码中悄悄地被忽略,例如:

user.map { $0.name }.observe { ... } // prints: Fetching user...
user.map { $0.email }.observe { ... } // prints: Fetching user...

您可以将每个信号观察视为一个独立的过程。通常,这种行为正是我们所需要的,但有时我们可以通过将一个序列共享给多个观察者来优化我们的代码。为了实现这一点,我们只需要应用操作符 shareReplay(limit:)

let user = user.shareReplay(limit: 1)

user.map { $0.name }.observe { ... } // prints: Fetching user...
user.map { $0.email }.observe { ... } // Does not print anything, but still gets the user :)

参数 limit 指定应将多少个元素(.next 事件)重播给观察者。终端事件始终重播。一个元素通常就是我们所需要的。操作符 shareReplay(limit:) 是两个操作符的组合。为了理解它,我们将介绍两个有趣的概念:subjects 和 connectable signals。

Subjects(主题)

在文档的开头,我们使用 SignalProtocol 协议定义了信号。然后,我们实现了一个具体的 Signal 类型,该类型通过为每个观察执行生成器闭包来符合该协议。生成器会将事件发送到提供给 observe(with:) 方法的观察者。

我们可以用不同的方式实现信号吗?让我们尝试制作另一种信号 - 也是观察者的一种。我们将它称为 Subject。以下是 ReactiveKit 提供的 Subject 的简化实现。

open class Subject<Element, Error: Swift.Error>: SignalProtocol, ObserverProtocol {

  private var observers: [Observer<Element, Error>] = []

  open func on(_ event: Signal<Element, Error>.Event) {
    observers.forEach { $0(event) }
  }

  open func observe(with observer: @escaping Observer<Element, Error>) -> Disposable {
    observers.append(observer)
    return /* a disposable that removes the observer from the array */
  }
}

我们的新型信号 subject 本身就是一个观察者,它持有自己观察者的数组。当 subject 接收到一个事件(当调用 on(_:) 方法时),该事件只是传播到所有注册的观察者。观察此 subject 意味着将给定的观察者添加到观察者数组中。

我们如何使用这样的 subject?

let name = Subject<String, Never>()

name.observeNext { name in print("Hi \(name)!") }

name.on(.next("Jim")) // prints: Hi Jim!

// ReactiveKit provides few extension toon the ObserverProtocol so we can also do:
name.send("Kathryn") // prints: Hi Kathryn!

name.send(completion: .finished)

注意:使用 ReactiveKit 时,您实际上应该使用 PassthroughSubject。它具有与我们在此定义的 Subject 相同的行为和接口 - 只是一个不同的名称,以便与 ReactiveX API 保持一致。

如您所见,我们没有生成器闭包,而是将事件发送到 subject 本身。然后,subject 将这些事件传播到自己的观察者。

当我们需要将命令式世界中的操作转换为反应式世界中的信号时,Subjects 非常有用。例如,假设我们需要将视图控制器外观事件作为信号。我们可以创建一个 subject 属性,然后从 viewDidAppear 重写中将事件发送给它。这样的 subject 将表示视图控制器外观事件的信号。

class MyViewController: UIViewController {

  fileprivate let _viewDidAppear = PassthroughSubject<Void, Never>()

  override viewDidAppear(_ animated: Bool) {
    super.viewDidAppear(animated)
    _viewDidAppear.send()
  }
}

我们可以公开 subject,但是这样任何人都可以向其发送事件。更好的方法是像我们一样将其设为 fileprivate,然后将其公开为信号。建议将所有反应式扩展放入 ReactiveKit 提供的 ReactiveExtensions 类型的扩展中。这是你的做法:

extension ReactiveExtensions where Base: MyViewController {

  var viewDidAppear: Signal<Void, Never> {
    return base._viewDidAppear.toSignal() // convert Subject to Signal
  }
}

然后我们可以像这样使用我们的信号:

myViewController.reactive.viewDidAppear.observeNext {
  print("view did appear")
}

Subjects 代表的是被称为热信号的信号类型。 它们之所以被称为热信号,是因为它们会“发送”事件,而不管是否有观察者注册。 另一方面,Signal 类型代表的是被称为冷信号的信号类型。 这种类型的信号不会产生事件,直到我们给它们一个观察者来接收事件。

正如你可能从实现中推断出的那样,观察一个 Subject 只能得到观察者注册后发送的事件。 任何在观察者注册之前可能已发送的事件都不会被观察者接收。 有什么方法可以解决这个问题吗? 嗯,我们可以缓冲接收到的事件,然后将它们重播给新的观察者。 让我们在一个子类中这样做。

public final class ReplaySubject<Element, Error: Swift.Error>: Subject<Element, Error> {

  private var buffer: [Signal<Element, Error>.Event] = []

  public override func on(_ event: Signal<Element, Error>.Event) {
    buffer.append(event)
    super.on(event)
  }

  public func observe(with observer: @escaping Observer<Element, Error>) -> Disposable {
    buffer.forEach { observer($0) }
    return super.observe(with: observer)
  }
}

同样,这是 ReactiveKit 提供的 ReplaySubject 的简化版本,但它包含了说明这个概念所需的一切。 每当接收到一个事件,我们都会把它放到缓冲区中。 当观察者注册时,我们就会重播我们在缓冲区中的所有事件。 任何未来的事件都将像在 Subject 中一样被传播。

注意:ReactiveKit 提供的 ReplaySubject 支持将缓冲区限制为特定大小,因此它不会永远增长。 通常,通过使用 ReplaySubject(bufferSize: 1) 实例化它,将其限制为仅一个事件就足够了。 该缓冲区始终保存最新的事件并删除旧的事件。

此时,你可能已经有了如何实现 shareReplay 操作符的想法。 我们可以用 replay subject 观察原始信号,然后多次观察该 subject。 但是,为了将它实现为一个操作符并对用户不透明,我们需要了解可连接信号。

可连接信号

到目前为止,我们已经看到了两种信号。 一种 Signal 类型的信号,只有在观察者注册后才会产生事件;另一种是 Subject 类型的信号,不管是否有观察者注册都会产生事件。 可连接信号将是我们实现的第三种信号。 只有当我们调用它的 connect() 方法时,它才会开始产生事件。 让我们首先定义一个协议。

/// Represents a signal that is started by calling `connect` on it.
public protocol ConnectableSignalProtocol: SignalProtocol {

  /// Start the signal.
  func connect() -> Disposable
}

我们将构建一个可连接信号,作为对任何其他类型信号的包装器。 我们将利用 Subjects 来实现。

public final class ConnectableSignal<Source: SignalProtocol>: ConnectableSignalProtocol {

  private let source: Source
  private let subject: Subject<Source.Element, Source.Error>

  public init(source: Source, subject: Subject<Source.Element, Source.Error>) {
    self.source = source
    self.subject = subject
  }

  public func connect() -> Disposable {
    return source.observe(with: subject)
  }

  public func observe(with observer: @escaping Observer<Source.Element, Source.Error>) -> Disposable {
    return subject.observe(with: observer)
  }
}

这里我们需要两样东西:一个我们正在包装到可连接信号中的源信号,以及一个将事件从源信号传播到可连接信号观察者的 subject。 我们将在初始化程序中要求它们,并将它们保存为属性。

观察可连接信号实际上意味着观察底层的 subject。 启动信号现在很简单 - 我们需要做的就是用 subject 开始观察源信号(记住 - subject 也是一个观察者)。 这将使事件从源流入注册到 subject 的观察者。

我们现在拥有实现 shareReplay(limit:) 的所有部分。 让我们从 replay(limit:) 开始。

extension SignalProtocol {

  /// Ensure that all observers see the same sequence of elements. Connectable.
  public func replay(_ limit: Int = Int.max) -> ConnectableSignal<Self> {
    return ConnectableSignal(source: self, subject: ReplaySubject(bufferSize: limit))
  }
}

足够简单。 使用 ReplaySubject 创建一个 ConnectableSignal 可确保所有观察者获得相同的事件序列,并且源信号仅被观察一次。 唯一的问题是返回的信号是一个可连接信号,因此我们必须调用它的 connect() 方法才能启动事件。

我们需要以某种方式将可连接信号转换为非可连接信号。 为了做到这一点,我们需要在正确的时间调用 connect,并在正确的时间进行 dispose。 什么时候是正确的时间呢? 只有这样才合理 - 连接的正确时间是在第一次观察时,而 dispose 的正确时间是当最后一个观察被 dispose 时。

为了做到这一点,我们将保持一个引用计数。 每次有新的观察者,计数都会增加,而在每次 dispose 时,计数都会减少。 当计数从 0 变为 1 时,我们将连接;当计数从 1 变为 0 时,我们将 dispose。

public extension ConnectableSignalProtocol {

  /// Convert connectable signal into the ordinary signal by calling `connect`
  /// on the first observation and calling dispose when number of observers goes down to zero.
  public func refCount() -> Signal<Element, Error> {
    // check out: https://github.com/ReactiveKit/ReactiveKit/blob/e781e1d0ce398259ca38cc0d5d0ed6b56d8eab39/Sources/Connectable.swift#L68-L85
   }
}

实现 shareReplay 操作符

现在我们了解了 subjects 和可连接信号,我们可以实现操作符 shareReplay(limit:)。 这非常简单

/// Ensure that all observers see the same sequence of elements.
public func shareReplay(limit: Int = Int.max) -> Signal<Element, Error> {
  return replay(limit).refCount()
}

处理信号错误

你可能会忽略它们并延迟处理,但在某个时候,你需要处理信号可能发生的错误。

如果信号有可能通过重试原始生产者来恢复,你可以使用 retry 操作符。

let image /*: Signal<UIImage, NetworkError> */ = getImage().retry(3)

想象一下,在命令式编程范例中,这将需要多少行代码 :)

操作符 retry 只能在某些时候起作用,并且最终会失败。 应用该操作符的结果仍然是一个可能失败的信号。

我们如何将可能失败的信号转换为非失败(安全)信号? 我们必须以某种方式处理这个错误。 一种方法是用默认元素来恢复。

let image /*: Signal<UIImage, Never> */ = getImage().recover(with: .placeholder)

现在我们得到了安全的 Signal,因为转换后的信号永远不会失败。 在原始信号上可能发生的任何 .failed 事件都将被包含默认元素(在我们的示例中是占位符图像)的 .next 事件所替换。

获得安全信号的另一种方法是忽略 - 抑制 - 错误。 如果你真的不在乎这个错误,并且忽略它不会发生任何坏事,你就会这样做。

let image /*: Signal<UIImage, Never> */ = getImage().suppressError(logging: true)

记录错误始终是一个好主意。

如果你需要在发生错误的情况下执行其他逻辑,你可以将其平铺映射到其他信号上。

let image = getImage().flatMapError { error in
  return getAlternativeImage()
}

Property(属性)

Property 将可变状态包装到一个对象中,该对象能够观察该状态。 每当状态改变时,观察者都会收到通知。 就像 PassthroughSubject 一样,它代表了进入命令式世界的桥梁。

要创建一个 property,只需用初始值初始化它即可。

let name = Property("Jim")

nil 是包装可选类型的 property 的有效值。

Properties 就像 Signal 类型的信号一样,也是信号。 它们可以被转换为其他信号,以与信号相同的方式被观察和绑定。

例如,你可以使用 observeobserveNext 方法注册一个观察者。

name.observeNext { value in
  print("Hi \(value)!")
}

当你注册一个观察者时,它会立即用 property 的当前值来调用,因此代码片段将打印 "Hi Jim!"。

要随后更改 property 的值,只需设置 value 属性即可。

name.value = "Jim Kirk" // Prints: Hi Jim Kirk!

加载信号

信号通常代表异步操作,例如网络调用。 任何好的应用程序都会在调用正在进行时向用户显示某种加载指示器,并在调用失败时显示一个错误对话框,其中可能包含重试选项。 为了方便这些用例,ReactiveKit 提供了 LoadingSignalLoadingProperty 类型。

一个 action 或一个 work 可以处于三种状态之一:加载中(loading)、已加载(loaded)、加载失败(loading failed)。 ReactiveKit 使用枚举 LoadingState 定义了这些状态。

/// Represents loading state of an asynchronous action.
public enum LoadingState<LoadingValue, LoadingError: Error>: LoadingStateProtocol {

  /// Value is loading.
  case loading

  /// Value is loaded.
  case loaded(LoadingValue)

  /// Value loading failed with the given error.
  case failed(LoadingError)
}

元素类型为 LoadingState 的信号被类型别名为 LoadingSignal

public typealias LoadingSignal<LoadingValue, LoadingError: Error> = Signal<LoadingState<LoadingValue, LoadingError>, Never>

请注意,加载信号是一个安全信号。 信号本身永远不会失败,但错误可以作为 .failed 加载状态发出。 这意味着该错误不会终止信号 - 在错误之后可以接收到新的事件。

如何将常规信号转换为加载信号? 就像应用 toLoadingSignal 操作符一样简单。 假设我们有一个代表一些资源获取操作的信号

let fetchImage: Signal<UIImage, ApplicationError> = ...

然后,我们可以通过应用 toLoadingSignal 操作符将该信号转换为加载信号。

fetchImage
    .toLoadingSignal()
    .observeNext { loadingState in
        switch loadingState {
        case .loading:
            // display loading indicator
        case .loaded(let image):
            // hide loading indicator
            // display image
        case .failed(let error):
            // hide loading indicator
            // display error message
        }
    }

现在,观察下一个元素会为我们提供信号的加载状态。 我们将在开始观察时立即收到 .loading 状态。 当资源加载完成时,我们将收到处于 .loaded 状态的资源或处于 .failed 状态的错误。

使用加载状态

加载信号看起来很棒,但是手动更新我们正在为其加载数据的每个视图的加载状态并没有什么乐趣。 值得庆幸的是,有一种更好的方法 - LoadingStateListener 协议

/// A consumer of loading state.
public protocol LoadingStateListener: class {

    /// Consume observed loading state.
    func setLoadingState<LoadingValue, LoadingError>(_ state: ObservedLoadingState<LoadingValue, LoadingError>)
}

任何根据其显示数据的加载状态更新其外观的东西都可以实现此协议。 在 iOS 上,UIViewController 或 UIView 将是一个不错的选择。 例如

extension UIViewController: LoadingStateListener {

    public func setLoadingState<LoadingValue, LoadingError>(_ state: ObservedLoadingState<LoadingValue, LoadingError>) {
        switch state {
        case .loading:
            // display loading indicator
        case .reloading:
            // display reloading indicator
        case .loaded(let value):
            // hide loading indicator
            // display value
        case .failed(let error):
            // hide loading indicator
            // display error
        }
    }
}

请注意,LoadingStateListener 获取的是 ObservedLoadingState 而不是 LoadingState。 两者之间的区别在于前者具有一个额外的状态:.reloading。 ReactiveKit 会自动将后续的 .loading 状态转换为 .reloading 状态,以便你可以在这两种情况下采取不同的行动。

现在我们有了一个加载状态侦听器,我们可以通过由侦听器消耗其加载状态,将任何加载信号转换为常规的安全信号

fetchImage
    .toLoadingSignal()
    .consumeLoadingState(by: viewController)
    .bind(to: viewController.imageView) { imageView, image in
        imageView.image = image
    }

令人兴奋! 操作符 consumeLoadingState 接受加载状态侦听器,并在每次加载信号产生一个状态时更新它。 它返回加载值的安全信号,即它从 .loaded 状态中解包底层值。 在我们的示例中,这将是 Signal<UIImage, Never>,然后我们可以将其绑定到我们的图像视图并更新其内容。

转换加载信号

ReactiveKit 提供了许多特定于加载信号的操作符,例如 valuemapValuemapLoadingErrordematerializeLoadingStateflatMapValue。 但是,你可以将作用于其值的常规信号操作符应用于加载信号。 为此,请使用 liftValue 操作符。 例如,要跳过前三个值并将它们延迟一秒钟,请执行以下操作

aLoadingSignal.liftValue {
    $0.skip(first: 3).delay(interval: 1)
}

liftValue 接受一个闭包,该闭包会获得一个常规信号,然后你可以使用常规信号操作符来转换它。

加载属性

我们经常需要一种方法来存储异步操作的结果,以及一种方法来刷新(重新加载)它。为此,我们可以使用 LoadingProperty 类型。它类似于常规的 Property,但不是使用值来初始化它,而是使用一个闭包来初始化它,该闭包提供一个加载信号 - 一个可以执行某些工作的闭包。然后,LoadingProperty 可以像任何其他 LoadingSignal 一样使用。当我们第一次观察(或绑定)它时,它将加载其值,即执行该工作。它还提供了一种通过再次执行该工作来重新加载值的方法。

这是一个我们如何使用 LoadingProperty 来实现一个简单的用户服务的例子。

class UserService {

    let user: LoadingProperty<User, ApplicationError>

    init(_ api: API) {

        user = LoadingProperty {
            api.fetchUser()
        }
    }

    func refresh() -> LoadingSignal<User, ApplicationError> {
        return user.reload()
    }
}

其他常见模式

在 .next 事件上执行操作

假设您有一个按钮,用于(重新)加载应用程序中的照片。 我们将如何在响应式世界中实现这一点? 首先,我们需要一个表示按钮点击的信号。 使用 Bond 框架,您可以像这样获得该信号。

let reload /*: Signal<Void, Never> */ = button.reactive.tap

每当点击按钮时,该信号将发送一个 .next 事件。 我们希望在每次这样的事件上加载照片。 为此,我们将 reload 信号 flat map 到照片请求中。

let photo = reload.flatMapLatest { _ in
  return apiClient().loadPhoto() // returns Signal<UIImage, NetworkError>
}

photo 将是内部信号的任何类型 - 在我们的例子中是 Signal<UIImage, NetworkError>。 然后我们可以将其绑定到图像视图。

photo
  .suppressError(logging: true)  // we can bind only safe signals
  .bind(to: imageView.reactive.image) // using the Bond framework

发生的事情是,每当点击按钮时,都会发出一个新的照片请求,并且图像视图的图像将被更新。

还有两个其他的操作符可以 flat map 信号:flatMapConcatflatMapMerge。 这三者之间的区别在于,当有多个活跃的内部信号时,它们处理来自内部信号的事件传播的方式。 例如,假设用户在之前的请求完成之前点击了重新加载按钮。 会发生什么?

组合多个信号

假设您有用户名和密码信号,并且您想要一个信号来告诉您是否都已输入,以便您可以启用登录按钮。 您可以使用 combineLatest 操作符来实现这一点。

let username = usernameLabel.reactive.text
let password = passwordLabel.reactive.text

let canLogIn = combineLatest(username, password) { username, password in
  return !username.isEmpty && !password.isEmpty
}

canLogIn.bind(to: loginButton.reactive.isEnabled)

您需要向操作符提供的只是信号和一个闭包,该闭包将来自这些信号的最新元素映射到一个新元素。

响应式扩展由 Bond 框架提供。

调试

Timelane

ReactiveKit 内置支持 Timelane Xcode Instrument。 只需下载该工具并开始使用 lane 操作符将信号数据发送到 Timelane Instrument。

mySignal
  .filter { ... }
  .lane("My Signal")
  .map { ... }
  .sink {
    ... 
  }

这是一行代码!

请注意,lane 仅在 macOS 10.14、iOS 12、tvOS 12、watchOS 5 或更高版本上可用。 如果您正在为较旧的系统版本编译,为了方便起见,您可以使用 laneIfAvailable 操作符,但请记住,在较旧的系统版本上进行测试时,事件日志记录将静默失败。

Debug 操作符

您可以通过应用 debug 操作符将信号事件打印到控制台。

mySignal
  .filter { ... }
  .debug("My Signal")
  .map { ... }
  .sink {
    ... 
  }

断点

ReactiveKit 还提供了一个 breakpoint 操作符。 它是基于 Combine 的 breakpoint operator 实现的。

要求

或者

安装

Carthage

github "DeclarativeHub/ReactiveKit"

CocoaPods

pod 'ReactiveKit'

Swift Package Manager

// swift-tools-version:5.0

import PackageDescription

let package = Package(
  name: "MyApp",
  dependencies: [
    .package(url: "https://github.com/DeclarativeHub/ReactiveKit.git", from: "3.10.0")
  ],
  targets: [
    .target(name: "MyApp", dependencies: ["ReactiveKit"])
  ]
)

沟通

其他文档

许可

MIT 许可证(MIT)

版权所有 (c) 2015-2020 Srdan Rasic (@srdanrasic)

特此授予任何人免费获得本软件及其相关文档文件(“软件”)的副本的许可,以处理本软件,不受限制,包括但不限于使用、复制、修改、合并、发布、分发、再许可和/或销售本软件的副本,并允许向其提供本软件的人员受以下条件的约束

上述版权声明和本许可声明应包含在本软件的所有副本或主要部分中。

本软件按“原样”提供,不提供任何形式的明示或暗示的保证,包括但不限于对适销性、特定用途的适用性和非侵权性的保证。在任何情况下,作者或版权持有人均不对任何索赔、损害或其他责任承担责任,无论是在合同、侵权行为还是其他方面,因本软件或本软件的使用或其他交易而引起的或与之相关的责任。