The following piece of code triggers the unusual concurrency-related compiler error, shown below the code.
extension ProcPipeline {
#if false
protocol Proc: Sendable {
var name: String {get}
func run () async
}
#else
protocol Proc {
var name: String {get}
func run () async
}
#endif
ProcPipeline.swift:30:23: error: value of non-Sendable type '@isolated(any) @async @callee_guaranteed @substituted <τ_0_0> () -> @out τ_0_0 for <()>' accessed after being transferred; later accesses could race
28 | let procs: [any Proc] = [producer, scaler, scaler2, scaler3, adder]
29 | for proc in procs.reversed() {
30 | group.addTask {
| |- error: value of non-Sendable type '@isolated(any) @async @callee_guaranteed @substituted <τ_0_0> () -> @out τ_0_0 for <()>' accessed after being transferred; later accesses could race
| `- note: access can happen concurrently
31 | await proc.run()
32 | }
make: *** [compile] Error 1
swiftc -parse-as-library -swift-version 6 -strict-concurrency=complete ProcPipeline.swift
Code
// ProcPipeline.swift
@main
enum ProcPipeline {
static func main () async throws {
let lv: [Int] = [1, 2, 3, 4, 5, 10]
for l in lv {
if l > 0 {
print ()
}
// let r = 8 * (l * (l + 1)) / 2
let r = 4 * (l * (l + 1))
print ("sl = \(l) A1 = \(r)...")
await run (seqLen: l, multiplier: 2, expect: r)
}
}
}
extension ProcPipeline {
static func run (seqLen: Int, multiplier m: Int, expect r: Int) async {
await withTaskGroup (of: Void.self) {group in
let producer = NumberPoducer (name: "P1", count: seqLen, output: Port <Int> ())
let scaler = NumberScaler (name: "S1", multiplier: m, input: producer.output, output: Port <Int> ())
let scaler2 = NumberScaler (name: "S2", multiplier: m, input: scaler.output, output: Port <Int> ())
let scaler3 = NumberScaler (name: "S3", multiplier: m, input: scaler2.output, output: Port <Int> ())
let adder = NumberAdder (name: "A1", expect: r, input: scaler3.output)
let procs: [any Proc] = [producer, scaler, scaler2, scaler3, adder]
for proc in procs.reversed() {
group.addTask {
await proc.run()
}
}
}
}
}
extension ProcPipeline {
struct Port <T>: Sendable where T:Sendable {
let stream : AsyncStream <T>
let cont : AsyncStream <T>.Continuation
init () {
let u = Self.makeAsyncStream ()
self.stream = u.0
self.cont = u.1
}
static func makeAsyncStream () -> (AsyncStream <T>, AsyncStream <T>.Continuation) {
var cont: AsyncStream <T>.Continuation?
let stream = AsyncStream (T.self) { continuation in
cont = continuation
}
return (stream, cont!)
}
}
}
extension ProcPipeline {
#if false
protocol Proc: Sendable {
var name: String {get}
func run () async
}
#else
protocol Proc {
var name: String {get}
func run () async
}
#endif
protocol Producer: Proc {
associatedtype OutputValue: Sendable
var output: Port <OutputValue> {get}
}
protocol Consumer: Proc {
associatedtype InputValue: Sendable
var input : Port <InputValue> {get}
}
protocol Filter: Proc {
associatedtype InputValue: Sendable
associatedtype OutputValue: Sendable
var input : Port <InputValue> {get}
var output: Port <OutputValue> {get}
}
}
extension ProcPipeline {
struct NumberPoducer: Producer {
typealias OutputValue = Int
let name : String
let count: Int
let output: Port <OutputValue>
func run() async {
assert (count >= 1)
let values = (1...count).map {$0}
print ("-->", name, values)
let cont = output.cont
for i in values {
cont.yield (i)
}
cont.finish()
}
}
}
extension ProcPipeline {
struct NumberScaler: Filter {
typealias InputValue = Int
typealias OutputValue = Int
let name : String
let multiplier: Int
let input : Port <OutputValue>
let output: Port <OutputValue>
func run() async {
let ins = input.stream
let cont = output.cont
for await u in ins {
let v = multiplier * u
print ("-->", name, u, v)
cont.yield (v)
}
cont.finish()
}
}
}
extension ProcPipeline {
struct NumberAdder: Consumer {
typealias InputValue = Int
let name : String
let expect: Int
let input: Port <InputValue>
func run() async {
var sum = 0
let ins = input.stream
for await u in ins {
sum += u
}
print ("-->", name, sum)
assert (sum == expect)
}
}
}
Q: What is <τ_0_0> () -> @out τ_0_0 for <()>?
Thank you.