influxdb-client-swift

CircleCI codecov Platforms License Documentation GitHub issues GitHub pull requests Slack Status

该仓库包含 InfluxDB 2.x 的 Swift 客户端参考实现。

文档

本节包含客户端库文档的链接。

特性

InfluxDB 2.x 客户端由两个包组成

支持的平台

此包需要 Swift 5 和 Xcode 12+。

安装

Swift 包管理器

将此行添加到你的 Package.swift

// swift-tools-version:5.3
import PackageDescription

let package = Package(
    name: "MyPackage",
    dependencies: [
        .package(name: "influxdb-client-swift", url: "https://github.com/influxdata/influxdb-client-swift", from: "1.7.0"),
    ],
    targets: [
        .target(name: "MyModule", dependencies: [
          .product(name: "InfluxDBSwift", package: "influxdb-client-swift"),
          // or InfluxDBSwiftApis for management API
          .product(name: "InfluxDBSwiftApis", package: "influxdb-client-swift")
        ])
    ]
)

用法

重要提示:您应该在应用程序结束时调用 close() 来释放已分配的资源。

创建客户端

通过参数指定 urltoken

let client = InfluxDBClient(url: "https://:8086", token: "my-token")

...

client.close()

客户端选项

选项 描述 类型 默认值
bucket 写入的默认目标存储桶 String (字符串)
org 写入的默认组织存储桶 String (字符串)
precision body line-protocol 中 unix 时间戳的默认精度 TimestampPrecision (时间戳精度) ns
timeoutIntervalForRequest 等待其他数据时使用的超时时间间隔。 TimeInterval (时间间隔) 60 秒
timeoutIntervalForResource 资源请求允许花费的最长时间。 TimeInterval (时间间隔) 5 分钟
enableGzip 为 HTTP 请求启用 Gzip 压缩。 Bool (布尔值) false (假)
debugging 为 HTTP 请求/响应启用调试。 Bool (布尔值) false (假)
配置默认的 Bucket (存储桶), Organization (组织) 和 Precision (精度)
let options: InfluxDBClient.InfluxDBOptions = InfluxDBClient.InfluxDBOptions(
        bucket: "my-bucket",
        org: "my-org",
        precision: .ns)

let client = InfluxDBClient(url: "https://:8086", token: "my-token", options: options)

...

client.close()

InfluxDB 1.8 API 兼容性

client = InfluxDBClient(
        url: "https://:8086", 
        username: "user", 
        password: "pass",
        database: "my-db", 
        retentionPolicy: "autogen")

...

client.close()

写入

WriteApi 支持异步写入 InfluxDB 2.x。写入的结果可以通过 (response, error), Swift.ResultCombine 来处理。

数据可以写成

  1. 格式化为 InfluxDB 的 Line Protocol 的 String (字符串)
  2. Data Point (数据点) 结构
  3. 带有键的元组样式映射: measurement (测量), tags (标签), fields (字段) 和 time (时间)
  4. 以上项目的数组

以下示例演示了如何使用 Data Point 结构写入数据。 更多信息请参见文档和 示例

import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis

@main
struct WriteData: AsyncParsableCommand {
  @Option(name: .shortAndLong, help: "The name or id of the bucket destination.")
  private var bucket: String

  @Option(name: .shortAndLong, help: "The name or id of the organization destination.")
  private var org: String

  @Option(name: .shortAndLong, help: "Authentication token.")
  private var token: String

  @Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
  private var url: String
}

extension WriteData {
  mutating func run() async throws {
    //
    // Initialize Client with default Bucket and Organization
    //
    let client = InfluxDBClient(
            url: url,
            token: token,
            options: InfluxDBClient.InfluxDBOptions(bucket: bucket, org: org))

    //
    // Record defined as Data Point
    //
    let recordPoint = InfluxDBClient
            .Point("demo")
            .addTag(key: "type", value: "point")
            .addField(key: "value", value: .int(2))
    //
    // Record defined as Data Point with Timestamp
    //
    let recordPointDate = InfluxDBClient
            .Point("demo")
            .addTag(key: "type", value: "point-timestamp")
            .addField(key: "value", value: .int(2))
            .time(time: .date(Date()))

    try await client.makeWriteAPI().write(points: [recordPoint, recordPointDate])
    print("Written data:\n\n\([recordPoint, recordPointDate].map { "\t- \($0)" }.joined(separator: "\n"))")
    print("\nSuccess!")

    client.close()
  }
}

