该仓库包含 InfluxDB 2.x 的 Swift 客户端参考实现。
此部分包含指向客户端库文档的链接。
InfluxDB 2.x 客户端包含两个包
InfluxDBSwift
InfluxDBSwiftApis
InfluxDBSwift
之上此软件包需要 Swift 5 和 Xcode 12+。
将此行添加到您的 Package.swift
文件中
// swift-tools-version:5.3
import PackageDescription
let package = Package(
name: "MyPackage",
dependencies: [
.package(name: "influxdb-client-swift", url: "https://github.com/influxdata/influxdb-client-swift", from: "1.7.0"),
],
targets: [
.target(name: "MyModule", dependencies: [
.product(name: "InfluxDBSwift", package: "influxdb-client-swift"),
// or InfluxDBSwiftApis for management API
.product(name: "InfluxDBSwiftApis", package: "influxdb-client-swift")
])
]
)
重要提示:您应该在应用程序结束时调用
close()
以释放已分配的资源。
通过参数指定 url 和 token
let client = InfluxDBClient(url: "https://:8086", token: "my-token")
...
client.close()
选项 | 描述 | 类型 | 默认值 |
---|---|---|---|
bucket | 写入的默认目标存储桶 | 字符串 | 无 |
org | 写入的默认组织存储桶 | 字符串 | 无 |
precision | 正文行协议中 unix 时间戳的默认精度 | TimestampPrecision | ns |
timeoutIntervalForRequest | 等待其他数据时使用的超时时间间隔。 | TimeInterval | 60 秒 |
timeoutIntervalForResource | 资源请求允许花费的最长时间。 | TimeInterval | 5 分钟 |
enableGzip | 为 HTTP 请求启用 Gzip 压缩。 | Bool | false |
debugging | 启用 HTTP 请求/响应的调试。 | Bool | false |
let options: InfluxDBClient.InfluxDBOptions = InfluxDBClient.InfluxDBOptions(
bucket: "my-bucket",
org: "my-org",
precision: .ns)
let client = InfluxDBClient(url: "https://:8086", token: "my-token", options: options)
...
client.close()
client = InfluxDBClient(
url: "https://:8086",
username: "user",
password: "pass",
database: "my-db",
retentionPolicy: "autogen")
...
client.close()
WriteApi 支持将异步写入 InfluxDB 2.x。写入结果可以通过 (response, error)
、Swift.Result
或 Combine
处理。
数据可以写入为
String
measurement
、tags
、fields
和 time
的元组样式映射以下示例演示如何使用数据点结构写入数据。 更多信息请参考文档和 示例。
import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis
@main
struct WriteData: AsyncParsableCommand {
@Option(name: .shortAndLong, help: "The name or id of the bucket destination.")
private var bucket: String
@Option(name: .shortAndLong, help: "The name or id of the organization destination.")
private var org: String
@Option(name: .shortAndLong, help: "Authentication token.")
private var token: String
@Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
private var url: String
}
extension WriteData {
mutating func run() async throws {
//
// Initialize Client with default Bucket and Organization
//
let client = InfluxDBClient(
url: url,
token: token,
options: InfluxDBClient.InfluxDBOptions(bucket: bucket, org: org))
//
// Record defined as Data Point
//
let recordPoint = InfluxDBClient
.Point("demo")
.addTag(key: "type", value: "point")
.addField(key: "value", value: .int(2))
//
// Record defined as Data Point with Timestamp
//
let recordPointDate = InfluxDBClient
.Point("demo")
.addTag(key: "type", value: "point-timestamp")
.addField(key: "value", value: .int(2))
.time(time: .date(Date()))
try await client.makeWriteAPI().write(points: [recordPoint, recordPointDate])
print("Written data:\n\n\([recordPoint, recordPointDate].map { "\t- \($0)" }.joined(separator: "\n"))")
print("\nSuccess!")
client.close()
}
}
通过 QueryApi 检索的结果可以格式化为
Data
。import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis
@main
struct QueryCpu: AsyncParsableCommand {
@Option(name: .shortAndLong, help: "The name or id of the bucket destination.")
private var bucket: String
@Option(name: .shortAndLong, help: "The name or id of the organization destination.")
private var org: String
@Option(name: .shortAndLong, help: "Authentication token.")
private var token: String
@Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
private var url: String
}
extension QueryCpu {
mutating func run() async throws {
//
// Initialize Client with default Bucket and Organization
//
let client = InfluxDBClient(
url: url,
token: token,
options: InfluxDBClient.InfluxDBOptions(bucket: bucket, org: org))
// Flux query
let query = """
from(bucket: "\(self.bucket)")
|> range(start: -10m)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> filter(fn: (r) => r["cpu"] == "cpu-total")
|> filter(fn: (r) => r["_field"] == "usage_user" or r["_field"] == "usage_system")
|> last()
"""
print("\nQuery to execute:\n\(query)\n")
let records = try await client.queryAPI.query(query: query)
print("Query results:")
try records.forEach { print(" > \($0.values["_field"]!): \($0.values["_value"]!)") }
client.close()
}
}
@main
struct QueryCpuData: AsyncParsableCommand {
@Option(name: .shortAndLong, help: "The name or id of the bucket destination.")
private var bucket: String
@Option(name: .shortAndLong, help: "The name or id of the organization destination.")
private var org: String
@Option(name: .shortAndLong, help: "Authentication token.")
private var token: String
@Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
private var url: String
}
extension QueryCpuData {
mutating func run() async throws {
//
// Initialize Client with default Bucket and Organization
//
let client = InfluxDBClient(
url: url,
token: token,
options: InfluxDBClient.InfluxDBOptions(bucket: bucket, org: org))
// Flux query
let query = """
from(bucket: "\(self.bucket)")
|> range(start: -10m)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> filter(fn: (r) => r["cpu"] == "cpu-total")
|> filter(fn: (r) => r["_field"] == "usage_user" or r["_field"] == "usage_system")
|> last()
"""
print("\nQuery to execute:\n\(query)\n")
let response = try await client.queryAPI.queryRaw(query: query)
let csv = String(decoding: response, as: UTF8.self)
print("InfluxDB response: \(csv)")
client.close()
}
}
InfluxDB Cloud 支持 参数化查询,它允许您使用 InfluxDB API 动态更改查询中的值。 参数化查询使 Flux 查询更具可重用性,也可用于帮助防止注入攻击。
InfluxDB Cloud 将 params 对象作为名为 params
的 Flux 记录插入到 Flux 查询中。 使用点或括号表示法访问 Flux 查询中 params
记录中的参数。 参数化 Flux 查询仅支持 int
、float
和 string
数据类型。 要将支持的数据类型转换为其他 Flux 基本数据类型,请使用 Flux 类型转换函数。
参数化查询示例
⚠️ 参数化查询仅在 InfluxDB Cloud 中受支持,目前在 InfluxDB OSS 中尚不支持。
import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis
@main
struct ParameterizedQuery: AsyncParsableCommand {
@Option(name: .shortAndLong, help: "The bucket to query. The name or id of the bucket destination.")
private var bucket: String
@Option(name: .shortAndLong,
help: "The organization executing the query. Takes either the `ID` or `Name` interchangeably.")
private var org: String
@Option(name: .shortAndLong, help: "Authentication token.")
private var token: String
@Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
private var url: String
}
extension ParameterizedQuery {
mutating func run() async throws {
// Initialize Client with default Organization
let client = InfluxDBClient(
url: url,
token: token,
options: InfluxDBClient.InfluxDBOptions(bucket: bucket, org: org))
for index in 1...3 {
let point = InfluxDBClient
.Point("demo")
.addTag(key: "type", value: "point")
.addField(key: "value", value: .int(index))
try await client.makeWriteAPI().write(point: point)
}
// Flux query
let query = """
from(bucket: params.bucketParam)
|> range(start: -10m)
|> filter(fn: (r) => r["_measurement"] == params.measurement)
"""
// Query parameters [String:String]
let queryParams = ["bucketParam": "\(bucket)", "measurement": "demo"]
print("\nQuery to execute:\n\n\(query)\n\n\(queryParams)")
let records = try await client.queryAPI.query(query: query, params: queryParams)
print("\nSuccess response...\n")
try records.forEach { print(" > \($0.values["_field"]!): \($0.values["_value"]!)") }
client.close()
}
}
DeleteAPI 支持从 InfluxDB 存储桶中删除 点。 使用 DeletePredicateRequest 标识要删除的点。
import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis
@main
struct DeleteData: AsyncParsableCommand {
@Option(name: .shortAndLong, help: "Specifies the bucket name to delete data from.")
private var bucket: String
@Option(name: .shortAndLong,
help: "Specifies the organization name to delete data from.")
private var org: String
@Option(name: .shortAndLong, help: "Authentication token.")
private var token: String
@Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
private var url: String
@Option(name: .shortAndLong, help: "InfluxQL-like delete predicate statement.")
private var predicate: String
}
extension DeleteData {
mutating func run() async throws {
// Initialize Client with default Organization
let client = InfluxDBClient(
url: url,
token: token,
options: InfluxDBClient.InfluxDBOptions(org: self.org))
// Create DeletePredicateRequest
let predicateRequest = DeletePredicateRequest(
start: Date(timeIntervalSince1970: 0),
stop: Date(),
predicate: predicate)
try await client.deleteAPI.delete(predicate: predicateRequest, bucket: bucket, org: org)
print("\nDeleted data by predicate:\n\n\t\(predicateRequest)")
// Print date after Delete
try await queryData(client: client)
client.close()
}
private func queryData(client: InfluxDBClient) async throws {
let query = """
from(bucket: "\(bucket)")
|> range(start: 0)
|> filter(fn: (r) => r["_measurement"] == "server")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
"""
let response = try await client.queryAPI.query(query: query)
print("\nRemaining data after delete:\n")
try response.forEach { record in
let provider = record.values["provider"]!
let production = record.values["production"]
let app = record.values["app"]
return print("\t\(provider),production=\(production!),app=\(app!)")
}
}
}
客户端支持以下管理 API
以下示例演示如何使用 InfluxDB 2.0 管理 API 创建新存储桶。 更多信息请参考文档和 示例。
import ArgumentParser
import Foundation
import InfluxDBSwift
import InfluxDBSwiftApis
@main
struct CreateNewBucket: AsyncParsableCommand {
@Option(name: .shortAndLong, help: "New bucket name.")
private var name: String
@Option(name: .shortAndLong, help: "Duration bucket will retain data.")
private var retention: Int64 = 3600
@Option(name: .shortAndLong, help: "Specifies the organization name.")
private var org: String
@Option(name: .shortAndLong, help: "Authentication token.")
private var token: String
@Option(name: .shortAndLong, help: "HTTP address of InfluxDB.")
private var url: String
}
extension CreateNewBucket {
mutating func run() async throws {
// Initialize Client and API
let client = InfluxDBClient(url: url, token: token)
let api = InfluxDB2API(client: client)
let orgId = (try await api.organizationsAPI.getOrgs(org: org)!).orgs?.first?.id
// Bucket configuration
let request = PostBucketRequest(
orgID: orgId!,
name: name,
retentionRules: [RetentionRule(type: RetentionRule.ModelType.expire, everySeconds: retention)])
// Create Bucket
let bucket = try await api.bucketsAPI.postBuckets(postBucketRequest: request)!
// Create Authorization with permission to read/write created bucket
let bucketResource = Resource(
type: Resource.ModelType.buckets,
id: bucket.id,
orgID: orgId
)
// Authorization configuration
let authorizationRequest = AuthorizationPostRequest(
description: "Authorization to read/write bucket: \(name)",
orgID: orgId!,
permissions: [
Permission(action: Permission.Action.read, resource: bucketResource),
Permission(action: Permission.Action.write, resource: bucketResource)
])
// Create Authorization
let authorization = try await api.authorizationsAPI.postAuthorizations(authorizationPostRequest: authorizationRequest)!
print("The bucket: '\(bucket.name)' is successfully created.")
print("The following token could be use to read/write:")
print("\t\(authorization.token!)")
client.close()
}
}
有时,在每个测量中存储相同的信息很有用,例如 hostname
、location
、customer
。 客户端能够使用静态值或环境变量作为标签值。
表达式
California Miner
- 静态值${env.HOST_NAME}
- 环境变量client = InfluxDBClient(
url: "https://:8086",
token: "my-token",
options: InfluxDBClient.InfluxDBOptions(bucket: "my-bucket", org: "my-org"))
let tuple: InfluxDBClient.Point.Tuple
= (measurement: "mem", tags: ["tag": "a"], fields: ["value": .int(3)], time: nil)
let records: [Any] = [
InfluxDBClient.Point("mining")
.addTag(key: "sensor_state", value: "normal")
.addField(key: "depth", value: .int(2)),
tuple
]
let defaultTags = InfluxDBClient.PointSettings()
.addDefaultTag(key: "customer", value: "California Miner")
.addDefaultTag(key: "sensor_id", value: "${env.SENSOR_ID}")
try await client.makeWriteAPI(pointSettings: defaultTags).writeRecords(records: records)
print("Successfully written default tags")
mining,customer=California\ Miner,sensor_id=123-456-789,sensor_state=normal depth=2i
mining,customer=California\ Miner,sensor_id=123-456-789,sensor_state=normal pressure=3i
⚠️ connectionProxyDictionary
不能在 Linux 上定义。 您必须设置HTTPS_PROXY
或HTTP_PROXY
系统环境变量。
您可以通过 connectionProxyDictionary
选项将客户端配置为通过 HTTP 代理隧道传输请求
var connectionProxyDictionary = [AnyHashable: Any]()
connectionProxyDictionary[kCFNetworkProxiesHTTPEnable as String] = 1
connectionProxyDictionary[kCFNetworkProxiesHTTPProxy as String] = "localhost"
connectionProxyDictionary[kCFNetworkProxiesHTTPPort as String] = 3128
let options: InfluxDBClient.InfluxDBOptions = InfluxDBClient.InfluxDBOptions(
bucket: "my-bucket",
org: "my-org",
precision: .ns,
connectionProxyDictionary: connectionProxyDictionary)
client = InfluxDBClient(url: "https://:8086", token: "my-token", options: options)
更多信息请参考 - URLSessionConfiguration.connectionProxyDictionary, Global Proxy Settings Constants.
客户端自动跟踪 HTTP 重定向。 您可以通过 urlSessionDelegate
配置禁用重定向
class DisableRedirect: NSObject, URLSessionTaskDelegate {
func urlSession(_ session: URLSession,
task: URLSessionTask,
willPerformHTTPRedirection response: HTTPURLResponse,
newRequest request: URLRequest,
completionHandler: @escaping (URLRequest?) -> Void) {
completionHandler(nil)
}
}
let options = InfluxDBClient.InfluxDBOptions(
bucket: "my-bucket",
org: "my-org",
urlSessionDelegate: DisableRedirect())
client = InfluxDBClient(url: "https://:8086", token: "my-token", options: options)
更多信息请参考 - URLSessionDelegate.
如果您想贡献代码,可以通过 GitHub 完成,方法是 fork 存储库并将 pull request 发送到 master
分支。
构建要求
构建源代码和测试目标
swift build --build-tests
运行测试
./Scripts/influxdb-restart.sh
swift test
检查代码覆盖率
./Scripts/influxdb-restart.sh
swift test --enable-code-coverage
您也可以使用 docker-cli
而无需安装 Swift SDK
make docker-cli
swift build
该客户端根据 MIT 许可证 的条款作为开源提供。