vanvoorden
(Rick van Voorden)
1
Hi! I'm about to try and get started with a small little WebSocket client. I see that Vapor currently has a WebSocket client with this API to receive messages:
ws.onText { ws, text in
// String received by this WebSocket.
print(text)
}
ws.onBinary { ws, binary in
// [UInt8] received by this WebSocket.
print(binary)
}
I was kind of thinking it might be possible to listen for WebSocket messages using AsyncSequence:
for await (ws, text) in ws.onText {
// String received by this WebSocket.
print(text)
}
for await (ws, binary) in ws.onBinary {
// [UInt8] received by this WebSocket.
print(binary)
}
I guess I could try building that myself. Did anyone have an open-source solution that already does this? Thanks!
joshw
(Josh)
2
Keep in mind that would mean you need to put those inside a task -
Task {
for await (ws, text) in ws.onText {
// String received by this WebSocket.
print(text)
}
}
Task {
for await (ws, binary) in ws.onBinary {
// [UInt8] received by this WebSocket.
print(binary)
}
}
Might be less boilerplate to just do the callbacks.
But if you still wanted async streams could do something along the lines of...
let channel = AsyncChannel<[UInt8]>()
ws.onBinary { _, binary in
await channel.send(binary)
}
// elswhere
Task {
for try await binary in channel {
// handle
}
}
1 Like
If you haven’t seen it, AsyncBufferedByteIterator might be helpful to you. It’s in the AsyncAlgorithms package.
1 Like
vanvoorden
(Rick van Voorden)
4
Here's an attempt on AsyncStream that seems to be working at "n equals 1" scale:
import Foundation
extension URLSessionWebSocketTask {
func stream() -> AsyncThrowingStream<String, any Error> {
let (stream, continuation) = AsyncThrowingStream.makeStream(of: String.self)
continuation.onTermination = { @Sendable _ in
// TODO: CLOSE TASK
}
self.listen(continuation: continuation)
self.resume()
return stream
}
}
extension URLSessionWebSocketTask {
func listen(continuation: AsyncThrowingStream<String, any Error>.Continuation) {
self.receive { result in
do {
switch try result.get() {
case .data(let data):
break
case .string(let string):
continuation.yield(string)
@unknown default:
break
}
self.listen(continuation: continuation)
} catch {
continuation.finish(throwing: error)
// TODO: CLOSE TASK
}
}
}
}
func main() async throws {
let url = URL(string: "ws://localhost:8080/echo")!
let task = URLSession.shared.webSocketTask(with: url)
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
for try await string in task.stream() {
print(string)
}
}
try await group.waitForAll()
}
}
try await main()