I'm trying to create a simple HTTP server to test raw data streaming in Alamofire. I've created an endpoint in Vapor which works, but gives me <10% the raw TCP throughput seen by iperf
on localhost.
app.on(.GET, "infinite") { request -> Response in
Response(body: .init(stream: { writer in
let buffer = request.application.allocator.buffer(repeating: 1, count: 10_000_000)
func writeBuffer(_ buffer: ByteBuffer, promise: EventLoopPromise<Void>) {
writer.write(.buffer(buffer)).whenComplete { result in
switch result {
case .success:
request.eventLoop.execute { writeBuffer(buffer, promise: promise) }
case let .failure(error):
request.application.logger.error("Infinite stream finished with error: \(error)")
promise.completeWith(writer.write(.end))
}
}
}
writeBuffer(buffer, promise: request.eventLoop.makePromise())
}))
}
So I'm trying to create a raw NIO version to see where the performance issue may lie. However, that appears to be easier said than done. I've adapted the simple example from TechEmpower, coming up with my own simplified server.
import Foundation
import NIO
import NIOHTTP1
private final class HTTPHandler: ChannelInboundHandler {
public typealias InboundIn = HTTPServerRequestPart
public typealias OutboundOut = HTTPServerResponsePart
let allocator = ByteBufferAllocator()
let dateCache: RFC1123DateCache
init(channel: Channel) {
self.dateCache = .on(channel.eventLoop)
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
switch unwrapInboundIn(data) {
case let .head(request):
switch request.uri {
case "/infinite":
processInfiniteRequest(in: context)
default:
context.close(promise: nil)
}
case .body:
break
case .end:
context.write(wrapOutboundOut(.end(nil)), promise: nil)
}
}
func channelReadComplete(context: ChannelHandlerContext) {
context.flush()
context.fireChannelReadComplete()
}
private func processInfiniteRequest(in context: ChannelHandlerContext) {
let headers = responseHeaders()
context.write(wrapOutboundOut(.head(headers)), promise: nil)
let buffer = allocator.buffer(repeating: 1, count: 20_000_000)
func writeBuffer() {
let promise = context.eventLoop.makePromise(of: Void.self)
context.write(wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
promise.futureResult.whenComplete { _ in
writeBuffer()
}
}
writeBuffer()
}
private func responseHeaders() -> HTTPResponseHead {
var headers = HTTPHeaders()
headers.add(name: "content-type", value: "application/octet-stream")
headers.add(name: "server", value: "SuperStream")
headers.add(name: "date", value: dateCache.currentTimestamp())
return HTTPResponseHead(
version: .http1_1,
status: .ok,
headers: headers
)
}
}
let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
let bootstrap = ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.backlog, value: 8192)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false, withErrorHandling: false).flatMap {
channel.pipeline.addHandler(HTTPHandler(channel: channel))
}
}
.childChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelOption(ChannelOptions.maxMessagesPerRead, value: 16)
defer {
try! group.syncShutdownGracefully()
}
let channel = try bootstrap.bind(host: "0.0.0.0", port: 8080).wait()
guard let localAddress = channel.localAddress else {
fatalError("Address was unable to bind. Please check that the socket was not closed or that the address family was understood.")
}
try channel.closeFuture.wait()
However, this example, no matter how I adjust the buffer sending loop, either sends little data or blows the stack. So either my HTTP response is malformed and the receiver isn't responding to allow more data and the frontend isn't detecting it, or I'm simply buffering so much up that it blows up. The second case doesn't make much sense since I thought sending a buffer would support backpressure, but something's going wrong.
Any ideas for the fastest way to send an unending stream of bytes over an HTTP body?