WebSocket options with AsyncSequence Iterator?

Hi! I'm about to try and get started with a small little WebSocket client. I see that Vapor currently has a WebSocket client with this API to receive messages:[1]

ws.onText { ws, text in
    // String received by this WebSocket.
    print(text)
}

ws.onBinary { ws, binary in
    // [UInt8] received by this WebSocket.
    print(binary)
}

I was kind of thinking it might be possible to listen for WebSocket messages using AsyncSequence:[2]

for await (ws, text) in ws.onText {
    // String received by this WebSocket.
    print(text)
}

for await (ws, binary) in ws.onBinary {
    // [UInt8] received by this WebSocket.
    print(binary)
}

I guess I could try building that myself. Did anyone have an open-source solution that already does this? Thanks!


  1. https://docs.vapor.codes/advanced/websockets/ ↩︎

  2. AsyncSequence | Apple Developer Documentation ↩︎

Keep in mind that would mean you need to put those inside a task -

Task {
    for await (ws, text) in ws.onText {
        // String received by this WebSocket.
        print(text)
    }
}

Task {
    for await (ws, binary) in ws.onBinary {
        // [UInt8] received by this WebSocket.
        print(binary)
    }
}

Might be less boilerplate to just do the callbacks.

But if you still wanted async streams could do something along the lines of...

let channel = AsyncChannel<[UInt8]>()
ws.onBinary { _, binary in
    await channel.send(binary)
}

// elswhere
Task {
    for try await binary in channel {
        // handle
    }
}
1 Like

If you haven’t seen it, AsyncBufferedByteIterator might be helpful to you. It’s in the AsyncAlgorithms package.

1 Like

Here's an attempt on AsyncStream that seems to be working at "n equals 1" scale:

import Foundation

extension URLSessionWebSocketTask {
  func stream() -> AsyncThrowingStream<String, any Error> {
    let (stream, continuation) = AsyncThrowingStream.makeStream(of: String.self)
    continuation.onTermination = { @Sendable _ in
      //  TODO: CLOSE TASK
    }
    self.listen(continuation: continuation)
    self.resume()
    return stream
  }
}

extension URLSessionWebSocketTask {
  func listen(continuation: AsyncThrowingStream<String, any Error>.Continuation) {
    self.receive { result in
      do {
        switch try result.get() {
        case .data(let data):
          break
        case .string(let string):
          continuation.yield(string)
        @unknown default:
          break
        }
        self.listen(continuation: continuation)
      } catch {
        continuation.finish(throwing: error)
        //  TODO: CLOSE TASK
      }
    }
  }
}

func main() async throws {
  let url = URL(string: "ws://localhost:8080/echo")!
  let task = URLSession.shared.webSocketTask(with: url)
  try await withThrowingTaskGroup(of: Void.self) { group in
    group.addTask {
      for try await string in task.stream() {
        print(string)
      }
    }
    try await group.waitForAll()
  }
}

try await main()