Async{Throwing}Stream reimplementation

Good day everyone,

I’ve been working on a reimplementation of Async{Throwing}Stream for the past couple of weeks and would like to hear your thoughts and feedback before I proceed further.

Here’s a summary of the changes and improvements so far. I think some of these changes might require an evolution proposal:

  • The biggest change. Unification and refactor of the separate state machines into a single, unified one. State and actions are modeled as enums, transitions are exclusively handled via exhaustive switch statements. A with-style lock is used instead of manual lock/unlock.

  • Typed throws for AsyncThrowingStream.

  • Updated execution semantics by adopting nonisolated(nonsending) (not complete yet, waiting for the updated withTaskCancellationHandler version).

  • AsyncThrowingStream’s unfolding initializer now has an onCancel parameter.

  • Improved termination behavior. Now consistent across all possible ways to terminate the stream (cancel, finish, unfolding).

  • YieldResult now conditionally conforms to Hashable.

  • AsyncThrowingStream.Continuation.Termination now conditionally conforms to Hashable.

  • BufferingPolicy now conforms to Hashable.

Not implemented yet:

  • Terminate stream when continuation goes out of scope.

  • ~Copyable elements (currently not possible to implement, as far as I’m aware).

  • Use Mutex instead of custom lock?

P.S. Out of scope for now, but perhaps something to consider for Swift 7 is unifying AsyncStream and AsyncThrowingStream into a single type.

P.P.S. The tests are still a mess. If someone wants to help improve and expand them, I’d really appreciate the help.

9 Likes

Improvements always welcome!

I'm wondering if you shouldn't split up your work into phases though, for example rehashing internals can land any time as long as behavior remains unchanged. Adding new parameters and reshaping API will need a SE proposal though, so that will be held off until a proposal is pitched and reviewed.

The other "group" of work are things like the typed throws that are quite some work to land without breaking "everyone" and while the change looks simple on paper it sometimes can take a long time to actually land those changes... Overall we're definitely supportive of e.g. converging APIs into "one" and removing the ...Throwing... versions of APIs, but just as a heads up that this may be more difficult than it seems at first.

I believe you're right about ~Copyable right now, it may not be possible yet but please explore and let us know what limitations you're hitting.

Using Mutex instead of custom locks would be preferable when possible, yes.

There's been thoughts going around about replacements for this type, but the work to converge the 2 APIs into one with typed throws and adopting typed throws etc are good direction anwyay, if you have the time and are interested in that's welcome improvements.

Hope this helps a bit.

3 Likes

That was the advice I received from Jamie as well. I see no reason why this shouldn’t be possible. I thought it would be a good idea to start with the state machine. Even though it’s the biggest change, as you said, it’s internal and will only have a minor behavioral change, which I think is still acceptable, i.e., AsyncThrowingStream would have the same weakly defined multiple consumer behavior that AsyncStream already has. And the change around cancellation behavior as discussed here. I will try to get a PR ready to open this week.

I’m not totally sold on switching to Mutex yet. While it seems like the right thing to do and we wouldn’t have to worry about another lock implementation, I think in this case using a stateless withLock composes better. We could use a Mutex with a Void value, but that feels a little awkward.

I think the two biggest blockers so far, without trying much yet, are that the Continuation types don’t support ~Copyable values yet. However, if I understand the recent Continuation pitch correctly, UnsafeContinuation would be updated to support ~Copyable, although in the implementation PR ~Copyable is commented out. Is there a timeline for ~Copyable support? The second blocker, similar to the reason we need to use an UnsafeSendable wrapper, is YieldResult’s dropped case, where we cannot statically prove that the element is not resumed and returned at the same time, even though this will never happen in practice.

How many of the changes do you think would require an SE proposal? Would it be a single proposal or multiple ones?

It's unlikely to land ~Copyable in continuations in the near future, but we'll revisit it during that proposal indeed. I'm not sure it'll make it into 6.4 though.

Basically "new API" or "very different behavior" need a proposal, it could be lumped into one proposal I think. Other than the small improvements you mentioned which could be without a proposal.

So, I attempted to replace the custom lock with Mutex. However, since Mutex's withLock method’s body closure has a sending return type, we are, with the current sate of RBI, quite limited what we can do in such cases. I think we need to improve flow analysis first or even introduce call-once closures before this would work.

@ktoso I’ve started integrating the new state machine into the standard library and ran into a couple of questions.

Is there an internal UnsafeSendable type I could reuse? something like this:

struct _UnsafeSendable<Value: ~Copyable>: @unchecked Sendable, ~Copyable {
  private let value: Value

  init(_ value: consuming Value) {
    self.value = value
  }

  consuming func take() -> sending Value {
    return self.value
  }
}

Also, since we can’t use Mutex, I’ll stick with the existing lock implementation. Does this implementation for the create function look correct? It seems to work but I have my doubts.

  static func create(
    bufferPolicy limit: Continuation.BufferingPolicy
  ) -> _Storage {
    let minimumCapacity = _lockWordCount()

    let storage = unsafe Builtin.allocWithTailElems_1(
      _Storage.self,
      minimumCapacity._builtinWordValue,
      UnsafeRawPointer.self
    )

    let bufferPolicyPtr =
    unsafe UnsafeMutablePointer<Continuation.BufferingPolicy>(
      Builtin.addressof(&storage.bufferPolicy)
    )
    unsafe bufferPolicyPtr.initialize(to: limit)

    let statePtr =
    unsafe UnsafeMutablePointer<State>(
      Builtin.addressof(&storage.state)
    )
    unsafe statePtr.initialize(to: .idle(buffer: []))

    let terminationPtr =
    unsafe UnsafeMutablePointer<TerminationHandler?>(
      Builtin.addressof(&storage.onTermination)
    )
    unsafe terminationPtr.initialize(to: nil)

    let ptr =
    unsafe UnsafeRawPointer(
      Builtin.projectTailElems(storage, UnsafeRawPointer.self)
    )
    unsafe _lockInit(ptr)

    return storage
  } 

