Swift Kafka 客户端库提供了一种便捷的方式,通过利用 Swift 的新并发特性 与 Apache Kafka 进行交互。此软件包封装了原生 librdkafka
库。
要在 SwiftPM 项目中使用 Kafka
库,请将以下行添加到你的 Package.swift
文件中的依赖项
.package(url: "https://github.com/swift-server/swift-kafka-client", branch: "main")
在你的可执行目标中包含 "Kafka"
作为依赖项
.target(name: "<target>", dependencies: [
.product(name: "Kafka", package: "swift-kafka-client"),
]),
最后,将 import Kafka
添加到你的源代码中。
为了正确处理启动和关闭,Kafka
应该在 Swift Service Lifecycle
ServiceGroup
中使用。KafkaProducer
和 KafkaConsumer
都实现了 Service
协议。
KafkaProducer
的 send(_:)
方法返回一个消息 ID,该 ID 稍后可用于识别相应的确认。确认通过 events
AsyncSequence
接收。每个确认都指示消息生成成功或返回错误。
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [brokerAddress])
let (producer, events) = try KafkaProducer.makeProducerWithEvents(
configuration: configuration,
logger: logger
)
await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
let serviceGroup = ServiceGroup(
services: [producer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
logger: logger
)
try await serviceGroup.run()
}
// Task sending message and receiving events
group.addTask {
let messageID = try producer.send(
KafkaProducerMessage(
topic: "topic-name",
value: "Hello, World!"
)
)
for await event in events {
switch event {
case .deliveryReports(let deliveryReports):
// Check what messages the delivery reports belong to
default:
break // Ignore any other events
}
}
}
}
在用要读取的主题分区对初始化 KafkaConsumer
后,可以使用 messages
AsyncSequence
消费消息。
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
consumptionStrategy: .partition(
KafkaPartition(rawValue: 0),
topic: "topic-name"
),
bootstrapBrokerAddresses: [brokerAddress]
)
let consumer = try KafkaConsumer(
configuration: configuration,
logger: logger
)
await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
let serviceGroup = ServiceGroup(
services: [consumer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
logger: logger
)
try await serviceGroup.run()
}
// Task receiving messages
group.addTask {
for try await message in consumer.messages {
// Do something with message
}
}
}
Kafka 还允许用户订阅作为消费者组一部分的主题数组。
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
bootstrapBrokerAddresses: [brokerAddress]
)
let consumer = try KafkaConsumer(
configuration: configuration,
logger: logger
)
await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
let serviceGroup = ServiceGroup(
services: [consumer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
logger: logger
)
try await serviceGroup.run()
}
// Task receiving messages
group.addTask {
for try await message in consumer.messages {
// Do something with message
}
}
}
默认情况下,KafkaConsumer
在收到相应的消息后会自动提交消息偏移量。但是,我们允许用户禁用此设置并手动提交消息偏移量。
let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
var configuration = KafkaConsumerConfiguration(
consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
bootstrapBrokerAddresses: [brokerAddress]
)
configuration.isAutoCommitEnabled = false
let consumer = try KafkaConsumer(
configuration: configuration,
logger: logger
)
await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
let serviceGroup = ServiceGroup(
services: [consumer],
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
logger: logger
)
try await serviceGroup.run()
}
// Task receiving messages
group.addTask {
for try await message in consumer.messages {
// Do something with message
// ...
try await consumer.commitSync(message)
}
}
}
KafkaProducer
和 KafkaConsumer
都可以配置为使用不同的安全机制。
var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .plaintext
var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .tls()
let kerberosConfiguration = KafkaConfiguration.SASLMechanism.KerberosConfiguration(
keytab: "KEYTAB_FILE"
)
var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslPlaintext(
mechanism: .gssapi(kerberosConfiguration: kerberosConfiguration)
)
let saslMechanism = KafkaConfiguration.SASLMechanism.scramSHA256(
username: "USERNAME",
password: "PASSWORD"
)
var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslTLS(
saslMechanism: saslMechanism
)
该软件包依赖于 librdkafka 库,该库作为 git 子模块包含在内。它具有在 Package.swift
中排除的源文件。
我们为该软件包提供了一个 Docker 环境。这将自动启动本地 Kafka 服务器并运行软件包测试。
docker-compose -f docker/docker-compose.yaml run test