WebSocket options with AsyncSequence Iterator?

Hi! I'm about to try and get started with a small little WebSocket client. I see that Vapor currently has a WebSocket client with this API to receive messages:[1]

ws.onText { ws, text in
    // String received by this WebSocket.

ws.onBinary { ws, binary in
    // [UInt8] received by this WebSocket.

I was kind of thinking it might be possible to listen for WebSocket messages using AsyncSequence:[2]

for await (ws, text) in ws.onText {
    // String received by this WebSocket.

for await (ws, binary) in ws.onBinary {
    // [UInt8] received by this WebSocket.

I guess I could try building that myself. Did anyone have an open-source solution that already does this? Thanks!

  1. https://docs.vapor.codes/advanced/websockets/ ↩︎

  2. AsyncSequence | Apple Developer Documentation ↩︎

Keep in mind that would mean you need to put those inside a task -

Task {
    for await (ws, text) in ws.onText {
        // String received by this WebSocket.

Task {
    for await (ws, binary) in ws.onBinary {
        // [UInt8] received by this WebSocket.

Might be less boilerplate to just do the callbacks.

But if you still wanted async streams could do something along the lines of...

let channel = AsyncChannel<[UInt8]>()
ws.onBinary { _, binary in
    await channel.send(binary)

// elswhere
Task {
    for try await binary in channel {
        // handle
If you haven’t seen it, AsyncBufferedByteIterator might be helpful to you. It’s in the AsyncAlgorithms package.

Here's an attempt on AsyncStream that seems to be working at "n equals 1" scale:

import Foundation

extension URLSessionWebSocketTask {
  func stream() -> AsyncThrowingStream<String, any Error> {
    let (stream, continuation) = AsyncThrowingStream.makeStream(of: String.self)
    continuation.onTermination = { @Sendable _ in
      //  TODO: CLOSE TASK
    self.listen(continuation: continuation)
    return stream

extension URLSessionWebSocketTask {
  func listen(continuation: AsyncThrowingStream<String, any Error>.Continuation) {
    self.receive { result in
      do {
        switch try result.get() {
        case .data(let data):
        case .string(let string):
        @unknown default:
        self.listen(continuation: continuation)
      } catch {
        continuation.finish(throwing: error)
        //  TODO: CLOSE TASK

func main() async throws {
  let url = URL(string: "ws://localhost:8080/echo")!
  let task = URLSession.shared.webSocketTask(with: url)
  try await withThrowingTaskGroup(of: Void.self) { group in
    group.addTask {
      for try await string in task.stream() {
    try await group.waitForAll()

try await main()