AsyncSequenceReader

Test Status

AsyncSequenceReader 提供了构建模块,可以轻松地使用 Swift 的 AsyncSequence

快速链接

安装

在您的 Package.swift 文件中添加 AsyncSequenceReader 作为依赖项以开始使用它。然后,在任何您希望使用该库的文件中添加 import AsyncSequenceReader

请查看 发布 以获取推荐版本。

dependencies: [
    .package(url: "https://github.com/mochidev/AsyncSequenceReader.git", .upToNextMinor(from: "0.3.1")),
],
...
targets: [
    .target(
        name: "MyPackage",
        dependencies: [
            "AsyncSequenceReader",
        ]
    )
]

什么是 AsyncSequenceReader

AsyncSequenceReader 是一系列构建模块的集合,旨在简化读取信息并将 AsyncSequence 转换为您的应用程序可以理解的数据类型。

尽管可以通过 for await 循环来使用 AsyncSequence,但这通常不是最简单的数据使用方式

for await byte in url.resourceBytes {
    // Buffer enough bytes to read an int
    // Buffer the amount of bytes specified by the int to read a frame
    // Repeat until all frames are consumed...
}

如果序列化格式比这更复杂,那么编写易于阅读、易于理解且易于维护的代码可能会变得非常困难。

AsyncSequenceReader 提供了 3 个主要工具来帮助您解决这个问题:迭代器映射、计数集合和终止集合。

迭代器映射

此软件包提供的最基本的构建模块称为迭代器映射。迭代器映射为您提供了一种一次读取序列中一个值的方法,而无需担心缓冲或状态管理。此外,它们允许您在构建完整对象时返回它们,让应用程序的其他部分在这些对象可用时立即使用它们!

让我们构建一个迭代器映射

struct DataFrame {
    var command: String
    var payload: [UInt8]
}

let url = ...
let sequence = url.resourceBytes

let results = sequence.iteratorMap { iterator -> DataFrame? in
    /// Reads go here
}

// Do something with the results:
for await dataFrame in results {
    print(dataFrame)
}

在闭包中,您可以执行以下三件事之一

读取值就像调用 let value = try await iterator.next() 一样简单。此值将与序列的类型匹配,在本例中为 UInt8。如果值为 nil,则表示您已到达序列的末尾。我们稍后将查看其他读取值的方法。

注意:请抵制在迭代器映射中捕获错误的冲动,因为一旦读取了值,它将不再可用。

返回一个对象将使其可供任何使用结果序列的人使用,从而准备好再次调用您的闭包以获取下一个对象。请注意,除非有东西使用您的 results 序列(通过 for await,或通过使用 .reduce 或其他 AsyncSequence 方法),否则不会调用您的闭包。

注意:不要将迭代器复制到其他方法而不将其标记为 inout,因为作为值类型,会创建一个副本,并且进一步的读取可能会变得不同步。

计数集合

一次在一个迭代器映射中读取值很有用,但通常我们需要缓冲更大量的数据。我们可以通过几种方式来实现这一点

var fourByteSequence = try await iterator.collect(4) // [UInt8, UInt8, UInt8, UInt8]?
var largeSequence = try await iterator.collect(max: 256) // Array of [UInt8]? with a max size of 256, but may be shorter if the sequence had less than 256 characters available.
var limitedSequence = try await iterator.collect(min: 128, max: 256) // Array of [UInt8]? that will throw if less than 128 bytes are available, but will be no larger than 256.

对于最后一个示例,请注意,只有在读取所有字节后,limitedSequence 才会变为可用。即,如果序列仍在进行中,即使现在只有 128 个字节可用,您也不会获得结果。

如果无法收集到最少数量的字节,则会抛出 AsyncSequenceReaderError.insufficientElements 错误。

您还可以使用序列转换将元素收集到另一个异步序列中

var veryLargeSequence = try await iterator.collect(1024*1024*1024) { sequence -> Summary in
    let results = sequence.iteratorMap { iterator -> DataFrame? in
        guard let values = try await iterator.collect(count: 1024*1024) else { return nil }
        
        return DataFrame(values)
    }
    
    let averages = try await results.reduce(into: []) { $0.append($1.average) }
    return Summary(averages)
}

在上面的示例中,我们的序列转换使我们可以访问一个最大为 1024*1024*1024 字节(即 1 GB!)的序列。然而,我们没有将数据累积到数组中,而是获得了一个序列,我们可以将迭代器映射附加到该序列,以便我们可以一次处理 1 MB 的数据,并将这些数据组合成 DataFrame 类型。然后,我们可以使用这个转换后的序列,将其归约为计算每个数据帧的平均值,并将这些平均值存储在一个 Summary 对象中。

