Swift Concurrency & Reactive programming

After watching all WWDC related sessions to concurrency and reading through all related proposals again, I kept on thinking about how this can affect the future of reactive programming in Swift. First of all, a big thank you to everyone involved in the Concurrency efforts so far. What you created is truly amazing and pushes Swift miles ahead!

Concurrency & Combine

Swift Concurrency provides two new types that are enabling reactive programming natively AsyncSequence and AsyncStream. The former allows you to use for await in to asynchronously wait for new values to be produced this can be compared to Combine's Publisher type. It also provides a lot of operators like map, compactMap, flatMap, prefix, etc. . On the other hand, AsyncStream allows to adapt code that yields multiple values over time. It already has backed-in back pressure support which enables it to theoretically do everything that Combine does as well. First, let's look into how to make the current Combine based code be accessible with the new Concurrency primatives.

One can easy wrap any Publisher into an AsyncStream with an extension like this:

extension Publisher {
    var asAsyncStream: AsyncThrowingStream<Output> {
        AsyncThrowingStream(Output.self) { continuation in
            let cancellable = sink { completion in
                switch completion {
                case .finished:
                    continuation.finish()
                case .failure(error):
                    continuation.finish(throwing: error)
                }
            } receiveValue: { output in
                continuation.yield(output)
            }
            
            continuation.onTermination = { _ in cancellable.cancel() }
        }
    }
}

Now I can replace my subscription code with for await in. Looking something like this:

class Foo {
    init() {
        Task {
            for await value in somePublisher.asAsyncStream {
                print(value)
            }
        }
    }
}

What seems missing?

The above already lays a very good foundation to natively enable reactive programming; however, there are some things that I think are missing to fully replace Combine.

Subjects

Subjects like CurrentValueSubject, PassthroughSubjects, Just, etc. are at the core of most of the Combine based code. With the above-mentioned AsyncStream one should be able to easily create subjects as well. Below is my stab at recreating a CurrentValueSubject.

class CurrentValueSubject<Output, Failure: Error>: AsyncSequence {
    typealias AsyncIterator = AsyncThrowingStream<Output>.Iterator
    typealias Element = Output
    
    private var stream: AsyncThrowingStream<Output>! = nil
    private var continuation: AsyncThrowingStream<Output>.Continuation! = nil

    init(_ value: Output) {
        stream = AsyncThrowingStream(maxBufferedElements: 1) { continuation in
            self.continuation = continuation
            continuation.yield(value)
        }
    }

    func yield(_ input: Output) {
        continuation.yield(input)
    }

    func finish(_ throwing: Failure?) {
        continuation.finish(throwing: throwing)
    }
  
    func makeAsyncIterator() -> AsyncThrowingStream<Output>.Iterator {
     return stream.makeAsyncIterator()
   }
}

Scheduling

An important part of Combine is the notion of time which is abstracted behind the Scheduler and its associated types. This allows to implement operators such as debounce, throttle, etc. . At the moment, Concurrency is lacking this notion of time (there is a very simplistic Task.sleep() method though). However, there is already work being done with laying the foundation of custom excutors.

Mixing elements from multiple AsyncSequences

Combine provides operators such as merge or combineLatest, they are powerful tools that allow to setup complex reactive chains.

Wishes

A common pattern between Combine and Concurrency based code is the handling of AnyCancellable or Task.Handle<Void, Never>. In Combine, one most store the AnyCancellable otherwise the subscription is cancelled right away. On the other hand, the task handles are not using reference semantics to automatically cancel; however, in most cases, a reference needs to be kept around to cancel the task at the right moment. This is especially important for infinite sequences. Let's look at a quick example

class Foo {
    let stream: AsyncStream<Int>
    init(stream: AsyncStream<Int>) {
        self.stream = stream
        
        async {
            for await number in stream {
                // retain cycle
                self.foo(number)
            }
        }
    }
    
    func foo(_ number: Int) {}
}

The above code introduces a retain cycle in the for await in loop since it uses self. There are two ways out of this. Either by using weak self or by storing the task handle in some local variable and canceling it at the right point in time. As far as I can see, this is also how I think SwiftUIs task modifier works. It stores the handle and cancels it when the view disappears.

My wish/dream here would be that we can make this pattern easier without it being baked into frameworks. I don't know if this is even possible to achieve in the compiler or not. But a syntax like this would be truly amazing.

class Foo {
    let stream: AsyncStream<Int>
    init(stream: AsyncStream<Int>) {
        self.stream = stream
        
        weakAsync {
             for await number in stream {
                self.foo(number)
            }
        }
    }
    
    func foo(_ number: Int) {}
}

The pattern to await on some infinite sequence is super common as far as I can see from the projects I worked on, I think it would be great if we could somehow achieve to make this easier without the weak/strong dance. I would love to hear from the compiler folks if something like this is even possible.

Summary

The newly introduced things with Concurrency allow us to write similar code as we can with Combine. There are some operators and basic functionality missing to replace everything; however, the foundation is set. From my point of view, the only thing that is stopping Concurrency to replace Combine is the time it takes to fill in the necessary gaps.

What do you all think about Concurrency and the future of reactive programming in Swift?

32 Likes

Some great ideas here!

In addition to these, I think it would be nice to move Combine's Cancellable protocol and AnyCancellable type into the standard library. Tash.Handle could then conform to it and be easily stored in a Set of heterogeneous AnyCancellables, along with Combine subscriptions, or just managed more efficiently if you need to store several Task.Handles with different generic types.

It would also be nice to have a property wrapper like @Published and a protocol like ObservableObject in the standard library. Being able to observe several properties on an object and receive async notifications when any of those properties on the object change is a valuable pattern in many contexts.

2 Likes

Some poorly thought through ideas in no particular order to add:

Publishers and operators that take closures like Map, FlatMap, Just, Future, Deferred etc all could be marked as async. I guess that these markings aren’t backwards compatible with earlier SDKs.

It’d be great to get a publisher from a Task so we could use all the operators available to transform values computed by the task.

Conversely, it’d be great to have a Task Publisher whose tasks lifetime is owned and managed by the subscription.

I think moving these two protocols into the standard library is not easily possible. However, I also don't really know if we need them. The equivalent to AnyCancellable in Concurrency is just a Task.Handle<Void, Never> which can be treated similarly. The only slight inconvenience is when it comes to storing both types in a set like you mentioned. Thinking about the long-term possibilities here, I would assume that Concurrency is somewhat replacing Combine, therefore, the AnyCancellable would only be needed in the transition period. So IMO we should handle this on our own in our code by declaring something like CustomCancellable & CustomAnyCancellable and conforming both types to the former. As soon as, the whole code base is only using Concurrency then one can drop these two types.

In regards, to @Published and ObservableObject fully agreed. I think this goes also along the lines that subjects or more generally speaking ways to kick-off an AsyncStream are missing.

Creating a publisher from a Task should be easily possible same goes for the Task Publisher that's lifetime is managed by the subscription. What is going to be interesting here, is if we have to provide that type ourselves for the time-being or if the Combine framework is getting these conveniences until GM. Guess we need to be filling feedbacks here :slight_smile: