influxdb-client-swift

CircleCI codecov Platforms License Documentation GitHub issues GitHub pull requests Slack Status

该仓库包含 InfluxDB 2.x 的 Swift 客户端参考实现。

文档

此部分包含指向客户端库文档的链接。

特性

InfluxDB 2.x 客户端包含两个包

支持的平台

此软件包需要 Swift 5 和 Xcode 12+。

安装

Swift 包管理器

将此行添加到您的 Package.swift 文件中

// swift-tools-version:5.3
import PackageDescription

let package = Package(
    name: "MyPackage",
    dependencies: [
        .package(name: "influxdb-client-swift", url: "https://github.com/influxdata/influxdb-client-swift", from: "1.7.0"),
    ],
    targets: [
        .target(name: "MyModule", dependencies: [
          .product(name: "InfluxDBSwift", package: "influxdb-client-swift"),
          // or InfluxDBSwiftApis for management API
          .product(name: "InfluxDBSwiftApis", package: "influxdb-client-swift")
        ])
    ]
)

使用方法

重要提示:您应该在应用程序结束时调用 close() 以释放已分配的资源。

创建客户端

通过参数指定 urltoken

let client = InfluxDBClient(url: "https://:8086", token: "my-token")

...

client.close()

客户端选项

选项 描述 类型 默认值
bucket 写入的默认目标存储桶 字符串
org 写入的默认组织存储桶 字符串
precision 正文行协议中 unix 时间戳的默认精度 TimestampPrecision ns
timeoutIntervalForRequest 等待其他数据时使用的超时时间间隔。 TimeInterval 60 秒
timeoutIntervalForResource 资源请求允许花费的最长时间。 TimeInterval 5 分钟
enableGzip 为 HTTP 请求启用 Gzip 压缩。 Bool false
debugging 启用 HTTP 请求/响应的调试。 Bool false
配置默认的 BucketOrganizationPrecision
let options: InfluxDBClient.InfluxDBOptions = InfluxDBClient.InfluxDBOptions(
        bucket: "my-bucket",
        org: "my-org",
        precision: .ns)

let client = InfluxDBClient(url: "https://:8086", token: "my-token", options: options)

...

client.close()

InfluxDB 1.8 API 兼容性

client = InfluxDBClient(
        url: "https://:8086", 
        username: "user", 
        password: "pass",
        database: "my-db", 
        retentionPolicy: "autogen")

...

client.close()

写入

WriteApi 支持将异步写入 InfluxDB 2.x。写入结果可以通过 (response, error)Swift.ResultCombine 处理。

数据可以写入为

  1. 格式化为 InfluxDB 行协议的 String
  2. 数据点 结构体
  3. 键为 measurementtagsfieldstime 的元组样式映射
  4. 上述项目的数组

以下示例演示如何使用数据点结构写入数据。 更多信息请参考文档和 示例

import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis

@main
struct WriteData: AsyncParsableCommand {
  @Option(name: .shortAndLong, help: "The name or id of the bucket destination.")
  private var bucket: String

  @Option(name: .shortAndLong, help: "The name or id of the organization destination.")
  private var org: String

  @Option(name: .shortAndLong, help: "Authentication token.")
  private var token: String

  @Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
  private var url: String
}

extension WriteData {
  mutating func run() async throws {
    //
    // Initialize Client with default Bucket and Organization
    //
    let client = InfluxDBClient(
            url: url,
            token: token,
            options: InfluxDBClient.InfluxDBOptions(bucket: bucket, org: org))

    //
    // Record defined as Data Point
    //
    let recordPoint = InfluxDBClient
            .Point("demo")
            .addTag(key: "type", value: "point")
            .addField(key: "value", value: .int(2))
    //
    // Record defined as Data Point with Timestamp
    //
    let recordPointDate = InfluxDBClient
            .Point("demo")
            .addTag(key: "type", value: "point-timestamp")
            .addField(key: "value", value: .int(2))
            .time(time: .date(Date()))

    try await client.makeWriteAPI().write(points: [recordPoint, recordPointDate])
    print("Written data:\n\n\([recordPoint, recordPointDate].map { "\t- \($0)" }.joined(separator: "\n"))")
    print("\nSuccess!")

    client.close()
  }
}

查询

通过 QueryApi 检索的结果可以格式化为

  1. FluxRecord 的惰性序列
  2. 原始查询响应,格式为 Data

