Async Channels for Swift concurrency

Ok, I’ve updated actors implementation to pass tests (not here yet). It has mostly remained the same (1.5s for base cases), but degraded for buffered versions by ~0.2s due to moving values between buffer and senders queues (wonder if that can be optimized by merging senders queue and buffer?).

Select is performing at 1.7s too, which I suspect can also be due to the buffers mostly. For select implementation I’ve just reused existing code from the repo, including AsyncSemaphore.

The overall code is actually close to the locks version, just with much less locks.

UPD.

Used senders' queue for buffer, got a performance boost on buffered concurrency version, but not on syncRw.

Code:

//  Channel.swift

infix operator <- :AssignmentPrecedence

@inline(__always)
@inlinable
public func <- <T>(c: Channel<T>, value: T) async {
    await c.send(value)
}

@inline(__always)
@inlinable
public func <- <T>(value: inout T?, chan: Channel<T>) async {
    await value = chan.receive()
}

prefix operator <-

@inline(__always)
@inlinable
@discardableResult
public prefix func <- <T>(chan: Channel<T>) async -> T? {
    return await chan.receive()
}

public actor Channel<T> where T: Sendable {
    private typealias Container = LinkedList

    private typealias Receiver = UnsafeContinuation<T?, Never>
    private var receivers = Container<Receiver>()

    private typealias Sender = (continuation: UnsafeContinuation<Void, Never>?, value: T)
    private var senders = Container<Sender>()

    private var sema: AsyncSemaphore?

    private let capacity: Int
    private var buffer: Int = 0

    @usableFromInline
    internal var isClosed: Bool { closed }
    private var closed = false

    public init(capacity: Int = 0) {
        self.capacity = capacity
    }

    @usableFromInline
    internal func send(_ value: T) async {
        if _send(value) {
            return
        }
        await withUnsafeContinuation { continuation in
            senders.append((continuation, value))
            sema?.signal()
        }
    }

    @usableFromInline
    internal func sendOrListen(_ sm: AsyncSemaphore, value: T) -> Bool {
        if _send(value) {
            return true
        }
        sema = sm
        return false
    }

    @inline(__always)
    private func _send(_ value: T) -> Bool {
        if closed {
            fatalError("Cannot send on a closed channel")
        }
        if !receivers.isEmpty {
            let receiver = receivers.popFirst()!
            receiver.resume(returning: value)
            return true
        }
        if buffer < capacity {
            buffer += 1
            senders.append((nil, value))
            return true
        }
        return false
    }

    @usableFromInline
    internal func receive() async -> T? {
        if let value = _receive() {
            return value
        }
        if closed {
            return nil
        }
        return await withUnsafeContinuation { continuation in
            receivers.append(continuation)
        }
    }

    @usableFromInline
    internal func receiveOrListen(_ sm: AsyncSemaphore) -> T? {
        if let value = _receive() {
            return value
        }
        if closed {
            return nil
        }
        sema = sm
        return nil
    }

    @inline(__always)
    private func _receive() -> T? {
        if senders.isEmpty {
            return nil
        }
        let (continuation, value) = senders.popFirst()!
        if continuation == nil {
            buffer -= 1
        }
        continuation?.resume()
        return value
    }

    public func close() {
        closed = true
        while let receiver = receivers.popFirst() {
            receiver.resume(returning: nil)
        }
    }
}

extension Channel: AsyncSequence, AsyncIteratorProtocol {
    public typealias Element = T
    
    public nonisolated func makeAsyncIterator() -> Channel { self }
    
    public func next() async -> T? { await receive() }
}

//  Select.swift

import os
import Foundation

public struct SelectHandler {
    internal let inner: any SelectProtocol
}

internal protocol SelectProtocol {
    func handle(_ sm: AsyncSemaphore) async -> Bool
}

internal struct RxHandler<T>: SelectProtocol where T: Sendable {
    let chan: Channel<T>
    let outFunc: (T?) async -> ()
       
    func handle(_ sm: AsyncSemaphore) async -> Bool {
        if let val = await chan.receiveOrListen(sm) {
            await outFunc(val)
            return true
        }
        if await chan.isClosed {
            await outFunc(nil)
            return true
        }
        return false
    }
}

internal struct NoneHandler: SelectProtocol {
    let handler: () async -> ()

    func handle(_ sm: AsyncSemaphore) async -> Bool {
        await handler()
        return true
    }
}

internal struct TxHandler<T>: SelectProtocol where T: Sendable {
    let chan: Channel<T>
    let val: T
    let onSend: () async -> ()
       
    func handle(_ sm: AsyncSemaphore) async -> Bool {
        if await chan.sendOrListen(sm, value: val) {
            await onSend()
            return true
        }
        return false
    }
}

@resultBuilder
public struct SelectCollector {
    @inlinable
    @inline(__always)
    public static func buildBlock(_ handlers: SelectHandler...) -> [SelectHandler] { handlers }
}

