CombineExt 为 Combine 提供了一系列操作符、发布者和实用工具,这些工具并非 Apple 提供,但在其他响应式框架和标准中很常见。
最初,添加这些工具的灵感来自于我多年使用 RxSwift 和 ReactiveX 后,对 Combine 的研究。
所有操作符、实用工具和帮助程序都遵守 Combine 的发布者合约,包括背压。
注意:这仍然是 CombineExt 的一个相对早期的版本,还有很多需要完善的地方。 我很乐意接受 PR、想法、意见或改进。 谢谢!:)
将以下行添加到您的 Podfile 中
pod 'CombineExt'
将以下依赖项添加到您的 Package.swift 文件中
.package(url: "https://github.com/CombineCommunity/CombineExt.git", from: "1.0.0")
Carthage 支持作为预构建的二进制文件提供。
将以下内容添加到您的 Cartfile 中
github "CombineCommunity/CombineExt"
本节概述了 CombineExt 提供的一些自定义操作符。
通过将 self
中的每个值与来自其他发布者的最新值(如果有)组合,将最多四个发布者合并为一个发布者。
let taps = PassthroughSubject<Void, Never>()
let values = CurrentValueSubject<String, Never>("Hello")
taps
.withLatestFrom(values)
.sink(receiveValue: { print("withLatestFrom: \($0)") })
taps.send()
taps.send()
values.send("World!")
taps.send()
withLatestFrom: Hello
withLatestFrom: Hello
withLatestFrom: World!
将输出值转换为新的发布者,并展平来自这些多个上游发布者的事件流,使其看起来好像来自单个事件流。
映射到新的发布者将取消对前一个发布者的订阅,仅保留一个活动的订阅及其事件发射。
注意:flatMapLatest
是 map
和 switchToLatest
的组合。
let trigger = PassthroughSubject<Void, Never>()
trigger
.flatMapLatest { performNetworkRequest() }
trigger.send()
trigger.send() // cancels previous request
trigger.send() // cancels previous request
CombineExt 提供了 assign(to:on:)
的自定义重载,可让您同时将发布者绑定到多个 keypath 目标。
var label1: UILabel
var label2: UILabel
var text: UITextField
["hey", "there", "friend"]
.publisher
.assign(to: \.text, on: label1,
and: \.text, on: label2,
and: \.text, on: text)
CombineExt 提供了一个额外的重载 — assign(to:on:ownership)
— 这使您可以指定您希望 assign 操作拥有的所有权类型:strong
、weak
或 unowned
。
// Retain `self` strongly
subscription = subject.assign(to: \.value, on: self)
subscription = subject.assign(to: \.value, on: self, ownership: .strong)
// Use a `weak` reference to `self`
subscription = subject.assign(to: \.value, on: self, ownership: .weak)
// Use an `unowned` reference to `self`
subscription = subject.assign(to: \.value, on: self, ownership: .unowned)
Amb 接受多个发布者,并镜像第一个发出事件的发布者。 您可以将其视为发布者的竞赛,其中第一个发出事件的发布者传递其事件,而其他发布者则被忽略(还有一个 Collection.amb
方法可以简化与多个发布者的协作)。
名称 amb
来自 Reactive Extensions 操作符,在 RxJS 中也称为 race
。
let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()
subject1
.amb(subject2)
.sink(receiveCompletion: { print("amb: completed with \($0)") },
receiveValue: { print("amb: \($0)") })
subject2.send(3) // Since this subject emit first, it becomes the active publisher
subject1.send(1)
subject2.send(6)
subject1.send(8)
subject1.send(7)
subject1.send(completion: .finished)
// Only when subject2 finishes, amb itself finishes as well, since it's the active publisher
subject2.send(completion: .finished)
amb: 3
amb: 6
amb: completed with .finished
将任何发布者转换为其事件的发布者。 给定一个 Publisher<Output, MyError>
,此操作符将返回一个 Publisher<Event<Output, MyError>, Never>
,这意味着您的失败实际上将是一个常规值,这使得在许多用例中错误处理更加简单。
let values = PassthroughSubject<String, MyError>()
enum MyError: Swift.Error {
case ohNo
}
values
.materialize()
.sink(receiveCompletion: { print("materialized: completed with \($0)") },
receiveValue: { print("materialized: \($0)") })
values.send("Hello")
values.send("World")
values.send("What's up?")
values.send(completion: .failure(.ohNo))
materialize: .value("Hello")
materialize: .value("World")
materialize: .value("What's up?")
materialize: .failure(.ohNo)
materialize: completed with .finished
给定一个已实现(materialized)的发布者,仅发布发出的上游值,省略失败。 给定一个 Publisher<Event<String, MyError>, Never>
,此操作符将返回一个 Publisher<String, Never>
。
注意:此操作符仅适用于使用 materialize()
操作符实现的发布者。
let values = PassthroughSubject<String, MyError>()
enum MyError: Swift.Error {
case ohNo
}
values
.materialize()
.values()
.sink(receiveValue: { print("values: \($0)") })
values.send("Hello")
values.send("World")
values.send("What's up?")
values.send(completion: .failure(.ohNo))
values: "Hello"
values: "World"
values: "What's up?"
给定一个已实现(materialized)的发布者,仅发布发出的上游失败,省略值。 给定一个 Publisher<Event<String, MyError>, Never>
,此操作符将返回一个 Publisher<MyError, Never>
。
注意:此操作符仅适用于使用 materialize()
操作符实现的发布者。
let values = PassthroughSubject<String, MyError>()
enum MyError: Swift.Error {
case ohNo
}
values
.materialize()
.failures()
.sink(receiveValue: { print("failures: \($0)") })
values.send("Hello")
values.send("World")
values.send("What's up?")
values.send(completion: .failure(.ohNo))
failure: MyError.ohNo
将先前实现的发布者转换回其原始形式。 给定一个 Publisher<Event<String, MyError>, Never>
,此操作符将返回一个 Publisher<String, MyError>
注意:此操作符仅适用于使用 materialize()
操作符实现的发布者。
将发布者的值分区为两个独立的发布者,分别包含匹配和不匹配提供的谓词的值。
let source = PassthroughSubject<Int, Never>()
let (even, odd) = source.partition { $0 % 2 == 0 }
even.sink(receiveValue: { print("even: \($0)") })
odd.sink(receiveValue: { print("odd: \($0)") })
source.send(1)
source.send(2)
source.send(3)
source.send(4)
source.send(5)
odd: 1
even: 2
odd: 3
even: 4
odd: 5
此仓库包括 Combine 的 Publisher.zip
方法的两个重载(在编写本文时,最多只有三个参数)。
这使您可以任意压缩多个发布者,并接收一个内部发布者输出的数组。
let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let third = PassthroughSubject<Int, Never>()
let fourth = PassthroughSubject<Int, Never>()
subscription = first
.zip(with: second, third, fourth)
.map { $0.reduce(0, +) }
.sink(receiveValue: { print("zipped: \($0)") })
first.send(1)
second.send(2)
third.send(3)
fourth.send(4)
您还可以直接在具有相同输出和失败类型的发布者的集合上使用 .zip()
,例如
[first, second, third, fourth]
.zip()
.map { $0.reduce(0, +) }
.sink(receiveValue: { print("zipped: \($0)") })
zipped: 10
此仓库包含 Collection 的扩展,允许您直接在具有相同输出和失败类型的发布者集合上调用 .merge()
。
这使您可以任意合并多个发布者,并从单个发布者接收内部发布者输出。
let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let third = PassthroughSubject<Int, Never>()
let fourth = PassthroughSubject<Int, Never>()
subscription = [first, second, third, fourth]
.merge()
.sink(receiveValue: { print("output: \($0)") })
first.send(1)
second.send(2)
third.send(3)
fourth.send(4)
output: 1
output: 2
output: 3
output: 4
此仓库包括 Combine 的 Publisher.combineLatest
方法的两个重载(在编写本文时,最多只有三个参数)和一个受约束的 Collection.combineLatest
扩展。
这使您可以任意组合多个发布者,并接收一个内部发布者输出的数组。
let first = PassthroughSubject<Bool, Never>()
let second = PassthroughSubject<Bool, Never>()
let third = PassthroughSubject<Bool, Never>()
let fourth = PassthroughSubject<Bool, Never>()
subscription = [first, second, third, fourth]
.combineLatest()
.sink(receiveValue: { print("combineLatest: \($0)") })
first.send(true)
second.send(true)
third.send(true)
fourth.send(true)
first.send(false)
combineLatest: [true, true, true, true]
combineLatest: [false, true, true, true]
将发布者集合的元素过滤到新的发布者集合中。
let intArrayPublisher = PassthroughSubject<[Int], Never>()
intArrayPublisher
.filterMany { $0.isMultiple(of: 2) }
.sink(receiveValue: { print($0) })
intArrayPublisher.send([10, 2, 4, 3, 8])
none
[10, 2, 4, 8]
将发布者集合的每个元素投影到新的发布者集合形式中。
let intArrayPublisher = PassthroughSubject<[Int], Never>()
intArrayPublisher
.mapMany(String.init)
.sink(receiveValue: { print($0) })
intArrayPublisher.send([10, 2, 2, 4, 3, 8])
["10", "2", "2", "4", "3", "8"]
当 Output
约束为 Never
时,Publisher.setOutputType(to:)
是 .setFailureType(to:)
的类似物。 在调用 .ignoreOutput()
之后链接操作符时,这尤其有用。
Publisher.removeAllDuplicates
和 .removeAllDuplicates(by:)
是 Apple 的 Publisher.removeDuplicates
和 .removeDuplicates(by:)
的更严格形式— 操作符删除所有先前值事件中的重复项,而不是成对删除。
如果您的 Output
不符合 Hashable
或 Equatable
,您可以改用此操作符的基于比较器的版本来决定两个元素是否相等。
subscription = [1, 1, 2, 1, 3, 3, 4].publisher
.removeAllDuplicates()
.sink(receiveValue: { print("removeAllDuplicates: \($0)") })
removeAllDuplicates: 1
removeAllDuplicates: 2
removeAllDuplicates: 3
removeAllDuplicates: 4
与 Publisher.share
类似,.share(replay:)
可用于创建具有引用语义的发布者实例,该实例将预定义数量的值事件重播给更多订阅者。
let subject = PassthroughSubject<Int, Never>()
let replayedPublisher = subject
.share(replay: 3)
subscription1 = replayedPublisher
.sink(receiveValue: { print("first subscriber: \($0)") })
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)
subscription2 = replayedPublisher
.sink(receiveValue: { print("second subscriber: \($0)") })
first subscriber: 1
first subscriber: 2
first subscriber: 3
first subscriber: 4
second subscriber: 2
second subscriber: 3
second subscriber: 4
Publisher.prefix
的重载,它重新发布提供 duration
(以秒为单位)的值,然后完成。
let subject = PassthroughSubject<Int, Never>()
subscription = subject
.prefix(duration: 0.5, on: DispatchQueue.main)
.sink(receiveValue: { print($0) })
subject.send(1)
subject.send(2)
subject.send(3)
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
subject.send(4)
}
1
2
3
Publisher.prefix(while:)
的重载,允许包含第一个不通过 while
谓词的元素。
let subject = PassthroughSubject<Int, Never>()
subscription = subject
.prefix(
while: { $0 % 2 == 0 },
behavior: .inclusive
)
.sink(
receivecompletion: { print($0) },
receiveValue: { print($0) }
)
subject.send(0)
subject.send(2)
subject.send(4)
subject.send(5)
0
2
4
5
finished
切换发布者集合的每个布尔元素。
let subject = PassthroughSubject<Bool, Never>()
subscription = subject
.toggle()
.sink(receiveValue: { print($0) })
subject.send(true)
subject.send(false)
subject.send(true)
false
true
false
将源发布者的元素分组为 N 个连续元素的数组。
let subject = PassthroughSubject<Int, Never>()
subscription = subject
.nwise(3)
.sink(receiveValue: { print($0) })
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)
subject.send(5)
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
将源发布者的元素分组为前一个元素和当前元素的元组
let subject = PassthroughSubject<Int, Never>()
subscription = subject
.pairwise()
.sink(receiveValue: { print("\($0.0) -> \($0.1)") })
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)
subject.send(5)
1 -> 2
2 -> 3
3 -> 4
4 -> 5
同时忽略发布者的值事件并重写其 Output
泛型的简写。
let onlyAFour = ["1", "2", "3"].publisher
.ignoreOutput(setOutputType: Int.self)
.append(4)
CombineExt 提供了几个重载来忽略错误,并可以选择指定新的错误类型以及是否在这种情况下触发完成。
ignoreFailure(completeImmediately:)
ignoreFailure(setFailureType:completeImmediately:)
enum AnError {
case someError
}
let subject = PassthroughSubject<Int, AnError>()
subscription = subject
.ignoreFailure() // The `completeImmediately` parameter defaults to `true`.
.sink(receiveValue: { print($0) }, receiveCompletion: { print($0) })
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(completion: .failure(.someError))
1
2
3
.finished
将类型为 AnyPublisher<Output, Failure>
的发布者转换为 AnyPublisher<Result<Output, Failure>, Never>
enum AnError: Error {
case someError
}
let subject = PassthroughSubject<Int, AnError>()
let subscription = subject
.mapToResult()
.sink(receiveCompletion: { print("completion: \($0)") },
receiveValue: { print("value: \($0)") })
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(completion: .failure(.someError))
value: success(1)
value: success(2)
value: success(3)
value: failure(AnError.someError)
completion: finished
Collection.flatMapBatches(of:)
按批次订阅接收器包含的发布者,并也按批次返回它们的输出(同时保持顺序)。 只有在先前的批次成功完成后,才会订阅后续的发布者批次——任何一个失败都会向下游转发。
let ints = (1...6).map(Just.init)
subscription = ints
.flatMapBatches(of: 2)
.sink(receiveCompletion: { print($0) }, receiveValue: { print($0) })
[1, 2]
[3, 4]
[5, 6]
.finished
本节概述了 CombineExt 提供的一些自定义 Combine 发布者
一个接受带有订阅者参数的闭包的发布者,您可以动态地向该闭包发送值或完成事件。
这使您可以轻松创建自定义发布者来包装任何非发布者异步工作,同时仍然尊重下游消费者的背压需求。
您应该从闭包中返回一个符合 Cancellable
的对象,您可以在其中定义任何在发布者完成或取消对发布者的订阅时执行的清理操作。
AnyPublisher<String, MyError>.create { subscriber in
// Values
subscriber.send("Hello")
subscriber.send("World!")
// Complete with error
subscriber.send(completion: .failure(MyError.someError))
// Or, complete successfully
subscriber.send(completion: .finished)
return AnyCancellable {
// Perform cleanup
}
}
您还可以使用具有相同签名的 AnyPublisher
初始化程序
AnyPublisher<String, MyError> { subscriber in
/// ...
return AnyCancellable { }
CurrentValueRelay
与 CurrentValueSubject
相同,但有两个主要区别
.finished
事件。let relay = CurrentValueRelay<String>("well...")
relay.sink(receiveValue: { print($0) }) // replays current value, e.g. "well..."
relay.accept("values")
relay.accept("only")
relay.accept("provide")
relay.accept("great")
relay.accept("guarantees")
well...
values
only
provide
great
guarantees
PassthroughRelay
与 PassthroughSubject
相同,但有两个主要区别
.finished
事件。let relay = PassthroughRelay<String>()
relay.accept("well...")
relay.sink(receiveValue: { print($0) }) // does not replay past value(s)
relay.accept("values")
relay.accept("only")
relay.accept("provide")
relay.accept("great")
relay.accept("guarantees")
values
only
provide
great
guarantees
Combine 类似于 Rx 的 ReplaySubject
类型。 它类似于 CurrentValueSubject
,因为它会缓冲值,但是,它更进一步,允许消费者指定要缓冲和重播给未来订阅者的值的数量。 此外,它将处理在订阅时清除缓冲区后转发任何完成事件。
let subject = ReplaySubject<Int, Never>(bufferSize: 3)
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)
subject
.sink(receiveValue: { print($0) })
subject.send(5)
2
3
4
5
当然是 MIT ;-) 请参阅 LICENSE 文件。
Apple 徽标和 Combine 框架是 Apple Inc. 的财产。