Great, to hear that you are trying to adopt our new Concurrency bridges. I want to prepare a more detailed post about them and how they ought to be used but happy to also provide feedback for your specific code.
In general, what you are trying to achieve here is the correct use-case for the NIOAsyncSequenceProducer. It allows you to bridge a channel pipeline into an AsyncSequence which can be consumed from a Task. However, I think there are a few important details which you need to adhere to do this safely and correctly.
1. Backpressure
One of the main goals of NIOAsyncSequenceProducer is to properly communicate the back-pressure from the AsyncSequence to the channel pipeline. To do this the NIOAsyncSequenceProducer returns a YieldResult when calling any of the yield methods on the NIOAsyncSequenceProducer.Source. Furthermore, the NIOAsyncSequenceProducerDelegate has a method produceMore.
Now let's talk about how these should be used to propagate back-pressure between a ChannelHandler and the NIOAsyncSequenceProducer. First, we recommend to buffer all the incoming data that you get in channelRead until you get a channelReadComplete. In channelReadComplete you should forward your whole buffer at once to the NIOAsyncSequenceProducer.Source.yield method. Importantly here, depending on the returned YieldResult you should issue a context.read() to request more data if the YieldResult.produceMore was returned.
The other side of the back-pressure propagation is to implement the NIOAsyncSequenceProducerDelegate.produceMore. If this method gets called you also should issue a context.read() to request more data.
Implementing, both of these makes sure that depending on how fast you are consuming the AsyncSequence new data is requested from the channel pipeline.
2. Consuming the AsyncSequence
An important characteristic of the NIOAsyncSequenceProducer is that it is a unicast AsyncSequence. That means it only allows to create a single AsyncIterator and that AsyncIterator is not Sendable. In practice, that means only a single Task can consume the AsyncSequence.
From looking at your code it is not exactly clear how many Tasks are involved here but from my guessing of the executeAsync method this probably spawns a Task.
We really recommend to only spawn a single Task best a child task to consume the AsyncSequence.
On another note, I think you are also using the NIOAsyncWriter here which is the bridge for writes in the async world back into the NIO channel pipeline. Interestingly, enough it looks like on every channel read you are also getting the processed data from your Task back and feed it back down the channel pipeline.
In general, I would recommend here to not bridge back and forth in the middle of the channel pipeline into Concurrency land. Rather, I would do it at the very end where you feed all the reads into a NIOAsyncSequenceProducer and get all the writes back by using a NIOAsyncWriter.
On the NIOAsyncWriter you want to simply feed all the yielded writes into context.write() and you want to implement the channelWritabilityChanged(context:) method to toggle the writability of the NIOAsyncWriter.Sink.
@lukasa also has an open PR that shows how to bridge a channel into an AsyncSequence. That PR is a bit outdated but it might give you a good idea how it can be done. In the future, we probably want to provide an out-of-box solution to convert a channel (pipeline) into the Concurrency world using the primitives (NIOAsyncSequenceProducer & NIOAsyncWriter).
Let me know if you have more questions!