How to use NIOAsyncChannel and reuse connections for an HTTP Client

Hello,

I'm writing an HTTP client for a library. I can only use Swift-NIO as a dependency.

The client will poll events from a server (GET /next) and reuse the same TCP connection once an event has been delivered to poll the next event (issue another GET /next)

I'm trying to implement this with NIOAsyncChannel.executeAndClose()
My initial idea was to loop until shutdown is requested and in each pass in the loop, use outbound to send the GET /next and iterate over inbound to read my event.

However, it appears that inbound can produce only one iterator, my second attempt to iterate over inbound generates an error:

NIOThrowingAsyncSequenceProducer allows only a single AsyncIterator to be created

How can I reuse the same TCP connection in an HTTP client using NIOAsyncChannel ?

Create the iterator once and then reuse it for each request.

This is something I have been using in tests:

enum SimpleClient {
    typealias ClientHandler<Result> = (NIOAsyncChannelInboundStream<HTTPResponsePart>, NIOAsyncChannelOutboundWriter<HTTPRequestPart>) async throws -> (Result)

    static func execute<Result>(host: String, port: Int, _ work: @escaping ClientHandler<Result>) async throws -> Result {
        let channel : NIOAsyncChannel = try await ClientBootstrap(group: MultiThreadedEventLoopGroup.singleton)
            .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
            .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEPORT), value: 1)
            .connect(host: host, port: port) { channel in
                channel.pipeline.addHTTPClientHandlers()
                /*.flatMap {
                    channel.pipeline.addHandler(DebugClientHandler())
                }*/.flatMap {
                    channel.pipeline.addHandler(HTTP1ToHTTPClientCodec())
                }.flatMapThrowing {
                    try NIOAsyncChannel(
                        wrappingChannelSynchronously: channel,
                        configuration: .init(
                            inboundType: HTTPResponsePart.self,
                            outboundType: HTTPRequestPart.self)
                        )
                }

            }
        
        return try await channel.executeThenClose(work)
    }
}


extension NIOAsyncChannelInboundStream<HTTPResponsePart>.AsyncIterator {
    mutating func readFullResponse() async throws -> (HTTPResponse, ByteBuffer, HTTPFields?) {
        var response: HTTPResponse?
        var body = ByteBuffer()
        while let part = try await self.next() {
            switch part {
            case .head(let head):
                response = head
            case .body(var buf):
                body.writeBuffer(&buf)
            case .end(let trailers):
                guard let response = response else {
                    throw HTTPError.unexpectedHTTPPart(part)
                }
                return (response, body, trailers)
            }
        }
        throw HTTPError.unexpectedHTTPPart(nil)
    }
}

extension NIOAsyncChannelOutboundWriter<HTTPRequestPart> {
    func get(_ path: String) async throws {
        try await self.write(.head(HTTPRequest(method: .get, scheme: nil, authority: nil, path: path)))
        try await self.write(.end(nil))
    }

    func post(_ path: String, body: ByteBuffer) async throws {
        try await self.write(.head(HTTPRequest(method: .post, scheme: nil, authority: nil, path: path)))
        try await self.write(.body(body))
        try await self.write(.end(nil))
    }

    func head(_ path: String) async throws {
        try await self.write(.head(HTTPRequest(method: .head, scheme: nil, authority: nil, path: path)))
        try await self.write(.end(nil))
    }
}

And then on the use site:

@Test
    func testMethodNotAllowed() async throws {
        var router = Router()
        router.get("/user/{id}/posts", handler: AnyHandler { req, res in 
            try await res.plainText("OK: /users/\(req.pathParameters[required: "id", as: Int.self])/posts")
        })

        try await withServer(handler: router.handle) { inbound, outbound in 
            var inboundIterator = inbound.makeAsyncIterator()
            try await outbound.head("/user/123/posts")
            var response = try await inboundIterator.readFullResponse()
            #expect(response.0.status == .methodNotAllowed)
            let body = String(decoding: response.1.readableBytesView, as: UTF8.self)
            #expect("" == body) // bodies for head requests are ignored


            try await outbound.get("/user/123/posts")
            response = try await inboundIterator.readFullResponse()
            #expect(response.0.status == .ok)
            #expect("OK: /users/123/posts" == String(decoding: response.1.readableBytesView, as: UTF8.self))
        }
    }

3 Likes

This worked for me. Thank you for helping ! It was great seeing you yesterday. I hope you enjoyed Fosdem !

2 Likes