Streaming multiple payloads through a response

I'm building a Swift-native replacement for HTTPBin for Alamofire testing using Vapor. Things have gone pretty well so far but I've run into an issue. I need to stream multiple payloads in a single response, similar to how a streaming log service would work. I have my Reply type and just want to send multiple but separate encodings of the same instance over the connection. I've tried this using the BodyStreamWriter to stream in encoded values but I get no response from the server.

app.on(.GET, "stream", ":count") { request -> Response in
    guard let count = request.parameters["count", as: Int.self], count <= 100 else {
        return Response(status: .badRequest)
    }

    let encoder = JSONEncoder()
    let rep = try reply(to: request)
    let response = Response(body: .init(stream: { writer in
        for _ in 0..<count {
            let buffer = try! encoder.encodeAsByteBuffer(rep, allocator: app.allocator)
            writer.write(.buffer(buffer))
        }
    }, count: count))
    
    return response
}

It also seems strange to me that the streaming response requires the count of writes as part of its initialization, as a true stream may not be able to provide that.

In any case, what am I doing wrong with the BodyStreamWriter here?

1 Like

Turns out the core issue was that I just wasn’t calling writer.write(.end), which is obvious when you think about it. However, this doesn’t do what I need, as its still a single response with a body of fixed size. What I need is chunked responses of unknown size over time. It appears Vapor used to support chunked encoding but no longer does. @tanner0101 is that correct and is there any replacement?

Yeah Vapor 3 used to only support chunked encoding for streaming responses. Vapor 4 replaced that with BodyStreamWriter which is more generally useful and supports back pressure. Chunked encoding can be built on top of this by adding the transfer-encoding: chunked header and writing to the BodyStreamWriter using the chunked encoding. The count being a non-optional Int is unfortunate but you can hack around that by setting it to 0. Vapor 4 should definitely provide helpers to do this. Those not being there already is an oversight, sorry about that.

I'll try setting the count to 0, but my attempts to manually set the Transfer-Encoding were unsuccessful, as all content still came back as a single blob to the client. If I want to replicate a log stream that sends a bit of content over time, can I just schedule write calls to the stream writer on the EventLoop? I've tried this but it didn't work, but perhaps the body size is the issue:

let response = Response(body: .init(stream: { writer in
    let start = writer.write(.buffer(buffer))
    var next = start
    for _ in 0..<count {
        next = next.flatMap {
            request.eventLoop.scheduleTask(in: .milliseconds(500)) {
                _ = writer.write(.buffer(buffer))
            }.futureResult
        }
    }
    next.flatMap { writer.write(.end) }
}, count: buffer.readableBytes * count))

Yeah my guess would be the content-length header being set is causing the client to accumulate the entire body first. Hard to say without seeing that code though.

If you want to implement chunked encoding yourself, try something like:

let response = Response()
response.body = .init(stream: { stream in
    // Write bytes to stream.
}, count: 0)
response.headers.remove(name: .contentLength)
response.headers.replaceOrAdd(name: .transferEncoding, value: "chunked")

Then verify that the response only has the transfer-encoding header and not the content-length header by doing print(response).

Once the headers are correct, just write the data to stream in the chunked encoding. The format is:

<length>\r\n
<bytes>\r\n

You can see an example here:

HTTP/1.1 200 OK 
Content-Type: text/plain 
Transfer-Encoding: chunked

7\r\n
Mozilla\r\n 
9\r\n
Developer\r\n
7\r\n
Network\r\n
0\r\n 
\r\n

It's important to note though that whether this will give you the bytes in chunks depends on the client. Even if you send the data using chunked encoding from the server, the client may still choose to accumulate the data into a single blob first.

For example, Vapor's Client protocol always accumulates the response into a single buffer. The underlying AsyncHTTPClient however exposes a HTTPClientResponseDelegate parameter that you can set to get the response back part by part. Regardless of whether you use chunked encoding or not, setting HTTPClientResponseDelegate will give you back the body bytes as they are received.

In other words, all of this stuff is pretty finicky and implementation-dependent. If you want to reliably send packets of information asynchronously, WebSockets is the way to go.