@inlinable
@inline(__always)
public func select(@SelectCollector cases: () -> ([SelectHandler])) async {
    let handlers = cases()
    while true {
        let sm = AsyncSemaphore()
        if await handle(sm, handlers: handlers) {
            return
        }
        await sm.wait()
    }
}

@usableFromInline
internal func handle(_ sm: AsyncSemaphore, handlers: [SelectHandler]) async -> Bool {
    var defaultCase: NoneHandler?
    for handler in handlers.shuffled() {
        if let noneHandler = handler.inner as? NoneHandler {
            defaultCase = noneHandler
        } else if await handler.inner.handle(sm) {
            return true
        }
    }
    return await defaultCase?.handle(sm) ?? false
}

@inline(__always)
public func rx<T>(_ chan: Channel<T>, _ outFunc: @escaping (T?) async -> ()) -> SelectHandler {
    return SelectHandler(inner: RxHandler(chan: chan, outFunc: outFunc))
}

@inline(__always)
public func rx<T>(_ chan: Channel<T>, _ outFunc: @escaping () async -> ()) -> SelectHandler {
    return SelectHandler(inner: RxHandler(chan: chan, outFunc: { _ in await outFunc() }))
}

@inline(__always)
public func rx<T>(_ chan: Channel<T>) -> SelectHandler {
    return SelectHandler(inner: RxHandler(chan: chan, outFunc: { _ in }))
}

@inline(__always)
public func tx<T>(_ chan: Channel<T>, _ val: T) -> SelectHandler {
    return SelectHandler(inner: TxHandler(chan: chan, val: val, onSend: {}))
}

@inline(__always)
public func tx<T>(_ chan: Channel<T>, _ val: T, _ onSend: @escaping () async -> ()) -> SelectHandler {
    return SelectHandler(inner: TxHandler(chan: chan, val: val, onSend: onSend))
}

@inline(__always)
public func none(handler: @escaping () async -> ()) -> SelectHandler {
    return SelectHandler(inner: NoneHandler(handler: handler))
}

@usableFromInline
internal final class AsyncSemaphore: @unchecked Sendable {
    private var permits: Int = 0
    private var continuationQueue = LinkedList<UnsafeContinuation<Void, Never>>()
    private let lock = OSAllocatedUnfairLock()

    @usableFromInline
    init() {
    }

    @usableFromInline
    func wait() async {
        await withUnsafeContinuation { continuation in
            lock.lock()
            if permits > 0 {
                permits -= 1
                lock.unlock()
                continuation.resume()
            } else {
                continuationQueue.append(continuation)
                lock.unlock()
            }
        }
    }
    
    func signal() {
        lock.lock()
        if let next = continuationQueue.popFirst() {
            lock.unlock()
            next.resume()
        } else {
            permits += 1
            lock.unlock()
        }
    }
}

//  LinkedList.swift

internal struct LinkedList<T> {
    private final class Node {
        var value: T
        var next: Node?

        init(value: T) {
            self.value = value
        }
    }

    private(set) var isEmpty: Bool = true
    private var head: Node?
    private var tail: Node?
    private(set) var count: Int = 0

    mutating func append(_ value: T) {
        let node = Node(value: value)
        if head == nil {
            head = node
            tail = node
        } else {
            tail?.next = node
            tail = node
        }
        count += 1
        isEmpty = false
    }
    
    mutating func popFirst() -> T? {
        let value = head?.value
        head = head?.next
        if head == nil {
            isEmpty = true
            tail = nil
        }
        count -= 1
        return value
    }
}

UPD 2. Measurements

hyperfine '.build/release/SwiftChannelsDemo 1 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 1 Int
  Time (mean ± σ):      1.483 s ±  0.007 s    [User: 1.536 s, System: 0.519 s]
  Range (min … max):    1.471 s …  1.494 s    10 runs

hyperfine '.build/release/SwiftChannelsDemo 2 Int' 
Benchmark 1: .build/release/SwiftChannelsDemo 2 Int
  Time (mean ± σ):      1.487 s ±  0.004 s    [User: 1.540 s, System: 0.524 s]
  Range (min … max):    1.482 s …  1.492 s    10 runs

hyperfine '.build/release/SwiftChannelsDemo 3 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 3 Int
  Time (mean ± σ):      1.437 s ±  0.005 s    [User: 1.499 s, System: 0.492 s]
  Range (min … max):    1.430 s …  1.448 s    10 runs

hyperfine '.build/release/SwiftChannelsDemo 4 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 4 Int
  Time (mean ± σ):      2.133 s ±  0.014 s    [User: 2.116 s, System: 0.001 s]
  Range (min … max):    2.123 s …  2.171 s    10 runs

hyperfine '.build/release/SwiftChannelsDemo 5 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 5 Int
  Time (mean ± σ):      1.674 s ±  0.008 s    [User: 1.869 s, System: 0.911 s]
  Range (min … max):    1.663 s …  1.684 s    10 runs
2 Likes