RabbitMQNIO

Platform macOS | Linux Swift 5.10

AMQP 0.9.1 协议的 Swift 实现:解码器 + 编码器 (AMQPProtocol) 和非阻塞客户端 (AMQPClient)。

深受以下项目的启发:amq-protocol (https://github.com/cloudamqp/amq-protocol.cr) 和 amqp-client (https://github.com/cloudamqp/amqp-client.cr)。

Swift-NIO 相关代码基于其他 NIO 项目,例如

项目状态

本项目处于 beta 阶段 - 在第一个稳定版本发布之前,API 仍可能发生更改。
已在生产环境中使用超过一年。

AMQPProtocol 库涵盖了 AMQP 0.9.1 规范的所有内容。 AMQPClient 库使用 NIO Channels 的架构已经完成,并且所有常见的 AMQP 操作都有效(不支持 WebSockets)。
该项目的主要目标是发布第一个稳定版本。 为此,项目需要

基本用法

创建一个连接并使用连接字符串连接到 AMQP 代理。

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)

var connection: AMQPConnection

do {
    connection = try await AMQPConnection.connect(use: eventLoopGroup.next(), from: .init(url: "amqp://guest:guest@localhost:5672/%2f"))

    print("Succesfully connected")
} catch {
    print("Error while connecting", error)
}

创建一个连接并使用配置对象连接到 AMQP 代理。

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)

var connection: AMQPConnection

do {
    connection = try await AMQPConnection.connect(use: eventLoopGroup.next(), from: .init(connection: .plain, server: .init()))

    print("Succesfully connected")
} catch {
    print("Error while connecting", error)
}

打开一个通道。

var channel: AMQPChannel
 
do {
    channel = try await connection.openChannel()

    print("Succesfully opened a channel")
} catch {
    print("Error while opening a channel", error)
}

声明一个队列。

do {
    try await channel.queueDeclare(name: "test", durable: false)

    print("Succesfully created queue")
} catch {
    print("Error while creating queue", error)
}

将消息发布到队列。

do {
    let deliveryTag = try await channel.basicPublish(
        from: ByteBuffer(string: "{}"),
        exchange: "",
        routingKey: "test"
    )

    print("Succesfully publish a message")
} catch {
    print("Error while publishing a message", error)
}

消费单个消息。

do {
    guard let msg = try await channel.basicGet(queue: "test") else {
        print("No message currently available")
        return
    }

    print("Succesfully consumed a message", msg)
} catch {
    print("Error while consuming a message", error)
}

设置 QOS 限制以防止消费者内存溢出。

try await channel.basicQos(count: 1000)

将多个消息作为 AsyncThrowingStream 消费。

do {
    let consumer = try await channel.basicConsume(queue: "test")

    for try await msg in consumer {
        print("Succesfully consumed a message", msg)
        break
    }
} catch {
    print("Delivery failure", error)
}

消费者将在取消初始化时自动取消。 也可以手动取消。

try await channel.basicCancel(consumerTag: consumer.name)

关闭通道、连接。

do {
    try await channel.close()
    try await connection.close()

    print("Succesfully closed", msg)
} catch {
    print("Error while closing", error)
}

连接恢复模式。

处理代理关闭通道或连接断开连接的情况。 与 AMQP 代理的连接通过响应心跳消息来维持,但是,在网络问题或代理重新启动时,连接可能会中断。 代理也可能因错误的命令或其他错误而关闭通道或连接。 目前,RabbitMQNIO 不支持任何连接或通道恢复/重新连接机制,因此客户端必须手动处理。 连接中断后,由它创建的所有通道和连接本身都必须手动重新创建。

示例恢复模式。

检查通道和连接状态(安全通道模式 - 将标准连接和通道包装在一个类中,并在例如任何生产操作之前重用通道)。

@available(macOS 12.0, *)
class SimpleSafeConnection {
    private let eventLoop: EventLoop
    private let config: AMQPConnectionConfiguration

    private var channel: AMQPChannel?
    private var connection: AMQPConnection?

    init(eventLoop: EventLoop, config: AMQPConnectionConfiguration) {
        self.eventLoop = eventLoop
        self.config = config
    }
    
    func reuseChannel() async throws -> AMQPChannel {
        guard let channel = self.channel, channel.isOpen else {
            if self.connection == nil || self.connection!.isConnected {
                self.connection = try await AMQPConnection.connect(use: self.eventLoop, from: self.config)
            }

            self.channel = try await connection!.openChannel()
            return self.channel!
        }
        return channel
    }
}

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let connection = SimpleSafeConnection(eventLoop: eventLoopGroup.next(), config: .init(connection: .plain, server: .init()))

while(true) {
    let deliveryTag = try await connection.reuseChannel().basicPublish(
        from: ByteBuffer(string: "{}"),
        exchange: "",
        routingKey: "test"
    )
}

处理通道和连接关闭错误(简单的重试模式 - 发生错误时重新创建通道或连接)。

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
var connection =  try await AMQPConnection.connect(use: eventLoopGroup.next(), from: .init(connection: .plain, server: .init()))
var channel = try await connection.openChannel()

for _ in 0..<3 {
    do {
        let deliveryTag = try await channel.basicPublish(
            from: ByteBuffer(string: "{}"),
            exchange: "",
            routingKey: "test"
        )
        break
    } catch AMQPConnectionError.channelClosed {
        do {
            channel = try await connection.openChannel()
        } catch AMQPConnectionError.connectionClosed {
            connection = try await AMQPConnection.connect(use: eventLoopGroup.next(), from: .init(connection: .plain, server: .init()))
            channel = try await connection.openChannel()
        }
    } catch {
        print("Unknown problem", error)
    }
}

以上恢复模式可以混合使用。

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let connection = SimpleSafeConnection(eventLoop: eventLoopGroup.next(), config: .init(connection: .plain, server: .init()))
var channel = try await connection.reuseChannel()

for _ in 0..<3 {
    do {
        let deliveryTag = try await channel.basicPublish(
            from: ByteBuffer(string: "{}"),
            exchange: "",
            routingKey: "test"
        )
        break
    } catch AMQPConnectionError.channelClosed {
        channel = try await connection.reuseChannel()
    } catch {
        print("Unknown problem", error)
    }
}