Using Actors Correctly

As a learning exercise, I am trying to create a channel for intertask communications.

Here is my first attempt:

actor Channel <T> {
    private var store = [T?] ()
    
    func put (_ u: T?) {
        store.append (u)
    }
    
    func get () async -> (Bool, T?) {
        let v = store.isEmpty ? (true, nil) : (false, store.removeFirst())
        return v
    }
}

This works fine, but the consumer has to spin inefficiently until the channel has something to offer.

I thought if I suspend the calling task when the channel is empty, I will eliminate the need to spin.

Behold my second attempt using continuations! :sweat_smile:

actor Channel <T> {
    private var store = [T?] ()
    private var suspendeds = [UnsafeContinuation <Void, Never>] ()
    
    func put (_ u: T?) {
        store.append (u)
        // resume the first task if any are suspended
        if !suspendeds.isEmpty {
            suspendeds.removeFirst().resume()
        }
    }
    
    func get () async -> T? {
        repeat {
            // suspend the task if the store is empty
            if store.isEmpty {
                await withUnsafeContinuation {cont in
                    suspendeds.append (cont)
                }
            }
        }
        while store.isEmpty
                
        return store.removeFirst()
    }
    
    var suspendedsCount : Int {
        suspendeds.count
    }
}

But of course this does not work, and it bothers me that I can't explain why!

Thank you in advance for any insights.

EDIT: I am now using a slightly modified version of @MarSe32m's code below

But still seeing problems:

  • segmentation fault ;
  • malloc: pointer 0x600000f1c000 being freed was not allocated; and
  • consumers hanging.

Producer/consumer code is here:

Test Code
Main
@main
enum Main {
    static func main () async {
        await V3.run ()
    }
}
Driver
import Foundation

extension V3 {
    // number of producer and consumer tasks, each consumer has its own unique producer
    static let M = 32
    static let TaskIDRange = 1...M
    
    // number of packets emitted by a producer and expected to be received by a consumer
    static let N = 32
    static let PacketIDRange = 0..<N

    // sentinel for sinalling end of packets
    static let eot = -1
    
    typealias TaskID   = Int
    typealias PacketID = Int
    typealias Packet   = (TaskID, PacketID)
    
    static let recorder = Recorder (capacity:M)

    static func run () async {
        let chan = Channel <Packet> ()

        // Spawn the consumers...
        Task {
            for i in TaskIDRange {
                consumer (id: i, using: chan)
            }
        }
        
        if true {
            // Allow consumers to settle down in wait state
            try! await Task.sleep (nanoseconds: 5_000_000_000)
        }
        // Spawn the producers...
        Task {
            for i in TaskIDRange {
                producer (id: i, using: chan)
            }
        }
        
        // Wait for the consumers to finish
        var secs = 0
        var finisheds = 0
        repeat {
            try! await Task.sleep (nanoseconds: 5_000_000_000)
            secs += 5
            let count = await chan.count
            finisheds = await recorder.consumerFinisheds
            print ("\(secs) seconds elapsed - \(count) \(TaskIDRange.count) consumers \(finisheds) finished...")
        } while finisheds < TaskIDRange.count

        // Check records
        let count = await recorder.record.count
        assert (count == M)
        
        for i in TaskIDRange {
            guard let values = await recorder.record [i] else {
                fatalError()
            }
            
            let svalues = values.sorted()
            print (i, svalues)
            
            for i in PacketIDRange {
                assert (i == svalues [i])
            }
        }
    }

    static func consumer (id: TaskID, using chan: Channel <Packet>) {
        Task {
            var n = 0
            var receivedEot = false
            
            while n < N {
                // get a packet and check if ours
                guard let u = await chan.get() else {
                    // producer never sends a nil
                    fatalError()
                }
                                
                if u.0 != id {
                    // not ours, put it back
                    await chan.put (u)
                }
                else {
                    // ours
                    if u.1 == eot {
                        if n < N {
                            // not finished yet, put it back
                            await chan.put (u)
                        }
                        else  {
                            receivedEot = true
                            break
                        }
                    }
                    else {
                        // count packets
                        n += 1
                        await recorder.update (key: id, value: u.1)
                    }
                }
            }
            
            if !receivedEot {
                repeat {
                    guard let u = await chan.get() else {
                        // producer never sends a nil
                        fatalError()
                    }
                                    
                    if u.0 != id {
                        // not ours, put it back
                        await chan.put (u)
                    }
                    else {
                        assert (u.1 == eot)
                        receivedEot = true
                    }
                } while !receivedEot
            }

            print ("-->", id, n, "finished")

            await recorder.consumerFinished()
        }
    }
    
