I have a function that creates an async stream. In it, I define a local variable that can be mutated by the stream. For my use case, the returned stream will always be used by a single consumer. Is there a way to make it concurrency safe, possibly by disallowing multiple consumers?
Right now, I figured out I can mark state as nonisolated(unsafe) to make it work without warnings for the single consumer use case. But I'm not sure how to enforce that to be the only use possible.
nonisolated func makeStream() -> AsyncStream<Bool> {
var state = false
return AsyncStream {
state.toggle() // Warning: Mutation of captured var 'state' in concurrently-executing code
return state // Warning: Reference to captured var 'state' in concurrently-executing code
}
}
// This is how I use the stream
func singleConsumer() async {
for await value in makeStream() {
print(value)
try? await Task.sleep(for: .seconds(1))
}
}
// I want to disallow following usage
func multipleConsumers() {
let stream = makeStream()
Task {
for await value in stream {
print("1", value)
try? await Task.sleep(for: .seconds(1))
}
}
Task {
for await value in stream {
print("2", value)
try? await Task.sleep(for: .seconds(1))
}
}
}
i can think of a number of ideas, none of which seem 'perfect', but might be worth consideration.
the first would be to expose the stream's AsyncIterator directly to the client code, rather than the stream itself, as that is a non-Sendable type. this should ensure that the iterator should not be allowed to be used concurrently (assuming strict concurrency checking is on). however, you do lose the nice for-await syntax, and have to resort to while loops.
alternatively, you could perhaps wrap the stream in a non-Sendable 'box' type that forwards to it, but removes visibility of the Sendable conformance. also, depending on how strictly you wish to control things, you could try adding runtime validation to ensure it cannot produce more than one iterator (e.g. here's a draft PR that does this).
lastly, leveraging non-Copyable and non-Escapable types might be worth some exploration. they have some limitations (and probably some bugs), but do offer tools for more precise control of instance lifetimes, so could potentially be helpful.
as i think about this a bit more, there is an outstanding question to address – do you care if there are multiple serialized consumers? for example, if both the Tasks in the multipleConsumers() example were annotated with @MainActor, or otherwise shared the same actor isolation, would that be okay or do you wish to prevent that? if multi-consumption even from the same actor is undesirable, you may need to resort to runtime checks, as the non-Sendable type approaches won't prevent that.
In answer to the question as to whether you can prohibit an AsyncStream from having multiple consumers (i.e., multiple iterators), I know of no way to do that. So, given that, I would just synchronize the mutable shared state:
nonisolated func makeStream() -> any AsyncSequence<Int, Never> {
let state = Mutex(0)
return AsyncStream {
state.withLock {
$0 += 1
return $0
}
}
}
Obviously, one could replace Mutex (part of the Synchronization framework) with whatever synchronization pattern you prefer (e.g., actors, OSAllocatedUnfairLock, etc.).
As an aside, but I made this an incrementing Int rather a toggling Bool, so it is easier to observe some more subtle behaviors of multiple iterators. But the same idea applies either way.
Alternatively, you can just make your own AsyncSequence conforming type:
struct AsyncCounter: AsyncSequence {
func makeAsyncIterator() -> AsyncIterator {
return AsyncIterator()
}
struct AsyncIterator: AsyncIteratorProtocol {
var current = 0
mutating func next() async -> Int? {
defer { current += 1 }
return current
}
}
}
Note, this is placing the mutable state in the iterator (the idiomatic pattern), so the behavior is a little different. If you had multiple for await value in sequence {…} loops, each would have its own state. The AsyncStream approach shares this mutable state (which is why we have to synchronize it).
As an aside, there’s really nothing asynchronous going on this particular example, so I might just make it a Sequence:
struct Counter: Sequence {
func makeIterator() -> Iterator {
return Iterator()
}
struct Iterator: IteratorProtocol {
var current = 0
mutating func next() -> Int? {
defer { current += 1 }
return current
}
}
}
But if yours is really doing something asynchronous and you need AsyncSequence, then follow the async pattern. But if not, I would not be inclined to adopt AsyncSequence just for the convenience of AsyncStream.
If the sequence really was asynchronous, then you could adopt the continuation-based AsyncStream:
nonisolated func makeStream() -> any AsyncSequence<Int, Never> {
AsyncStream { continuation in
let task = Task {
var current = 0
while true {
continuation.yield(current)
current += 1
try await Task.sleep(for: .seconds(1))
}
// this stream never ends (unless cancelled), so no need to call `contination.finish()`.
}
continuation.onTermination = { state in
if case .cancelled = state {
task.cancel()
}
}
}
}
In this value-yielding pattern, the mutable state is captured within the task, so there are no races.
somewhat tangentially: this reminds me that while the continuation-based initializer does allow encapsulating state in the manner you describe, it has different behavior than the init(unfolding:) flavor of stream. in particular i think of the former as 'push'-based with buffering, while the latter is 'pull'-based and does not. the continuation-initialized stream may create UnsafeContinuations internally while it has consumers waiting for more elements to come in, but the unfolding-initialized stream will invoke its closure every time an iterator awaits next() to produce a new element.
There's a few ways to answer this question I guess...
Instead of trying to prevent the multi-use perhaps you could consider leaning into the good ol' functional programming fold function like. You could pass the partially applied fold as well then; This would allow multiple folks to run that I suppose.
stream.fold(0) { accumulated, element in accumulated += 1)
Thank you everyone, these are some good ideas. Indeed, this could be solved in a number of ways depending on the desired outcome. My main motivation was to avoid a situation where values could be lost due to multiple consumers modifying the same inner state.
This looks to be the best solution for my use case. I see that unless someone manually creates an iterator and reuses it, separate for loops will get separate state. This is enough.
Yeah, that was just an extremely simplified example. Real implementation has asynchronous code.
For completeness, these are the approaches I tried now with their outputs when used in multipleConsumers example:
Original
nonisolated func makeStream() -> AsyncStream<Int> {
var current = 0
return AsyncStream {
defer { current += 1 }
return current
}
}
prints:
1 0
2 1
1 2
2 2
2 4
1 4
1 6
2 6
This one has the issue where values could be lost.
Stream with continuation
nonisolated func makeStream2() -> some AsyncSequence<Int, Never> {
AsyncStream { continuation in
let task = Task {
var current = 0
while true {
continuation.yield(current)
current += 1
}
}
continuation.onTermination = { state in
if case .cancelled = state {
task.cancel()
}
}
}
}
prints:
2 1
1 0
1 2
2 3
2 4
1 5
2 6
1 7
State is shared and next value can be returned to either of the consumers.