Hi everybody!
I need to re-stream chunked payload from another web service through the my Vapor endpoint with some intermediate processing of chunks before sresending. So the question is: how to make it works?
As I understand for streaming out some data from Vapor endpoint we have only one way:
app.get("testStream") { req throws -> Response in
let response = Response()
response.headers.replaceOrAdd(name: .contentType, value: "text/event-stream")
response.headers.replaceOrAdd(name: .transferEncoding, value: "chunked")
response.body = .init(stream: { writer in
writer.write(.buffer(...)) // Need to wait somehow for completion (seems we can't use wait() on route's event loop)
...
writer.write(.end) // Need to wait somehow for completion (seems we can't use wait() on route's event loop)
}
response.headers.remove(name: .contentLength)
return response
}
To receive streamed data I have found two options:
First - create own instance HTTPClient
// Ideally we should use req.client.post() here but I didn't find a way how to do that for streming payload
let httpClient = HTTPClient(eventLoopGroupProvider: .createNew)
do {
var request = HTTPClientRequest(url: "https://api.openai.com/v1/chat/completions")
request.method = .POST
request.headers.add(name: "Content-Type", value: "application/json")
request.headers.add(name: "Authorization", value: "Bearer <Token>")
let dict: [String: Any] = [
"model": "gpt-3.5-turbo",
"stream": true,
"messages": [
["role": "user", "content": "Text of query"]
]
]
let data = try! JSONSerialization.data(withJSONObject: dict)
request.body = .bytes(ByteBuffer(data: data))
let response = try await httpClient.execute(request, timeout: .seconds(30))
print("HTTP head", response)
var receivedBytes = 0
for try await buffer in response.body {
let readableBytes = buffer.readableBytes
if let chunk = buffer.getData(at: buffer.readerIndex, length: readableBytes) {
print("Chunk received: \(String(data: chunk, encoding: .utf8)!)")
}
receivedBytes += readableBytes
}
print("did receive \(receivedBytes) bytes")
} catch {
print("request failed:", error)
}
try await httpClient.shutdown()
Another one - URLSession with tasks. For my purpose I found Swift lib to work with OpenAI. It provides just regular callbacks for received data
let openAI = OpenAI(apiToken: "<Token>")
openAI.chatsStream(
query: .init(
model: .gpt3_5Turbo,
messages: [
.init(role: .user, content: "Text of query")
]
),
onResult: { chunkResult in
switch chunkResult {
case .success(let result):
// write result to the output
case .failure(let error):
// handle error
}
},
completion: { error in
// handle error of completion
}
)
Is anybody can help me to combine these two parts together keeping in mind all non-blocking and multithreading approaches ?