RE-stream chunked data (Server-sent events) from another web-service through the Vapor endpoint

Hi everybody!
I need to re-stream chunked payload from another web service through the my Vapor endpoint with some intermediate processing of chunks before sresending. So the question is: how to make it works?

As I understand for streaming out some data from Vapor endpoint we have only one way:

app.get("testStream") { req throws -> Response in
    let response = Response()
    response.headers.replaceOrAdd(name: .contentType, value: "text/event-stream")
    response.headers.replaceOrAdd(name: .transferEncoding, value: "chunked")
    response.body = .init(stream: { writer in
    	writer.write(.buffer(...)) // Need to wait somehow for completion (seems we can't use wait() on route's event loop)
    	...
    	writer.write(.end) // Need to wait somehow for completion (seems we can't use wait() on route's event loop)
	}
	response.headers.remove(name: .contentLength)
	return response
}

To receive streamed data I have found two options:
First - create own instance HTTPClient

// Ideally we should use req.client.post() here but I didn't find a way how to do that for streming payload
let httpClient = HTTPClient(eventLoopGroupProvider: .createNew)
do {
    var request = HTTPClientRequest(url: "https://api.openai.com/v1/chat/completions")
    request.method = .POST
    request.headers.add(name: "Content-Type", value: "application/json")
    request.headers.add(name: "Authorization", value: "Bearer <Token>")

    let dict: [String: Any] = [
        "model": "gpt-3.5-turbo",
        "stream": true,
        "messages": [
            ["role": "user", "content": "Text of query"]
        ]
    ]

    let data = try! JSONSerialization.data(withJSONObject: dict)
    request.body = .bytes(ByteBuffer(data: data))

    let response = try await httpClient.execute(request, timeout: .seconds(30))
    print("HTTP head", response)

    var receivedBytes = 0
    for try await buffer in response.body {
        let readableBytes = buffer.readableBytes
        if let chunk = buffer.getData(at: buffer.readerIndex, length: readableBytes) {
            print("Chunk received: \(String(data: chunk, encoding: .utf8)!)")
        }
        receivedBytes += readableBytes
    }
    print("did receive \(receivedBytes) bytes")
} catch {
    print("request failed:", error)
}
try await httpClient.shutdown()

Another one - URLSession with tasks. For my purpose I found Swift lib to work with OpenAI. It provides just regular callbacks for received data

let openAI = OpenAI(apiToken: "<Token>")
openAI.chatsStream(
	query: .init(
        model: .gpt3_5Turbo,
        messages: [
            .init(role: .user, content: "Text of query")
        ]
    ),
    onResult: { chunkResult in
        switch chunkResult {
            case .success(let result):
                // write result to the output
            case .failure(let error):
                // handle error
        }
    },
    completion: { error in
       // handle error of completion
    }
)

Is anybody can help me to combine these two parts together keeping in mind all non-blocking and multithreading approaches ?

1 Like

I ran into the same issue/task and found an open PR that addresses SSE for Vapor by @Joannis_Orlandos: Implement Server-Sent Events by Joannis · Pull Request #2960 · vapor/vapor · GitHub - it's currently blocked by Add support for asynchronous body stream writing by Joannis · Pull Request #2998 · vapor/vapor · GitHub, with last comment by @0xTim that it will be prioritized soon.
Until then, here's an example of using Response.Body(stream: ) by Giovanni Luigi: server sent events - How to stream data in Vapor Swift? - Stack Overflow as well as this (bit older) gist by @JUSTINMKAUFMAN: Swift SSE EventSource Server for Vapor · GitHub

EDIT: This works for me, for streaming text through SSE:

let body = Response.Body(stream: { writer in
    Task {
        do {
            for try await res in req.openAI.chatsStream(query: query) {
                if let next = res.choices.first?.delta.content {
                    _ = writer.write(.buffer(.init(string: next)))
                }
            }
        } catch {
            req.logger.error("Stream Error: \(error)")
            //_ = writer.write(.error(error)) // writing error would crash the app
        }
        _ = writer.write(.end)
    }
})
let res = Response(status: .ok, body: body)
res.headers.replaceOrAdd(name: "Content-Type", value: "text/event-stream")
return res