MongoQueue

基于 MongoKitten 的 MongoDB 任务队列。

加入我们的 Discord 频道,提出任何问题和进行友好交流。

阅读文档 获取更多信息。

快速开始

使用 MongoKitten 常规连接到 MongoDB

let db = try await MongoDatabase.connect(to: "mongodb:///my_database")

为您的任务队列选择一个集合

let queue = MongoQueue(collection: db["tasks"])

通过遵循 ScheduledTask (以及隐式的 Codable) 协议来定义您的任务

struct RegistrationEmailTask: ScheduledTask {
    // This type has no context
    typealias ExecutionContext = Void

    // Computed property, required by ScheduledTask
    // This executed the task ASAP
    var taskExecutionDate: Date { Date() }
    
    // Stored properties represent the metadata needed to execute the task
    let recipientEmail: String
    let userId: ObjectId
    let fullName: String
    
    func execute(withContext context: ExecutionContext) async throws {
        // TODO: Send the email
        // Throwing an error triggers `onExecutionFailure`
    }
    
    func onExecutionFailure(failureContext: QueuedTaskFailure<ExecutionContext>) async throws -> TaskExecutionFailureAction {
        // Only attempt the job once. Failing to send the email cancels the job
        return .dequeue()
    }
}

在 MongoQueue 启动之前注册任务。

// Context is `Void`, so we pass in a void here
queue.registerTask(RegistrationEmailTask.self, context: ())

在后台启动队列 - 这在使用于 HTTP 应用程序内部或创建独立工作进程时非常有用。

try queue.runInBackground()

或者,在前台运行队列并阻塞直到队列停止。仅当您的队列工作进程仅作为工作进程运行时才使用此方法。例如,它不在一旁为用户提供服务。

try await queue.run()

在 MongoDB 中将任务加入队列

let task = RegistrationEmailTask(
  recipientEmail: "joannis@orlandos.nl",
  userId: ...,
  fullName: "Joannis Orlandos"
)
try await queue.queueTask(task)

完成了!只需等待它被执行。

测试

您可以使用以下命令逐个运行队列中当前所有活跃的任务

try await queue.runUntilEmpty()

这不会运行任何计划在将来的任务,并且一旦没有当前任务可用就会退出。

并行化

您可以使用以下代码设置 **每个任务队列实例** 的并行化数量

queue.setMaxParallelJobs(to: 6)

如果您有两个容器运行 MongoQueue 实例,那么它将能够同时运行 2 * 6 = 12 个任务。

与 Vapor 集成

要从您的 Vapor 请求中访问 queue,请添加以下代码片段

import Vapor
import MongoKitten
import MongoQueue

extension Request {
  public var queue: MongoQueue {
    return application.queue
  }
}

private struct MongoQueueStorageKey: StorageKey {
  typealias Value = MongoQueue
}

extension Application {
  public var queue: MongoQueue {
    get {
      storage[MongoQueueStorageKey.self]!
    }
    set {
      storage[MongoQueueStorageKey.self] = newValue
    }
  }

  public func initializeMongoQueue(withCollection collection: MongoCollection) {
    self.queue = MongoQueue(collection: collection)
  }
}

从这里,您可以像这样添加任务

app.post("tasks") { req in
  try await req.queue.queueTask(MyCustomTask())
  return HTTPStatus.created
}

高级用法

在深入了解更多(详细的)API 之前,这里概述了它的工作原理

当您将任务加入队列时,它用于派生排队任务的基本信息。这些要求的部分内容在协议中,但具有 MongoQueue 提供的默认值。

出队过程

每个任务都有一个 **类别**,这是一个唯一的字符串,用于在数据库中标识此任务的类型。当您在 MongoQueue 中注册您的任务时,类别用于了解如何在工作进程获取任务后解码和执行任务。

MongoQueue 定期在计时器上检查(如果可能,使用更改流以获得更好的响应性)是否有新任务准备好抓取。当它从 MongoDB 中拉取任务时,它会获取计划在此日期执行的 **最高优先级** 任务。

默认情况下,优先级为 .normal,但可以在任务的 var priority: TaskPriority { get } 中增加或降低紧急程度。

当任务从队列中取出时,其 status 将设置为 executing。这意味着其他作业现在无法执行此任务。在执行此操作时,任务模型的 maxTaskDuration 用作任务预期持续时间的指示。预期的截止日期通过将 maxTaskDuration 添加到当前日期来设置在 MongoDB 的模型中。

如果达到截止日期,其他工作进程可以(并且将会)将任务出队并将其放回 scheduled。这假设工作进程已崩溃。但是,在任务花费异常时间的情况下,工作进程将相应地更新截止日期。

由于此系统,建议将紧急且生命周期短的任务设置为较短的 maxTaskDuration。但是要考虑网络连接,因为将其设置得非常低(例如 5 秒)可能会导致在可以延长截止日期之前就已达到截止日期。

如果任务出队,您的任务模型会在 func onDequeueTask(withId taskId: ObjectId, withContext context: ExecutionContext, inQueue queue: MongoQueue) async throws 中收到通知。

同样,在执行失败时,您会在 func onExecutionFailure(failureContext: QueuedTaskFailure<ExecutionContext>) async throws -> TaskExecutionFailureAction 上收到调用,您可以在其中决定是否重新排队,以及是否应用最大尝试次数。