查询

通过 QueryApi 检索的结果可以格式化为

  1. FluxRecord 的惰性序列
  2. 原始查询响应作为 Data

查询到 FluxRecord

import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis

@main
struct QueryCpu: AsyncParsableCommand {
  @Option(name: .shortAndLong, help: "The name or id of the bucket destination.")
  private var bucket: String

  @Option(name: .shortAndLong, help: "The name or id of the organization destination.")
  private var org: String

  @Option(name: .shortAndLong, help: "Authentication token.")
  private var token: String

  @Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
  private var url: String
}

extension QueryCpu {
  mutating func run() async throws {
    //
    // Initialize Client with default Bucket and Organization
    //
    let client = InfluxDBClient(
            url: url,
            token: token,
            options: InfluxDBClient.InfluxDBOptions(bucket: bucket, org: org))

    // Flux query
    let query = """
                from(bucket: "\(self.bucket)")
                    |> range(start: -10m)
                    |> filter(fn: (r) => r["_measurement"] == "cpu")
                    |> filter(fn: (r) => r["cpu"] == "cpu-total")
                    |> filter(fn: (r) => r["_field"] == "usage_user" or r["_field"] == "usage_system")
                    |> last()
                """

    print("\nQuery to execute:\n\(query)\n")

    let records = try await client.queryAPI.query(query: query)

    print("Query results:")
    try records.forEach { print(" > \($0.values["_field"]!): \($0.values["_value"]!)") }

    client.close()
  }
}

查询到 Data

@main
struct QueryCpuData: AsyncParsableCommand {
  @Option(name: .shortAndLong, help: "The name or id of the bucket destination.")
  private var bucket: String

  @Option(name: .shortAndLong, help: "The name or id of the organization destination.")
  private var org: String

  @Option(name: .shortAndLong, help: "Authentication token.")
  private var token: String

  @Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
  private var url: String
}

extension QueryCpuData {
  mutating func run() async throws {
    //
    // Initialize Client with default Bucket and Organization
    //
    let client = InfluxDBClient(
            url: url,
            token: token,
            options: InfluxDBClient.InfluxDBOptions(bucket: bucket, org: org))

    // Flux query
    let query = """
                from(bucket: "\(self.bucket)")
                    |> range(start: -10m)
                    |> filter(fn: (r) => r["_measurement"] == "cpu")
                    |> filter(fn: (r) => r["cpu"] == "cpu-total")
                    |> filter(fn: (r) => r["_field"] == "usage_user" or r["_field"] == "usage_system")
                    |> last()
                """

    print("\nQuery to execute:\n\(query)\n")

    let response = try await client.queryAPI.queryRaw(query: query)

    let csv = String(decoding: response, as: UTF8.self)
    print("InfluxDB response: \(csv)")

    client.close()
  }
}

参数化查询

InfluxDB Cloud 支持 参数化查询, 允许您使用 InfluxDB API 动态更改查询中的值。 参数化查询使 Flux 查询更具可重用性,并且还可用于帮助防止注入攻击。

InfluxDB Cloud 将 params 对象作为名为 params 的 Flux 记录插入到 Flux 查询中。 使用点或括号表示法来访问 Flux 查询中 params 记录中的参数。 参数化的 Flux 查询仅支持 int(整数), float(浮点数)和 string(字符串)数据类型。 要将支持的数据类型转换为其他 Flux 基本数据类型,请使用 Flux 类型转换函数

参数化查询示例

⚠️参数化查询仅在 InfluxDB Cloud 中支持,目前 InfluxDB OSS 不支持。

import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis

@main
struct ParameterizedQuery: AsyncParsableCommand {
  @Option(name: .shortAndLong, help: "The bucket to query. The name or id of the bucket destination.")
  private var bucket: String

  @Option(name: .shortAndLong,
          help: "The organization executing the query. Takes either the `ID` or `Name` interchangeably.")
  private var org: String

  @Option(name: .shortAndLong, help: "Authentication token.")
  private var token: String

  @Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
  private var url: String
}