1 Like

Right. I'm not using this for a real service, just a test server for Alamofire to ensure our streaming (non-WebSocket) connectivity works correctly. I'll give the fully manual chunked encoding a try, since I didn't do that before, even though I did set the encoding and remove the Content-Length. Since Alamofire uses URLSession I think the encoded chunks should work correctly, as I've manually tested on real logging servers.

Ah I see, that makes sense then. That's strange that the code you posted here didn't work: Streaming multiple payloads through a response - #4 by Jon_Shier.

If Alamofire supports reading in HTTP response bodies part by part, then that code should work. When you call writer.write(.buffer(...)) that results in channel.writeAndFlush being called in NIO which should result in the bytes being sent over the network. The only thing count is doing is setting the content-length header on the response.

Was there anything like a proxy in between the client and the server? Which APIs in Alamofire / URLSession were you using to handle the streaming response?

Alamofire's newer DataStreamRequest uses URLSessionDataTask under the hood, relying on URLSession's parsing of incoming Data to call its streaming handlers. This is largely non-deterministic, as URLSession doesn't guarantee behavior there, but chunked responses over time seem to trigger the appropriate behavior right now, but that may require the chunked encoding I wasn't handling. I was just sending multiple JSON payloads into the stream and the client was seeing it as a single blog of multiple responses. If possible I'd like to trigger the separate handlers, which may require both the chunked encoding and time between the writes, but I'll see.

I see. Yeah I guess it comes down to how URLSessionDataTask works internally. That seems strange that they only call the handlers separately for chunk encoded responses. Ideally you'd call the data handlers whenever you get bytes in from the OS. Are the JSON payloads relatively small? If they all fit in a single TCP frame (~64K IIRC) then maybe there's an optimization happening somewhere to merge them together. Since chunk encoding needs to parsed it would make sense that even if multiple come together in a single frame, they would result in separate calls to URLSessionDataTask handlers since each chunk of bytes must be parsed out of the incoming data.

I tested against the Clash proxy server's logs to validate that our chunked streaming was working, since it was mentioned in the initial feature request. In that case, the URLSession callback is triggered once for each small JSON payload. But yes, I've always assumed there are limitations here as you mention, like framing or even MTU sizes. And as you've mentioned, if users want strict framing here, websockets are the way to go, and supporting URLSessionWebSocketTask is on my roadmap for Alamofire 5.3 (5.2 will be Combine). In the end, this server will serve as a replacement for HTTPBin for our test suite which we can customize for higher performance, reliability, and more testing scenarios, like websockets, which HTTPBin doesn't support.

1 Like

It seems that attempting to pass 0 for the Content-Length doesn't work, as the client still receives the header with value 0, even after removing the header on the Vapor side. I've updated my sever code:

app.on(.GET, "stream", ":count") { request -> Response in
    guard let count = request.parameters["count", as: Int.self], count > 0, count <= 100 else {
        return Response(status: .badRequest)
    }
    let encoder = JSONEncoder()
    let rep = try reply(to: request)
    let buffer = try! encoder.encodeAsByteBuffer(rep, allocator: app.allocator)
    let response = Response(body: .init(stream: { writer in
        let start = writer.write(.buffer(buffer.chunked(allocator: app.allocator)))
        var next = start
        for _ in 0..<count {
            next = next.flatMap {
                request.eventLoop.scheduleTask(in: .milliseconds(500)) {
                    _ = writer.write(.buffer(buffer.chunked(allocator: app.allocator)))
                }.futureResult
            }
        }
        _ = next.flatMap { writer.write(.end) }
    }, count: 0))
    response.headers.replaceOrAdd(name: .contentEncoding, value: "chunked")
    response.headers.remove(name: .contentLength)
    
    return response
}
...

extension ByteBuffer {
    func chunked(allocator: ByteBufferAllocator) -> ByteBuffer {
        var buffer = allocator.buffer(capacity: readableBytes + 3)
        buffer.writeString("\(readableBytes)\r\n")
        buffer.writeBytes(getBytes(at: 0, length: readableBytes) ?? [])
        buffer.writeString("\r\n")
        
        return buffer
    }
}

