I'm building a Swift-native replacement for HTTPBin for Alamofire testing using Vapor. Things have gone pretty well so far but I've run into an issue. I need to stream multiple payloads in a single response, similar to how a streaming log service would work. I have my Reply type and just want to send multiple but separate encodings of the same instance over the connection. I've tried this using the BodyStreamWriter to stream in encoded values but I get no response from the server.
app.on(.GET, "stream", ":count") { request -> Response in
guard let count = request.parameters["count", as: Int.self], count <= 100 else {
return Response(status: .badRequest)
}
let encoder = JSONEncoder()
let rep = try reply(to: request)
let response = Response(body: .init(stream: { writer in
for _ in 0..<count {
let buffer = try! encoder.encodeAsByteBuffer(rep, allocator: app.allocator)
writer.write(.buffer(buffer))
}
}, count: count))
return response
}
It also seems strange to me that the streaming response requires the count of writes as part of its initialization, as a true stream may not be able to provide that.
In any case, what am I doing wrong with the BodyStreamWriter here?
Turns out the core issue was that I just wasn’t calling writer.write(.end), which is obvious when you think about it. However, this doesn’t do what I need, as its still a single response with a body of fixed size. What I need is chunked responses of unknown size over time. It appears Vapor used to support chunked encoding but no longer does. @tanner0101 is that correct and is there any replacement?
Yeah Vapor 3 used to only support chunked encoding for streaming responses. Vapor 4 replaced that with BodyStreamWriter which is more generally useful and supports back pressure. Chunked encoding can be built on top of this by adding the transfer-encoding: chunked header and writing to the BodyStreamWriter using the chunked encoding. The count being a non-optional Int is unfortunate but you can hack around that by setting it to 0. Vapor 4 should definitely provide helpers to do this. Those not being there already is an oversight, sorry about that.
I'll try setting the count to 0, but my attempts to manually set the Transfer-Encoding were unsuccessful, as all content still came back as a single blob to the client. If I want to replicate a log stream that sends a bit of content over time, can I just schedule write calls to the stream writer on the EventLoop? I've tried this but it didn't work, but perhaps the body size is the issue:
let response = Response(body: .init(stream: { writer in
let start = writer.write(.buffer(buffer))
var next = start
for _ in 0..<count {
next = next.flatMap {
request.eventLoop.scheduleTask(in: .milliseconds(500)) {
_ = writer.write(.buffer(buffer))
}.futureResult
}
}
next.flatMap { writer.write(.end) }
}, count: buffer.readableBytes * count))
Yeah my guess would be the content-length header being set is causing the client to accumulate the entire body first. Hard to say without seeing that code though.
If you want to implement chunked encoding yourself, try something like:
let response = Response()
response.body = .init(stream: { stream in
// Write bytes to stream.
}, count: 0)
response.headers.remove(name: .contentLength)
response.headers.replaceOrAdd(name: .transferEncoding, value: "chunked")
Then verify that the response only has the transfer-encoding header and not the content-length header by doing print(response).
Once the headers are correct, just write the data to stream in the chunked encoding. The format is:
It's important to note though that whether this will give you the bytes in chunks depends on the client. Even if you send the data using chunked encoding from the server, the client may still choose to accumulate the data into a single blob first.
For example, Vapor's Client protocol always accumulates the response into a single buffer. The underlying AsyncHTTPClient however exposes a HTTPClientResponseDelegate parameter that you can set to get the response back part by part. Regardless of whether you use chunked encoding or not, setting HTTPClientResponseDelegate will give you back the body bytes as they are received.
In other words, all of this stuff is pretty finicky and implementation-dependent. If you want to reliably send packets of information asynchronously, WebSockets is the way to go.
Right. I'm not using this for a real service, just a test server for Alamofire to ensure our streaming (non-WebSocket) connectivity works correctly. I'll give the fully manual chunked encoding a try, since I didn't do that before, even though I did set the encoding and remove the Content-Length. Since Alamofire uses URLSession I think the encoded chunks should work correctly, as I've manually tested on real logging servers.
If Alamofire supports reading in HTTP response bodies part by part, then that code should work. When you call writer.write(.buffer(...)) that results in channel.writeAndFlush being called in NIO which should result in the bytes being sent over the network. The only thing count is doing is setting the content-length header on the response.
Was there anything like a proxy in between the client and the server? Which APIs in Alamofire / URLSession were you using to handle the streaming response?
Alamofire's newer DataStreamRequest uses URLSessionDataTask under the hood, relying on URLSession's parsing of incoming Data to call its streaming handlers. This is largely non-deterministic, as URLSession doesn't guarantee behavior there, but chunked responses over time seem to trigger the appropriate behavior right now, but that may require the chunked encoding I wasn't handling. I was just sending multiple JSON payloads into the stream and the client was seeing it as a single blog of multiple responses. If possible I'd like to trigger the separate handlers, which may require both the chunked encoding and time between the writes, but I'll see.
I see. Yeah I guess it comes down to how URLSessionDataTask works internally. That seems strange that they only call the handlers separately for chunk encoded responses. Ideally you'd call the data handlers whenever you get bytes in from the OS. Are the JSON payloads relatively small? If they all fit in a single TCP frame (~64K IIRC) then maybe there's an optimization happening somewhere to merge them together. Since chunk encoding needs to parsed it would make sense that even if multiple come together in a single frame, they would result in separate calls to URLSessionDataTask handlers since each chunk of bytes must be parsed out of the incoming data.
I tested against the Clash proxy server's logs to validate that our chunked streaming was working, since it was mentioned in the initial feature request. In that case, the URLSession callback is triggered once for each small JSON payload. But yes, I've always assumed there are limitations here as you mention, like framing or even MTU sizes. And as you've mentioned, if users want strict framing here, websockets are the way to go, and supporting URLSessionWebSocketTask is on my roadmap for Alamofire 5.3 (5.2 will be Combine). In the end, this server will serve as a replacement for HTTPBin for our test suite which we can customize for higher performance, reliability, and more testing scenarios, like websockets, which HTTPBin doesn't support.
It seems that attempting to pass 0 for the Content-Length doesn't work, as the client still receives the header with value 0, even after removing the header on the Vapor side. I've updated my sever code:
app.on(.GET, "stream", ":count") { request -> Response in
guard let count = request.parameters["count", as: Int.self], count > 0, count <= 100 else {
return Response(status: .badRequest)
}
let encoder = JSONEncoder()
let rep = try reply(to: request)
let buffer = try! encoder.encodeAsByteBuffer(rep, allocator: app.allocator)
let response = Response(body: .init(stream: { writer in
let start = writer.write(.buffer(buffer.chunked(allocator: app.allocator)))
var next = start
for _ in 0..<count {
next = next.flatMap {
request.eventLoop.scheduleTask(in: .milliseconds(500)) {
_ = writer.write(.buffer(buffer.chunked(allocator: app.allocator)))
}.futureResult
}
}
_ = next.flatMap { writer.write(.end) }
}, count: 0))
response.headers.replaceOrAdd(name: .contentEncoding, value: "chunked")
response.headers.remove(name: .contentLength)
return response
}
...
extension ByteBuffer {
func chunked(allocator: ByteBufferAllocator) -> ByteBuffer {
var buffer = allocator.buffer(capacity: readableBytes + 3)
buffer.writeString("\(readableBytes)\r\n")
buffer.writeBytes(getBytes(at: 0, length: readableBytes) ?? [])
buffer.writeString("\r\n")
return buffer
}
}
It works in that I receive data, but the chunked encoding is not stripped (not that I'm sure URLSession does that anyway), as my Transfer-Encoding is being set back to Identity.
If you do print(response) before returning it from the route handler what do you see? I don't see the string "identity" anywhere in Vapor or NIO so I'm not sure where that is coming from.