AsyncConcurrentQueue

一个主要*遵循 FIFO 原则的队列系统,它利用了 Swift 的 async/await 结构化并发机制。 还包括 FutureTask,它允许你任意延迟/启动任务。

<style>.foootnote { font-size: 0.6em; }</style>

*主要:async/await 用于将任务添加到队列,因此,如果快速连续添加多个任务,任务的正确排序取决于 async/await 的机制。 但是,一旦任务被添加,启动**顺序是有保证的。

**启动:如果队列允许多个并发任务,那么在较长任务之后启动的较短任务可能会在较长的第一个任务之前完成。

用法

import AsyncConcurrentQueue

// AsyncConcurrentQueue
Task {
	// initialize a new queue
	let queue = AsyncConcurrentQueue()
	
	// set how many tasks may run concurrently
	queue.setMaximumConcurrentTasks(5)
	
	// feed it a bunch of tasks to perform.
	for i in 1...20 {
		let task = await queue.createTask {
			print("starting \(i)")
			try await Task.sleep(for: .seconds(Double.random(in: 0.5...2)))
			print("finishing \(i)")
			return i
		}

		Task {
			let value = try await task.value
			print("Got \(value) from \(i)")
		}
	}
}

// FutureTask
Task {
	// create future task. Note that it does not run right away.
	let futureTask = FutureTask {
		print("Waited until called")
	}

	// create normal task for comparison. Note that it runs almost immediately.
	Task {
		print("Ran immediately")
	}

	// create a separate future task. It doesn't run right away, but also has a cancellation handler.
	let canceller = FutureTask(
		operation: {
			print("Will never run")
		},
		onCancellation: {
			print("Cancelled before activation")
		})

	// wait half a second
	try await Task.sleep(for: .seconds(0.5))
	// cancel the `canceller` task. The cancellation handler will run.
	canceller.cancel()

	// wait another couple seconds
	try await Task.sleep(for: .seconds(2))
	
	// activate the `futureTask`, allowing it to proceed now.
	futureTask.activate()
}

关于 FIFO 的说明

当前实现在快速连续添加任务时,处理 FIFO 的效果不佳。 我未来的计划是添加同步方法来添加任务,接受异步闭包(类似于现在),但要改进内部机制,以便立即将任务添加到数组中。 这将加强顺序的保证。 然后,异步系统将逐个检索数组的第一个元素来创建异步任务。(至少这是计划。会有一些挑战,例如保持排队任务的泛型返回值,但这将是周一 Michael 要解决的问题。)