Swift 的 RabbitMq 库

这是一个用于 Swift 的高级 RabbitMQ (AMQP) 库,它深受 go-rabbitmq 库的启发。 它在底层使用了 rabbitmq-nio,但提供了高级抽象,以简化用户体验,并提供了大多数应用程序都可以开箱即用的功能。

这个库专门设计用于与 RabbitMQ 进行 AMQP 消息传递。 它不适用于任何其他 AMQP 兼容的代理。

功能特性

此库目前包含的一些功能包括

请注意:此库仍在开发中,尚未达到 v1 状态。 API 可能会更改,并且可能会根据需要添加新功能。

最终,这个库的目标是为应用程序提供一个友好的 API 来使用 RabbitMQ,而无需过于深入地了解如何从代理发布和消费消息的具体细节。

安装

将以下依赖项添加到您的 Package.swift 文件中

.package(url: "https://github.com/xtremekforever/swift-rabbitmq", from: "0.1.0")

然后,将其添加到您的目标依赖项部分,如下所示

.product(name: "RabbitMq", package: "swift-rabbitmq")

依赖

此库仅支持 Swift 5.10 或更高版本,因为底层的 Semaphore 库至少需要 5.10。

此外,此库需要一个可访问的 RabbitMQ 实例在某处运行,无论是在 Docker 容器内部还是在另一台主机上。

兼容性

该库仅与以下操作系统兼容

用法

在最基本的情况下,这个库可以如下使用

import RabbitMq

// Create connection and connect to the broker
let connection = BasicConnection("amqp://guest:guest@localhost/%2f")
try await connection.connect()

// Publish something
let publisher = Publisher(connection, "MyExchange")
try await publisher.publish("A test message")

// Consume
let consumer = Consumer(connection, "MyQueue", "MyExchange")
let stream = try await consumer.consume()
for await message in stream {
    print(message)
}

// Close the connection
await connection.close()

RabbitMQ 支持的每个选项都可以传递给 PublisherConsumer,因此请查看 API 文档以了解可用的内容。

对于连接恢复模式,必须使用单独的任务,因为 RetryingConnection.run() 方法作为异步任务运行以监视连接。 例子

import RabbitMq

let connection = RetryingConnection("amqp://guest:guest@localhost/%2f", reconnectionInterval: .seconds(10))
let publisher = Publisher(connection, "MyExchange")
let consumer = Consumer(connection, "MyQueue", "MyExchange")

try await withThrowingDiscardingTaskGroup { group in
    // Retrying Connection
    group.addTask {
        try await connection.run()
    }

    // Retrying Publisher
    group.addTask { 
        while !Task.isCancelled {
            try await publisher.retryingPublish("Hi there!", retryInterval: .seconds(5))
            try await Task.sleep(for: .seconds(1))
        }
    }
    
    // Retrying Consumer
    group.addTask {
        let events = try await consumer.retryingConsume(retryInterval: .seconds(5))
        for await message in events {
            print(message)
        }
    }
}

// note: the `connection.run()` method will close the connection when it exits

有关更高级的用法示例,请参见示例项目

贡献

欢迎对该库的任何更新、想法或建议! 我一直在开发这个库作为一个个人项目,可以被所有人用于爱好项目、工作项目或介于两者之间的任何项目。

打开一个 issue 或一个 pull request,我将很乐意审查。