Studies on Reactive Patterns with Swift Concurrency

I'd like to share some experimental work I've done exploring reactive programming patterns within the Swift Concurrency model. During my research, I identified certain gaps between traditional reactive patterns and the new async/await paradigm, and I've proposed experimental implementations to address these challenges.

My study repository is available at: GitHub - o-nnerb/swift-async-stream: Experimental implementations of AsyncSignal, ValueSubject, PassthroughSubject and AsyncExpectation

Context and Motivation

Swift Concurrency has brought significant improvements to asynchronous programming in Swift, yet I noticed that some common patterns from frameworks like Combine lack direct equivalents in this new paradigm. Specifically, I explored three areas I consider important for discussion:

  1. Synchronization mechanisms beyond actor for complex scenarios
  2. Implementation of reactive subjects (ValueSubject and PassthroughSubject) compatible with async sequences
  3. An asynchronous testing utility similar to XCTestExpectation, but compatible with the new Swift Testing framework

1. AsyncLock: Granular Synchronization in Asynchronous Contexts

While actor is the recommended solution for state isolation in Swift Concurrency, there are scenarios requiring more granular control over synchronization. Consider a method that must guarantee mutual exclusion throughout its entire execution—including asynchronous calls:

actor SomeManager {
    private(set) var value = 0
    
    func doSomething(_ block: @Sendable () async -> Void) async {
        value += 1
        await block()
        value += 2
    }
    
    func updateValue(_ value: Int) {
        self.value = value
    }
}

In this example, there's no guarantee of atomicity during doSomething—other operations may interleave during the suspension inside block(). To address this, I implemented an AsyncLock enabling explicit synchronization:

final class SomeManager: @unchecked Sendable {
    private let lock = AsyncLock()
    private var _value = 0
    
    var value: Int {
        get async {
            await lock.withLock { _value }
        }
    }
    
    func doSomething(_ block: @Sendable () async -> Void) async {
        await lock.withLock {
            _value += 1
            await block()
            _value += 2
        }
    }
    
    func updateValue(_ value: Int) async {
        await lock.withLock { _value = value }
    }
}

This approach ensures no other operation can modify _value while doSomething is running—even across async suspensions. I’d appreciate community input on whether this is a valid approach or if there are more idiomatic alternatives within Swift Concurrency.

2. Reactive Subjects for AsyncSequence

The swift-async-algorithms package provides excellent tools for working with async sequences, but I noticed it lacks direct equivalents to Combine’s Subject types. The provided AsyncChannel has important limitations:

  • Supports only a single consumer
  • Doesn’t retain the current value for new subscribers

In my experiments, I implemented ValueSubject and PassthroughSubject as async sequences supporting multiple consumers:

let valueSubject = ValueSubject(1)
Task {
    for await index in valueSubject {
        print("Task 1 received: \(index)")
    }
}

Task {
    for await index in valueSubject {
        print("Task 2 received: \(index)")
    }
}

// Both tasks receive all values
valueSubject.value = 5

A critical aspect of this implementation is managing reference cycles. To address this, I introduced AnyAsyncSequence, which "erases" subscriber references back to the producer:

let erasedSubject = valueSubject.eraseToAnyAsyncSequence()

While functional in my experimental scenarios, I recognize this approach may have performance or safety implications requiring deeper community review.

3. AsyncExpectation for Swift Testing

The new Swift Testing framework currently lacks a direct equivalent to XCTestExpectation, making it difficult to write tests that depend on asynchronous conditions. I implemented AsyncExpectation to fill this gap:

import SwiftAsyncTesting

func testAsyncOperation() async throws {
    let expectation = AsyncExpectation(description: "Async operation completes")
    
    Task {
        await someAsyncFunction()
        expectation.fulfill()
    }
    
    try await expectations([expectation], timeout: 10.0)
}

This implementation is compatible with both Swift Testing and XCTest, and provides features such as:

  • Support for multiple simultaneous expectations
  • Configurable timeout-based waiting
  • Support for inverted expectations (which fail when fulfilled)

Conclusion and Next Steps

These implementations emerged as an academic exercise to better understand the nuances of Swift Concurrency and how it interacts with traditional reactive patterns. I recognize that some solutions may not be fully optimized or idiomatic, and I’m actively seeking community feedback on the following questions:

  1. Are there better alternatives for granular synchronization beyond actor?
  2. How could AsyncChannel evolve to efficiently support multiple consumers?
  3. Is there an official plan to bring XCTestExpectation-like functionality to Swift Testing?

Thank you in advance for any contributions, constructive criticism, or suggestions for improvement. The repository is open for discussions and PRs, and I intend to continue refining these ideas based on community feedback.

2 Likes

This question has come up before e.g. here. The short answer is that, no, we don't plan to implement this exact shape of API in Swift Testing.

The somewhat longer answer is that most problems for which XCTestExpectation/XCTWaiter are a solution can be solved directly with Swift Concurrency. In the case of your example here, the solution is probably to just use await:

@Test func `async operation`() async {
  await someAsyncFunction()
}

Of course, you indicate that you want to make sure that someAsyncFunction() returns in a reasonable amount of time, and you've set a 10-second timer on your waiter. When you use XCTest, one test runs at a time in the current process, meaning that the kernel's scheduler and the Swift runtime's task scheduler aren't juggling your test alongside others. Swift Testing uses in-process parallelization instead which is much faster, but means that the schedulers do not guarantee that your test's task will get 10 seconds of execution time (or any other non-zero amount of execution time) before your 10 second timer fires[1].

All that said… we are experimenting with similar API that would cover cases like "check if this condition has been met after n seconds" that are hard to model with Swift Concurrency, but our experiments likely require changes in the Swift runtime to implement correctly. My colleague on the Testing Workgroup, @maartene, is working on a pitch in this area (that I won't spoil for him) that would eventually allow us to implement API of this approximate shape.


  1. We don't recommend fine-grained timeouts for your tests even when using XCTest, for what it's worth. Strictly speaking, you don't have any time guarantees with XCTest either, and you may find that while a 10 second timeout is sufficient on your M5 MacBook Pro, your colleague's 15-year-old Pentium 4 Linux system takes 30 seconds and your CI system takes 2 minutes. ā†©ļøŽ

9 Likes

Not sure this is the safest approach.

If I’m reading it correctly you want to ā€˜set up’ your object in some whatever invariant/state , invoke client code to use and potentially alter that state and ā€˜finalise’ the transaction with another state altering. So inside block: () … there will definetly be calls to get/set that state which will go trough the lock, which is already ā€˜locked’ at this point —> deadlock. You could solve it if you re-organize the lock calls, but at this point what is even the point of using an actor anyway?

It’s always strange to me to see classic synchronization methods used inside actors. Imo the whole point of using actors is that you’re ok with their ā€˜weak consistency’ model.

2 Likes