用于 Swift 并发的性能卓越的通道。
不要通过共享内存进行通信;通过通信共享内存
- Rob Pike
通道是一种类型化的管道,您可以通过它发送和接收值 - 通常跨线程,或者在本例中,跨 Swift 异步任务。这个库是模仿 Go 的通道行为而设计的。
如果您熟悉 golang 和 go 生态系统,您可以跳到go 比较部分。
let msg = Channel<String>(capacity: 3)
let done = Channel<Bool>()
Task {
for await message in msg {
print(message)
}
await done <- true
}
await msg <- "Swift"
await msg <- "❤️"
await msg <- "Channels"
msg.close()
await <-done
显然,达到 Go 的速度是一个崇高的目标,我们可能永远无法实现,但它仍然非常快!
以上结果来自详细基准测试结果中的 Int
测试
有关更详细的结果(关于各种数据类型),请参阅基准测试自述文件。
https://github.com/gh123man/Async-Channels
作为 Swift 包依赖项添加到您的项目中。import AsyncChannels
然后开始吧!非缓冲通道
// Create an un-buffered channel
let msg = Channel<String>()
Task {
// Send a value. Send will suspend until the channel is read.
await msg <- "foo"
}
Task {
// Receive a value. Receive will suspend until a value is ready
let foo = await <-msg
}
缓冲通道
// Create a buffered channel that can hold 2 items
let msg = Channel<String>(capacity: 2)
// Writing to a buffered channel will not suspend until the channel is full
await msg <- "foo"
await msg <- "bar"
// Messages are received in the order they are sent
print(await <-msg) // foo
print(await <-msg) // bar
// The msg channel is now empty.
可以关闭通道。在 Swift 中,通道接收 (<-
) 运算符返回 T?
,因为当通道关闭时,通道读取将返回 nil
。如果尝试写入已关闭的通道,则会触发 fatalError。
let a = Channel<String>()
let done = Channel<Bool>()
Task {
// a will suspend because there is nothing to receive
await <-a
await done <- true
}
// Close will send `nil` causing a to resume in the task above
a.close()
// done is signaled
await <-done
Channel
实现了 AsyncSequence
,所以你可以这样写
let a = Channel<String>()
for await message in a {
print(message)
}
当通道关闭时,循环将中断。
select
允许单个任务等待多个通道操作。 select
将暂停,直到至少有一个 case 准备好。 如果多个 case 准备就绪,它将随机选择一个。
receive(c)
接收一个值,但不做任何处理。
receive(c) { v in ... }
接收一个值,并对其进行一些处理。
send("foo", to: c)
发送一个值,不做任何处理。
send("foo", to: c) { ... }
如果发送成功,则运行一些代码。
none { ... }
如果没有任何通道操作准备就绪,则执行 none 的代码。
any(x1, x2, ...) { x in ... }
或 any(seq) { el in ... }
对序列进行操作,可用于对多个通道执行相同的操作。
let c = Channel<String>()
let d = Channel<String>()
Task {
await c <- "foo"
await d <- "bar"
}
// Will print foo or bar
await select {
receive(d) { print($0!) }
receive(c) { print($0!) }
}
let a = Channel<String>(capacity: 10)
let b = Channel<String>(capacity: 10)
// Fill up channel a
for _ in (0..<10) {
await a <- "a"
}
for _ in (0..<20) {
await select {
// receive from a and print it
receive(a) { print($0!) }
// send "b" to b
send("b", to: b)
// if both a and b suspend, print "NONE"
none {
print("NONE")
}
}
}
此库还包括 WaitGroup
实现。 当您想等待多个任务完成时,等待组很有用。
let wg = WaitGroup()
let signal = Channel<Bool>()
let done = Channel<Bool>()
// Task that drains the signal channel
Task {
for await _ in signal { }
await done <- true
}
// 100 workers that write to the signal channel
for _ in 0..<100 {
await wg.add(1)
Task {
await signal <- true
await wg.done()
}
}
// When all workers are done - signal is drained, so wg will be done.
await wg.wait()
// Closing the signal channel means it's empty, so done is signaled.
signal.close()
await <-done
这个库还包括一些额外的功能,这些功能是通过 Swift 的 resultBuilder
的灵活性实现的。
使用 select any
复用 n:1
通道
let channels = (0..<100).map { _ in Channel<Bool>() }
let collected = Channel<Bool>()
// 100 tasks writing to 100 channels
for c in channels {
Task {
await c <- true
}
}
// 1 task recieving from 100 channels and writing the results to 1 channel.
Task {
for _ in 0..<100 {
await select {
any(channels) { channel in
receive(channel) { val in
await collected <- val!
}
}
}
}
collected.close()
}
var sum = 0
for await _ in collected {
sum += 1
}
条件 cases
let a = Channel<String>()
let b = Channel<String>()
Task {
await a <- "foo"
}
var enableRecieve = true
await select {
if enableRecieve {
receive(a) { await result <- $0! }
}
send("b", to: b)
}
有关实际用法的示例,请参阅Examples文件夹。
如果没有 forums.swift.org 的朋友和 github 上的贡献者的帮助,我不可能走到今天。 非常感谢
LinkedList
后备数据结构any
函数。