查询到 FluxRecord

import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis

@main
struct QueryCpu: AsyncParsableCommand {
  @Option(name: .shortAndLong, help: "The name or id of the bucket destination.")
  private var bucket: String

  @Option(name: .shortAndLong, help: "The name or id of the organization destination.")
  private var org: String

  @Option(name: .shortAndLong, help: "Authentication token.")
  private var token: String

  @Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
  private var url: String
}

extension QueryCpu {
  mutating func run() async throws {
    //
    // Initialize Client with default Bucket and Organization
    //
    let client = InfluxDBClient(
            url: url,
            token: token,
            options: InfluxDBClient.InfluxDBOptions(bucket: bucket, org: org))

    // Flux query
    let query = """
                from(bucket: "\(self.bucket)")
                    |> range(start: -10m)
                    |> filter(fn: (r) => r["_measurement"] == "cpu")
                    |> filter(fn: (r) => r["cpu"] == "cpu-total")
                    |> filter(fn: (r) => r["_field"] == "usage_user" or r["_field"] == "usage_system")
                    |> last()
                """

    print("\nQuery to execute:\n\(query)\n")

    let records = try await client.queryAPI.query(query: query)

    print("Query results:")
    try records.forEach { print(" > \($0.values["_field"]!): \($0.values["_value"]!)") }

    client.close()
  }
}

查询到 Data

@main
struct QueryCpuData: AsyncParsableCommand {
  @Option(name: .shortAndLong, help: "The name or id of the bucket destination.")
  private var bucket: String

  @Option(name: .shortAndLong, help: "The name or id of the organization destination.")
  private var org: String

  @Option(name: .shortAndLong, help: "Authentication token.")
  private var token: String

  @Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
  private var url: String
}

extension QueryCpuData {
  mutating func run() async throws {
    //
    // Initialize Client with default Bucket and Organization
    //
    let client = InfluxDBClient(
            url: url,
            token: token,
            options: InfluxDBClient.InfluxDBOptions(bucket: bucket, org: org))

    // Flux query
    let query = """
                from(bucket: "\(self.bucket)")
                    |> range(start: -10m)
                    |> filter(fn: (r) => r["_measurement"] == "cpu")
                    |> filter(fn: (r) => r["cpu"] == "cpu-total")
                    |> filter(fn: (r) => r["_field"] == "usage_user" or r["_field"] == "usage_system")
                    |> last()
                """

    print("\nQuery to execute:\n\(query)\n")

    let response = try await client.queryAPI.queryRaw(query: query)

    let csv = String(decoding: response, as: UTF8.self)
    print("InfluxDB response: \(csv)")

    client.close()
  }
}

参数化查询

InfluxDB Cloud 支持 参数化查询,它允许您使用 InfluxDB API 动态更改查询中的值。 参数化查询使 Flux 查询更具可重用性,也可用于帮助防止注入攻击。

InfluxDB Cloud 将 params 对象作为名为 params 的 Flux 记录插入到 Flux 查询中。 使用点或括号表示法访问 Flux 查询中 params 记录中的参数。 参数化 Flux 查询仅支持 intfloatstring 数据类型。 要将支持的数据类型转换为其他 Flux 基本数据类型,请使用 Flux 类型转换函数

参数化查询示例

⚠️参数化查询仅在 InfluxDB Cloud 中受支持,目前在 InfluxDB OSS 中尚不支持。

import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis

@main
struct ParameterizedQuery: AsyncParsableCommand {
  @Option(name: .shortAndLong, help: "The bucket to query. The name or id of the bucket destination.")
  private var bucket: String

  @Option(name: .shortAndLong,
          help: "The organization executing the query. Takes either the `ID` or `Name` interchangeably.")
  private var org: String

  @Option(name: .shortAndLong, help: "Authentication token.")
  private var token: String

  @Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
  private var url: String
}

