Conbini 为便利的 Publisher
、操作符和 Subscriber
提供了榨取 Apple Combine 框架最大价值的方法。
要使用此库,您需要
Conbini
添加到您的项目中。
// swift-tools-version:5.2
import PackageDescription
let package = Package(
/* Your package name, supported platforms, and generated products go here */
dependencies: [
.package(url: "https://github.com/dehesa/package-conbini.git", from: "0.6.2")
],
targets: [
.target(name: /* Your target name here */, dependencies: ["Conbini"])
]
)
如果您想使用 Conbini 的 testing 扩展,您需要在您的 SPM targets 或 testing targets 中定义 CONBINI_FOR_TESTING
标志。Conbini 的 testing 扩展需要 XCTest
,它在某些平台上(例如 watchOS)的运行时不可用,或者您可能不想链接到这样的动态库(例如,在构建命令行工具时)。
targets: [
.testTarget(name: /* Your target name here */, dependencies: ["Conbini"], swiftSettings: [.define("CONBINI_FOR_TESTING")])
]
Conbini
。
import Conbini
testing 便利性取决于 XCTest,它在常规执行中不可用。 这就是 Conbini 提供两种风格的原因
import Conbini
包含除 testing 便利性之外的所有代码。import ConbiniForTesting
仅包含 testing 功能。Publisher 操作符
handleEnd(_:)
当 publisher 完成(无论成功或失败)或 publisher 被取消时,执行(仅一次)提供的闭包。
它执行与标准 handleEvents(receiveSubscription:receiveOutput:receiveCompletion:receiveCancel:receiveRequest:)
相同的操作,如果您将类似的闭包添加到 receiveCompletion
和 receiveCancel
中。
let publisher = upstream.handleEnd { (completion) in
switch completion {
case .none: // The publisher got cancelled.
case .finished: // The publisher finished successfully.
case .failure(let error): // The publisher generated an error.
}
}
retry(on:intervals:)
尝试使用上游 publisher 重新创建失败的订阅,并在失败的尝试之间等待指定的秒数,重复给定的次数。
let apiCallPublisher.retry(on: queue, intervals: [0.5, 2, 5])
// Same functionality to retry(3), but waiting between attemps 0.5, 2, and 5 seconds after each failed attempt.
此操作符接受任何符合 Scheduler
的调度器(例如 DispatchQueue
、RunLoop
等)。您还可以选择调整容错度和调度器操作。
then(maxDemand:_:)
忽略所有值,并在收到成功完成时执行提供的 publisher。 如果发出失败的完成,它将向下游转发。
let publisher = setConfigurationOnServer.then {
subscribeToWebsocket.publisher
}
此操作符可以选择使用其 maxDemand
参数来控制背压。 该参数的行为类似于 flatMap
的 maxPublishers
,它指定在任何给定时间向上游请求的最大需求。
Subscriber 操作符
assign(to:on:)
变体。
当 "on" 对象也持有 publisher 的 cancellable 时,Combine 的 assign(to:on:)
操作会创建内存循环。 当将值分配给 self
时,通常会发生这种情况。
class CustomObject {
var value: Int = 0
var cancellable: AnyCancellable? = nil
func performOperation() {
cancellable = numberPublisher.assign(to: \.value, on: self)
}
}
Conbini 的 assign(to:onWeak:)
操作符以弱引用指向给定的对象,并具有在对象被反初始化时取消管道的额外好处。
Conbini 还引入了 assign(to:onUnowned:)
操作符,它也避免了内存循环,但使用了 unowned
代替。
await
同步等待接收 publisher 的响应。
let publisher = Just("Hello")
.delay(for: 2, scheduler: DispatchQueue.global())
let greeting = publisher.await
同步等待是通过 DispatchGroup
执行的。请考虑您在哪里使用 await
,因为执行队列会停止并等待答案
DispatchQueue.main
或任何其他正在执行任何后台任务的队列中调用此属性。invoke(_:on:)
变体。
此操作符在给定的值/引用上调用指定的函数,并传递上游值。
struct Custom {
func performOperation(_ value: Int) { /* do something */ }
}
let instance = Custom()
let cancellable = [1, 2, 3].publisher.invoke(Custom.performOperation, on: instance)
Conbini 还提供了变体 invoke(_:onWeak:)
和 invoke(_:onUnowned:)
,它们避免了引用类型上的内存循环。
result(onEmpty:_:)
它订阅接收 publisher 并在收到值时执行提供的闭包。 如果发生故障,则使用该故障执行处理程序。
let cancellable = serverRequest.result { (result) in
switch result {
case .success(let value): ...
case .failure(let error): ...
}
}
该操作符允许您选择为上游在没有值的情况下完成的情况生成错误(该错误将由您的 handler
消耗)。
sink(fixedDemand:)
它订阅上游并请求精确的 fixedDemand
值(之后 subscriber 完成)。 subscriber 可能会在完成之前收到零到 fixedDemand
的值,但永远不会超过该值。
let cancellable = upstream.sink(fixedDemand: 5, receiveCompletion: { (completion) in ... }) { (value) in ... }
sink(maxDemand:)
它订阅上游请求 maxDemand
值并始终保持相同的背压。
let cancellable = upstream.sink(maxDemand: 3) { (value) in ... }
Deferred
变体。
这些 publishers 接受一个闭包,该闭包在请求 *大于零* 的需求时执行。 有几种风格
DeferredValue
发出一个单值,然后完成。
该值未提供/缓存,而是由闭包生成。 该闭包在收到肯定订阅后执行一次。
let publisher = DeferredValue<Int,CustomError> {
return intenseProcessing()
}
还提供了 Try
变体,使您可以从闭包中 throw
。 它失去了具体的错误类型(即,它被转换为 Swift.Error
)。
DeferredResult
根据生成的 Result
向下游转发一个值或一个故障。
let publisher = DeferredResult {
guard someExpression else { return .failure(CustomError()) }
return .success(someValue)
}
DeferredComplete
转发完成事件(无论是成功还是失败)。
let publisher = DeferredComplete {
return errorOrNil
}
还提供了 Try
变体,使您可以从闭包中 throw
;但它失去了具体的错误类型(即,被转换为 Swift.Error
)。
DeferredPassthrough
在闭包中提供一个 passthrough subject,用于向下游发送值。
它类似于在 Deferred
闭包上包装 Passthrough
subject,不同之处在于闭包上给定的 Passthrough
已经*连接*在 publisher 链上,并且可以立即开始发送值。 此外,内存管理由 Conbini 负责,每个新 subscriber 都会收到一个新的 subject(闭包重新执行)。
let publisher = DeferredPassthrough { (subject) in
subject.send(something)
subject.send(somethingElse)
subject.send(completion: .finished)
}
存在这些 publishers 而不是使用其他 Combine
提供的闭包(例如 Just
、Future
或 Deferred
)有几个原因
Just
立即转发一个值,每个新 subscriber 总是收到相同的值。Future
立即(在初始化时)执行其闭包,然后缓存返回的值。 然后为任何未来的订阅转发该值。Deferred...
publishers 在执行之前等待订阅和 *大于零* 的需求。 这也意味着,闭包将为任何新的 subscriber 重新执行。Deferred
具有与 Conbini 类似的功能,但它只接受 publisher。DelayedRetry
它提供了 retry(on:intervals:)
操作符的功能。
Then
它提供了 then
操作符的功能。
HandleEnd
它提供了 handleEnd(_:)
操作符的功能。
额外功能
Publishers.PrefetchStrategy
它已扩展了 .fatalError(message:file:line:)
选项,以在缓冲区已满时停止执行。 这在开发和调试期间以及在您确定缓冲区永远不会被填满的情况下非常有用。
publisher.buffer(size: 10, prefetch: .keepFull, whenFull: .fatalError())
FixedSink
它在订阅时请求固定数量的值,一旦它收到了所有这些值,它就会完成/取消管道。 这些值是通过背压请求的,因此上游生成的数值不会超过允许的数量。
let subscriber = FixedSink(demand: 5) { (value) in ... }
upstream.subscribe(subscriber)
GraduatedSink
它在订阅时请求固定数量的值,并通过在输入接收时请求一个附加值来始终保持相同的需求。 标准的 Subscribers.Sink
在订阅时请求 .unlimited
数量的值。 这可能不是我们想要的,因为有时可能需要控制正在进行的值(例如,只允许同时进行 *n* 个正在进行的 *API 调用)。
let subscriber = GraduatedSink(maxDemand: 3) { (value) in ... }
upstream.subscribe(subscriber)
这些 subscribers 的名称不是很好/准确。 欢迎任何建议。
Conbini 提供了便利的 subscribers 以简化代码 testing。 这些 subscribers 使测试等待直到满足特定期望(或者使测试在负面情况下失败)。 此外,如果超时时间过长或未满足期望,受影响的测试行将在 Xcode 中正确地标记为 *红色*。
expectsAll(timeout:on:)
它订阅 publisher,使运行测试等待零个或多个值和一个成功完成。
let emittedValues = publisherChain.expectsAll(timeout: 0.8, on: test)
expectsAtLeast(timeout:on:)
它订阅 publisher,使运行测试等待至少提供的值数量。 一旦收到提供的值数量,publisher 就会被取消,并且值被返回。
let emittedValues = publisherChain.expectsAtLeast(values: 5, timeout: 0.8, on: test)
此操作符/subscriber 接受一个可选的闭包来检查收到的每个值。
let emittedValues = publisherChain.expectsAtLeast(values: 5, timeout: 0.8, on: test) { (value) in
XCTAssert...
}
expectsCompletion(timeout:on:)
它订阅 publisher,使运行测试等待成功完成,同时忽略所有发出的值。
publisherChain.expectsCompletion(timeout: 0.8, on: test)
expectsFailure(timeout:on:)
它订阅 publisher,使运行测试等待失败完成,同时忽略所有发出的值。
publisherChain.expectsFailure(timeout: 0.8, on: test)
expectsOne(timeout:on:)
它订阅 publisher,使运行测试等待单个值和一个成功完成。 如果发出多个值或者 publisher 失败,则订阅将被取消,并且测试失败。
let emittedValue = publisherChain.expectsOne(timeout: 0.8, on: test)
XCTestCase
已被 *扩展* 以支持以下功能。
wait(seconds:)
锁定接收测试 interval
秒。
final class CustomTests: XCTestCase {
func testSomething() {
let subject = PassthroughSubject<Int,Never>()
let cancellable = subject.sink { print($0) }
let queue = DispatchQueue.main
queue.asyncAfter(.now() + 1) { subject.send(1) }
queue.asyncAfter(.now() + 2) { subject.send(2) }
self.wait(seconds: 3)
cancellable.cancel()
}
}