异步通道

用于 Swift 并发的性能卓越的通道。

不要通过共享内存进行通信;通过通信共享内存

- Rob Pike

通道是一种类型化的管道,您可以通过它发送和接收值 - 通常跨线程,或者在本例中,跨 Swift 异步任务。这个库是模仿 Go 的通道行为而设计的。

如果您熟悉 golang 和 go 生态系统,您可以跳到go 比较部分。

示例

let msg = Channel<String>(capacity: 3)
let done = Channel<Bool>()

Task {
    for await message in msg {
        print(message)
    }
    await done <- true
}

await msg <- "Swift"
await msg <- "❤️"
await msg <- "Channels"

msg.close()
await <-done

基准测试

此库与 Go 和 Apple 的 Async Algorithms 通道实现的比较

Swift vs Go

此库与等效的 go 代码的比较

Swift vs Go

显然,达到 Go 的速度是一个崇高的目标,我们可能永远无法实现,但它仍然非常快!

以上结果来自详细基准测试结果中的 Int 测试

有关更详细的结果(关于各种数据类型),请参阅基准测试自述文件。

用法

  1. https://github.com/gh123man/Async-Channels 作为 Swift 包依赖项添加到您的项目中。
  2. import AsyncChannels 然后开始吧!

通道操作

非缓冲通道

// Create an un-buffered channel
let msg = Channel<String>()

Task {
    // Send a value. Send will suspend until the channel is read. 
    await msg <- "foo" 
}

Task {
    // Receive a value. Receive will suspend until a value is ready
    let foo = await <-msg
}

缓冲通道

// Create a buffered channel that can hold 2 items
let msg = Channel<String>(capacity: 2)

// Writing to a buffered channel will not suspend until the channel is full
await msg <- "foo" 
await msg <- "bar" 

// Messages are received in the order they are sent
print(await <-msg) // foo
print(await <-msg) // bar

// The msg channel is now empty. 

关闭通道

可以关闭通道。在 Swift 中,通道接收 (<-) 运算符返回 T?,因为当通道关闭时,通道读取将返回 nil。如果尝试写入已关闭的通道,则会触发 fatalError。

let a = Channel<String>()
let done = Channel<Bool>()

Task {
    // a will suspend because there is nothing to receive
    await <-a 
    await done <- true
}

// Close will send `nil` causing a to resume in the task above
a.close() 
// done is signaled 
await <-done

序列操作

Channel 实现了 AsyncSequence,所以你可以这样写

let a = Channel<String>() 

for await message in a {
    print(message)
}

当通道关闭时,循环将中断。

Select

select 允许单个任务等待多个通道操作。 select 将暂停,直到至少有一个 case 准备好。 如果多个 case 准备就绪,它将随机选择一个。

操作

receive(c) 接收一个值,但不做任何处理。

receive(c) { v in ... } 接收一个值,并对其进行一些处理。

send("foo", to: c) 发送一个值,不做任何处理。

send("foo", to: c) { ... } 如果发送成功,则运行一些代码。

none { ... } 如果没有任何通道操作准备就绪,则执行 none 的代码。

any(x1, x2, ...) { x in ... }any(seq) { el in ... } 对序列进行操作,可用于对多个通道执行相同的操作。

示例

let c = Channel<String>()
let d = Channel<String>()

Task {
    await c <- "foo"
    await d <- "bar"
}

// Will print foo or bar
await select {
    receive(d) { print($0!) }
    receive(c) { print($0!) }
}
let a = Channel<String>(capacity: 10)
let b = Channel<String>(capacity: 10)

// Fill up channel a
for _ in (0..<10) {
    await a <- "a"
}

for _ in (0..<20) {
    await select {
        // receive from a and print it
        receive(a) { print($0!) }
        // send "b" to b
        send("b", to: b)
        // if both a and b suspend, print "NONE"
        none {
            print("NONE")
        }
    }
}

Wait Group

此库还包括 WaitGroup 实现。 当您想等待多个任务完成时,等待组很有用。

示例

let wg = WaitGroup()
let signal = Channel<Bool>()
let done = Channel<Bool>()

// Task that drains the signal channel
Task {
    for await _ in signal { }
    await done <- true
}

// 100 workers that write to the signal channel
for _ in 0..<100 {
    await wg.add(1)
    Task {
        await signal <- true
        await wg.done()
    }
}
// When all workers are done - signal is drained, so wg will be done.
await wg.wait()

// Closing the signal channel means it's empty, so done is signaled.
signal.close()
await <-done

高级用法

这个库还包括一些额外的功能,这些功能是通过 Swift 的 resultBuilder 的灵活性实现的。

示例

使用 select any 复用 n:1 通道

let channels = (0..<100).map { _ in Channel<Bool>() }
let collected = Channel<Bool>()

// 100 tasks writing to 100 channels
for c in channels {
    Task {
        await c <- true
    }
}

// 1 task recieving from 100 channels and writing the results to 1 channel. 
Task {
    for _ in 0..<100 {
        await select {
            any(channels) { channel in
                receive(channel) { val in
                    await collected <- val!
                }
            }
        }
    }
    collected.close()
}

var sum = 0
for await _ in collected {
    sum += 1
}

条件 cases

let a = Channel<String>()
let b = Channel<String>()

Task {
    await a <- "foo"
}

var enableRecieve = true
await select {
    if enableRecieve {
        receive(a) { await result <- $0! }
    }
    send("b", to: b)
}

代码示例

有关实际用法的示例,请参阅Examples文件夹。

特别感谢

如果没有 forums.swift.org 的朋友和 github 上的贡献者的帮助,我不可能走到今天。 非常感谢