I have a hard time integrating Swift concurrency into some existing file IO code. For instance reading from a FileHandle like this forces me to constantly spawn new unstructured Tasks.
let wg = DispatchGroup()
fileHandle.readabilityHandler = { handle in
let data = handle.availableData
wg.enter() // 1
Task {
defer {
wg.leave()
}
// work with `data`....
}
wg.wait() // 2
}
If AsyncStream had a BufferingPolicy that would block on yield(), that would go a long way, as I could simply wrap the entire handler in an AsyncStream, remove the code between (1) and (2) and simply call a blocking continuation.yield() there instead, something like this:
AsyncStream<Data>(Data.self, bufferingPolicy: .blocking(1)) { cont in
fileHandle.readabilityHandler = { handle in
let data = handle.availableData
if data.isEmpty {
cont.finish()
} else {
// BLOCKS non-async readabilityHandler until enough
// elements (1) have been consumed
cont.yield(data)
}
}
}
The current policies only allow for discarding or buffering all elements, when the other end is not fast enough to consume the bytes. In my case fileHandle is a process pipe and I don't want the process to proceed with output unless I have consumed all emitted bytes.
PS: I know FileHandle has an async bytes property, but reading everything byte for byte is suboptimal here.
You clearly did not understand the problem. AsyncStream is explicitly for bridging non-async (blocking/callback) code to async. It's AsyncStream.Continuation.yield() is always called from non-async code, so I don't get why you think this would block an async Task.
Sorry, I understood your complaint to be about the code example you provided, in which I thought you were proposing to call yield() from an unstructured Task.
Nitpick: That is generally not correct. You can have an unstructured task which drives the continuation or even a structured one if the continuation was extracted from the stream.
What exactly do you mean by blocking in your context?
What is it about the AsyncStream buffer, that needs to be blocking?
The buffer is meant to be consumed somewhere via back pressure. Does it really matter if the buffer gets filled with data faster than the consumer pulls them?
Does this trick potentially help?
var continuation: AsyncStream<...>.Continuation! = nil
let stream = AsyncStream {
continuation = $0 // While not pretty, this is totally valid!
}
// Use the continuation from outside the stream.
continuation.yield(...)
continuation.finish()
// Consume the stream somewhere with back pressure by pulling the elements from the buffer.
for await element in stream { ... }
Unlike other continuations in Swift, AsyncStream.Continuation supports escaping. Source
AsyncStream.Continuation conforms to Sendable.
You can think of an AsyncStream as if there were three entities involved:
Producer - The code that drives the continuation and essentially feeds the buffer
AsyncStream Buffer
Consumer - The side that consumes the values from the buffer
There's no direct connection between the Consumer and the Producer. If the Producer yields / pushes elements faster than the Consumer pulls them, then they get buffered based on the buffering policy.
The nice thing about AsyncStream is that allows you to communicate from sync context to async context. But it is distinctly limited in that it only goes from sync context to async context, i.e. yield on AsyncStream.Continution is not marked as async and there is no non-async version of next on AsyncIterator. The case that Eric is describing has come up a lot for me as well. I want to go from async context to async context and I want the yielding task to hang until the enqueued value is read. The only way to do this with AsyncStream is to call yield from within with(Unsafe|Checked)Continuation.
If I understand what is being requested is to have an async version of yield.
The default executor manages a pool of threads. When you make an async call, execution of the callee is enqueued on the executor, and the caller’s thread becomes available for another Task to run on.
A blocking call is synchronous, never yielding the thread. In the case of I/O, this can tie up the thread for a significant period. This is a no-no when running on an executor’s thread.
Ok, so you are distinguishing blocking as being what happens on a thread and suspending as being the identical concept on a task. I haven't seen that distinction being made and I've heard a lot of people loosely use the terms interchangeably, i.e. speaking of a task as being "blocked" rather than "suspended", or a thread as being "suspended" rather than "blocked".
Standardizing on your terminology here: what is needed is a suspending version of yield, i.e. one marked as async. And on the other side, I also need a non-suspending version of next on AsyncIterator.
No. Why would I want an async version of yield? I want a BLOCKING version of yield for non-async contexts, otherwise there is no way to put backpressure on a source that produces elements faster than I can/want to consume them.
If you stick to Swift concurrency it’s not a big issue to use the “blocking” terminology, but since @Erik_Aigner’s code snippet specifically mixes thread-based concurrency (DispatchGroup, which is a semaphore that counts backwards), Swift concurrency, and I/O (which comes in both blocking and non-blocking flavors), it’s important to be precise.
An async version of yield would provide the back pressure by not returning until the value had been consumed. I had missed what @ksluder points out which is that you are doing this from non-async context.
One thing to consider is that in the larger vision of Swift Concurrency, structured concurrency enables the use of async I/O without sacrificing the priority-donation benefits that blocking APIs have.
precisely. you are in non-async context and I was mistakenly thinking that was not the case. In async context, it provides the back pressure by not returning until the value is consumed.
In my eyes this would limit the whole type to only allow to push and pull one element at a time, which eliminates the buffer entirely.
No consider calling yield inside withUnsafeContinuation<Void> and passing the continuation as part of the Element of the stream, i.e. Element on this particular stream looks like the tuple (Continuation, Value). Then the for await in the consumer does `continuation.resume(). None of this affects the buffering of the Stream at all, but it "suspends" the calling task while it is enqueued, and provides back-pressure to the upstream tasks.
That would make the AsyncIterator non-async. So we're moving out of the AsyncSequence here.
This would be somewhat the equivalent of peek on a RunLoop. But yes, I'm finding AsyncSequence to be constricting.
The proper primitive for this is perhaps swift-async-algorithms’ AsyncChannel which is intended to have backpressure exposed to the sender. The primary mean is through the async AsyncChannel.send(_:), where the channel can keep the sender suspended (by not returning) until the consumer is ready to move on.
AsyncStream was rather designed to be a one-way street for interop with existing observer/notification patterns where the sender can simply fire-and-forget.
Though AsyncChannel does not currently offer a synchronous, blocking send(_:). But it does seem a more natural host AsyncStream.