After watching all WWDC related sessions to concurrency and reading through all related proposals again, I kept on thinking about how this can affect the future of reactive programming in Swift. First of all, a big thank you to everyone involved in the Concurrency efforts so far. What you created is truly amazing and pushes Swift miles ahead!
Concurrency & Combine
Swift Concurrency provides two new types that are enabling reactive programming natively AsyncSequence
and AsyncStream
. The former allows you to use for await in
to asynchronously wait for new values to be produced this can be compared to Combine's Publisher
type. It also provides a lot of operators like map
, compactMap
, flatMap
, prefix
, etc. . On the other hand, AsyncStream
allows to adapt code that yields multiple values over time. It already has backed-in back pressure support which enables it to theoretically do everything that Combine does as well. First, let's look into how to make the current Combine based code be accessible with the new Concurrency primatives.
One can easy wrap any Publisher
into an AsyncStream
with an extension like this:
extension Publisher {
var asAsyncStream: AsyncThrowingStream<Output> {
AsyncThrowingStream(Output.self) { continuation in
let cancellable = sink { completion in
switch completion {
case .finished:
continuation.finish()
case .failure(error):
continuation.finish(throwing: error)
}
} receiveValue: { output in
continuation.yield(output)
}
continuation.onTermination = { _ in cancellable.cancel() }
}
}
}
Now I can replace my subscription code with for await in
. Looking something like this:
class Foo {
init() {
Task {
for await value in somePublisher.asAsyncStream {
print(value)
}
}
}
}
What seems missing?
The above already lays a very good foundation to natively enable reactive programming; however, there are some things that I think are missing to fully replace Combine.
Subjects
Subjects like CurrentValueSubject
, PassthroughSubjects
, Just
, etc. are at the core of most of the Combine based code. With the above-mentioned AsyncStream
one should be able to easily create subjects as well. Below is my stab at recreating a CurrentValueSubject
.
class CurrentValueSubject<Output, Failure: Error>: AsyncSequence {
typealias AsyncIterator = AsyncThrowingStream<Output>.Iterator
typealias Element = Output
private var stream: AsyncThrowingStream<Output>! = nil
private var continuation: AsyncThrowingStream<Output>.Continuation! = nil
init(_ value: Output) {
stream = AsyncThrowingStream(maxBufferedElements: 1) { continuation in
self.continuation = continuation
continuation.yield(value)
}
}
func yield(_ input: Output) {
continuation.yield(input)
}
func finish(_ throwing: Failure?) {
continuation.finish(throwing: throwing)
}
func makeAsyncIterator() -> AsyncThrowingStream<Output>.Iterator {
return stream.makeAsyncIterator()
}
}
Scheduling
An important part of Combine is the notion of time which is abstracted behind the Scheduler
and its associated types. This allows to implement operators such as debounce
, throttle
, etc. . At the moment, Concurrency is lacking this notion of time (there is a very simplistic Task.sleep()
method though). However, there is already work being done with laying the foundation of custom excutors.
Mixing elements from multiple AsyncSequences
Combine provides operators such as merge
or combineLatest
, they are powerful tools that allow to setup complex reactive chains.
Wishes
A common pattern between Combine and Concurrency based code is the handling of AnyCancellable
or Task.Handle<Void, Never>
. In Combine, one most store the AnyCancellable
otherwise the subscription is cancelled right away. On the other hand, the task handles are not using reference semantics to automatically cancel; however, in most cases, a reference needs to be kept around to cancel the task at the right moment. This is especially important for infinite sequences. Let's look at a quick example
class Foo {
let stream: AsyncStream<Int>
init(stream: AsyncStream<Int>) {
self.stream = stream
async {
for await number in stream {
// retain cycle
self.foo(number)
}
}
}
func foo(_ number: Int) {}
}
The above code introduces a retain cycle in the for await in
loop since it uses self. There are two ways out of this. Either by using weak self
or by storing the task handle in some local variable and canceling it at the right point in time. As far as I can see, this is also how I think SwiftUIs task
modifier works. It stores the handle and cancels it when the view disappears.
My wish/dream here would be that we can make this pattern easier without it being baked into frameworks. I don't know if this is even possible to achieve in the compiler or not. But a syntax like this would be truly amazing.
class Foo {
let stream: AsyncStream<Int>
init(stream: AsyncStream<Int>) {
self.stream = stream
weakAsync {
for await number in stream {
self.foo(number)
}
}
}
func foo(_ number: Int) {}
}
The pattern to await on some infinite sequence is super common as far as I can see from the projects I worked on, I think it would be great if we could somehow achieve to make this easier without the weak/strong dance. I would love to hear from the compiler folks if something like this is even possible.
Summary
The newly introduced things with Concurrency allow us to write similar code as we can with Combine. There are some operators and basic functionality missing to replace everything; however, the foundation is set. From my point of view, the only thing that is stopping Concurrency to replace Combine is the time it takes to fill in the necessary gaps.
What do you all think about Concurrency and the future of reactive programming in Swift?