使用 Apple 的 Combine 框架创建自定义发布者需要编写许多行代码。但是,自定义发布者之间的代码通常非常相似。在 Combine 中,只有 Future
类允许仅从一个简单的闭包构建发布者。不幸的是,Future
不允许取消该闭包内的任务。此外,Future
只允许一个输出项。
PubFactory
提供了 Create
类来构建完全可取消且可以发出项目流的自定义发布者。
每个异步调用都可以使用 PubFactory.Create
轻松转换为自定义发布者。在其最简单的形式中,PubFactory.Create
使用闭包进行初始化。该闭包有一个参数(订阅者),使闭包能够向下游连接的订阅者发送项目和完成(完成或失败)。闭包的唯一任务是启动异步操作并返回能够停止此操作的可取消对象。Create
类需要两个泛型:发布者的输出类型及其错误类型。
let publisher = Create<Int, Never> { subscriber in
// Start asynchronous operation
return AnyCancellable {
// Action to stop asynchronous operation
}
}
异步操作可以使用 subscriber
对象来通知下游的订阅者。以下代码将整数序列 4、2、7 发送到连接的订阅者。最后,它完成流,没有失败。
subscriber.receive(4)
subscriber.receive(2)
subscriber.receive(7)
subscriber.receive(completion: .finished)
让我们看一个例子。我们通过使用 PubFactory.Create
来实现我们自己的 URLSession.dataTaskPublisher
版本
extension URLSession {
func myDataTaskPublisher(for url: URL) -> Create<Data?, Error> {
Create<Data?, Error> { subscriber in
let task = URLSession.shared.dataTask(with: url) { dataOrNil, _, errorOrNil in
if let error = errorOrNil {
subscriber.receive(completion: .failure(error))
}
else {
subscriber.receive(dataOrNil)
subscriber.receive(completion: .finished)
}
}
task.resume()
return AnyCancellable { task.cancel() }
}
}
}
在此示例中,闭包创建并启动 URLSession.dataTask
以加载网站。如果网站内容已加载或发生错误,则会调用数据任务本身的完成闭包。如果发生错误,闭包会通过调用以下代码向订阅者发送失败
subscriber.receive(completion: .failure(error))
如果没有发生错误,它会将数据发送给订阅者,并在之后立即完成流
subscriber.receive(dataOrNil)
subscriber.receive(completion: .finished)
闭包返回一个可取消对象,该对象在订阅取消后立即停止任务
return AnyCancellable { task.cancel() }
在这种特殊情况下,您可以使用 Combine 框架中的 Future
。使用 Future
的缺点是,数据任务无法取消。
Future<Data?, Error> { subscriber in
let task = URLSession.shared.dataTask(with: url) { dataOrNil, _, errorOrNil in
if let error = errorOrNil {
subscriber(.failure(error))
}
else {
subscriber(.success(dataOrNil))
}
}
task.resume()
}
在以下示例中,我们定义了一个发布者,该发布者发送从 0 开始递增的整数数字。
let numberGenerator = Create<Int, Never> { subscriber in
thread = Thread {
var i = 0
while !Thread.current.isCancelled {
subscriber.receive(i)
i += 1
}
}
thread?.start()
return AnyCancellable { thread?.cancel() }
}
为此,闭包启动自己的线程,该线程递增变量 i
。每次更改时,i
的内容都会发送到 subscriber
。此过程一直持续到线程被取消。因此,闭包返回一个 AnyCancellable
,如果订阅已取消,它将停止线程。
非常重要的是,闭包不会阻塞。如果它会阻塞,那么应该停止闭包内任务的 Cancellable 将永远不会被构造。因此,以下代码是无效的
let numberGenerator = Create<Int, Never> { subscriber in
var isCancelled = false
var i = 0
while !isCancelled {
subscriber.receive(i)
i += 1
}
return AnyCancellable { isCancelled = true }
}
Combine 具有集成的背压处理。Create
也提供基本的背压处理。如果订阅者的需求为 .none
,则 subscriber.receive(:)
发送的所有项目都将被简单地忽略,直到需求再次增加。
还有第二种可能性可以使用 Create
创建发布者。它是为像数字生成器这样的用例而设计的。使用带有两个参数的闭包。第一个参数是 subscriber
,第二个参数是 context
。此闭包不返回可取消对象,因为它期望闭包根据 context
的状态自行停止
let publisher = Create<Int, Never> { subscriber, context in
var i = 0
while !context.cancelled {
subscriber.receive(i)
i += 1
}
}
context
提供有关订阅状态的信息
protocol Context {
var paused: Bool { get }
var cancelled: Bool { get }
func waitIfPaused()
}
它还提供背压支持。如果订阅者的需求为 .none
,则可以通过添加一个命令来轻松地临时停止数字生成器
let publisher = Create<Int, Never> { subscriber, context in
var i = 0
while !context.cancelled {
context.waitIfPaused()
subscriber.receive(i)
i += 1
}
}
该软件包可以使用 Swift Package Manager 进行安装。
MIT 许可证
版权所有 2020 Patrick Sturm
特此授予许可,免费授予任何获得本软件和相关文档文件(“软件”)副本的人员,在不受限制的情况下处理本软件,包括但不限于使用、复制、修改、合并、发布、分发、再许可和/或销售软件副本的权利,并允许向其提供软件的人员这样做,但须符合以下条件
上述版权声明和本许可声明应包含在软件的所有副本或主要部分中。
本软件按“原样”提供,不提供任何形式的明示或暗示的保证,包括但不限于适销性、特定用途的适用性和不侵权的保证。在任何情况下,作者或版权持有者均不对任何索赔、损害或其他责任负责,无论是在合同诉讼、侵权行为或其他方面,由软件或软件的使用或其他交易引起、产生或与之相关