That is very closely related to the UniqueBox proposal that is active.

Per the storage; the one thing to double check is the alignment of memory like that - that can sometimes be tricky to get right.

Interesting idea, but I do wonder how much more overhead this would add compared to a simple @unchecked Sendable type.

Just a quick update. The reimplementation of the state machine backing Async{Throwing}Stream has an open PR here: https://github.com/swiftlang/swift/pull/88017.

As for the other proposed changes, as discussed earlier, they will be part of an evolution proposal. I plan to have a pitch thread ready sometime next week.

To recap, the evolution proposal will cover the following changes:

  1. Add typed throws to AsyncThrowingStream.

  2. Add an onCancel parameter to AsyncThrowingStream's unfolding initializer.

  3. Align termination behavior across all stream termination paths.

  4. Add Hashable conformance to Async{Throwing}Stream.BufferingPolicy, and conditionally to Async{Throwing}Stream.YieldResult and AsyncThrowingStream.Continuation.Termination.

  5. Update execution semantics by adopting nonisolated(nonsending).

  6. Terminate stream when continuation goes out of scope.

~Copyable support will probably end up as a future direction, depending on the final timeline for that.

2 Likes

Let me know when that is read for review - we likely will need a few folks review it for different aspects of the impact and I will try and gather them up for taking a look at it; we need to be VERY careful w.r.t adding behaviors and adjusting signatures because we need to ensure that existing applications don't crash or misbehave due to the changes and we also have a promise that we won't break compilation or linkage of existing apps.

2 Likes

What does that mean exactly? If the deinit of the continuation is called, the stream will finish?

So if I do this:

let stream = AsyncStream<Void> { _ in }
for await _ in stream {}
// am I here immediately?

Yes. When there is no longer any reference to the continuation, the stream will terminate. Some more info here: Should AsyncStream terminate if its continuation goes out of scope?

Okay missed that discussion. I just wanted to say that I used this never ending stream behavior in the past for testing purposes (not sure yet if this can be replicated differently). Others are doing the same, see here: swift-concurrency-extras/Sources/ConcurrencyExtras/AsyncStream.swift at f9db8422fa13f554987eba71887f9652c4a46b9b · pointfreeco/swift-concurrency-extras · GitHub

Thanks for the info! I already had a feeling that somewhere there was code relying on this exact behavior.

While writing the pitch, I noticed that point 3 (“Align termination behavior across all stream termination paths.”) doesn’t fit neatly with the other changes. I think we can spin this out as a standalone bug fix. Since the state machine PR already modifies termination behavior slightly, this change feels somewhat out of place here. Here is the snippet from the pitch:

Motivation

The current implementation of AsyncStream 's unfolding initializer does not clear the onCancel closure after the stream has been terminated, regardless of whether the stream finished by either returning nil or due to the task it is part of being canceled:

let stream = AsyncStream<Int>(
  unfolding: {
    return nil
  },
  onCancel: {
    print("onCancel Called!")
  }
)

withUnsafeCurrentTask { $0?.cancel() }

var iterator = stream.makeAsyncIterator()

_ = await iterator.next() // onCancel Called!
_ = await iterator.next() // onCancel Called!
_ = await iterator.next() // onCancel Called!

Proposed Change

The implementation will be updated to align the termination behavior with that of the Continuation-based variant. As a result, when an onCancel closure is set, it will not be called more than once. When the stream finishes, the onCancel closure will be cleared.

Any thoughts on this?

1 Like

Doing so would likely make the review of them much easier. So as much as it can be done; smaller more digestible changes are preferred.

Even though this is not specified in the original design, I would consider it a bug and malformed behavior. We should consider some sort of way of ensuring standardization around cancellation actions or cancellation side effects being one-shot.

The one thing we need to make sure is that is introducing additional mutable state - it may let us collapse part of it but we should also make sure that mutation is done safely (i.e. under the scope of the lock) to clear it out.

It just occurred to me that throwing an error from AsyncThrowingStream’s unfolding initializer does not terminate the stream, which, if I understand correctly, results in an ill-formed AsyncSequence:

End of Iteration

After an AsyncIteratorProtocol types returns nil or throws an error from its next() method, all future calls to next() must return nil. This matches the behavior of IteratorProtocol types and is important, since calling an iterator's next() method is the only way to determine whether iteration has finished.

struct SomeError: Error {}

let stream = AsyncThrowingStream<Int, Error> {
  let number = Int.random(in: 0..<10)

  if number > 5 {
    throw SomeError()
  } else {
    return number
  }
}

var iterator = stream.makeAsyncIterator()

do {
  let result = try await iterator.next()
    print("Result: \(result!)")
} catch {
    print(error)
}

do {
  let result = try await iterator.next()
    print("Result: \(result!)")
} catch {
    print(error)
}

do {
  let result = try await iterator.next()
    print("Result: \(result!)")
} catch {
    print(error)
}

Should we consider changing that behavior as well? Any thoughts?

I do remember debating around this while writing that; the issue was that Sequence version (what the unfolding was patterned off of) has similar behavior w.r.t. nil return - it is upon the onus of the implementor of the closure to handle that. At least that was the original thought.

It might make sense to mention this in the pitch. If we decide to keep it as is, we should add documentation clarifying that the implementor is responsible for creating a well-formed AsyncSequence, as we did in the AsyncIteratorProtocol documentation.

I would be very interested in doing this. It would probably require its own evolution proposal.

Will have a draft PR ready later today. I'm thinking we should model it as a state machine, similar to the other PR.