一个 Swift 实现的 Kafka 库,用于生产和消费事件流。
其通过封装 librdkafka C 库来实现功能。
最新版本的 SwiftKafka 需要 Swift 5.0 或更高版本。 您可以按照此链接下载此版本的 Swift 二进制文件。
将 SwiftKafka
包添加到应用程序的 Package.swift
文件中的 dependencies 列表中。 将 "x.x.x"
替换为最新的 SwiftKafka
release 版本。
.package(url: "https://github.com/IBM-Swift/SwiftKafka.git", from: "x.x.x")
将 SwiftKafka
添加到你的 target 的 dependencies 中
.target(name: "example", dependencies: ["SwiftKafka"]),
import SwiftKafka
要使用 SwiftKafka,您需要安装 librdkafka
包
brew install librdkafka
从 Confluent APT 仓库安装 librdkafka - 请参阅此处的说明 (按照步骤 1 和 2 添加 Confluent 包签名密钥和 apt 仓库),然后安装 librdkafka
sudo apt install librdkafka
为了在本地进行实验,您可以设置自己的 Kafka 服务器来进行生产/消费。
在 macOS 上,您可以按照此 使用 Homebrew 安装 Kafka 指南来运行本地服务器。
在 Linux 上,您可以按照此 在 Ubuntu 上手动安装 指南。
KafkaConfig
类包含 KafkaConsumer
/KafkaProducer
的配置设置。
该类使用默认值初始化,然后可以使用辅助函数进行更改。 例如,要启用所有日志记录,您需要设置 debug 变量
let config = KafkaConfig()
config.debug = [.all]
或者,您可以直接在 KafkaConfig
对象上访问配置字典
let config = KafkaConfig()
config["debug"] = "all"
配置键和描述的列表可以在 librdkafka CONFIGURATION.md 中找到。
当您将此类传递给生产者/消费者时,将创建一个副本,因此对实例的进一步更改不会影响现有配置。
KafkaProducer
类将消息生成到 Kafka 服务器。
您可以使用 KafkaConfig
实例或默认配置来初始化 KafkaProducer
。
生产者发送具有以下字段的 KafkaProducerRecord
send()
函数是异步的。 结果在回调中返回,其中包含成功时的 KafkaConsumerRecord
或失败时的 KafkaError
。
以下示例将值为“Hello World”的消息生成到在 localhost 上运行的 Kafka 服务器的“test”主题中。
do {
let producer = try KafkaProducer()
guard producer.connect(brokers: "localhost:9092") == 1 else {
throw KafkaError(rawValue: 8)
}
producer.send(producerRecord: KafkaProducerRecord(topic: "test", value: "Hello world", key: "Key")) { result in
switch result {
case .success(let message):
print("Message at offset \(message.offset) successfully sent")
case .failure(let error):
print("Error producing: \(error)")
}
}
} catch {
print("Error creating producer: \(error)")
}
KafkaConsumer
类从 Kafka 服务器消费消息。
您可以使用 KafkaConfig
实例或默认配置来初始化 KafkaConsumer
。
然后,您可以使用 subscribe()
订阅主题。 这将在具有相同组 ID 的消费者之间均匀分配主题分区。 如果您未设置组 ID,将使用随机 UUID。
或者,您可以使用 assign()
手动设置消费者的分区和偏移量。
subscribe()
和 assign()
都是异步的,会立即返回,但是它们可能需要长达 sessionTimeoutMs(默认 10 秒)* 2 的时间才能完全连接消费者。
要从 Kafka 消费消息,您可以调用 poll(timeout:)
。 这将轮询 Kafka,阻塞 timeout
秒。 完成后,它将返回一个 KafkaConsumerRecord
数组,其中包含以下字段
完成消费后,您可以调用 close()
来关闭连接并取消分配消费者。 然后,未分配的分区将在组中的其他消费者之间重新平衡。 如果未调用 close()
,则在释放该类时将关闭消费者。
以下示例消费并打印来自 Kafka 服务器的“test”主题的所有未读消息。
do {
let config = KafkaConfig()
config.groupId = "Kitura"
config.autoOffsetReset = .beginning
let consumer = try KafkaConsumer(config: config)
guard consumer.connect(brokers: "localhost:9092") == 1 else {
throw KafkaError(rawValue: 8)
}
try consumer.subscribe(topics: ["test"])
while(true) {
let records = try consumer.poll()
print(records)
}
} catch {
print("Error creating consumer: \(error)")
}
有关更多信息,请访问我们的 API 参考。
我们喜欢谈论服务器端 Swift 和 Kitura。 加入我们的 Slack 与团队见面!
此库根据 Apache 2.0 获得许可。 完整的许可证文本可在 LICENSE 中找到。