PubFactory

使用 Apple 的 Combine 框架创建自定义发布者需要编写许多行代码。但是,自定义发布者之间的代码通常非常相似。在 Combine 中,只有 Future 类允许仅从一个简单的闭包构建发布者。不幸的是,Future 不允许取消该闭包内的任务。此外,Future 只允许一个输出项。

PubFactory 提供了 Create 类来构建完全可取消且可以发出项目流的自定义发布者。

概念

每个异步调用都可以使用 PubFactory.Create 轻松转换为自定义发布者。在其最简单的形式中,PubFactory.Create 使用闭包进行初始化。该闭包有一个参数(订阅者),使闭包能够向下游连接的订阅者发送项目和完成(完成或失败)。闭包的唯一任务是启动异步操作并返回能够停止此操作的可取消对象。Create 类需要两个泛型:发布者的输出类型及其错误类型。

let publisher = Create<Int, Never> { subscriber in
    // Start asynchronous operation
    return AnyCancellable {
        // Action to stop asynchronous operation
    }
}

异步操作可以使用 subscriber 对象来通知下游的订阅者。以下代码将整数序列 4、2、7 发送到连接的订阅者。最后,它完成流,没有失败。

subscriber.receive(4)
subscriber.receive(2)
subscriber.receive(7)
subscriber.receive(completion: .finished)

示例:URLSession

让我们看一个例子。我们通过使用 PubFactory.Create 来实现我们自己的 URLSession.dataTaskPublisher 版本

extension URLSession {
    func myDataTaskPublisher(for url: URL) -> Create<Data?, Error> {
        Create<Data?, Error> { subscriber in
            let task = URLSession.shared.dataTask(with: url) { dataOrNil, _, errorOrNil in
                if let error = errorOrNil {
                    subscriber.receive(completion: .failure(error))
                }
                else {
                    subscriber.receive(dataOrNil)
                    subscriber.receive(completion: .finished)
                }
            }
            task.resume()
            return AnyCancellable { task.cancel() }
        }
    }
}

在此示例中,闭包创建并启动 URLSession.dataTask 以加载网站。如果网站内容已加载或发生错误,则会调用数据任务本身的完成闭包。如果发生错误,闭包会通过调用以下代码向订阅者发送失败

subscriber.receive(completion: .failure(error))

如果没有发生错误,它会将数据发送给订阅者,并在之后立即完成流

subscriber.receive(dataOrNil)
subscriber.receive(completion: .finished)

闭包返回一个可取消对象,该对象在订阅取消后立即停止任务

return AnyCancellable { task.cancel() }

在这种特殊情况下,您可以使用 Combine 框架中的 Future。使用 Future 的缺点是,数据任务无法取消。

Future<Data?, Error> { subscriber in
    let task = URLSession.shared.dataTask(with: url) { dataOrNil, _, errorOrNil in
        if let error = errorOrNil {
            subscriber(.failure(error))
        }
        else {
            subscriber(.success(dataOrNil))
        }
    }
    task.resume()
}

示例:数字生成器

在以下示例中,我们定义了一个发布者,该发布者发送从 0 开始递增的整数数字。

let numberGenerator = Create<Int, Never> { subscriber in
    thread = Thread {
        var i = 0
        while !Thread.current.isCancelled {
            subscriber.receive(i)
            i += 1
        }
    }
    thread?.start()
    return AnyCancellable { thread?.cancel() }
}

为此,闭包启动自己的线程,该线程递增变量 i。每次更改时,i 的内容都会发送到 subscriber。此过程一直持续到线程被取消。因此,闭包返回一个 AnyCancellable,如果订阅已取消,它将停止线程。

非常重要的是,闭包不会阻塞。如果它会阻塞,那么应该停止闭包内任务的 Cancellable 将永远不会被构造。因此,以下代码是无效的

let numberGenerator = Create<Int, Never> { subscriber in
    var isCancelled = false
    var i = 0
    while !isCancelled {
        subscriber.receive(i)
        i += 1
    }
    return AnyCancellable { isCancelled = true }
}

Create 的背压

Combine 具有集成的背压处理。Create 也提供基本的背压处理。如果订阅者的需求为 .none,则 subscriber.receive(:) 发送的所有项目都将被简单地忽略,直到需求再次增加。

带有上下文的 Create

还有第二种可能性可以使用 Create 创建发布者。它是为像数字生成器这样的用例而设计的。使用带有两个参数的闭包。第一个参数是 subscriber,第二个参数是 context。此闭包不返回可取消对象,因为它期望闭包根据 context 的状态自行停止

let publisher = Create<Int, Never> { subscriber, context in
    var i = 0
    while !context.cancelled {
        subscriber.receive(i)
        i += 1
    }
}

context 提供有关订阅状态的信息

protocol Context {
    var paused: Bool { get }
    var cancelled: Bool { get }
    func waitIfPaused()
}

它还提供背压支持。如果订阅者的需求为 .none,则可以通过添加一个命令来轻松地临时停止数字生成器

let publisher = Create<Int, Never> { subscriber, context in
    var i = 0
    while !context.cancelled {
        context.waitIfPaused()
        subscriber.receive(i)
        i += 1
    }
}

用法

该软件包可以使用 Swift Package Manager 进行安装。

许可证

MIT 许可证

版权所有 2020 Patrick Sturm

特此授予许可,免费授予任何获得本软件和相关文档文件(“软件”)副本的人员,在不受限制的情况下处理本软件,包括但不限于使用、复制、修改、合并、发布、分发、再许可和/或销售软件副本的权利,并允许向其提供软件的人员这样做,但须符合以下条件

上述版权声明和本许可声明应包含在软件的所有副本或主要部分中。

本软件按“原样”提供,不提供任何形式的明示或暗示的保证,包括但不限于适销性、特定用途的适用性和不侵权的保证。在任何情况下,作者或版权持有者均不对任何索赔、损害或其他责任负责,无论是在合同诉讼、侵权行为或其他方面,由软件或软件的使用或其他交易引起、产生或与之相关