extension ParameterizedQuery {
  mutating func run() async throws {
    // Initialize Client with default Organization
    let client = InfluxDBClient(
            url: url,
            token: token,
            options: InfluxDBClient.InfluxDBOptions(bucket: bucket, org: org))

    for index in 1...3 {
      let point = InfluxDBClient
              .Point("demo")
              .addTag(key: "type", value: "point")
              .addField(key: "value", value: .int(index))
      try await client.makeWriteAPI().write(point: point)
    }

    // Flux query
    let query = """
                from(bucket: params.bucketParam)
                    |> range(start: -10m)
                    |> filter(fn: (r) => r["_measurement"] == params.measurement)
                """

    // Query parameters [String:String]
    let queryParams = ["bucketParam": "\(bucket)", "measurement": "demo"]

    print("\nQuery to execute:\n\n\(query)\n\n\(queryParams)")

    let records = try await client.queryAPI.query(query: query, params: queryParams)

    print("\nSuccess response...\n")

    try records.forEach { print(" > \($0.values["_field"]!): \($0.values["_value"]!)") }

    client.close()
  }
}

删除数据

DeleteAPI 支持从 InfluxDB 存储桶中删除 。 使用 DeletePredicateRequest 标识要删除的点。

import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis

@main
struct DeleteData: AsyncParsableCommand {
  @Option(name: .shortAndLong, help: "Specifies the bucket name to delete data from.")
  private var bucket: String

  @Option(name: .shortAndLong,
          help: "Specifies the organization name to delete data from.")
  private var org: String

  @Option(name: .shortAndLong, help: "Authentication token.")
  private var token: String

  @Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
  private var url: String

  @Option(name: .shortAndLong, help: "InfluxQL-like delete predicate statement.")
  private var predicate: String
}

extension DeleteData {
  mutating func run() async throws {
    // Initialize Client with default Organization
    let client = InfluxDBClient(
            url: url,
            token: token,
            options: InfluxDBClient.InfluxDBOptions(org: self.org))

    // Create DeletePredicateRequest
    let predicateRequest = DeletePredicateRequest(
            start: Date(timeIntervalSince1970: 0),
            stop: Date(),
            predicate: predicate)

    try await client.deleteAPI.delete(predicate: predicateRequest, bucket: bucket, org: org)

    print("\nDeleted data by predicate:\n\n\t\(predicateRequest)")

    // Print date after Delete
    try await queryData(client: client)

    client.close()
  }

  private func queryData(client: InfluxDBClient) async throws {
    let query = """
                from(bucket: "\(bucket)")
                    |> range(start: 0)
                    |> filter(fn: (r) => r["_measurement"] == "server")
                    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
                """

    let response = try await client.queryAPI.query(query: query)

    print("\nRemaining data after delete:\n")

    try response.forEach { record in
      let provider = record.values["provider"]!
      let production = record.values["production"]
      let app = record.values["app"]
      return print("\t\(provider),production=\(production!),app=\(app!)")
    }
  }
}

管理 API

客户端支持以下管理 API

API 文档
AuthorizationsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Authorizations
BucketsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Buckets
DBRPsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/DBRPs
HealthAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Health
PingAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Ping
LabelsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Labels
OrganizationsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Organizations
ReadyAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Ready
ScraperTargetsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/ScraperTargets
SecretsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Secrets
SetupAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Tasks
SourcesAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Sources
TasksAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Tasks
UsersAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Users
VariablesAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Variables

以下示例演示如何使用 InfluxDB 2.0 管理 API 创建新存储桶。 更多信息请参考文档和 示例

import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis

@main
struct CreateNewBucket: AsyncParsableCommand {
  @Option(name: .shortAndLong, help: "New bucket name.")
  private var name: String

  @Option(name: .shortAndLong, help: "Duration bucket will retain data.")
  private var retention: Int64 = 3600

  @Option(name: .shortAndLong, help: "Specifies the organization name.")
  private var org: String

  @Option(name: .shortAndLong, help: "Authentication token.")
  private var token: String

  @Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
  private var url: String
}

extension CreateNewBucket {
  mutating func run() async throws {
    // Initialize Client and API
    let client = InfluxDBClient(url: url, token: token)
    let api = InfluxDB2API(client: client)

    let orgId = (try await api.organizationsAPI.getOrgs(org: org)!).orgs?.first?.id

    // Bucket configuration
    let request = PostBucketRequest(
            orgID: orgId!,
            name: name,
            retentionRules: [RetentionRule(type: RetentionRule.ModelType.expire, everySeconds: retention)])

    // Create Bucket
    let bucket = try await api.bucketsAPI.postBuckets(postBucketRequest: request)!

    // Create Authorization with permission to read/write created bucket
    let bucketResource = Resource(
            type: Resource.ModelType.buckets,
            id: bucket.id,
            orgID: orgId
    )

    // Authorization configuration
    let authorizationRequest = AuthorizationPostRequest(
            description: "Authorization to read/write bucket: \(name)",
            orgID: orgId!,
            permissions: [
              Permission(action: Permission.Action.read, resource: bucketResource),
              Permission(action: Permission.Action.write, resource: bucketResource)
            ])

    // Create Authorization
    let authorization = try await api.authorizationsAPI.postAuthorizations(authorizationPostRequest: authorizationRequest)!

    print("The bucket: '\(bucket.name)' is successfully created.")
    print("The following token could be use to read/write:")
    print("\t\(authorization.token!)")

    client.close()
  }
}