Response headers:

Date: Mon, 13 Apr 2020 20:13:30 GMT
Connection: keep-alive
Content-Encoding: chunked
Content-Length: 0

Oop, that's a bug. Looks like content-length is always set to 0 even if the header doesn't exist. I've put up a fix for that here: Improve body handling for HEAD responses by tanner0101 · Pull Request #2310 · vapor/vapor · GitHub. Would you mind testing against .branch("tn-head-res-body") to verify the fix works?

1 Like

It works in that I receive data, but the chunked encoding is not stripped (not that I'm sure URLSession does that anyway), as my Transfer-Encoding is being set back to Identity.

Transfer-Encoding: Identity
Connection: keep-alive
Date: Mon, 13 Apr 2020 20:58:59 GMT
app.on(.GET, "stream", ":count") { request -> Response in
    guard let count = request.parameters["count", as: Int.self], count > 0, count <= 100 else {
        return Response(status: .badRequest)
    }
    let encoder = JSONEncoder()
    let rep = try reply(to: request)
    let buffer = try! encoder.encodeAsByteBuffer(rep, allocator: app.allocator)
    let response = Response(body: .init(stream: { writer in
        let start = writer.write(.buffer(buffer.chunked(allocator: app.allocator)))
        var next = start
        for _ in 1..<count {
            next = next.flatMap {
                request.eventLoop.scheduleTask(in: .milliseconds(500)) {
                    _ = writer.write(.buffer(buffer.chunked(allocator: app.allocator)))
                }.futureResult
            }
        }
        _ = next.flatMap { writer.write(.end) }
    }, count: 0))
    response.headers.replaceOrAdd(name: .transferEncoding, value: "chunked")
    response.headers.remove(name: .contentLength)
    
    return response
}

I was using Content-Encoding before, but it should be Transfer-Encoding, right? My code for the chunk encoding is in my earlier post.

I'm pretty sure my chunked encoding is wrong anyway so I need to fix that and try again.

If I've fixed my encoding that seems to have had no affect on the header.

@Jon_Shier try using .transferEncoding instead of .contentEncoding.

I am. That snippet is from my current version.

Here's the latest version of my route, with what I hope is proper chunk encoding:

app.on(.GET, "stream", ":count") { request -> Response in
    guard let count = request.parameters["count", as: Int.self], count > 0, count <= 100 else {
        return Response(status: .badRequest)
    }
    let encoder = JSONEncoder()
    let rep = try reply(to: request)
    let buffer = try! encoder.encodeAsByteBuffer(rep, allocator: app.allocator)
    let response = Response(body: .init(stream: { writer in
        let start = writer.write(.buffer(buffer.chunked(allocator: app.allocator)))
        var next = start
        for _ in 1..<count {
            next = next.flatMap {
                request.eventLoop.scheduleTask(in: .milliseconds(1000)) {
                    _ = writer.write(.buffer(buffer.chunked(allocator: app.allocator)))
                }.futureResult
            }
        }
        
        var buffer = app.allocator.buffer(capacity: 3)
        buffer.writeString("0\r\n\r\n")
        next = next.flatMap { writer.write(.buffer(buffer)) }
        
        _ = next.flatMap { writer.write(.end) }
    }, count: 0))
    response.headers.replaceOrAdd(name: .transferEncoding, value: "chunked")
    response.headers.remove(name: .contentLength)
    
    return response
}

If you do print(response) before returning it from the route handler what do you see? I don't see the string "identity" anywhere in Vapor or NIO so I'm not sure where that is coming from.

It prints:

[ INFO ] GET /stream/2
HTTP/1.1 200 OK
transfer-encoding: chunked
<stream>

I'll see if I can see the traffic in the middle.

Proxying through Charles, the response headers are what I'd expect:

HTTP/1.1 200 OK
date: Tue, 14 Apr 2020 14:37:27 GMT
Transfer-Encoding: chunked
Proxy-Connection: keep-alive

So perhaps URLSession doesn't like my encoding and sets something locally?