I've finally give it a shot with actors. I'm using simple linked list to store senders & receivers as unsafe continuations + as buffer as well, Array obviously performs badly, and Deque also is less efficient than that. Haven't implemented select so far. Adding inlines didn't make any time improvements for me, either that was my incorrect use of them or compiler actually optimises it well even without explicit annotations.
So far, measurements a bit down for the first 3 tests (with consistent 1.5s) and got an improvements for syncRw case in more than 1 second (still a lot off from the Go version). Tested on MacBook Pro M1.
hyperfine -w 3 '.build/release/SwiftChannelsDemo 1'
Benchmark 1: .build/release/SwiftChannelsDemo 1
Time (mean ± σ): 1.512 s ± 0.002 s [User: 1.531 s, System: 0.524 s]
Range (min … max): 1.508 s … 1.515 s 10 runs
hyperfine -w 3 '.build/release/SwiftChannelsDemo 2'
Benchmark 1: .build/release/SwiftChannelsDemo 2
Time (mean ± σ): 1.524 s ± 0.005 s [User: 1.541 s, System: 0.532 s]
Range (min … max): 1.519 s … 1.536 s 10 runs
hyperfine -w 3 '.build/release/SwiftChannelsDemo 3'
Benchmark 1: .build/release/SwiftChannelsDemo 3
Time (mean ± σ): 1.524 s ± 0.005 s [User: 1.540 s, System: 0.530 s]
Range (min … max): 1.519 s … 1.534 s 10 runs
hyperfine -w 3 '.build/release/SwiftChannelsDemo 4'
Benchmark 1: .build/release/SwiftChannelsDemo 4
Time (mean ± σ): 1.891 s ± 0.030 s [User: 1.853 s, System: 0.005 s]
Range (min … max): 1.877 s … 1.976 s 10 runs
Still haven't tested how much pop/push in buffer takes time, but I suspect that the main reason that currently affects performance is suspension points. But actors still perform well on my opinion, especially if we think about the level of low-level optimisations made in Go implementation itself over years. As for channels idea itself, despite being fantastic model for concurrency in Go, I believe approach in Swift in general should differ from it, since it has completely different set of abstractions and tools.
The code:
// Channel.swift
infix operator <- :AssignmentPrecedence
public func <- <T>(c: Channel<T>, value: T) async {
await c.send(value)
}
public func <- <T>(value: inout T?, chan: Channel<T>) async {
await value = chan.receive()
}
prefix operator <-
@discardableResult public prefix func <- <T>(chan: Channel<T>) async -> T? {
return await chan.receive()
}
public actor Channel<T> where T: Sendable {
private typealias Receiver = UnsafeContinuation<T?, Never>
private var receivers = LinkedList<Receiver>()
private typealias Sender = (continuation: UnsafeContinuation<Void, Never>, value: T)
private var senders = LinkedList<Sender>()
private let capacity: Int
private var buffer: LinkedList<T> = LinkedList()
public init(capacity: Int = 0) {
self.capacity = capacity
}
public func send(_ value: T) async {
guard receivers.isEmpty else {
receivers.pop()?.resume(returning: value)
return
}
guard buffer.count < capacity else {
await withUnsafeContinuation { continuation in
senders.push((continuation, value))
}
return
}
buffer.push(value)
}
public func receive() async -> T? {
if !buffer.isEmpty {
return buffer.pop()
}
if !senders.isEmpty {
let sender = senders.pop()
sender?.continuation.resume()
return sender?.value
}
return await withUnsafeContinuation { continuation in
receivers.push(continuation)
}
}
}
// LinkedList.swift
internal struct LinkedList<T> {
private final class Node {
var value: T
var next: Node?
init(value: T) {
self.value = value
}
}
private(set) var isEmpty: Bool = true
private var head: Node?
private var tail: Node?
private(set) var count: Int = 0
mutating func push(_ value: T) {
let node = Node(value: value)
if head == nil {
head = node
tail = node
} else {
tail?.next = node
tail = node
}
count += 1
isEmpty = false
}
mutating func pop() -> T? {
let value = head?.value
head = head?.next
if head == nil {
isEmpty = true
tail = nil
}
count -= 1
return value
}
}
And running app is pretty much copy-paste from the benchmarks, just instead of using Swift to time, I've used hyperfine.
// App.swift
import Foundation
import SwiftChannels
// To run benchmark
// swift build -c release
// hyperfine -w 3 '.build/release/SwiftChannels <number>'
@main
struct App {
static func main() async {
guard let opt = CommandLine.arguments.last else {
return
}
let value = Int(opt)
switch value {
case 1: await testSingleReaderManyWriter()
case 2: await testHighConcurrency()
case 3: await testHighConcurrencyBuffered()
case 4: await testSyncRw()
default: break
}
}
}
func testSingleReaderManyWriter() async {
let a = Channel<Int>()
for _ in (0..<100) {
Task.detached {
for _ in (0..<10_000) {
await a <- 1
}
}
}
var sum = 0
while sum < 1_000_000 {
sum += (await <-a)!
}
}
func testHighConcurrency() async {
let a = Channel<Int>()
for _ in (0..<1000) {
Task {
for _ in (0..<1000) {
await a <- 1
}
}
}
var sum = 0
while sum < 1_000_000 {
sum += (await <-a)!
}
}
func testHighConcurrencyBuffered() async {
let a = Channel<Int>(capacity: 20)
for _ in (0..<1000) {
Task {
for _ in (0..<1000) {
await a <- 1
}
}
}
var sum = 0
while sum < 1_000_000 {
sum += (await <-a)!
}
}
func testSyncRw() async {
let a = Channel<Int>(capacity: 1)
for i in (0..<5_000_000) {
await a <- i
await <-a
}
}