AsyncSequenceReader
提供了构建模块,可以轻松地使用 Swift 的 AsyncSequence
。
在您的 Package.swift
文件中添加 AsyncSequenceReader
作为依赖项以开始使用它。然后,在任何您希望使用该库的文件中添加 import AsyncSequenceReader
。
请查看 发布 以获取推荐版本。
dependencies: [
.package(url: "https://github.com/mochidev/AsyncSequenceReader.git", .upToNextMinor(from: "0.3.1")),
],
...
targets: [
.target(
name: "MyPackage",
dependencies: [
"AsyncSequenceReader",
]
)
]
AsyncSequenceReader
是一系列构建模块的集合,旨在简化读取信息并将 AsyncSequence
转换为您的应用程序可以理解的数据类型。
尽管可以通过 for await
循环来使用 AsyncSequence
,但这通常不是最简单的数据使用方式
for await byte in url.resourceBytes {
// Buffer enough bytes to read an int
// Buffer the amount of bytes specified by the int to read a frame
// Repeat until all frames are consumed...
}
如果序列化格式比这更复杂,那么编写易于阅读、易于理解且易于维护的代码可能会变得非常困难。
AsyncSequenceReader
提供了 3 个主要工具来帮助您解决这个问题:迭代器映射、计数集合和终止集合。
此软件包提供的最基本的构建模块称为迭代器映射。迭代器映射为您提供了一种一次读取序列中一个值的方法,而无需担心缓冲或状态管理。此外,它们允许您在构建完整对象时返回它们,让应用程序的其他部分在这些对象可用时立即使用它们!
让我们构建一个迭代器映射
struct DataFrame {
var command: String
var payload: [UInt8]
}
let url = ...
let sequence = url.resourceBytes
let results = sequence.iteratorMap { iterator -> DataFrame? in
/// Reads go here
}
// Do something with the results:
for await dataFrame in results {
print(dataFrame)
}
在闭包中,您可以执行以下三件事之一
nil
,表示序列的结束。读取值就像调用 let value = try await iterator.next()
一样简单。此值将与序列的类型匹配,在本例中为 UInt8
。如果值为 nil,则表示您已到达序列的末尾。我们稍后将查看其他读取值的方法。
注意:请抵制在迭代器映射中捕获错误的冲动,因为一旦读取了值,它将不再可用。
返回一个对象将使其可供任何使用结果序列的人使用,从而准备好再次调用您的闭包以获取下一个对象。请注意,除非有东西使用您的 results
序列(通过 for await
,或通过使用 .reduce
或其他 AsyncSequence
方法),否则不会调用您的闭包。
注意:不要将迭代器复制到其他方法而不将其标记为 inout
,因为作为值类型,会创建一个副本,并且进一步的读取可能会变得不同步。
一次在一个迭代器映射中读取值很有用,但通常我们需要缓冲更大量的数据。我们可以通过几种方式来实现这一点
var fourByteSequence = try await iterator.collect(4) // [UInt8, UInt8, UInt8, UInt8]?
var largeSequence = try await iterator.collect(max: 256) // Array of [UInt8]? with a max size of 256, but may be shorter if the sequence had less than 256 characters available.
var limitedSequence = try await iterator.collect(min: 128, max: 256) // Array of [UInt8]? that will throw if less than 128 bytes are available, but will be no larger than 256.
对于最后一个示例,请注意,只有在读取所有字节后,limitedSequence
才会变为可用。即,如果序列仍在进行中,即使现在只有 128 个字节可用,您也不会获得结果。
如果无法收集到最少数量的字节,则会抛出 AsyncSequenceReaderError.insufficientElements
错误。
您还可以使用序列转换将元素收集到另一个异步序列中
var veryLargeSequence = try await iterator.collect(1024*1024*1024) { sequence -> Summary in
let results = sequence.iteratorMap { iterator -> DataFrame? in
guard let values = try await iterator.collect(count: 1024*1024) else { return nil }
return DataFrame(values)
}
let averages = try await results.reduce(into: []) { $0.append($1.average) }
return Summary(averages)
}
在上面的示例中,我们的序列转换使我们可以访问一个最大为 1024*1024*1024
字节(即 1 GB!)的序列。然而,我们没有将数据累积到数组中,而是获得了一个序列,我们可以将迭代器映射附加到该序列,以便我们可以一次处理 1 MB 的数据,并将这些数据组合成 DataFrame
类型。然后,我们可以使用这个转换后的序列,将其归约为计算每个数据帧的平均值,并将这些平均值存储在一个 Summary
对象中。
请注意,整个过程中,一次最多只会使用大约 1 MB 的内存,因为它实际上只会在归约结果时被使用,而归约结果一次只会读取 1 MB 的数据,并且一旦读取了总共 1 GB 的数据就会停止。
终止集合实际上与计数集合的工作方式相同,但它们会读取直到遇到某个元素(或元素序列)为止
var nullTerminatedString = try await iterator.collect(upToIncluding: 0, throwsIfOver: 1024) // [UInt8]?, ending in `\0`
var httpHeaderEntry = try await iterator.collect(upToExcluding: ["\r".asciiValue, "\n".asciiValue], throwsIfOver: 1024) // [UInt8]?, without the `\r\n`
当扫描字符串或其他已知边界时,这尤其有用,允许您获取一个元素数组,其中包含或排除您指定的终止符。
请注意,throwsIfOver
参数是必要的 —— 这是为了防止无限制的读取失控。如果未检测到终止符,或者已达到您的最大元素允许量,则会抛出 AsyncSequenceReaderError.terminationNotFound
错误。
如果您使用序列转换,则可以绕过 throwsIfOver
参数,如果您的算法处理大量数据,这可能是一个更好的选择。如果您提前停止读取,后续请求仍然可以读取元素,从而使您可以更好地控制如何读取数据。
另请注意,如果您使用序列转换,您只能收集到一个序列,直到并包括您的终止符,并且如果从未遇到您的终止符,则不会抛出错误,因为您可以轻松地检查 result.suffix(termination.count) == termination
以自行验证这一点,从而使您有可能处理不同的数据长度。
当您将 AsyncSequenceReader
与 Bytes 结合使用时,它会真正发挥作用,Bytes 是另一个专门用于处理和转换字节序列的软件包。例如,如果您想解码由四字节有效负载大小、空终止的标头字符串和有效负载组成的数据帧,您可以轻松地这样做
struct DataFrame {
var command: String
var payload: [UInt8]
}
let url = ...
let sequence = url.resourceBytes
let results = sequence.iteratorMap { iterator -> DataFrame? in
guard let payloadCountBytes = try await iterator.count(4) else { throw DataFrameError.missingPayloadSize }
var payloadSize = try UInt32(bigEndianBytes: payloadCountBytes)
guard let commandBytes = try await iterator.count(upToExcluding: 0, throwsIfOver: min(256, payloadSize)) else { throw DataFrameError.missingCommand }
let commandString = String(utf8Bytes: commandBytes)
payloadSize -= commandBytes.count - 1 // Don't forget the null byte we skipped
guard let payloadBytes = try await iterator.count(payloadSize) else { throw DataFrameError.missingPayload }
return DataFrame(command: commandString, payload: payloadBytes)
}
// Do something with the results:
for await dataFrame in results {
print(dataFrame)
}
更好的是,Bytes 还支持直接从 AsyncIteratorProtocol
读取,允许您将上述内容简化为
struct DataFrame {
var command: String
var payload: [UInt8]
}
let url = ...
let sequence = url.resourceBytes
let results = sequence.iteratorMap { iterator -> DataFrame? in
var payloadSize = try await iterator.next(bigEndian: UInt32.self)
let commandString = try await iterator.next(utf8: String.self, upToExcluding: 0, throwsIfOver: min(256, payloadSize))
payloadSize -= commandString.utf8.count - 1 // Don't forget the null byte we skipped
let payloadBytes = try await iterator.next(bytes: Bytes.self, count: payloadSize)
return DataFrame(command: commandString, payload: payloadBytes)
}
// Do something with the results:
for await dataFrame in results {
print(dataFrame)
}
有关更多示例,请查看此软件包中提供的单元测试。如果未列出好的示例,请考虑提交 PR 以展示如何完成!
欢迎贡献!请查看已有的 issue,或发起新的讨论以提出新功能。尽管无法保证功能请求,但欢迎符合项目目标并在事先讨论过的 PR!
请确保所有提交都具有清晰的提交历史记录、文档齐全且经过全面测试。请在提交之前 rebase 您的 PR 而不是合并 main
。需要线性历史记录,因此 PR 中将不接受合并提交。