AsyncStream needs a blocking buffering policy

I just build a type base on the async yielding desire. I mean it's probably very similar to AsyncChannel that was also just mentioned.

actor ThrowingChannel<Value> where Value: Sendable {
  private var pushContinuation: CheckedContinuation<CheckedContinuation<Value, any Error>, Never>?
  private var pullContinuation: CheckedContinuation<Value, any Error>?

  init() {
    self.pushContinuation = nil
    self.pullContinuation = nil
  }

  func push(_ result: Result<Value, any Error>) async {
    precondition(pushContinuation == nil)
    let pullContinuation = self.pullContinuation
    self.pullContinuation = nil
    if let pullContinuation {
      pullContinuation.resume(with: result)
    } else {
      let continuation = await withCheckedContinuation { continuation in
        pushContinuation = continuation
      }
      continuation.resume(with: result)
    }
  }

  func pull() async throws -> Value {
    precondition(pullContinuation == nil)
    return try await withCheckedThrowingContinuation { continuation in
      let pushContinuation = self.pushContinuation
      self.pushContinuation = nil
      if let pushContinuation {
        pushContinuation.resume(returning: continuation)
      } else {
        pullContinuation = continuation
      }
    }
  }
}

It will of course trap if you use it from multiple places. Again, this is just a "basic" AsyncChannel in that sense.

Both scenarios have the same semantics, differing only in whether the sender is in an asynchronous context or not.

Some languages like Kotlin support both with the same Channel primitive — a synchronous trySend(_:) where the blocking (via busy waiting or other means) is left as the sender’s exercise.

Hi, what are the issues with this implementation?

3 Likes