Async Channels for Swift concurrency

Great idea, I'll give that a go. I was having similar thoughts that this may be an artifact of compiler optimization.

Makes sense. Would be easy to update the benchmarks to run against a couple of different types.

+1 I think it depends on what you are trying to compare against. In this case we are comparing to go and similar workloads. But I am in favor of more benchmark variations - I agree what we have today is rather limited.

Thanks for all the feedback and discussion. I'll continue to test and update the linked list branch

2 Likes

Ive updated the testing harness to run on multiple data types and ran it against a few simple cases (other than integers). The results are fairly consistent:

Test case Data type Result (seconds)
testSingleReaderManyWriter Int 0.684
testHighConcurrency Int 0.719
testHighConcurrencyBuffered Int 1.104
testSyncRw Int 1.508
testSelect Int 1.545
testSingleReaderManyWriter String 0.811
testHighConcurrency String 0.838
testHighConcurrencyBuffered String 1.137
testSyncRw String 1.706
testSelect String 1.562
testSingleReaderManyWriter ValueData 0.750
testHighConcurrency ValueData 0.772
testHighConcurrencyBuffered ValueData 1.159
testSyncRw ValueData 1.518
testSelect ValueData 1.560
testSingleReaderManyWriter RefData 0.871
testHighConcurrency RefData 0.926
testHighConcurrencyBuffered RefData 1.187
testSyncRw RefData 1.681
testSelect RefData 1.543

FWIW these are simple structs/classes without many fields. But it further helps demonstrate that the Int test are not way off or biased by compiler behaviors.

1 Like

Ive significantly revamped the benchmarks with better test cases and more coverage across the board. This should hopefully lead to more fair comparisons against changes and other libraries.

I gave this a go today. Surprisingly, It didn't seem to be an improvement over LinkedList. I captured the difference between the current benches and copy/pasted Deque here.

I'm not sure posting a branch with the copy pasted code in the repo is a great idea - but if anyone want's the patch I can DM it (or it can be relatively easily re-created). This is still probably worth some more investigation. I'll post more updates if I dig into this more.

1 Like

Ok, I’ve updated actors implementation to pass tests (not here yet). It has mostly remained the same (1.5s for base cases), but degraded for buffered versions by ~0.2s due to moving values between buffer and senders queues (wonder if that can be optimized by merging senders queue and buffer?).

Select is performing at 1.7s too, which I suspect can also be due to the buffers mostly. For select implementation I’ve just reused existing code from the repo, including AsyncSemaphore.

The overall code is actually close to the locks version, just with much less locks.

UPD.

Used senders' queue for buffer, got a performance boost on buffered concurrency version, but not on syncRw.

Code:

//  Channel.swift

infix operator <- :AssignmentPrecedence

@inline(__always)
@inlinable
public func <- <T>(c: Channel<T>, value: T) async {
    await c.send(value)
}

@inline(__always)
@inlinable
public func <- <T>(value: inout T?, chan: Channel<T>) async {
    await value = chan.receive()
}

prefix operator <-

@inline(__always)
@inlinable
@discardableResult
public prefix func <- <T>(chan: Channel<T>) async -> T? {
    return await chan.receive()
}

public actor Channel<T> where T: Sendable {
    private typealias Container = LinkedList

    private typealias Receiver = UnsafeContinuation<T?, Never>
    private var receivers = Container<Receiver>()

    private typealias Sender = (continuation: UnsafeContinuation<Void, Never>?, value: T)
    private var senders = Container<Sender>()

    private var sema: AsyncSemaphore?

    private let capacity: Int
    private var buffer: Int = 0

    @usableFromInline
    internal var isClosed: Bool { closed }
    private var closed = false

    public init(capacity: Int = 0) {
        self.capacity = capacity
    }

    @usableFromInline
    internal func send(_ value: T) async {
        if _send(value) {
            return
        }
        await withUnsafeContinuation { continuation in
            senders.append((continuation, value))
            sema?.signal()
        }
    }

    @usableFromInline
    internal func sendOrListen(_ sm: AsyncSemaphore, value: T) -> Bool {
        if _send(value) {
            return true
        }
        sema = sm
        return false
    }

    @inline(__always)
    private func _send(_ value: T) -> Bool {
        if closed {
            fatalError("Cannot send on a closed channel")
        }
        if !receivers.isEmpty {
            let receiver = receivers.popFirst()!
            receiver.resume(returning: value)
            return true
        }
        if buffer < capacity {
            buffer += 1
            senders.append((nil, value))
            return true
        }
        return false
    }

    @usableFromInline
    internal func receive() async -> T? {
        if let value = _receive() {
            return value
        }
        if closed {
            return nil
        }
        return await withUnsafeContinuation { continuation in
            receivers.append(continuation)
        }
    }

    @usableFromInline
    internal func receiveOrListen(_ sm: AsyncSemaphore) -> T? {
        if let value = _receive() {
            return value
        }
        if closed {
            return nil
        }
        sema = sm
        return nil
    }

    @inline(__always)
    private func _receive() -> T? {
        if senders.isEmpty {
            return nil
        }
        let (continuation, value) = senders.popFirst()!
        if continuation == nil {
            buffer -= 1
        }
        continuation?.resume()
        return value
    }

    public func close() {
        closed = true
        while let receiver = receivers.popFirst() {
            receiver.resume(returning: nil)
        }
    }
}

