Hi everybody!
I need to re-stream chunked payload from another web service through the my Vapor endpoint with some intermediate processing of chunks before sresending. So the question is: how to make it works?
As I understand for streaming out some data from Vapor endpoint we have only one way:
app.get("testStream") { req throws -> Response in
let response = Response()
response.headers.replaceOrAdd(name: .contentType, value: "text/event-stream")
response.headers.replaceOrAdd(name: .transferEncoding, value: "chunked")
response.body = .init(stream: { writer in
writer.write(.buffer(...)) // Need to wait somehow for completion (seems we can't use wait() on route's event loop)
...
writer.write(.end) // Need to wait somehow for completion (seems we can't use wait() on route's event loop)
}
response.headers.remove(name: .contentLength)
return response
}
To receive streamed data I have found two options:
First - create own instance HTTPClient
// Ideally we should use req.client.post() here but I didn't find a way how to do that for streming payload
let httpClient = HTTPClient(eventLoopGroupProvider: .createNew)
do {
var request = HTTPClientRequest(url: "https://api.openai.com/v1/chat/completions")
request.method = .POST
request.headers.add(name: "Content-Type", value: "application/json")
request.headers.add(name: "Authorization", value: "Bearer <Token>")
let dict: [String: Any] = [
"model": "gpt-3.5-turbo",
"stream": true,
"messages": [
["role": "user", "content": "Text of query"]
]
]
let data = try! JSONSerialization.data(withJSONObject: dict)
request.body = .bytes(ByteBuffer(data: data))
let response = try await httpClient.execute(request, timeout: .seconds(30))
print("HTTP head", response)
var receivedBytes = 0
for try await buffer in response.body {
let readableBytes = buffer.readableBytes
if let chunk = buffer.getData(at: buffer.readerIndex, length: readableBytes) {
print("Chunk received: \(String(data: chunk, encoding: .utf8)!)")
}
receivedBytes += readableBytes
}
print("did receive \(receivedBytes) bytes")
} catch {
print("request failed:", error)
}
try await httpClient.shutdown()
Another one - URLSession with tasks. For my purpose I found Swift lib to work with OpenAI. It provides just regular callbacks for received data
let openAI = OpenAI(apiToken: "<Token>")
openAI.chatsStream(
query: .init(
model: .gpt3_5Turbo,
messages: [
.init(role: .user, content: "Text of query")
]
),
onResult: { chunkResult in
switch chunkResult {
case .success(let result):
// write result to the output
case .failure(let error):
// handle error
}
},
completion: { error in
// handle error of completion
}
)
Is anybody can help me to combine these two parts together keeping in mind all non-blocking and multithreading approaches ?
1 Like
abegehr
(Anton Begehr)
2
I ran into the same issue/task and found an open PR that addresses SSE for Vapor by @Joannis_Orlandos: Implement Server-Sent Events by Joannis · Pull Request #2960 · vapor/vapor · GitHub - it's currently blocked by Add support for asynchronous body stream writing by Joannis · Pull Request #2998 · vapor/vapor · GitHub, with last comment by @0xTim that it will be prioritized soon.
Until then, here's an example of using Response.Body(stream: ) by Giovanni Luigi: server sent events - How to stream data in Vapor Swift? - Stack Overflow as well as this (bit older) gist by @JUSTINMKAUFMAN: Swift SSE EventSource Server for Vapor · GitHub
EDIT: This works for me, for streaming text through SSE:
let body = Response.Body(stream: { writer in
Task {
do {
for try await res in req.openAI.chatsStream(query: query) {
if let next = res.choices.first?.delta.content {
_ = writer.write(.buffer(.init(string: next)))
}
}
} catch {
req.logger.error("Stream Error: \(error)")
//_ = writer.write(.error(error)) // writing error would crash the app
}
_ = writer.write(.end)
}
})
let res = Response(status: .ok, body: body)
res.headers.replaceOrAdd(name: "Content-Type", value: "text/event-stream")
return res