Infinite HTTP bodies with backpressure

I'm trying to create a simple HTTP server to test raw data streaming in Alamofire. I've created an endpoint in Vapor which works, but gives me <10% the raw TCP throughput seen by iperf on localhost.

app.on(.GET, "infinite") { request -> Response in
    Response(body: .init(stream: { writer in
        let buffer = request.application.allocator.buffer(repeating: 1, count: 10_000_000)
        
        func writeBuffer(_ buffer: ByteBuffer, promise: EventLoopPromise<Void>) {
            writer.write(.buffer(buffer)).whenComplete { result in
                switch result {
                case .success:
                    request.eventLoop.execute { writeBuffer(buffer, promise: promise) }
                case let .failure(error):
                    request.application.logger.error("Infinite stream finished with error: \(error)")
                    promise.completeWith(writer.write(.end))
                }
            }
        }
        
        writeBuffer(buffer, promise: request.eventLoop.makePromise())
    }))
}

So I'm trying to create a raw NIO version to see where the performance issue may lie. However, that appears to be easier said than done. I've adapted the simple example from TechEmpower, coming up with my own simplified server.

import Foundation
import NIO
import NIOHTTP1

private final class HTTPHandler: ChannelInboundHandler {
    public typealias InboundIn = HTTPServerRequestPart
    public typealias OutboundOut = HTTPServerResponsePart

    let allocator = ByteBufferAllocator()
    let dateCache: RFC1123DateCache

    init(channel: Channel) {
        self.dateCache = .on(channel.eventLoop)
    }

    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        switch unwrapInboundIn(data) {
        case let .head(request):
            switch request.uri {
            case "/infinite":
                processInfiniteRequest(in: context)
            default:
                context.close(promise: nil)
            }
        case .body:
            break
        case .end:
            context.write(wrapOutboundOut(.end(nil)), promise: nil)
        }
    }

    func channelReadComplete(context: ChannelHandlerContext) {
        context.flush()
        context.fireChannelReadComplete()
    }
    
    private func processInfiniteRequest(in context: ChannelHandlerContext) {
        let headers = responseHeaders()
        context.write(wrapOutboundOut(.head(headers)), promise: nil)
        let buffer = allocator.buffer(repeating: 1, count: 20_000_000)
        
        func writeBuffer() {
            let promise = context.eventLoop.makePromise(of: Void.self)
            context.write(wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
            promise.futureResult.whenComplete { _ in
                writeBuffer()
            }
        }
        
        writeBuffer()
    }

    private func responseHeaders() -> HTTPResponseHead {
        var headers = HTTPHeaders()
        headers.add(name: "content-type", value: "application/octet-stream")
        headers.add(name: "server", value: "SuperStream")
        headers.add(name: "date", value: dateCache.currentTimestamp())
        return HTTPResponseHead(
            version: .http1_1,
            status: .ok,
            headers: headers
        )
    }
}

let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
let bootstrap = ServerBootstrap(group: group)
    .serverChannelOption(ChannelOptions.backlog, value: 8192)
    .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
    .childChannelInitializer { channel in
        channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false, withErrorHandling: false).flatMap {
            channel.pipeline.addHandler(HTTPHandler(channel: channel))
        }
    }
    .childChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
    .childChannelOption(ChannelOptions.maxMessagesPerRead, value: 16)

defer {
    try! group.syncShutdownGracefully()
}

let channel = try bootstrap.bind(host: "0.0.0.0", port: 8080).wait()

guard let localAddress = channel.localAddress else {
    fatalError("Address was unable to bind. Please check that the socket was not closed or that the address family was understood.")
}

try channel.closeFuture.wait()

However, this example, no matter how I adjust the buffer sending loop, either sends little data or blows the stack. So either my HTTP response is malformed and the receiver isn't responding to allow more data and the frontend isn't detecting it, or I'm simply buffering so much up that it blows up. The second case doesn't make much sense since I thought sending a buffer would support backpressure, but something's going wrong.

Any ideas for the fastest way to send an unending stream of bytes over an HTTP body?

I don’t know if it matters, but you can take a look.

Interesting, though I don't know if that has any impact on responses. At this point I'm just trying to do an infinite response without blowing the stack and nothing seems to work, so I've obviously misunderstood how to chain the writes together. I'm looking more at Vapor's HTTP handling to see if I can adapt a simplified version, but this NIO server isn't my goal, just a way to test Vapor overhead, so I'd prefer not to spend so much time on it.

(That PR looks really important, so it's disappointing it hasn't been merged yet. No wonder Vapor does so poorly in benchmarks.)

Don't you have to flush after calling write? Also, you may want to set the TE to chunked, I don't think leaving out Content-Length is valid HTTP/1.1 otherwise.