extension ParameterizedQuery {
  mutating func run() async throws {
    // Initialize Client with default Organization
    let client = InfluxDBClient(
            url: url,
            token: token,
            options: InfluxDBClient.InfluxDBOptions(bucket: bucket, org: org))

    for index in 1...3 {
      let point = InfluxDBClient
              .Point("demo")
              .addTag(key: "type", value: "point")
              .addField(key: "value", value: .int(index))
      try await client.makeWriteAPI().write(point: point)
    }

    // Flux query
    let query = """
                from(bucket: params.bucketParam)
                    |> range(start: -10m)
                    |> filter(fn: (r) => r["_measurement"] == params.measurement)
                """

    // Query parameters [String:String]
    let queryParams = ["bucketParam": "\(bucket)", "measurement": "demo"]

    print("\nQuery to execute:\n\n\(query)\n\n\(queryParams)")

    let records = try await client.queryAPI.query(query: query, params: queryParams)

    print("\nSuccess response...\n")

    try records.forEach { print(" > \($0.values["_field"]!): \($0.values["_value"]!)") }

    client.close()
  }
}

删除数据

DeleteAPI 支持从 InfluxDB 存储桶中删除 。 使用 DeletePredicateRequest 标识要删除的点。

import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis

@main
struct DeleteData: AsyncParsableCommand {
  @Option(name: .shortAndLong, help: "Specifies the bucket name to delete data from.")
  private var bucket: String

  @Option(name: .shortAndLong,
          help: "Specifies the organization name to delete data from.")
  private var org: String

  @Option(name: .shortAndLong, help: "Authentication token.")
  private var token: String

  @Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
  private var url: String

  @Option(name: .shortAndLong, help: "InfluxQL-like delete predicate statement.")
  private var predicate: String
}

extension DeleteData {
  mutating func run() async throws {
    // Initialize Client with default Organization
    let client = InfluxDBClient(
            url: url,
            token: token,
            options: InfluxDBClient.InfluxDBOptions(org: self.org))

    // Create DeletePredicateRequest
    let predicateRequest = DeletePredicateRequest(
            start: Date(timeIntervalSince1970: 0),
            stop: Date(),
            predicate: predicate)

    try await client.deleteAPI.delete(predicate: predicateRequest, bucket: bucket, org: org)

    print("\nDeleted data by predicate:\n\n\t\(predicateRequest)")

    // Print date after Delete
    try await queryData(client: client)

    client.close()
  }

  private func queryData(client: InfluxDBClient) async throws {
    let query = """
                from(bucket: "\(bucket)")
                    |> range(start: 0)
                    |> filter(fn: (r) => r["_measurement"] == "server")
                    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
                """

    let response = try await client.queryAPI.query(query: query)

    print("\nRemaining data after delete:\n")

    try response.forEach { record in
      let provider = record.values["provider"]!
      let production = record.values["production"]
      let app = record.values["app"]
      return print("\t\(provider),production=\(production!),app=\(app!)")
    }
  }
}

管理 API

客户端支持以下管理 API

API 文档
AuthorizationsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Authorizations
BucketsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Buckets
DBRPsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/DBRPs
HealthAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Health
PingAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Ping
LabelsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Labels
OrganizationsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Organizations
ReadyAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Ready
ScraperTargetsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/ScraperTargets
SecretsAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Secrets
SetupAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Tasks
SourcesAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Sources
TasksAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Tasks
UsersAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Users
VariablesAPI https://docs.influxdb.org.cn/influxdb/latest/api/#tag/Variables

以下示例演示了如何使用 InfluxDB 2.0 管理 API 创建新存储桶。 更多信息请参见文档和 示例

import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis

@main
struct CreateNewBucket: AsyncParsableCommand {
  @Option(name: .shortAndLong, help: "New bucket name.")
  private var name: String

  @Option(name: .shortAndLong, help: "Duration bucket will retain data.")
  private var retention: Int64 = 3600

  @Option(name: .shortAndLong, help: "Specifies the organization name.")
  private var org: String

  @Option(name: .shortAndLong, help: "Authentication token.")
  private var token: String

  @Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
  private var url: String
}

