Async{Throwing}Stream reimplementation

Good day everyone,

I’ve been working on a reimplementation of Async{Throwing}Stream for the past couple of weeks and would like to hear your thoughts and feedback before I proceed further.

Here’s a summary of the changes and improvements so far. I think some of these changes might require an evolution proposal:

  • The biggest change. Unification and refactor of the separate state machines into a single, unified one. State and actions are modeled as enums, transitions are exclusively handled via exhaustive switch statements. A with-style lock is used instead of manual lock/unlock.

  • Typed throws for AsyncThrowingStream.

  • Updated execution semantics by adopting nonisolated(nonsending) (not complete yet, waiting for the updated withTaskCancellationHandler version).

  • AsyncThrowingStream’s unfolding initializer now has an onCancel parameter.

  • Improved termination behavior. Now consistent across all possible ways to terminate the stream (cancel, finish, unfolding).

  • YieldResult now conditionally conforms to Hashable.

  • AsyncThrowingStream.Continuation.Termination now conditionally conforms to Hashable.

  • BufferingPolicy now conforms to Hashable.

Not implemented yet:

  • Terminate stream when continuation goes out of scope.

  • ~Copyable elements (currently not possible to implement, as far as I’m aware).

  • Use Mutex instead of custom lock?

P.S. Out of scope for now, but perhaps something to consider for Swift 7 is unifying AsyncStream and AsyncThrowingStream into a single type.

P.P.S. The tests are still a mess. If someone wants to help improve and expand them, I’d really appreciate the help.

7 Likes

Improvements always welcome!

I'm wondering if you shouldn't split up your work into phases though, for example rehashing internals can land any time as long as behavior remains unchanged. Adding new parameters and reshaping API will need a SE proposal though, so that will be held off until a proposal is pitched and reviewed.

The other "group" of work are things like the typed throws that are quite some work to land without breaking "everyone" and while the change looks simple on paper it sometimes can take a long time to actually land those changes... Overall we're definitely supportive of e.g. converging APIs into "one" and removing the ...Throwing... versions of APIs, but just as a heads up that this may be more difficult than it seems at first.

I believe you're right about ~Copyable right now, it may not be possible yet but please explore and let us know what limitations you're hitting.

Using Mutex instead of custom locks would be preferable when possible, yes.

There's been thoughts going around about replacements for this type, but the work to converge the 2 APIs into one with typed throws and adopting typed throws etc are good direction anwyay, if you have the time and are interested in that's welcome improvements.

Hope this helps a bit.

2 Likes

That was the advice I received from Jamie as well. I see no reason why this shouldn’t be possible. I thought it would be a good idea to start with the state machine. Even though it’s the biggest change, as you said, it’s internal and will only have a minor behavioral change, which I think is still acceptable, i.e., AsyncThrowingStream would have the same weakly defined multiple consumer behavior that AsyncStream already has. And the change around cancellation behavior as discussed here. I will try to get a PR ready to open this week.

I’m not totally sold on switching to Mutex yet. While it seems like the right thing to do and we wouldn’t have to worry about another lock implementation, I think in this case using a stateless withLock composes better. We could use a Mutex with a Void value, but that feels a little awkward.

I think the two biggest blockers so far, without trying much yet, are that the Continuation types don’t support ~Copyable values yet. However, if I understand the recent Continuation pitch correctly, UnsafeContinuation would be updated to support ~Copyable, although in the implementation PR ~Copyable is commented out. Is there a timeline for ~Copyable support? The second blocker, similar to the reason we need to use an UnsafeSendable wrapper, is YieldResult’s dropped case, where we cannot statically prove that the element is not resumed and returned at the same time, even though this will never happen in practice.

How many of the changes do you think would require an SE proposal? Would it be a single proposal or multiple ones?

It's unlikely to land ~Copyable in continuations in the near future, but we'll revisit it during that proposal indeed. I'm not sure it'll make it into 6.4 though.

Basically "new API" or "very different behavior" need a proposal, it could be lumped into one proposal I think. Other than the small improvements you mentioned which could be without a proposal.

So, I attempted to replace the custom lock with Mutex. However, since Mutex's withLock method’s body closure has a sending return type, we are, with the current sate of RBI, quite limited what we can do in such cases. I think we need to improve flow analysis first or even introduce call-once closures before this would work.

@ktoso I’ve started integrating the new state machine into the standard library and ran into a couple of questions.

Is there an internal UnsafeSendable type I could reuse? something like this:

struct _UnsafeSendable<Value: ~Copyable>: @unchecked Sendable, ~Copyable {
  private let value: Value

  init(_ value: consuming Value) {
    self.value = value
  }

  consuming func take() -> sending Value {
    return self.value
  }
}

Also, since we can’t use Mutex, I’ll stick with the existing lock implementation. Does this implementation for the create function look correct? It seems to work but I have my doubts.

  static func create(
    bufferPolicy limit: Continuation.BufferingPolicy
  ) -> _Storage {
    let minimumCapacity = _lockWordCount()

    let storage = unsafe Builtin.allocWithTailElems_1(
      _Storage.self,
      minimumCapacity._builtinWordValue,
      UnsafeRawPointer.self
    )

    let bufferPolicyPtr =
    unsafe UnsafeMutablePointer<Continuation.BufferingPolicy>(
      Builtin.addressof(&storage.bufferPolicy)
    )
    unsafe bufferPolicyPtr.initialize(to: limit)

    let statePtr =
    unsafe UnsafeMutablePointer<State>(
      Builtin.addressof(&storage.state)
    )
    unsafe statePtr.initialize(to: .idle(buffer: []))

    let terminationPtr =
    unsafe UnsafeMutablePointer<TerminationHandler?>(
      Builtin.addressof(&storage.onTermination)
    )
    unsafe terminationPtr.initialize(to: nil)

    let ptr =
    unsafe UnsafeRawPointer(
      Builtin.projectTailElems(storage, UnsafeRawPointer.self)
    )
    unsafe _lockInit(ptr)

    return storage
  } 

That is very closely related to the UniqueBox proposal that is active.

Per the storage; the one thing to double check is the alignment of memory like that - that can sometimes be tricky to get right.

Interesting idea, but I do wonder how much more overhead this would add compared to a simple @unchecked Sendable type.