As a learning exercise, I am trying to create a channel for intertask communications.
Here is my first attempt:
actor Channel <T> {
private var store = [T?] ()
func put (_ u: T?) {
store.append (u)
}
func get () async -> (Bool, T?) {
let v = store.isEmpty ? (true, nil) : (false, store.removeFirst())
return v
}
}
This works fine, but the consumer has to spin inefficiently until the channel has something to offer.
I thought if I suspend the calling task when the channel is empty, I will eliminate the need to spin.
Behold my second attempt using continuations!
actor Channel <T> {
private var store = [T?] ()
private var suspendeds = [UnsafeContinuation <Void, Never>] ()
func put (_ u: T?) {
store.append (u)
// resume the first task if any are suspended
if !suspendeds.isEmpty {
suspendeds.removeFirst().resume()
}
}
func get () async -> T? {
repeat {
// suspend the task if the store is empty
if store.isEmpty {
await withUnsafeContinuation {cont in
suspendeds.append (cont)
}
}
}
while store.isEmpty
return store.removeFirst()
}
var suspendedsCount : Int {
suspendeds.count
}
}
But of course this does not work, and it bothers me that I can't explain why!
Thank you in advance for any insights.
EDIT: I am now using a slightly modified version of @MarSe32m's code below
But still seeing problems:
-
segmentation fault
; -
malloc: pointer 0x600000f1c000 being freed was not allocated
; and - consumers hanging.
Producer/consumer code is here:
Test Code
Main
@main
enum Main {
static func main () async {
await V3.run ()
}
}
Driver
import Foundation
extension V3 {
// number of producer and consumer tasks, each consumer has its own unique producer
static let M = 32
static let TaskIDRange = 1...M
// number of packets emitted by a producer and expected to be received by a consumer
static let N = 32
static let PacketIDRange = 0..<N
// sentinel for sinalling end of packets
static let eot = -1
typealias TaskID = Int
typealias PacketID = Int
typealias Packet = (TaskID, PacketID)
static let recorder = Recorder (capacity:M)
static func run () async {
let chan = Channel <Packet> ()
// Spawn the consumers...
Task {
for i in TaskIDRange {
consumer (id: i, using: chan)
}
}
if true {
// Allow consumers to settle down in wait state
try! await Task.sleep (nanoseconds: 5_000_000_000)
}
// Spawn the producers...
Task {
for i in TaskIDRange {
producer (id: i, using: chan)
}
}
// Wait for the consumers to finish
var secs = 0
var finisheds = 0
repeat {
try! await Task.sleep (nanoseconds: 5_000_000_000)
secs += 5
let count = await chan.count
finisheds = await recorder.consumerFinisheds
print ("\(secs) seconds elapsed - \(count) \(TaskIDRange.count) consumers \(finisheds) finished...")
} while finisheds < TaskIDRange.count
// Check records
let count = await recorder.record.count
assert (count == M)
for i in TaskIDRange {
guard let values = await recorder.record [i] else {
fatalError()
}
let svalues = values.sorted()
print (i, svalues)
for i in PacketIDRange {
assert (i == svalues [i])
}
}
}
static func consumer (id: TaskID, using chan: Channel <Packet>) {
Task {
var n = 0
var receivedEot = false
while n < N {
// get a packet and check if ours
guard let u = await chan.get() else {
// producer never sends a nil
fatalError()
}
if u.0 != id {
// not ours, put it back
await chan.put (u)
}
else {
// ours
if u.1 == eot {
if n < N {
// not finished yet, put it back
await chan.put (u)
}
else {
receivedEot = true
break
}
}
else {
// count packets
n += 1
await recorder.update (key: id, value: u.1)
}
}
}
if !receivedEot {
repeat {
guard let u = await chan.get() else {
// producer never sends a nil
fatalError()
}
if u.0 != id {
// not ours, put it back
await chan.put (u)
}
else {
assert (u.1 == eot)
receivedEot = true
}
} while !receivedEot
}
print ("-->", id, n, "finished")
await recorder.consumerFinished()
}
}
static func producer (id: TaskID, using chan: Channel <Packet>) {
Task {
for i in PacketIDRange {
await chan.put ((id, i))
}
// signal end of transmission
await chan.put ((id, eot))
await recorder.producerFinished()
}
}
}
Channel
import Foundation
enum V3 {
actor Channel<T> {
private var store = [T?] ()
private var suspendeds = [UnsafeContinuation <T?, Never>] ()
func put (_ u: T?) {
store.append(u)
if !suspendeds.isEmpty {
suspendeds.removeFirst().resume (returning: store.removeFirst ())
}
}
func get () async -> T? {
if store.isEmpty {
return await withUnsafeContinuation {
suspendeds.append ($0)
}
}
return store.removeFirst()
}
var count: (packets: Int, suspendeds: Int) {
return (store.count, suspendeds.count)
}
}
}
Recorder
import Foundation
actor Recorder {
private (set) var record = [Int: Array <Int>] ()
private (set) var consumerFinisheds = 0
private (set) var producerFinisheds = 0
init (capacity: Int) {
for i in 1...capacity {
record [i] = Array <Int> ()
}
}
func update (key: Int, value: Int) {
assert (key > 0)
record [key]?.append (value)
}
func consumerFinished () {
consumerFinisheds += 1;
}
func producerFinished () {
producerFinisheds += 1;
}
}