AMQP 0.9.1 协议的 Swift 实现:解码器 + 编码器 (AMQPProtocol) 和非阻塞客户端 (AMQPClient)。
深受以下项目的启发:amq-protocol (https://github.com/cloudamqp/amq-protocol.cr) 和 amqp-client (https://github.com/cloudamqp/amqp-client.cr)。
Swift-NIO 相关代码基于其他 NIO 项目,例如
本项目处于 beta 阶段 - 在第一个稳定版本发布之前,API 仍可能发生更改。
已在生产环境中使用超过一年。
AMQPProtocol 库涵盖了 AMQP 0.9.1 规范的所有内容。 AMQPClient 库使用 NIO Channels 的架构已经完成,并且所有常见的 AMQP 操作都有效(不支持 WebSockets)。
该项目的主要目标是发布第一个稳定版本。 为此,项目需要
创建一个连接并使用连接字符串连接到 AMQP 代理。
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
var connection: AMQPConnection
do {
connection = try await AMQPConnection.connect(use: eventLoopGroup.next(), from: .init(url: "amqp://guest:guest@localhost:5672/%2f"))
print("Succesfully connected")
} catch {
print("Error while connecting", error)
}
创建一个连接并使用配置对象连接到 AMQP 代理。
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
var connection: AMQPConnection
do {
connection = try await AMQPConnection.connect(use: eventLoopGroup.next(), from: .init(connection: .plain, server: .init()))
print("Succesfully connected")
} catch {
print("Error while connecting", error)
}
打开一个通道。
var channel: AMQPChannel
do {
channel = try await connection.openChannel()
print("Succesfully opened a channel")
} catch {
print("Error while opening a channel", error)
}
声明一个队列。
do {
try await channel.queueDeclare(name: "test", durable: false)
print("Succesfully created queue")
} catch {
print("Error while creating queue", error)
}
将消息发布到队列。
do {
let deliveryTag = try await channel.basicPublish(
from: ByteBuffer(string: "{}"),
exchange: "",
routingKey: "test"
)
print("Succesfully publish a message")
} catch {
print("Error while publishing a message", error)
}
消费单个消息。
do {
guard let msg = try await channel.basicGet(queue: "test") else {
print("No message currently available")
return
}
print("Succesfully consumed a message", msg)
} catch {
print("Error while consuming a message", error)
}
设置 QOS 限制以防止消费者内存溢出。
try await channel.basicQos(count: 1000)
将多个消息作为 AsyncThrowingStream 消费。
do {
let consumer = try await channel.basicConsume(queue: "test")
for try await msg in consumer {
print("Succesfully consumed a message", msg)
break
}
} catch {
print("Delivery failure", error)
}
消费者将在取消初始化时自动取消。 也可以手动取消。
try await channel.basicCancel(consumerTag: consumer.name)
关闭通道、连接。
do {
try await channel.close()
try await connection.close()
print("Succesfully closed", msg)
} catch {
print("Error while closing", error)
}
处理代理关闭通道或连接断开连接的情况。 与 AMQP 代理的连接通过响应心跳消息来维持,但是,在网络问题或代理重新启动时,连接可能会中断。 代理也可能因错误的命令或其他错误而关闭通道或连接。 目前,RabbitMQNIO 不支持任何连接或通道恢复/重新连接机制,因此客户端必须手动处理。 连接中断后,由它创建的所有通道和连接本身都必须手动重新创建。
示例恢复模式。
检查通道和连接状态(安全通道模式 - 将标准连接和通道包装在一个类中,并在例如任何生产操作之前重用通道)。
@available(macOS 12.0, *)
class SimpleSafeConnection {
private let eventLoop: EventLoop
private let config: AMQPConnectionConfiguration
private var channel: AMQPChannel?
private var connection: AMQPConnection?
init(eventLoop: EventLoop, config: AMQPConnectionConfiguration) {
self.eventLoop = eventLoop
self.config = config
}
func reuseChannel() async throws -> AMQPChannel {
guard let channel = self.channel, channel.isOpen else {
if self.connection == nil || self.connection!.isConnected {
self.connection = try await AMQPConnection.connect(use: self.eventLoop, from: self.config)
}
self.channel = try await connection!.openChannel()
return self.channel!
}
return channel
}
}
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let connection = SimpleSafeConnection(eventLoop: eventLoopGroup.next(), config: .init(connection: .plain, server: .init()))
while(true) {
let deliveryTag = try await connection.reuseChannel().basicPublish(
from: ByteBuffer(string: "{}"),
exchange: "",
routingKey: "test"
)
}
处理通道和连接关闭错误(简单的重试模式 - 发生错误时重新创建通道或连接)。
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
var connection = try await AMQPConnection.connect(use: eventLoopGroup.next(), from: .init(connection: .plain, server: .init()))
var channel = try await connection.openChannel()
for _ in 0..<3 {
do {
let deliveryTag = try await channel.basicPublish(
from: ByteBuffer(string: "{}"),
exchange: "",
routingKey: "test"
)
break
} catch AMQPConnectionError.channelClosed {
do {
channel = try await connection.openChannel()
} catch AMQPConnectionError.connectionClosed {
connection = try await AMQPConnection.connect(use: eventLoopGroup.next(), from: .init(connection: .plain, server: .init()))
channel = try await connection.openChannel()
}
} catch {
print("Unknown problem", error)
}
}
以上恢复模式可以混合使用。
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let connection = SimpleSafeConnection(eventLoop: eventLoopGroup.next(), config: .init(connection: .plain, server: .init()))
var channel = try await connection.reuseChannel()
for _ in 0..<3 {
do {
let deliveryTag = try await channel.basicPublish(
from: ByteBuffer(string: "{}"),
exchange: "",
routingKey: "test"
)
break
} catch AMQPConnectionError.channelClosed {
channel = try await connection.reuseChannel()
} catch {
print("Unknown problem", error)
}
}