Kitura

APIDoc Build Status - Master macOS Linux Apache 2 Slack Status

SwiftKafka

一个 Swift 实现的 Kafka 库,用于生产和消费事件流。

其通过封装 librdkafka C 库来实现功能。

Swift 版本

最新版本的 SwiftKafka 需要 Swift 5.0 或更高版本。 您可以按照此链接下载此版本的 Swift 二进制文件。

用法

Swift Package Manager

添加依赖项

SwiftKafka 包添加到应用程序的 Package.swift 文件中的 dependencies 列表中。 将 "x.x.x" 替换为最新的 SwiftKafka release 版本。

.package(url: "https://github.com/IBM-Swift/SwiftKafka.git", from: "x.x.x")

SwiftKafka 添加到你的 target 的 dependencies 中

.target(name: "example", dependencies: ["SwiftKafka"]),

导入包

import SwiftKafka

开始使用

要使用 SwiftKafka,您需要安装 librdkafka

macOS

brew install librdkafka

Linux

从 Confluent APT 仓库安装 librdkafka - 请参阅此处的说明 (按照步骤 1 和 2 添加 Confluent 包签名密钥和 apt 仓库),然后安装 librdkafka

sudo apt install librdkafka

在本地运行 Kafka 实例

为了在本地进行实验,您可以设置自己的 Kafka 服务器来进行生产/消费。

在 macOS 上,您可以按照此 使用 Homebrew 安装 Kafka 指南来运行本地服务器。

在 Linux 上,您可以按照此 在 Ubuntu 上手动安装 指南。

KafkaConfig

KafkaConfig 类包含 KafkaConsumer/KafkaProducer 的配置设置。

该类使用默认值初始化,然后可以使用辅助函数进行更改。 例如,要启用所有日志记录,您需要设置 debug 变量

let config = KafkaConfig()
config.debug = [.all]

或者,您可以直接在 KafkaConfig 对象上访问配置字典

let config = KafkaConfig()
config["debug"] = "all"

配置键和描述的列表可以在 librdkafka CONFIGURATION.md 中找到。

当您将此类传递给生产者/消费者时,将创建一个副本,因此对实例的进一步更改不会影响现有配置。

KafkaProducer

KafkaProducer 类将消息生成到 Kafka 服务器。

您可以使用 KafkaConfig 实例或默认配置来初始化 KafkaProducer

生产者发送具有以下字段的 KafkaProducerRecord

send() 函数是异步的。 结果在回调中返回,其中包含成功时的 KafkaConsumerRecord 或失败时的 KafkaError

以下示例将值为“Hello World”的消息生成到在 localhost 上运行的 Kafka 服务器的“test”主题中。

do {
    let producer = try KafkaProducer()
    guard producer.connect(brokers: "localhost:9092") == 1 else {
        throw KafkaError(rawValue: 8)
    }
    producer.send(producerRecord: KafkaProducerRecord(topic: "test", value: "Hello world", key: "Key")) { result in
        switch result {
        case .success(let message):
            print("Message at offset \(message.offset) successfully sent")
        case .failure(let error):
            print("Error producing: \(error)")
        }
    }
} catch {
    print("Error creating producer: \(error)")
}

KafkaConsumer

KafkaConsumer 类从 Kafka 服务器消费消息。

您可以使用 KafkaConfig 实例或默认配置来初始化 KafkaConsumer

然后,您可以使用 subscribe() 订阅主题。 这将在具有相同组 ID 的消费者之间均匀分配主题分区。 如果您未设置组 ID,将使用随机 UUID。

或者,您可以使用 assign() 手动设置消费者的分区和偏移量。

subscribe()assign() 都是异步的,会立即返回,但是它们可能需要长达 sessionTimeoutMs(默认 10 秒)* 2 的时间才能完全连接消费者。

要从 Kafka 消费消息,您可以调用 poll(timeout:)。 这将轮询 Kafka,阻塞 timeout 秒。 完成后,它将返回一个 KafkaConsumerRecord 数组,其中包含以下字段

完成消费后,您可以调用 close() 来关闭连接并取消分配消费者。 然后,未分配的分区将在组中的其他消费者之间重新平衡。 如果未调用 close(),则在释放该类时将关闭消费者。

以下示例消费并打印来自 Kafka 服务器的“test”主题的所有未读消息。

do {
    let config = KafkaConfig()
    config.groupId = "Kitura"
    config.autoOffsetReset = .beginning
    let consumer = try KafkaConsumer(config: config)
    guard consumer.connect(brokers: "localhost:9092") == 1 else {
        throw KafkaError(rawValue: 8)
    }
    try consumer.subscribe(topics: ["test"])
    while(true) {
        let records = try consumer.poll()
        print(records)
    }
} catch {
    print("Error creating consumer: \(error)")
}

API 文档

有关更多信息,请访问我们的 API 参考

社区

我们喜欢谈论服务器端 Swift 和 Kitura。 加入我们的 Slack 与团队见面!

许可证

此库根据 Apache 2.0 获得许可。 完整的许可证文本可在 LICENSE 中找到。