这个库旨在帮助 Swift 在一个新的领域取得进展:集群式多节点分布式系统。
通过这个库,我们提供可重用的运行时无关的成员管理协议实现,这些实现可以应用于各种集群使用场景。
集群成员管理协议是分布式系统的一个关键组成部分,例如计算密集型集群、调度器、数据库、键值存储等等。通过发布这个包,我们旨在简化此类系统的构建,因为它们不再需要依赖外部服务来处理服务成员管理。我们也想邀请社区合作开发更多的成员管理协议。
在核心层面,成员管理协议需要回答问题 “谁是我的(活跃)对等节点?”。 在分布式系统中,由于延迟或丢失的消息、网络分区和无响应但仍然 “活跃” 的节点是司空见惯的,因此这项看似简单的任务实际上并非如此简单。 提供对这个问题的可预测、可靠的答案就是集群成员管理协议的作用。
在实现成员管理协议时,可以进行各种权衡,并且它仍然是一个有趣的研究和持续改进的领域。 因此,集群成员管理包的重点不在于单一实现,而是作为该领域各种分布式算法的协作空间。
为了更深入地讨论协议和此实现中的修改,我们建议阅读 SWIM API 文档 以及下面链接的相关论文。
可扩展的弱一致性感染风格进程组 Membership 算法(也称为 “SWIM”),以及 2018 年 Lifeguard:更准确的故障检测的本地健康感知 论文中记录的一些值得注意的协议扩展。
SWIM 是一种 gossip 协议,其中对等节点定期交换关于他们对其他节点状态的观察的信息,最终将信息传播给集群中的所有其他成员。 这种分布式算法非常能够抵抗任意的消息丢失、网络分区和类似的问题。
在高层次上,SWIM 的工作方式如下:
.ack
作为回复。 参见下图 A
如何最初探测 B
。payload
,它是关于消息发送者所知的其他对等节点的(部分)信息,以及它们的成员状态(.alive
、.suspect
等)。.ack
,则该对等节点被认为是仍然 .alive
的。 否则,目标对等节点可能已终止/崩溃,或者由于其他原因而无响应。.pingRequest
消息,然后这些节点直接 ping 该对等节点(如下图中探测对等节点 E)。.suspect
。.nack
(“否定确认”)消息,以便通知 ping 请求源节点中间节点确实收到了这些 .pingRequest
消息,但目标似乎没有响应。 我们使用此信息来调整本地健康乘数,这会影响超时时间的计算。 要了解有关此的更多信息,请参阅 API 文档和 Lifeguard 论文。上述机制不仅用作故障检测机制,还用作 gossip 机制,该机制携带有关集群已知成员的信息。 这样,成员最终会了解其对等节点的状态,即使没有预先列出所有成员也是如此。 然而,值得指出的是,这种成员管理视图是 弱一致性的,这意味着无法保证(或者无法在没有额外信息的情况下知道)所有成员在任何给定时间点都对成员管理具有完全相同的视图。 然而,它是更高级别的工具和系统在其之上构建更强保证的绝佳构建块。
一旦故障检测机制检测到无响应节点,最终它将被标记为 .dead,导致其不可撤销地从集群中删除。 我们的实现提供了一个可选的扩展,向可能的状态添加一个 .unreachable 状态,但是大多数用户会发现它没有必要,并且默认情况下已禁用它。 有关合法状态转换的详细信息和规则,请参阅 SWIM.Status 或下图。
Swift 集群成员管理实现协议的方式是提供协议的 “Instances
”。 例如,SWIM 实现封装在运行时无关的 SWIM.Instance
中,该实例需要由网络运行时和实例本身之间的一些粘合代码 “驱动” 或 “解释”。 我们将实现的这些粘合部分称为 “Shell
”,并且该库附带一个使用 SwiftNIO 的 DatagramChannel
实现的 SWIMNIOShell
,该通道通过 UDP 异步执行所有消息传递。 替代实现可以使用完全不同的传输方式,或者将 SWIM 消息搭载在一些其他现有的 gossip 系统上等等。
SWIM 实例还内置了对发出指标(使用 swift-metrics)的支持,并且可以通过传递 swift-log Logger
来配置以记录有关内部详细信息的详细信息。
该库的主要目的是在各种需要某种形式的进程内成员管理服务的实现之间共享 SWIM.Instance
实现。 项目的 README 中详细记录了实现自定义运行时的信息(https://github.com/apple/swift-cluster-membership/),因此如果您有兴趣通过某些不同的传输实现 SWIM,请查看那里。
实现新的传输方式可以归结为 “填空” 练习。
首先,必须使用目标传输方式来实现 Peer 协议(https://github.com/apple/swift-cluster-membership/blob/main/Sources/SWIM/Peer.swift)。
/// SWIM peer which can be initiated contact with, by sending ping or ping request messages.
public protocol SWIMPeer: SWIMAddressablePeer {
/// Perform a probe of this peer by sending a `ping` message.
///
/// <... more docs here - please refer to the API docs for the latest version ...>
func ping(
payload: SWIM.GossipPayload,
from origin: SWIMPingOriginPeer,
timeout: DispatchTimeInterval,
sequenceNumber: SWIM.SequenceNumber
) async throws -> SWIM.PingResponse
// ...
}
这通常意味着使用发送消息和在适用时调用适当的回调的能力来包装某些连接、通道或其他标识。
然后,在对等节点的接收端,必须实现接收这些消息并调用 SWIM.Instance
上定义的所有相应的 on<SomeMessage>(...)
回调(在 SWIMProtocol 下分组)。
下面列出了 SWIMProtocol 的一部分,以让您了解它。
public protocol SWIMProtocol {
/// MUST be invoked periodically, in intervals of `self.swim.dynamicLHMProtocolInterval`.
///
/// MUST NOT be scheduled using a "repeated" task/timer", as the interval is dynamic and may change as the algorithm proceeds.
/// Implementations should schedule each next tick by handling the returned directive's `scheduleNextTick` case,
/// which includes the appropriate delay to use for the next protocol tick.
///
/// This is the heart of the protocol, as each tick corresponds to a "protocol period" in which:
/// - suspect members are checked if they're overdue and should become `.unreachable` or `.dead`,
/// - decisions are made to `.ping` a random peer for fault detection,
/// - and some internal house keeping is performed.
///
/// Note: This means that effectively all decisions are made in interval sof protocol periods.
/// It would be possible to have a secondary periodic or more ad-hoc interval to speed up
/// some operations, however this is currently not implemented and the protocol follows the fairly
/// standard mode of simply carrying payloads in periodic ping messages.
///
/// - Returns: `SWIM.Instance.PeriodicPingTickDirective` which must be interpreted by a shell implementation
mutating func onPeriodicPingTick() -> [SWIM.Instance.PeriodicPingTickDirective]
mutating func onPing( ... ) -> [SWIM.Instance.PingDirective]
mutating func onPingRequest( ... ) -> [SWIM.Instance.PingRequestDirective]
mutating func onPingResponse( ... ) -> [SWIM.Instance.PingResponseDirective]
// ...
}
这些调用在内部执行所有 SWIM 协议特定的任务,并返回指令,这些指令是关于实现应如何响应消息的易于解释的 “命令”。 例如,在收到 .pingRequest
消息后,返回的指令可能会指示 shell 向某些节点发送 ping。 该指令准备了所有适当的目标、超时和其他信息,从而更易于简单地遵循其指令并正确实现调用,例如,像这样:
self.swim.onPingRequest(
target: target,
pingRequestOrigin: pingRequestOrigin,
payload: payload,
sequenceNumber: sequenceNumber
).forEach { directive in
switch directive {
case .gossipProcessed(let gossipDirective):
self.handleGossipPayloadProcessedDirective(gossipDirective)
case .sendPing(let target, let payload, let pingRequestOriginPeer, let pingRequestSequenceNumber, let timeout, let sequenceNumber):
self.sendPing(
to: target,
payload: payload,
pingRequestOrigin: pingRequestOriginPeer,
pingRequestSequenceNumber: pingRequestSequenceNumber,
timeout: timeout,
sequenceNumber: sequenceNumber
)
}
}
通常,这允许将所有棘手的 “何时执行什么” 封装在协议实例中,而 Shell 只需要遵循实现它们的指令即可。 实际的实现通常需要执行一些更复杂的并发和网络任务,例如等待一系列响应并以特定方式处理它们等等,但是协议的总体轮廓是由实例的指令编排的。
有关每个回调的详细文档、何时调用它们以及所有这些如何组合在一起,请参阅 API 文档。
该存储库包含一个 端到端示例 和一个名为 SWIMNIOExample 的示例实现,该实现利用 SWIM.Instance
来启用一个简单的基于 UDP 的对等节点监视系统。 这允许对等节点通过发送由 SwiftNIO 驱动的数据报,使用 SWIM 协议互相 gossip 并通知彼此节点故障。
📘
SWIMNIOExample
实现仅作为示例提供,并没有考虑到生产用途,但是经过一些努力,它肯定可以在某些用例中表现良好。 如果您有兴趣了解有关集群成员管理算法、可扩展性基准测试以及使用 SwiftNIO 本身的更多信息,这是一个很好的入门模块,并且一旦该模块足够成熟,我们可能会考虑使其不仅是一个示例,而且是基于 Swift NIO 的集群式应用程序的可重用组件。
以最简单的形式,将提供的 SWIM 实例和 NIO shell 组合在一起以构建简单的服务器,可以将提供的处理程序嵌入到典型的 NIO 通道管道中,如下所示:
let bootstrap = DatagramBootstrap(group: group)
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.channelInitializer { channel in
channel.pipeline
// first install the SWIM handler, which contains the SWIMNIOShell:
.addHandler(SWIMNIOHandler(settings: settings)).flatMap {
// then install some user handler, it will receive SWIM events:
channel.pipeline.addHandler(SWIMNIOExampleHandler())
}
}
bootstrap.bind(host: host, port: port)
然后,示例处理程序可以接收和处理 SWIM 集群成员管理更改事件。
final class SWIMNIOExampleHandler: ChannelInboundHandler {
public typealias InboundIn = SWIM.MemberStatusChangedEvent
let log = Logger(label: "SWIMNIOExampleHandler")
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let change: SWIM.MemberStatusChangedEvent = self.unwrapInboundIn(data)
self.log.info("Membership status changed: [\(change.member.node)] is now [\(change.status)]", metadata: [
"swim/member": "\(change.member.node)",
"swim/member/status": "\(change.status)",
])
}
}
如果您有兴趣贡献和完善 SWIMNIO 实现,请前往 issues 并选择一个任务或提出您自己的改进建议!
我们通常对使用类似的 “Instance” 风格促进其他成员管理实现的讨论和实现感兴趣。
如果您对这样的算法感兴趣,并且有一个您希望看到的已实现的协议,请随时通过 issues 或 Swift 论坛 与我们联系。
非常鼓励经验报告、反馈、改进意见和贡献! 我们期待您的来信。