Ok, I’ve updated actors implementation to pass tests (not here yet). It has mostly remained the same (1.5s for base cases), but degraded for buffered versions by ~0.2s due to moving values between buffer and senders queues (wonder if that can be optimized by merging senders queue and buffer?).
Select is performing at 1.7s too, which I suspect can also be due to the buffers mostly. For select implementation I’ve just reused existing code from the repo, including AsyncSemaphore.
The overall code is actually close to the locks version, just with much less locks.
UPD.
Used senders' queue for buffer, got a performance boost on buffered concurrency version, but not on syncRw.
Code:
// Channel.swift
infix operator <- :AssignmentPrecedence
@inline(__always)
@inlinable
public func <- <T>(c: Channel<T>, value: T) async {
await c.send(value)
}
@inline(__always)
@inlinable
public func <- <T>(value: inout T?, chan: Channel<T>) async {
await value = chan.receive()
}
prefix operator <-
@inline(__always)
@inlinable
@discardableResult
public prefix func <- <T>(chan: Channel<T>) async -> T? {
return await chan.receive()
}
public actor Channel<T> where T: Sendable {
private typealias Container = LinkedList
private typealias Receiver = UnsafeContinuation<T?, Never>
private var receivers = Container<Receiver>()
private typealias Sender = (continuation: UnsafeContinuation<Void, Never>?, value: T)
private var senders = Container<Sender>()
private var sema: AsyncSemaphore?
private let capacity: Int
private var buffer: Int = 0
@usableFromInline
internal var isClosed: Bool { closed }
private var closed = false
public init(capacity: Int = 0) {
self.capacity = capacity
}
@usableFromInline
internal func send(_ value: T) async {
if _send(value) {
return
}
await withUnsafeContinuation { continuation in
senders.append((continuation, value))
sema?.signal()
}
}
@usableFromInline
internal func sendOrListen(_ sm: AsyncSemaphore, value: T) -> Bool {
if _send(value) {
return true
}
sema = sm
return false
}
@inline(__always)
private func _send(_ value: T) -> Bool {
if closed {
fatalError("Cannot send on a closed channel")
}
if !receivers.isEmpty {
let receiver = receivers.popFirst()!
receiver.resume(returning: value)
return true
}
if buffer < capacity {
buffer += 1
senders.append((nil, value))
return true
}
return false
}
@usableFromInline
internal func receive() async -> T? {
if let value = _receive() {
return value
}
if closed {
return nil
}
return await withUnsafeContinuation { continuation in
receivers.append(continuation)
}
}
@usableFromInline
internal func receiveOrListen(_ sm: AsyncSemaphore) -> T? {
if let value = _receive() {
return value
}
if closed {
return nil
}
sema = sm
return nil
}
@inline(__always)
private func _receive() -> T? {
if senders.isEmpty {
return nil
}
let (continuation, value) = senders.popFirst()!
if continuation == nil {
buffer -= 1
}
continuation?.resume()
return value
}
public func close() {
closed = true
while let receiver = receivers.popFirst() {
receiver.resume(returning: nil)
}
}
}
extension Channel: AsyncSequence, AsyncIteratorProtocol {
public typealias Element = T
public nonisolated func makeAsyncIterator() -> Channel { self }
public func next() async -> T? { await receive() }
}
// Select.swift
import os
import Foundation
public struct SelectHandler {
internal let inner: any SelectProtocol
}
internal protocol SelectProtocol {
func handle(_ sm: AsyncSemaphore) async -> Bool
}
internal struct RxHandler<T>: SelectProtocol where T: Sendable {
let chan: Channel<T>
let outFunc: (T?) async -> ()
func handle(_ sm: AsyncSemaphore) async -> Bool {
if let val = await chan.receiveOrListen(sm) {
await outFunc(val)
return true
}
if await chan.isClosed {
await outFunc(nil)
return true
}
return false
}
}
internal struct NoneHandler: SelectProtocol {
let handler: () async -> ()
func handle(_ sm: AsyncSemaphore) async -> Bool {
await handler()
return true
}
}
internal struct TxHandler<T>: SelectProtocol where T: Sendable {
let chan: Channel<T>
let val: T
let onSend: () async -> ()
func handle(_ sm: AsyncSemaphore) async -> Bool {
if await chan.sendOrListen(sm, value: val) {
await onSend()
return true
}
return false
}
}
@resultBuilder
public struct SelectCollector {
@inlinable
@inline(__always)
public static func buildBlock(_ handlers: SelectHandler...) -> [SelectHandler] { handlers }
}
@inlinable
@inline(__always)
public func select(@SelectCollector cases: () -> ([SelectHandler])) async {
let handlers = cases()
while true {
let sm = AsyncSemaphore()
if await handle(sm, handlers: handlers) {
return
}
await sm.wait()
}
}
@usableFromInline
internal func handle(_ sm: AsyncSemaphore, handlers: [SelectHandler]) async -> Bool {
var defaultCase: NoneHandler?
for handler in handlers.shuffled() {
if let noneHandler = handler.inner as? NoneHandler {
defaultCase = noneHandler
} else if await handler.inner.handle(sm) {
return true
}
}
return await defaultCase?.handle(sm) ?? false
}
@inline(__always)
public func rx<T>(_ chan: Channel<T>, _ outFunc: @escaping (T?) async -> ()) -> SelectHandler {
return SelectHandler(inner: RxHandler(chan: chan, outFunc: outFunc))
}
@inline(__always)
public func rx<T>(_ chan: Channel<T>, _ outFunc: @escaping () async -> ()) -> SelectHandler {
return SelectHandler(inner: RxHandler(chan: chan, outFunc: { _ in await outFunc() }))
}
@inline(__always)
public func rx<T>(_ chan: Channel<T>) -> SelectHandler {
return SelectHandler(inner: RxHandler(chan: chan, outFunc: { _ in }))
}
@inline(__always)
public func tx<T>(_ chan: Channel<T>, _ val: T) -> SelectHandler {
return SelectHandler(inner: TxHandler(chan: chan, val: val, onSend: {}))
}
@inline(__always)
public func tx<T>(_ chan: Channel<T>, _ val: T, _ onSend: @escaping () async -> ()) -> SelectHandler {
return SelectHandler(inner: TxHandler(chan: chan, val: val, onSend: onSend))
}
@inline(__always)
public func none(handler: @escaping () async -> ()) -> SelectHandler {
return SelectHandler(inner: NoneHandler(handler: handler))
}
@usableFromInline
internal final class AsyncSemaphore: @unchecked Sendable {
private var permits: Int = 0
private var continuationQueue = LinkedList<UnsafeContinuation<Void, Never>>()
private let lock = OSAllocatedUnfairLock()
@usableFromInline
init() {
}
@usableFromInline
func wait() async {
await withUnsafeContinuation { continuation in
lock.lock()
if permits > 0 {
permits -= 1
lock.unlock()
continuation.resume()
} else {
continuationQueue.append(continuation)
lock.unlock()
}
}
}
func signal() {
lock.lock()
if let next = continuationQueue.popFirst() {
lock.unlock()
next.resume()
} else {
permits += 1
lock.unlock()
}
}
}
// LinkedList.swift
internal struct LinkedList<T> {
private final class Node {
var value: T
var next: Node?
init(value: T) {
self.value = value
}
}
private(set) var isEmpty: Bool = true
private var head: Node?
private var tail: Node?
private(set) var count: Int = 0
mutating func append(_ value: T) {
let node = Node(value: value)
if head == nil {
head = node
tail = node
} else {
tail?.next = node
tail = node
}
count += 1
isEmpty = false
}
mutating func popFirst() -> T? {
let value = head?.value
head = head?.next
if head == nil {
isEmpty = true
tail = nil
}
count -= 1
return value
}
}
UPD 2. Measurements
hyperfine '.build/release/SwiftChannelsDemo 1 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 1 Int
Time (mean ± σ): 1.483 s ± 0.007 s [User: 1.536 s, System: 0.519 s]
Range (min … max): 1.471 s … 1.494 s 10 runs
hyperfine '.build/release/SwiftChannelsDemo 2 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 2 Int
Time (mean ± σ): 1.487 s ± 0.004 s [User: 1.540 s, System: 0.524 s]
Range (min … max): 1.482 s … 1.492 s 10 runs
hyperfine '.build/release/SwiftChannelsDemo 3 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 3 Int
Time (mean ± σ): 1.437 s ± 0.005 s [User: 1.499 s, System: 0.492 s]
Range (min … max): 1.430 s … 1.448 s 10 runs
hyperfine '.build/release/SwiftChannelsDemo 4 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 4 Int
Time (mean ± σ): 2.133 s ± 0.014 s [User: 2.116 s, System: 0.001 s]
Range (min … max): 2.123 s … 2.171 s 10 runs
hyperfine '.build/release/SwiftChannelsDemo 5 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 5 Int
Time (mean ± σ): 1.674 s ± 0.008 s [User: 1.869 s, System: 0.911 s]
Range (min … max): 1.663 s … 1.684 s 10 runs