    static func producer (id: TaskID, using chan: Channel <Packet>) {
        Task {
            for i in PacketIDRange {
                await chan.put ((id, i))
            }
            // signal end of transmission
            await chan.put ((id, eot))
            await recorder.producerFinished()
        }
    }
}
Channel
import Foundation

enum V3 {    
    actor Channel<T> {
        private var store = [T?] ()
        private var suspendeds = [UnsafeContinuation <T?, Never>] ()
        
        func put (_ u: T?) {
            store.append(u)
            if !suspendeds.isEmpty {
                suspendeds.removeFirst().resume (returning: store.removeFirst ())
            }
        }
        
        func get () async -> T? {
            if store.isEmpty {
                return await withUnsafeContinuation {
                    suspendeds.append ($0)
                }
            }
            return store.removeFirst()
        }
        
        var count: (packets: Int, suspendeds: Int) {
            return (store.count, suspendeds.count)
        }
    }
}
Recorder
import Foundation

actor Recorder {
    private (set) var record = [Int: Array <Int>] ()
    private (set) var consumerFinisheds = 0
    private (set) var producerFinisheds = 0

    init (capacity: Int) {
        for i in 1...capacity {
            record [i] = Array <Int> ()
        }
    }
    
    func update (key: Int, value: Int) {
        assert (key > 0)
        record [key]?.append (value)
    }
    
    func consumerFinished () {
        consumerFinisheds += 1;
    }
    
    func producerFinished () {
        producerFinisheds += 1;
    }
}

How doesn't it work? I suppose your implementation isn't "fair" for the waiting tasks since some task might be waiting forever for an item while others yoink all of the sent items.

The way I would implement this to be a fair channel would be like this

actor Channel<T> {
    private var store = [T?] ()
    private var suspendeds = [UnsafeContinuation <T?, Never>] ()
    
    func put (_ u: T?) {
        if !suspendeds.isEmpty {
            suspendeds.removeFirst().resume(returning: u)
            return
        }
        
        store.append(u)
    }
    
    func get () async -> T? {
        if store.isEmpty {
            return await withUnsafeContinuation {
                suspendeds.append($0)
            }
        }
        return store.removeFirst()
    }
    
    var suspendedsCount : Int {
        suspendeds.count
    }
}

Thank you. That was my first attempt, but it turned out to be problematic: store.removeFirst () would crash frequently with the error:

Fatal error: Can't remove first element from an empty collection

The implementation in OP still crashes with the same error but much less frequently.

Another error get is a malloc error complaining about attempt to release unallocated memory.

All appear to be random random errors.

I don't see how the store.removeFirst() is ever reached on an empty array. The function returns immediately after it has awaited the value. Notice that I turned the continuations from UnsafeContinuation<Void, Never> into UnsafeContinuation<T?, Never>.

Oops! Sorry, I should have looked more carefully. Mine did not have a return.

Now, I am using your version.

But still no luck. It only works with one producer and one consumer.

With multiple producers and consumers, some consumers seem to hang.

Thank you again.

Just a thought – are you testing on a real device or on the iOS Simulator?

If I understand correctly, the iOS Simulator is currently limited to only use one thread for the async/await thread pool. I guess this may create unexpected behavior when testing multiple producers/consumers scenarios.

I'm pretty sure that all well-formed async code should work correctly on a runtime of any thread width. It would be nice if we could test this ourselves, but running in the sim is as close as we can get right now.

