CombineGRPC

CombineGRPC 是一个为 Combine framework 集成 Swift gRPC 的库。

CombineGRPC 提供了两种功能,callhandle。使用 call 在客户端进行 gRPC 调用,并使用 handle 在服务器端处理传入的请求。该库为所有 RPC 风格提供了 callhandle 的版本。以下是每种风格的输入和输出类型。

RPC 风格 输入和输出类型
一元 请求 -> AnyPublisher<Response, RPCError>
服务器流式 请求 -> AnyPublisher<Response, RPCError>
客户端流式 AnyPublisher<Request, Error> -> AnyPublisher<Response, RPCError>
双向流式 AnyPublisher<Request, Error> -> AnyPublisher<Response, RPCError>

当你进行一元调用时,你需要提供一个请求消息,并获得一个响应发布者。该响应发布者将发布单个响应,或者因 RPCError 错误而失败。 类似地,如果你正在处理一元 RPC 调用,你需要提供一个处理程序,该处理程序接受一个请求参数并返回一个 AnyPublisher<Response, RPCError>

你可以遵循相同的直觉来理解其他 RPC 风格的类型。唯一的区别是,流式 RPC 的发布者可能会发布零个或多个消息,而不是一元响应发布者所期望的单个响应消息。

快速入门

让我们看一个快速示例。考虑以下简单 echo 服务的 protobuf 定义。该服务定义了一个双向 RPC。你向其发送消息流,它会将消息回显给你。

syntax = "proto3";

service EchoService {
  rpc SayItBack (stream EchoRequest) returns (stream EchoResponse);
}

message EchoRequest {
  string message = 1;
}

message EchoResponse {
  string message = 1;
}

服务器端

要实现服务器,你需要提供一个处理函数,该函数接受一个输入流 AnyPublisher<EchoRequest, Error> 并返回一个输出流 AnyPublisher<EchoResponse, RPCError>

import Foundation
import Combine
import CombineGRPC
import GRPC
import NIO

class EchoServiceProvider: EchoProvider {
  
  // Simple bidirectional RPC that echoes back each request message
  func sayItBack(context: StreamingResponseCallContext<EchoResponse>) -> EventLoopFuture<(StreamEvent<EchoRequest>) -> Void> {
    CombineGRPC.handle(context) { requests in
      requests
        .map { req in
          EchoResponse.with { $0.message = req.message }
        }
        .setFailureType(to: RPCError.self)
        .eraseToAnyPublisher()
    }
  }
}

启动服务器。这与使用 Swift gRPC 的过程相同。

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
  try! eventLoopGroup.syncShutdownGracefully()
}

// Start the gRPC server and wait until it shuts down.
_ = try Server
  .insecure(group: eventLoopGroup)
  .withServiceProviders([EchoServiceProvider()])
  .bind(host: "localhost", port: 8080)
  .flatMap { $0.onClose }
  .wait()

客户端

现在让我们设置客户端。同样,这与你使用 Swift gRPC 时会经历的过程相同。

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let channel = ClientConnection
  .insecure(group: eventLoopGroup)
  .connect(host: "localhost", port: 8080)
let echoClient = EchoServiceNIOClient(channel: channel)

要调用服务,请创建一个 GRPCExecutor 并使用其 call 方法。 你需要为其提供一个请求流 AnyPublisher<EchoRequest, Error>,你将从服务器获得一个响应流 AnyPublisher<EchoResponse, RPCError>

let requests = repeatElement(EchoRequest.with { $0.message = "hello"}, count: 10)
let requestStream: AnyPublisher<EchoRequest, Error> =
  Publishers.Sequence(sequence: requests).eraseToAnyPublisher()
let grpc = GRPCExecutor()

grpc.call(echoClient.sayItBack)(requestStream)
  .filter { $0.message == "hello" }
  .count()
  .sink(receiveValue: { count in
    assert(count == 10)
  })

就是这样!你已经设置了服务器和客户端之间的双向流式传输。EchoServiceNIOClient 的方法 sayItBack 由 Swift gRPC 生成。请注意,call 是柯里化的。你可以使用偏函数应用预先选择 RPC 调用。

let sayItBack = grpc.call(echoClient.sayItBack)

sayItBack(requestStream).map { response in
  // ...
}

配置 RPC 调用

GRPCExecutor 允许你为你的 RPC 调用配置 CallOptions。你可以为 GRPCExecutor 的初始化器提供一个流 AnyPublisher<CallOptions, Never>,并且在进行调用时将使用最新的 CallOptions 值。

let timeoutOptions = CallOptions(timeout: try! .seconds(5))
let grpc = GRPCExecutor(callOptions: Just(timeoutOptions).eraseToAnyPublisher())

重试策略

你还可以配置 GRPCExecutor 以通过指定 RetryPolicy 自动重试失败的调用。在以下示例中,我们重试状态为 .unauthenticated 的失败调用。我们使用 CallOptions 将 Bearer 令牌添加到授权标头,然后重试调用。

// Default CallOptions with no authentication
let callOptions = CurrentValueSubject<CallOptions, Never>(CallOptions())

let grpc = GRPCExecutor(
  callOptions: callOptions.eraseToAnyPublisher(),
  retry: .failedCall(
    upTo: 1,
    when: { error in
      error.status.code == .unauthenticated
    },
    delayUntilNext: { retryCount, error in  // Useful for implementing exponential backoff
      // Retry the call with authentication
      callOptions.send(CallOptions(customMetadata: HTTPHeaders([("authorization", "Bearer xxx")])))
      return Just(()).eraseToAnyPublisher()
    },
    didGiveUp: {
      print("Authenticated call failed.")
    }
  )
)

grpc.call(client.authenticatedRpc)(request)
  .map { response in
    // ...
  }

你可以想象做类似的事情,以便在 ID 令牌过期时无缝地重试调用。后端服务以状态 .unauthenticated 响应,你使用你的刷新令牌获取新的 ID 令牌,然后重试调用。

更多示例

查看 CombineGRPC 测试 以获取所有不同 RPC 调用和处理程序实现的示例。你可以在 这里 找到匹配的 protobuf。

后勤

从 Protobuf 生成 Swift 代码

要从你的 .proto 文件生成 Swift 代码,你需要首先安装 protoc Protocol Buffer 编译器。

brew install protobuf swift-protobuf grpc-swift

现在你已准备好从 protobuf 接口定义文件生成 Swift 代码。

让我们为 Swift 生成消息类型、gRPC 服务器和 gRPC 客户端。

protoc example_service.proto --swift_out=Generated/
protoc example_service.proto --grpc-swift_out=Generated/

你会看到 protoc 已经为我们创建了两个源文件。

ls Generated/
example_service.grpc.swift
example_service.pb.swift

将 CombineGRPC 添加到你的项目

你可以使用 Swift Package Manager 轻松地将 CombineGRPC 添加到你的项目。要将包依赖项添加到你的 Package.swift

dependencies: [
  .package(url: "https://github.com/vyshane/grpc-swift-combine.git", from: "1.1.0"),
],

兼容性

由于此库与 Combine 集成,因此它仅在支持 Combine 的平台上工作。 这目前意味着以下最低版本

平台 最低支持版本
macOS 10.15 (Catalina)
iOS & iPadOS 13
tvOS 13
watchOS 6

功能状态

RPC 客户端调用

服务器端处理程序

端到端测试

贡献

为测试中使用的 protobuf 生成 Swift 源代码

make protobuf

然后你可以在 Xcode 中打开 Package.swift,构建并运行测试。