Ok, I’ve updated actors implementation to pass tests (not here yet). It has mostly remained the same (1.5s for base cases), but degraded for buffered versions by ~0.2s due to moving values between buffer and senders queues (wonder if that can be optimized by merging senders queue and buffer?).
Select is performing at 1.7s too, which I suspect can also be due to the buffers mostly. For select implementation I’ve just reused existing code from the repo, including AsyncSemaphore
.
The overall code is actually close to the locks version, just with much less locks.
UPD.
Used senders' queue for buffer, got a performance boost on buffered concurrency version, but not on syncRw
.
Code:
// Channel.swift
infix operator <- :AssignmentPrecedence
@inline(__always)
@inlinable
public func <- <T>(c: Channel<T>, value: T) async {
await c.send(value)
}
@inline(__always)
@inlinable
public func <- <T>(value: inout T?, chan: Channel<T>) async {
await value = chan.receive()
}
prefix operator <-
@inline(__always)
@inlinable
@discardableResult
public prefix func <- <T>(chan: Channel<T>) async -> T? {
return await chan.receive()
}
public actor Channel<T> where T: Sendable {
private typealias Container = LinkedList
private typealias Receiver = UnsafeContinuation<T?, Never>
private var receivers = Container<Receiver>()
private typealias Sender = (continuation: UnsafeContinuation<Void, Never>?, value: T)
private var senders = Container<Sender>()
private var sema: AsyncSemaphore?
private let capacity: Int
private var buffer: Int = 0
@usableFromInline
internal var isClosed: Bool { closed }
private var closed = false
public init(capacity: Int = 0) {
self.capacity = capacity
}
@usableFromInline
internal func send(_ value: T) async {
if _send(value) {
return
}
await withUnsafeContinuation { continuation in
senders.append((continuation, value))
sema?.signal()
}
}
@usableFromInline
internal func sendOrListen(_ sm: AsyncSemaphore, value: T) -> Bool {
if _send(value) {
return true
}
sema = sm
return false
}
@inline(__always)
private func _send(_ value: T) -> Bool {
if closed {
fatalError("Cannot send on a closed channel")
}
if !receivers.isEmpty {
let receiver = receivers.popFirst()!
receiver.resume(returning: value)
return true
}
if buffer < capacity {
buffer += 1
senders.append((nil, value))
return true
}
return false
}
@usableFromInline
internal func receive() async -> T? {
if let value = _receive() {
return value
}
if closed {
return nil
}
return await withUnsafeContinuation { continuation in
receivers.append(continuation)
}
}
@usableFromInline
internal func receiveOrListen(_ sm: AsyncSemaphore) -> T? {
if let value = _receive() {
return value
}
if closed {
return nil
}
sema = sm
return nil
}
@inline(__always)
private func _receive() -> T? {
if senders.isEmpty {
return nil
}
let (continuation, value) = senders.popFirst()!
if continuation == nil {
buffer -= 1
}
continuation?.resume()
return value
}
public func close() {
closed = true
while let receiver = receivers.popFirst() {
receiver.resume(returning: nil)
}
}
}
extension Channel: AsyncSequence, AsyncIteratorProtocol {
public typealias Element = T
public nonisolated func makeAsyncIterator() -> Channel { self }
public func next() async -> T? { await receive() }
}
// Select.swift
import os
import Foundation
public struct SelectHandler {
internal let inner: any SelectProtocol
}
internal protocol SelectProtocol {
func handle(_ sm: AsyncSemaphore) async -> Bool
}
internal struct RxHandler<T>: SelectProtocol where T: Sendable {
let chan: Channel<T>
let outFunc: (T?) async -> ()
func handle(_ sm: AsyncSemaphore) async -> Bool {
if let val = await chan.receiveOrListen(sm) {
await outFunc(val)
return true
}
if await chan.isClosed {
await outFunc(nil)
return true
}
return false
}
}
internal struct NoneHandler: SelectProtocol {
let handler: () async -> ()
func handle(_ sm: AsyncSemaphore) async -> Bool {
await handler()
return true
}
}
internal struct TxHandler<T>: SelectProtocol where T: Sendable {
let chan: Channel<T>
let val: T
let onSend: () async -> ()
func handle(_ sm: AsyncSemaphore) async -> Bool {
if await chan.sendOrListen(sm, value: val) {
await onSend()
return true
}
return false
}
}
@resultBuilder
public struct SelectCollector {
@inlinable
@inline(__always)
public static func buildBlock(_ handlers: SelectHandler...) -> [SelectHandler] { handlers }
}
@inlinable
@inline(__always)
public func select(@SelectCollector cases: () -> ([SelectHandler])) async {
let handlers = cases()
while true {
let sm = AsyncSemaphore()
if await handle(sm, handlers: handlers) {
return
}
await sm.wait()
}
}
@usableFromInline
internal func handle(_ sm: AsyncSemaphore, handlers: [SelectHandler]) async -> Bool {
var defaultCase: NoneHandler?
for handler in handlers.shuffled() {
if let noneHandler = handler.inner as? NoneHandler {
defaultCase = noneHandler
} else if await handler.inner.handle(sm) {
return true
}
}
return await defaultCase?.handle(sm) ?? false
}
@inline(__always)
public func rx<T>(_ chan: Channel<T>, _ outFunc: @escaping (T?) async -> ()) -> SelectHandler {
return SelectHandler(inner: RxHandler(chan: chan, outFunc: outFunc))
}
@inline(__always)
public func rx<T>(_ chan: Channel<T>, _ outFunc: @escaping () async -> ()) -> SelectHandler {
return SelectHandler(inner: RxHandler(chan: chan, outFunc: { _ in await outFunc() }))
}
@inline(__always)
public func rx<T>(_ chan: Channel<T>) -> SelectHandler {
return SelectHandler(inner: RxHandler(chan: chan, outFunc: { _ in }))
}
@inline(__always)
public func tx<T>(_ chan: Channel<T>, _ val: T) -> SelectHandler {
return SelectHandler(inner: TxHandler(chan: chan, val: val, onSend: {}))
}
@inline(__always)
public func tx<T>(_ chan: Channel<T>, _ val: T, _ onSend: @escaping () async -> ()) -> SelectHandler {
return SelectHandler(inner: TxHandler(chan: chan, val: val, onSend: onSend))
}
@inline(__always)
public func none(handler: @escaping () async -> ()) -> SelectHandler {
return SelectHandler(inner: NoneHandler(handler: handler))
}
@usableFromInline
internal final class AsyncSemaphore: @unchecked Sendable {
private var permits: Int = 0
private var continuationQueue = LinkedList<UnsafeContinuation<Void, Never>>()
private let lock = OSAllocatedUnfairLock()
@usableFromInline
init() {
}
@usableFromInline
func wait() async {
await withUnsafeContinuation { continuation in
lock.lock()
if permits > 0 {
permits -= 1
lock.unlock()
continuation.resume()
} else {
continuationQueue.append(continuation)
lock.unlock()
}
}
}
func signal() {
lock.lock()
if let next = continuationQueue.popFirst() {
lock.unlock()
next.resume()
} else {
permits += 1
lock.unlock()
}
}
}
// LinkedList.swift
internal struct LinkedList<T> {
private final class Node {
var value: T
var next: Node?
init(value: T) {
self.value = value
}
}
private(set) var isEmpty: Bool = true
private var head: Node?
private var tail: Node?
private(set) var count: Int = 0
mutating func append(_ value: T) {
let node = Node(value: value)
if head == nil {
head = node
tail = node
} else {
tail?.next = node
tail = node
}
count += 1
isEmpty = false
}
mutating func popFirst() -> T? {
let value = head?.value
head = head?.next
if head == nil {
isEmpty = true
tail = nil
}
count -= 1
return value
}
}
UPD 2. Measurements
hyperfine '.build/release/SwiftChannelsDemo 1 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 1 Int
Time (mean ± σ): 1.483 s ± 0.007 s [User: 1.536 s, System: 0.519 s]
Range (min … max): 1.471 s … 1.494 s 10 runs
hyperfine '.build/release/SwiftChannelsDemo 2 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 2 Int
Time (mean ± σ): 1.487 s ± 0.004 s [User: 1.540 s, System: 0.524 s]
Range (min … max): 1.482 s … 1.492 s 10 runs
hyperfine '.build/release/SwiftChannelsDemo 3 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 3 Int
Time (mean ± σ): 1.437 s ± 0.005 s [User: 1.499 s, System: 0.492 s]
Range (min … max): 1.430 s … 1.448 s 10 runs
hyperfine '.build/release/SwiftChannelsDemo 4 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 4 Int
Time (mean ± σ): 2.133 s ± 0.014 s [User: 2.116 s, System: 0.001 s]
Range (min … max): 2.123 s … 2.171 s 10 runs
hyperfine '.build/release/SwiftChannelsDemo 5 Int'
Benchmark 1: .build/release/SwiftChannelsDemo 5 Int
Time (mean ± σ): 1.674 s ± 0.008 s [User: 1.869 s, System: 0.911 s]
Range (min … max): 1.663 s … 1.684 s 10 runs