extension CreateNewBucket {
  mutating func run() async throws {
    // Initialize Client and API
    let client = InfluxDBClient(url: url, token: token)
    let api = InfluxDB2API(client: client)

    let orgId = (try await api.organizationsAPI.getOrgs(org: org)!).orgs?.first?.id

    // Bucket configuration
    let request = PostBucketRequest(
            orgID: orgId!,
            name: name,
            retentionRules: [RetentionRule(type: RetentionRule.ModelType.expire, everySeconds: retention)])

    // Create Bucket
    let bucket = try await api.bucketsAPI.postBuckets(postBucketRequest: request)!

    // Create Authorization with permission to read/write created bucket
    let bucketResource = Resource(
            type: Resource.ModelType.buckets,
            id: bucket.id,
            orgID: orgId
    )

    // Authorization configuration
    let authorizationRequest = AuthorizationPostRequest(
            description: "Authorization to read/write bucket: \(name)",
            orgID: orgId!,
            permissions: [
              Permission(action: Permission.Action.read, resource: bucketResource),
              Permission(action: Permission.Action.write, resource: bucketResource)
            ])

    // Create Authorization
    let authorization = try await api.authorizationsAPI.postAuthorizations(authorizationPostRequest: authorizationRequest)!

    print("The bucket: '\(bucket.name)' is successfully created.")
    print("The following token could be use to read/write:")
    print("\t\(authorization.token!)")

    client.close()
  }
}

高级用法

默认标签

有时,将相同的信息存储在每个测量中很有用,例如 hostname(主机名), location(位置), customer(客户)。 客户端能够使用静态值或环境变量作为标签值。

表达式

示例

client = InfluxDBClient(
        url: "https://:8086",
        token: "my-token",
        options: InfluxDBClient.InfluxDBOptions(bucket: "my-bucket", org: "my-org"))

let tuple: InfluxDBClient.Point.Tuple
        = (measurement: "mem", tags: ["tag": "a"], fields: ["value": .int(3)], time: nil)

let records: [Any] = [
        InfluxDBClient.Point("mining")
                .addTag(key: "sensor_state", value: "normal")
                .addField(key: "depth", value: .int(2)),
        tuple
]

let defaultTags = InfluxDBClient.PointSettings()
        .addDefaultTag(key: "customer", value: "California Miner")
        .addDefaultTag(key: "sensor_id", value: "${env.SENSOR_ID}")

try await client.makeWriteAPI(pointSettings: defaultTags).writeRecords(records: records)
print("Successfully written default tags")
示例输出
mining,customer=California\ Miner,sensor_id=123-456-789,sensor_state=normal depth=2i
mining,customer=California\ Miner,sensor_id=123-456-789,sensor_state=normal pressure=3i

代理和重定向

⚠️connectionProxyDictionary 不能在 Linux 上定义。 您必须设置 HTTPS_PROXYHTTP_PROXY 系统环境变量。

您可以通过 connectionProxyDictionary 选项将客户端配置为通过 HTTP 代理隧道传输请求

var connectionProxyDictionary = [AnyHashable: Any]()
connectionProxyDictionary[kCFNetworkProxiesHTTPEnable as String] = 1
connectionProxyDictionary[kCFNetworkProxiesHTTPProxy as String] = "localhost"
connectionProxyDictionary[kCFNetworkProxiesHTTPPort as String] = 3128

let options: InfluxDBClient.InfluxDBOptions = InfluxDBClient.InfluxDBOptions(
        bucket: "my-bucket",
        org: "my-org",
        precision: .ns,
        connectionProxyDictionary: connectionProxyDictionary)

client = InfluxDBClient(url: "https://:8086", token: "my-token", options: options)

更多信息请参见 - URLSessionConfiguration.connectionProxyDictionary, Global Proxy Settings Constants

重定向

客户端会自动跟踪 HTTP 重定向。 您可以通过 urlSessionDelegate 配置禁用重定向

class DisableRedirect: NSObject, URLSessionTaskDelegate {
    func urlSession(_ session: URLSession,
                    task: URLSessionTask,
                    willPerformHTTPRedirection response: HTTPURLResponse,
                    newRequest request: URLRequest,
                    completionHandler: @escaping (URLRequest?) -> Void) {
        completionHandler(nil)
    }
}

let options = InfluxDBClient.InfluxDBOptions(
        bucket: "my-bucket",
        org: "my-org",
        urlSessionDelegate: DisableRedirect())

client = InfluxDBClient(url: "https://:8086", token: "my-token", options: options)

更多信息请参见 - URLSessionDelegate

贡献

如果您想贡献代码,可以通过 GitHub 来完成,方法是 fork 该存储库并将 pull request 发送到 master 分支。

构建要求

构建源代码和测试目标

swift build --build-tests

运行测试

./Scripts/influxdb-restart.sh
swift test

检查代码覆盖率

./Scripts/influxdb-restart.sh
swift test --enable-code-coverage

您也可以使用 docker-cli 而无需安装 Swift SDK

make docker-cli
swift build

许可证

该客户端以开源形式提供,并遵守 MIT 许可证 条款。