extension Channel: AsyncSequence, AsyncIteratorProtocol {
    public typealias Element = T
    
    public nonisolated func makeAsyncIterator() -> Channel { self }
    
    public func next() async -> T? { await receive() }
}

//  Select.swift

import os
import Foundation

public struct SelectHandler {
    internal let inner: any SelectProtocol
}

internal protocol SelectProtocol {
    func handle(_ sm: AsyncSemaphore) async -> Bool
}

internal struct RxHandler<T>: SelectProtocol where T: Sendable {
    let chan: Channel<T>
    let outFunc: (T?) async -> ()
       
    func handle(_ sm: AsyncSemaphore) async -> Bool {
        if let val = await chan.receiveOrListen(sm) {
            await outFunc(val)
            return true
        }
        if await chan.isClosed {
            await outFunc(nil)
            return true
        }
        return false
    }
}

internal struct NoneHandler: SelectProtocol {
    let handler: () async -> ()

    func handle(_ sm: AsyncSemaphore) async -> Bool {
        await handler()
        return true
    }
}

internal struct TxHandler<T>: SelectProtocol where T: Sendable {
    let chan: Channel<T>
    let val: T
    let onSend: () async -> ()
       
    func handle(_ sm: AsyncSemaphore) async -> Bool {
        if await chan.sendOrListen(sm, value: val) {
            await onSend()
            return true
        }
        return false
    }
}

@resultBuilder
public struct SelectCollector {
    @inlinable
    @inline(__always)
    public static func buildBlock(_ handlers: SelectHandler...) -> [SelectHandler] { handlers }
}

@inlinable
@inline(__always)
public func select(@SelectCollector cases: () -> ([SelectHandler])) async {
    let handlers = cases()
    while true {
        let sm = AsyncSemaphore()
        if await handle(sm, handlers: handlers) {
            return
        }
        await sm.wait()
    }
}

@usableFromInline
internal func handle(_ sm: AsyncSemaphore, handlers: [SelectHandler]) async -> Bool {
    var defaultCase: NoneHandler?
    for handler in handlers.shuffled() {
        if let noneHandler = handler.inner as? NoneHandler {
            defaultCase = noneHandler
        } else if await handler.inner.handle(sm) {
            return true
        }
    }
    return await defaultCase?.handle(sm) ?? false
}

@inline(__always)
public func rx<T>(_ chan: Channel<T>, _ outFunc: @escaping (T?) async -> ()) -> SelectHandler {
    return SelectHandler(inner: RxHandler(chan: chan, outFunc: outFunc))
}

@inline(__always)
public func rx<T>(_ chan: Channel<T>, _ outFunc: @escaping () async -> ()) -> SelectHandler {
    return SelectHandler(inner: RxHandler(chan: chan, outFunc: { _ in await outFunc() }))
}

@inline(__always)
public func rx<T>(_ chan: Channel<T>) -> SelectHandler {
    return SelectHandler(inner: RxHandler(chan: chan, outFunc: { _ in }))
}

@inline(__always)
public func tx<T>(_ chan: Channel<T>, _ val: T) -> SelectHandler {
    return SelectHandler(inner: TxHandler(chan: chan, val: val, onSend: {}))
}

@inline(__always)
public func tx<T>(_ chan: Channel<T>, _ val: T, _ onSend: @escaping () async -> ()) -> SelectHandler {
    return SelectHandler(inner: TxHandler(chan: chan, val: val, onSend: onSend))
}

@inline(__always)
public func none(handler: @escaping () async -> ()) -> SelectHandler {
    return SelectHandler(inner: NoneHandler(handler: handler))
}

@usableFromInline
internal final class AsyncSemaphore: @unchecked Sendable {
    private var permits: Int = 0
    private var continuationQueue = LinkedList<UnsafeContinuation<Void, Never>>()
    private let lock = OSAllocatedUnfairLock()