Right, limiting the pool to a single thread shouldn't be a problem unless you're blocking the thread in a way you shouldn't be. It's hard to say what's going wrong without seeing all the code in question.

Sebastian's code looks right to me.

Yes, I agree it “should” work with a single-width thread pool. And as far as I can tell, Sebastian's code looks right to me as well.

But since it reportedly did work with one producer/consumer, yet appears to fail with multiple producers/consumers. My first reaction is to look at the code generating the producers and consumers.

It’s at least possible to write code that unintentionally blocks the runtime somehow by mistake. But I haven’t seen the code, and can only speculate.

Sometimes running “concurrent” code in a single-threaded runtime can be confusing. Especially if you are expecting tasks running in parallel.

I have updated the OP with the test code. My apologies in advance if I have done something silly in there. :slight_smile:

Ah, yeah, a consumer that pulls something from the channel and then puts it back if it wasn’t what it wanted will certainly degrade to a busy-wait if producers aren’t actually running in parallel. As long as there’s something in the channel, the task will cycle through it endlessly looking for acceptable values that will never come.

The runtime could make a better effort to yield at suspension points when it realizes that a task has been hogging the CPU, but unless it really guaranteed fairness, this sort of system can always break. And really you just shouldn’t design systems like this; it would be extremely inefficient even if the scheduler bailed you out enough to guarantee progress.

1 Like

Thank you, John.

I actually did that deliberately to see how the concurrency system would fare.

I am totally puzzled, because after a reasonably short time I am observing near zero cpu usage. I have a hunch that some continuations seem to be disappearing before being resumed.

What I can't explain is that why it all works if I use this channel instead:

import Foundation

enum V1 {
    actor Channel <T> {
        private var store = [T?] ()
        
        func put (_ u: T?) {
            store.append (u)
        }
        
        func get () async -> (Bool, T?) {
            let v = store.isEmpty ? (true, nil) : (false, store.removeFirst())
            return v
        }
    }
}

But then a consumer has to spin in a loop waiting for a packet to arrive on the channel. That's why I started tinkering with continuations. :slightly_smiling_face:

Yeah, I don't have an immediate answer for why one works and the other doesn't; I'd need to actually pull up a debugger.

The code looks good, but I do see a crash when testing on macOS with this test code:

final class ChannelTests: XCTestCase {
    func testIt() async throws {
        let count = 5000
        
        let channel = Channel<Int>()
        
        await withTaskGroup(of: Void.self) { group in
                for i in 1...count {
                    group.addTask(priority: .random()) {
                        try! await Task.sleep(nanoseconds: UInt64.random(in: 0...10_000))
                        await channel.put(i)
                        print("-- put: \(i)")
                    }
                    group.addTask(priority: .random()) {
                        try! await Task.sleep(nanoseconds: UInt64.random(in: 0...10_000))
                        let got = await channel.get()
                        if let got = got {
                            print("-- got: \(got)")
                        } else {
                            print("-- got: nil")
                        }
                    }
                    await Task.yield()
                }
        }
    }
}

extension TaskPriority {
    static func random() -> TaskPriority {
        let r = Int.random(in: 0...4)
        
        switch r {
        case 0:
            return .background
        case 1:
            return .high
        case 2:
            return .low
        case 3:
            return .medium
        case 4:
            return .userInitiated
        default:
            return .medium
        }
    }
}

Crashes here

This code, however, doesn't use an actor and seems to work fine:

class Channel<T> {
    private var store = [T?] ()
    private var suspendeds = [UnsafeContinuation <T?, Never>] ()
    private let lock = NSLock()
    
    func put (_ u: T?) {
        lock.lock()
        if !suspendeds.isEmpty {
            suspendeds.removeFirst().resume(returning: u)
        } else {
            store.append(u)
        }
        lock.unlock()
    }
    
    func get () async -> T? {
        lock.lock()
        if store.isEmpty {
            return await withUnsafeContinuation {
                suspendeds.append($0)
                lock.unlock()
            }
        } else {
            let value = store.removeFirst()
            lock.unlock()
            return value
        }
    }
}

I would love to know where the issue is here with the actor version of the code.