For giggles and grins, I took a pass at writing a simple SyncStream
, a synchronous analog to AsyncStream
:
final class SyncStream<Element>: Sequence, @unchecked Sendable {
private var buffer: [Element] = []
private var isFinished = false
private let condition = NSCondition()
init(_ build: @Sendable (Continuation) -> Void) {
let continuation = Continuation { [weak self] value in
guard let self else { return }
condition.lock()
buffer.append(value)
condition.signal()
condition.unlock()
} finish: { [weak self] in
guard let self else { return }
condition.lock()
isFinished = true
condition.broadcast()
condition.unlock()
}
build(continuation)
}
func makeIterator() -> AnyIterator<Element> {
AnyIterator { [self] in
condition.withLock {
while buffer.isEmpty, !isFinished {
condition.wait()
}
if !buffer.isEmpty {
return buffer.removeFirst()
} else {
return nil
}
}
}
}
}
extension SyncStream {
final class Continuation: Sendable {
private let yieldClosure: @Sendable (Element) -> Void
private let finishClosure: @Sendable () -> Void
init(yield: @Sendable @escaping (Element) -> Void, finish: @Sendable @escaping () -> Void) {
self.yieldClosure = yield
self.finishClosure = finish
}
func yield(_ value: Element) {
yieldClosure(value)
}
func finish() {
finishClosure()
}
}
}
Then you can do things like:
let sequence = SyncStream { continuation in
for i in 0 ..< 10 {
continuation.yield(i)
}
continuation.finish()
}
for value in sequence {
print(value)
}
Or, I might push the work to another thread if the sequence takes a long time to generate. E.g., here is a rendition with a Thread.sleep
to simulate some slow yielding process:
let sequence2 = SyncStream { continuation in
DispatchQueue.global().async {
for i in 0 ..< 10 {
continuation.yield(i)
Thread.sleep(forTimeInterval: 1)
}
continuation.finish()
}
}
for value in sequence2 {
print(value)
}
Anyway, if I wanted to do this a lot, I might write a convenience method to do that for me:
extension SyncStream {
/// Initializes the stream and runs the continuation on a background thread.
static func background(
qos: DispatchQoS = .unspecified,
build: @Sendable @escaping (Continuation) -> Void
) -> SyncStream {
SyncStream { continuation in
DispatchQueue(label: "SyncStream", qos: qos).async {
build(continuation)
}
}
}
}
And then:
let sequence3 = SyncStream.background { continuation in
for i in 0 ..< 10 {
continuation.yield(i)
Thread.sleep(forTimeInterval: 1)
}
continuation.finish()
}
for value in sequence3 {
print(value)
}
Now, clearly I haven’t implemented all the richness of AsyncStream
. And it seems like there are all sorts of possible variations of the theme. And I confess that I threw this together without a ton of testing. But maybe this is a starting point.
For example, you might want back-pressure semantics, e.g., perhaps have the build
closure wait when the buffer is full (to avoid an unbridled buffer size):
class SyncStream<Element>: @unchecked Sendable {
private var buffer: [Element] = []
private var isFinished = false
private let maxBufferSize: Int?
private let condition = NSCondition()
init(
maxBufferSize: Int? = nil,
_ build: (Continuation) -> Void
) {
self.maxBufferSize = maxBufferSize
let continuation = Continuation { [weak self] value in
guard let self else { return }
condition.lock()
while let max = self.maxBufferSize, buffer.count >= max {
condition.wait()
}
buffer.append(value)
condition.signal() // notify consumer
condition.unlock()
} finish: { [weak self] in
guard let self else { return }
condition.lock()
isFinished = true
condition.broadcast() // notify all waiters
condition.unlock()
}
build(continuation)
}
}
extension SyncStream: Sequence {
func makeIterator() -> AnyIterator<Element> {
AnyIterator { [self] in
condition.lock()
defer { condition.unlock() }
while buffer.isEmpty, !isFinished {
condition.wait()
}
if !buffer.isEmpty {
let value = buffer.removeFirst()
condition.signal() // notify producer (if waiting)
return value
} else {
return nil
}
}
}
}
extension SyncStream {
final class Continuation: Sendable {
private let yieldClosure: @Sendable (Element) -> Void
private let finishClosure: @Sendable () -> Void
fileprivate init(
yield: @Sendable @escaping (Element) -> Void,
finish: @Sendable @escaping () -> Void
) {
self.yieldClosure = yield
self.finishClosure = finish
}
func yield(_ value: Element) {
yieldClosure(value)
}
func finish() {
finishClosure()
}
}
}
extension SyncStream {
/// Initializes the stream and runs the continuation on a background thread.
static func background(
maxBufferSize: Int? = nil,
qos: DispatchQoS = .unspecified,
build: @Sendable @escaping (Continuation) -> Void
) -> SyncStream {
SyncStream(maxBufferSize: maxBufferSize) { continuation in
DispatchQueue(label: "SyncStream", qos: qos).async {
build(continuation)
}
}
}
}
And then:
let sequence4 = SyncStream.background(maxBufferSize: 10) { continuation in
for i in 0 ..< 100 {
continuation.yield(i)
}
continuation.finish()
}
for value in sequence4 {
print(value)
Thread.sleep(forTimeInterval: 0.2)
}
Note, if you have the build
closure wait
once the buffer fills, it becomes essential to ensure that the build
closure runs on a different thread, or else it can deadlock, which is why I used my background
rendition in this scenario.
As you can see, you can go nuts with whatever buffer/back-pressure semantics make sense for you. But if your tree is potentially huge, it probably makes sense to limit the buffer somehow.
But the bottom line, I probably would avoid creating an AsyncStream
if it is really a synchronous sequence. I’d either write an Iterator
or do something like this.