NIOAsyncChannel: executeThenClose is too restrictive

I am trying to use the SwiftNIO async APIs (i.e. NIOAsyncChannel) to create a client library that communicates with a server using a proprietary TCP protocol. The inbound and outbound properties of NIOAsyncChannel are perfect for this, but have been deprecated in favor of executeThenClose(_:).

I don't want to use executeThenClose(_:) because it locks the lifetime of the channel to the scope of the closure supplied to executeThenClose(_:). This is a problem for my use case because the consumer of my library may have its own event loop or callback mechanism (e.g. for processing users' requests) that requires a channel remain open longer.

Any suggestions? Would it be possible for the inbound and outbound properties to be un-deprecated to support this scenario? Thanks!

The desire is to push users strongly towards a structured concurrency pattern. That is one where it isn't possible to leak the underlying connections, or to have them close at unexpected times, and that requires the scoped access of a with block.

The best solution to what you want here is to use a Structured Concurrency approach. In that case, you'd launch your NIOAsyncChannel in a task, and then communicate between that task and the user's code using AsyncSequences. You could use AsyncStream for that task, but any AsyncSequence will do.

1 Like

Thank you. I will try the approach you described.

Do you happen to have a minimal code sample to share ? I'm struggling to put all the pieces together :-)

I'm building an HTTP client that does long standing connection to a server that streams event. I want the connection to be cancellable and allow for a graceful shutdown.

Before I provide a more detailed example, is there a reason async-http-client isn't suitable for this use-case?

For my use case (the Swift Lambda Runtime library) we want to keep the number of dependencies to the minimal and try to produce the smallest possible executables.

I found a solution for my use case. Not sure it’s the best or it follows best practices. But it works.

I’m in public transport now. Will share more details when I will be properly connected.

1 Like

Here is my solution.

  1. Create the bootstrap
        self.clientBoostrap = ClientBootstrap(
            group: NIOSingletons.posixEventLoopGroup
        )
        .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
        .channelInitializer { channel in
            channel.eventLoop.makeCompletedFuture {
                try channel.pipeline.syncOperations.addHTTPClientHandlers()
            }
        }
  1. Create the client channel (in my case, my clients talks HTTP with the server, I use the HTTPClientRequestPart and HTTPClientResponsePart struct from the swift-nio project.
        try await clientBoostrap.connect(
            host: self.ip,
            port: self.port
        )
        .flatMapThrowing { channel in
            try NIOAsyncChannel(
                wrappingChannelSynchronously: channel,
                configuration: NIOAsyncChannel.Configuration(
                    inboundType: HTTPClientResponsePart.self,
                    outboundType: HTTPClientRequestPart.self
                )
            )
        }
        .get()
  1. I use executeAndClose() to read requests and queue them in a shared data structure that another thread can handle
try await clientChannel.executeThenClose { inbound, outbound in

    var inboundIterator = inbound.makeAsyncIterator()

    // send a request and wait for the response
    try await outbound.get("/next")
    let (headers, body) = try await inboundIterator.readFullResponse() // this func joins the .head .body and .end

    let response = await withCheckedContinuation {
        (continuation: CheckedContinuation<ByteBuffer, Never>) in

        // enqueue the invocation in a shared structure on which our caller can iterate
        self.invocationsPool.push(headers, body, continuation)

        // the consumer of the invocation will call continuation.resume()
    }

    // now we have a response
    try await outbound.post("/post", body: response)

    // read the server response from our /POST response
    let (respHeaders, respBody) = try await inboundIterator.readFullResponse()
}

// anything below this line might not be executed in case of Task cancellation
print("server closed the connection - exited executeThenClose")

Finally, another thread processes the requests received.

for try await invocation in self.invocationsPool {

    // invocation contains the request + the continuation object

    let response = .... // do something with the invocation 

    // notify the HTTPClient that a response is ready to send
    invocation.continuation.resume(returning: response)
}