请注意,整个过程中,一次最多只会使用大约 1 MB 的内存,因为它实际上只会在归约结果时被使用,而归约结果一次只会读取 1 MB 的数据,并且一旦读取了总共 1 GB 的数据就会停止。

终止集合

终止集合实际上与计数集合的工作方式相同,但它们会读取直到遇到某个元素(或元素序列)为止

var nullTerminatedString = try await iterator.collect(upToIncluding: 0, throwsIfOver: 1024) // [UInt8]?, ending in `\0`
var httpHeaderEntry = try await iterator.collect(upToExcluding: ["\r".asciiValue, "\n".asciiValue], throwsIfOver: 1024) // [UInt8]?, without the `\r\n`

当扫描字符串或其他已知边界时,这尤其有用,允许您获取一个元素数组,其中包含或排除您指定的终止符。

请注意,throwsIfOver 参数是必要的 —— 这是为了防止无限制的读取失控。如果未检测到终止符,或者已达到您的最大元素允许量,则会抛出 AsyncSequenceReaderError.terminationNotFound 错误。

如果您使用序列转换,则可以绕过 throwsIfOver 参数,如果您的算法处理大量数据,这可能是一个更好的选择。如果您提前停止读取,后续请求仍然可以读取元素,从而使您可以更好地控制如何读取数据。

另请注意,如果您使用序列转换,您只能收集到一个序列,直到并包括您的终止符,并且如果从未遇到您的终止符,则不会抛出错误,因为您可以轻松地检查 result.suffix(termination.count) == termination 以自行验证这一点,从而使您有可能处理不同的数据长度。

与 Bytes 集成

当您将 AsyncSequenceReaderBytes 结合使用时,它会真正发挥作用,Bytes 是另一个专门用于处理和转换字节序列的软件包。例如,如果您想解码由四字节有效负载大小、空终止的标头字符串和有效负载组成的数据帧,您可以轻松地这样做

struct DataFrame {
    var command: String
    var payload: [UInt8]
}

let url = ...
let sequence = url.resourceBytes

let results = sequence.iteratorMap { iterator -> DataFrame? in
    guard let payloadCountBytes = try await iterator.count(4) else { throw DataFrameError.missingPayloadSize }
    var payloadSize = try UInt32(bigEndianBytes: payloadCountBytes)
    
    guard let commandBytes = try await iterator.count(upToExcluding: 0, throwsIfOver: min(256, payloadSize)) else { throw DataFrameError.missingCommand }
    let commandString = String(utf8Bytes: commandBytes)
    payloadSize -= commandBytes.count - 1 // Don't forget the null byte we skipped
    
    guard let payloadBytes = try await iterator.count(payloadSize) else { throw DataFrameError.missingPayload }
    
    return DataFrame(command: commandString, payload: payloadBytes)
}

// Do something with the results:
for await dataFrame in results {
    print(dataFrame)
}

更好的是,Bytes 还支持直接从 AsyncIteratorProtocol 读取,允许您将上述内容简化为

struct DataFrame {
    var command: String
    var payload: [UInt8]
}

let url = ...
let sequence = url.resourceBytes

let results = sequence.iteratorMap { iterator -> DataFrame? in
    var payloadSize = try await iterator.next(bigEndian: UInt32.self)
    
    let commandString = try await iterator.next(utf8: String.self, upToExcluding: 0, throwsIfOver: min(256, payloadSize))
    payloadSize -= commandString.utf8.count - 1 // Don't forget the null byte we skipped
    
    let payloadBytes = try await iterator.next(bytes: Bytes.self, count: payloadSize)
    
    return DataFrame(command: commandString, payload: payloadBytes)
}

// Do something with the results:
for await dataFrame in results {
    print(dataFrame)
}

更多

有关更多示例,请查看此软件包中提供的单元测试。如果未列出好的示例,请考虑提交 PR 以展示如何完成!

贡献

欢迎贡献!请查看已有的 issue,或发起新的讨论以提出新功能。尽管无法保证功能请求,但欢迎符合项目目标并在事先讨论过的 PR!

请确保所有提交都具有清晰的提交历史记录、文档齐全且经过全面测试。请在提交之前 rebase 您的 PR 而不是合并 main。需要线性历史记录,因此 PR 中将不接受合并提交。