This code has got a few issues. The first is the use of whenComplete: this will fire if the write errors, as well as if it succeeds. In this instance you can get into an infinite recursion: if the write fails, you'll immediately send another write, which will synchronously fail.

The second issue, as @Helge_Hess1 noted, is issues with flushing. In general you've followed the right pattern for a server (flush on channelReadComplete). However, because you're sending inifinite bytes, not only in response to a single read, you need to add your own flushes in the send path there.

Finally, using futures to chunk up your writes here is not the most effective way to attempt to send an ungodly number of bytes. We can do a better job by remembering that ByteBuffer is copy-on-write, and so we can simply write the same ByteBuffer 100 times to queue up a huge bunch of bytes to send.

An amended processInfiniteRequest function is here:

    private func processInfiniteRequest(in context: ChannelHandlerContext) {
        let headers = responseHeaders()
        context.write(wrapOutboundOut(.head(headers)), promise: nil)
        let buffer = allocator.buffer(repeating: 1, count: 20_000_000)
        
        func writeBuffer() {
            // Write the same buffer 1000 times. This doesn't consume 1000x the memory:
            // we never actually copy these buffers, _unless_ we're also using TLS or some
            // other protocol interposer.
            for _ in 0..<1000 {
                context.write(wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
            }

            // Now we send one more message, and attach the promise to it.
            context.writeAndFlush(wrapOutboundOut(.body(.byteBuffer(buffer)))).whenSuccess {
                writeBuffer()
            }
        }
        
        writeBuffer()
    }
3 Likes

Thanks. This matches my later refinements where I used writeAndFlush and the promise to stop blowing the stack. However, now I'm running afoul of HTTPServerProtocolErrorHandler and hasUnterminatedResponse. When I remove the error handler the connection is reset by peer (error 54) when writing data. When it's in place, it just crashes when I attempt to write the body. I'm not sure whether this is a TCP error or whether my HTTP response is malformed by trying to send many bodies at once.

Confusingly, it's only the looping version that suffers from the connection errors. If I simply enqueue a bunch of writes, as you suggested, it works fine. Still only maxes out around 4GB/s like the Vapor implementation, so there must be some sort of fundamental limit here. Perhaps my test client wrk isn't efficient enough for this test? Even with multiple connections, 4GB/s seems like the max throughput.

So the result I'm getting isn't as bad as I thought, I was just confused since iperf3 measures things in Gbps and wrk outputs in GBps. So on both my Intel and M1 Max machines I'm actually getting around 50% maximum throughput. Intel is getting 33.1Gbps in NIO and 54.1Gbps (61.2%) in iperf3, while the M1 gets 57.2Gbps in NIO and 125.1Gbps (45.8%) in iperf3. I'm not sure where this extra overhead is going right now, but at least Vapor isn't the bottleneck for this particular test.

A likely source of extra overhead is that NIO is going to use chunked encoding here: I wonder if iperf3 does. I suspect not.

No, iperf3 is a raw TCP (or UDP) stream, no HTTP at all, so on localhost it can obtain the same result as system bandwidth.

In the end I adopted your suggested structure, but removed the recursive function since it wasn't doing anything. Ideally I would be able to more easily manage local cancellation after a certain amount of time or something, but this was just to verify Vapor wasn't the bottleneck for what I was doing.

private func processInfiniteRequest(in context: ChannelHandlerContext) {
    let headers = responseHeaders()
    _ = context.write(wrapOutboundOut(.head(headers)))
    let buffer = allocator.buffer(repeating: 1, count: 100_000_000)
    
    for _ in 0..<500 {
        _ = context.write(wrapOutboundOut(.body(.byteBuffer(buffer))))
    }
    _ = context.writeAndFlush(wrapOutboundOut(.body(.byteBuffer(buffer))))
}

If you want to keep pushing up your throughput, you can lift the buffer into a static (initialized only once), compute the headers as a static too, pass promise: nil to avoid allocating them instead of dropping them once they're allocated, and then see where you end up.

I'm only testing throughput for a single request, not overall throughput of an entire system.

FWIW, my profiling shows about 67% of all time is spent in Socket.writev, with an additional 11% in ByteBuffer.withUnsafeReadableBytesWithStorageManagement (of which, 4 of the 11% is swift_beginAccess), 5% in swift_release, and 2.5% in PendingStreamWritesManager.didWrite.

This should take care of the swift_beginAccess calls entirely:

swift build \
    -c release \
    -Xswiftc -enforce-exclusivity=unchecked

as it disables the runtime exclusivity checks. It'll likely gain you more than just its 4% because the code size should shrink.

promise: nil will still likely lead to a substantial speedup.

Terms of Service

Privacy Policy

Cookie Policy