Swift Kafka 客户端

Swift Kafka 客户端库提供了一种便捷的方式,通过利用 Swift 的新并发特性Apache Kafka 进行交互。此软件包封装了原生 librdkafka 库。

将 Kafka 添加为依赖项

要在 SwiftPM 项目中使用 Kafka 库,请将以下行添加到你的 Package.swift 文件中的依赖项

.package(url: "https://github.com/swift-server/swift-kafka-client", branch: "main")

在你的可执行目标中包含 "Kafka" 作为依赖项

.target(name: "<target>", dependencies: [
    .product(name: "Kafka", package: "swift-kafka-client"),
]),

最后,将 import Kafka 添加到你的源代码中。

用法

为了正确处理启动和关闭,Kafka 应该在 Swift Service Lifecycle ServiceGroup 中使用。KafkaProducerKafkaConsumer 都实现了 Service 协议。

生产者 API

KafkaProducersend(_:) 方法返回一个消息 ID,该 ID 稍后可用于识别相应的确认。确认通过 events AsyncSequence 接收。每个确认都指示消息生成成功或返回错误。

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [brokerAddress])

let (producer, events) = try KafkaProducer.makeProducerWithEvents(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [producer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task sending message and receiving events
    group.addTask {
        let messageID = try producer.send(
            KafkaProducerMessage(
                topic: "topic-name",
                value: "Hello, World!"
            )
        )

        for await event in events {
            switch event {
            case .deliveryReports(let deliveryReports):
                // Check what messages the delivery reports belong to
            default:
                break // Ignore any other events
            }
        }
    }
}

消费者 API

在用要读取的主题分区对初始化 KafkaConsumer 后,可以使用 messages AsyncSequence 消费消息。

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .partition(
        KafkaPartition(rawValue: 0),
        topic: "topic-name"
    ),
    bootstrapBrokerAddresses: [brokerAddress]
)

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message
        }
    }
}

消费者组

Kafka 还允许用户订阅作为消费者组一部分的主题数组。

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
    bootstrapBrokerAddresses: [brokerAddress]
)

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message
        }
    }
}

手动提交

默认情况下,KafkaConsumer 在收到相应的消息后会自动提交消息偏移量。但是,我们允许用户禁用此设置并手动提交消息偏移量。

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
var configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
    bootstrapBrokerAddresses: [brokerAddress]
)
configuration.isAutoCommitEnabled = false

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message
            // ...
            try await consumer.commitSync(message)
        }
    }
}

安全机制

KafkaProducerKafkaConsumer 都可以配置为使用不同的安全机制。

明文

var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .plaintext

TLS

var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .tls()

SASL

let kerberosConfiguration = KafkaConfiguration.SASLMechanism.KerberosConfiguration(
    keytab: "KEYTAB_FILE"
)

var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslPlaintext(
    mechanism: .gssapi(kerberosConfiguration: kerberosConfiguration)
)

SASL + TLS

let saslMechanism = KafkaConfiguration.SASLMechanism.scramSHA256(
    username: "USERNAME",
    password: "PASSWORD"
)

var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslTLS(
    saslMechanism: saslMechanism
)

librdkafka

该软件包依赖于 librdkafka 库,该库作为 git 子模块包含在内。它具有在 Package.swift 中排除的源文件。

开发设置

我们为该软件包提供了一个 Docker 环境。这将自动启动本地 Kafka 服务器并运行软件包测试。

docker-compose -f docker/docker-compose.yaml run test