高级用法

默认标签

有时,在每个测量中存储相同的信息很有用,例如 hostnamelocationcustomer。 客户端能够使用静态值或环境变量作为标签值。

表达式

示例

client = InfluxDBClient(
        url: "https://:8086",
        token: "my-token",
        options: InfluxDBClient.InfluxDBOptions(bucket: "my-bucket", org: "my-org"))

let tuple: InfluxDBClient.Point.Tuple
        = (measurement: "mem", tags: ["tag": "a"], fields: ["value": .int(3)], time: nil)

let records: [Any] = [
        InfluxDBClient.Point("mining")
                .addTag(key: "sensor_state", value: "normal")
                .addField(key: "depth", value: .int(2)),
        tuple
]

let defaultTags = InfluxDBClient.PointSettings()
        .addDefaultTag(key: "customer", value: "California Miner")
        .addDefaultTag(key: "sensor_id", value: "${env.SENSOR_ID}")

try await client.makeWriteAPI(pointSettings: defaultTags).writeRecords(records: records)
print("Successfully written default tags")
示例输出
mining,customer=California\ Miner,sensor_id=123-456-789,sensor_state=normal depth=2i
mining,customer=California\ Miner,sensor_id=123-456-789,sensor_state=normal pressure=3i

代理和重定向

⚠️connectionProxyDictionary 不能在 Linux 上定义。 您必须设置 HTTPS_PROXYHTTP_PROXY 系统环境变量。

您可以通过 connectionProxyDictionary 选项将客户端配置为通过 HTTP 代理隧道传输请求

var connectionProxyDictionary = [AnyHashable: Any]()
connectionProxyDictionary[kCFNetworkProxiesHTTPEnable as String] = 1
connectionProxyDictionary[kCFNetworkProxiesHTTPProxy as String] = "localhost"
connectionProxyDictionary[kCFNetworkProxiesHTTPPort as String] = 3128

let options: InfluxDBClient.InfluxDBOptions = InfluxDBClient.InfluxDBOptions(
        bucket: "my-bucket",
        org: "my-org",
        precision: .ns,
        connectionProxyDictionary: connectionProxyDictionary)

client = InfluxDBClient(url: "https://:8086", token: "my-token", options: options)

更多信息请参考 - URLSessionConfiguration.connectionProxyDictionary, Global Proxy Settings Constants.

重定向

客户端自动跟踪 HTTP 重定向。 您可以通过 urlSessionDelegate 配置禁用重定向

class DisableRedirect: NSObject, URLSessionTaskDelegate {
    func urlSession(_ session: URLSession,
                    task: URLSessionTask,
                    willPerformHTTPRedirection response: HTTPURLResponse,
                    newRequest request: URLRequest,
                    completionHandler: @escaping (URLRequest?) -> Void) {
        completionHandler(nil)
    }
}

let options = InfluxDBClient.InfluxDBOptions(
        bucket: "my-bucket",
        org: "my-org",
        urlSessionDelegate: DisableRedirect())

client = InfluxDBClient(url: "https://:8086", token: "my-token", options: options)

更多信息请参考 - URLSessionDelegate.

贡献

如果您想贡献代码,可以通过 GitHub 完成,方法是 fork 存储库并将 pull request 发送到 master 分支。

构建要求

构建源代码和测试目标

swift build --build-tests

运行测试

./Scripts/influxdb-restart.sh
swift test

检查代码覆盖率

./Scripts/influxdb-restart.sh
swift test --enable-code-coverage

您也可以使用 docker-cli 而无需安装 Swift SDK

make docker-cli
swift build

许可

该客户端根据 MIT 许可证 的条款作为开源提供。