    @usableFromInline
    init() {
    }

    @usableFromInline
    func wait() async {
        await withUnsafeContinuation { continuation in
            lock.lock()
            if permits > 0 {
                permits -= 1
                lock.unlock()
                continuation.resume()
            } else {
                continuationQueue.append(continuation)
                lock.unlock()
            }
        }
    }
    
    func signal() {
        lock.lock()
        if let next = continuationQueue.popFirst() {
            lock.unlock()
            next.resume()
        } else {
            permits += 1
            lock.unlock()
        }
    }
}

//  LinkedList.swift

internal struct LinkedList<T> {
    private final class Node {
        var value: T
        var next: Node?

        init(value: T) {
            self.value = value
        }
    }

    private(set) var isEmpty: Bool = true
    private var head: Node?
    private var tail: Node?
    private(set) var count: Int = 0

    mutating func append(_ value: T) {
        let node = Node(value: value)
        if head == nil {
            head = node
            tail = node
        } else {
            tail?.next = node
            tail = node
        }
        count += 1
        isEmpty = false
    }
    
    mutating func popFirst() -> T? {
        let value = head?.value
        head = head?.next
        if head == nil {
            isEmpty = true
            tail = nil
        }
        count -= 1
        return value
    }
}

UPD 2. Measurements

hyperfine '.build/release/SwiftChannelsDemo 1 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 1 Int
  Time (mean ± σ):      1.483 s ±  0.007 s    [User: 1.536 s, System: 0.519 s]
  Range (min … max):    1.471 s …  1.494 s    10 runs

hyperfine '.build/release/SwiftChannelsDemo 2 Int' 
Benchmark 1: .build/release/SwiftChannelsDemo 2 Int
  Time (mean ± σ):      1.487 s ±  0.004 s    [User: 1.540 s, System: 0.524 s]
  Range (min … max):    1.482 s …  1.492 s    10 runs

hyperfine '.build/release/SwiftChannelsDemo 3 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 3 Int
  Time (mean ± σ):      1.437 s ±  0.005 s    [User: 1.499 s, System: 0.492 s]
  Range (min … max):    1.430 s …  1.448 s    10 runs

hyperfine '.build/release/SwiftChannelsDemo 4 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 4 Int
  Time (mean ± σ):      2.133 s ±  0.014 s    [User: 2.116 s, System: 0.001 s]
  Range (min … max):    2.123 s …  2.171 s    10 runs

hyperfine '.build/release/SwiftChannelsDemo 5 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 5 Int
  Time (mean ± σ):      1.674 s ±  0.008 s    [User: 1.869 s, System: 0.911 s]
  Range (min … max):    1.663 s …  1.684 s    10 runs
2 Likes

Fastest implementation of channels in Rust
https://github.com/fereidani/kanal

1 Like

btw, can we also try to measure taskgroup and memory consumption? :thinking:

As for TaskGroup, I've tried the following modification on one test:

func testSingleReaderManyWriter() async {
    let a = Channel<Int>()
    await withDiscardingTaskGroup { group in
        for _ in 0..<100 {
            group.addTask {
                for _ in 0..<10_000 {
                    await a <- 1
                }
            }
        }
        var sum = 0
        while sum < 1_000_000 {
            sum += await <-a
        }
    }
}

And this one got zero difference in running time, which was a bit surprising for me.

2 Likes

As of now, ˋsyncRw` test is the one that bothers me the most. It has the worst performance, which I think should be due to the executor hops which are more likely to happen 4 times per each iteration (at least in actor version) — hop to channel to send, back to generic, hop to channel to receive, back to generic. But I see no way to reduce it. In locks version I assume there should be no hops at all, since all the code is non-isolated, which now means I can be wrong about executor hops…

shouldn't while loop be outside taskGroup? :thinking:

Only if we put group inside task:

func testSingleReaderManyWriter() async {
    let a = Channel<Int>()
    Task.detached {
         await withDiscardingTaskGroup { group in
            for _ in 0..<100 {
                group.addTask {
                    for _ in 0..<10_000 {
                        await a <- 1
                    }
                }
            }
        }
    }
    var sum = 0
    while sum < 1_000_000 {
        sum += await <-a
    }
}

But in general it also makes no differences in running time. If we move out while-loop outside of group without introducing a Task, we would need buffered channel with 1 million elements to prevent blocking it forever.

1 Like

Not sure I understand the code now :sweat_smile:
Why not putting while loop inside group.addTask then if inside TaskGroup?

It makes a little sense I think, to wrap task group with a task in such way :slight_smile: Just an example.

I'm not sure if it makes any difference from the initial version?