基于 MongoKitten 的 MongoDB 任务队列。
加入我们的 Discord 频道,提出任何问题和进行友好交流。
阅读文档 获取更多信息。
使用 MongoKitten 常规连接到 MongoDB
let db = try await MongoDatabase.connect(to: "mongodb:///my_database")
为您的任务队列选择一个集合
let queue = MongoQueue(collection: db["tasks"])
通过遵循 ScheduledTask
(以及隐式的 Codable
) 协议来定义您的任务
struct RegistrationEmailTask: ScheduledTask {
// This type has no context
typealias ExecutionContext = Void
// Computed property, required by ScheduledTask
// This executed the task ASAP
var taskExecutionDate: Date { Date() }
// Stored properties represent the metadata needed to execute the task
let recipientEmail: String
let userId: ObjectId
let fullName: String
func execute(withContext context: ExecutionContext) async throws {
// TODO: Send the email
// Throwing an error triggers `onExecutionFailure`
}
func onExecutionFailure(failureContext: QueuedTaskFailure<ExecutionContext>) async throws -> TaskExecutionFailureAction {
// Only attempt the job once. Failing to send the email cancels the job
return .dequeue()
}
}
在 MongoQueue 启动之前注册任务。
// Context is `Void`, so we pass in a void here
queue.registerTask(RegistrationEmailTask.self, context: ())
在后台启动队列 - 这在使用于 HTTP 应用程序内部或创建独立工作进程时非常有用。
try queue.runInBackground()
或者,在前台运行队列并阻塞直到队列停止。仅当您的队列工作进程仅作为工作进程运行时才使用此方法。例如,它不在一旁为用户提供服务。
try await queue.run()
在 MongoDB 中将任务加入队列
let task = RegistrationEmailTask(
recipientEmail: "joannis@orlandos.nl",
userId: ...,
fullName: "Joannis Orlandos"
)
try await queue.queueTask(task)
完成了!只需等待它被执行。
您可以使用以下命令逐个运行队列中当前所有活跃的任务
try await queue.runUntilEmpty()
这不会运行任何计划在将来的任务,并且一旦没有当前任务可用就会退出。
您可以使用以下代码设置 **每个任务队列实例** 的并行化数量
queue.setMaxParallelJobs(to: 6)
如果您有两个容器运行 MongoQueue 实例,那么它将能够同时运行 2 * 6 = 12
个任务。
要从您的 Vapor 请求中访问 queue
,请添加以下代码片段
import Vapor
import MongoKitten
import MongoQueue
extension Request {
public var queue: MongoQueue {
return application.queue
}
}
private struct MongoQueueStorageKey: StorageKey {
typealias Value = MongoQueue
}
extension Application {
public var queue: MongoQueue {
get {
storage[MongoQueueStorageKey.self]!
}
set {
storage[MongoQueueStorageKey.self] = newValue
}
}
public func initializeMongoQueue(withCollection collection: MongoCollection) {
self.queue = MongoQueue(collection: collection)
}
}
从这里,您可以像这样添加任务
app.post("tasks") { req in
try await req.queue.queueTask(MyCustomTask())
return HTTPStatus.created
}
在深入了解更多(详细的)API 之前,这里概述了它的工作原理
当您将任务加入队列时,它用于派生排队任务的基本信息。这些要求的部分内容在协议中,但具有 MongoQueue 提供的默认值。
每个任务都有一个 **类别**,这是一个唯一的字符串,用于在数据库中标识此任务的类型。当您在 MongoQueue 中注册您的任务时,类别用于了解如何在工作进程获取任务后解码和执行任务。
MongoQueue 定期在计时器上检查(如果可能,使用更改流以获得更好的响应性)是否有新任务准备好抓取。当它从 MongoDB 中拉取任务时,它会获取计划在此日期执行的 **最高优先级** 任务。
默认情况下,优先级为 .normal
,但可以在任务的 var priority: TaskPriority { get }
中增加或降低紧急程度。
当任务从队列中取出时,其 status
将设置为 executing
。这意味着其他作业现在无法执行此任务。在执行此操作时,任务模型的 maxTaskDuration
用作任务预期持续时间的指示。预期的截止日期通过将 maxTaskDuration
添加到当前日期来设置在 MongoDB 的模型中。
如果达到截止日期,其他工作进程可以(并且将会)将任务出队并将其放回 scheduled
。这假设工作进程已崩溃。但是,在任务花费异常时间的情况下,工作进程将相应地更新截止日期。
由于此系统,建议将紧急且生命周期短的任务设置为较短的 maxTaskDuration
。但是要考虑网络连接,因为将其设置得非常低(例如 5 秒)可能会导致在可以延长截止日期之前就已达到截止日期。
如果任务出队,您的任务模型会在 func onDequeueTask(withId taskId: ObjectId, withContext context: ExecutionContext, inQueue queue: MongoQueue) async throws
中收到通知。
同样,在执行失败时,您会在 func onExecutionFailure(failureContext: QueuedTaskFailure<ExecutionContext>) async throws -> TaskExecutionFailureAction
上收到调用,您可以在其中决定是否重新排队,以及是否应用最大尝试次数。