Proper use of NIOThrowingAsyncProducer

I just want to start off by saying thanks for the hard work in making this API. It is truly taking the NIO/Swift Concurrency bridge closer to a happier place.

I started trying to implement this API into my Channel Handler in order to keep messages in order and receive them one at a time to feed to my other Swift Concurrent handlers. This particular Handler is designer to take IRC protocol lines and parse them before feeding them to be handled else where. It becomes sticky while spawning tasks with in an event loop and also working with the context to write and flush. So this API really is perfect to use in the given situation as coded bellow. I am not sure if any other Libraries have adopted it yet, but here is a first pass at my implementation working with the Test Case code.

    public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        self.logger.info("AsyncMessageChannelHandler Read")
        var buffer = self.unwrapInboundIn(data)
        guard let lines = buffer.readString(length: buffer.readableBytes) else { return }

        guard !lines.isEmpty else { return }
        let messages = lines.components(separatedBy: "\n")
            .map { $0.replacingOccurrences(of: "\r", with: "") }
            .filter{ $0 != ""}
        _ = self.source.yield(contentsOf: messages)

        _ = context.eventLoop.executeAsync {
            func runIterator() async throws {
                guard let result = try await self.iterator?.next() else { throw NeedleTailError.parsingError }
                let parsedMessage = try await AsyncMessageTask.parseMessageTask(task: result, messageParser: self.parser)
                try await self.writer.yield(parsedMessage)
            }

            try await runIterator()
            if self.delegate.produceMoreCallCount != 0 {
               try await runIterator()
            }
        }
        channelRead(context: context)
    }
    
    private func channelRead(context: ChannelHandlerContext) {
        let promise = context.eventLoop.makePromise(of: Deque<IRCMessage>.self)
        self.writerDelegate.didYieldHandler = { deq in
            promise.succeed(deq)
        }
        promise.futureResult.whenSuccess { messages in
            for message in messages {
                let wioValue = self.wrapInboundOut(message)
                context.fireChannelRead(wioValue)
            }
        }
    }

I am curious to know in the use case as mentioned above, do I need to be concerned about handling the yield result or something else that I may seem to not understand about the API?

Thank you for your expertise.

Cole

1 Like

Great, to hear that you are trying to adopt our new Concurrency bridges. I want to prepare a more detailed post about them and how they ought to be used but happy to also provide feedback for your specific code.

In general, what you are trying to achieve here is the correct use-case for the NIOAsyncSequenceProducer. It allows you to bridge a channel pipeline into an AsyncSequence which can be consumed from a Task. However, I think there are a few important details which you need to adhere to do this safely and correctly.

1. Backpressure

One of the main goals of NIOAsyncSequenceProducer is to properly communicate the back-pressure from the AsyncSequence to the channel pipeline. To do this the NIOAsyncSequenceProducer returns a YieldResult when calling any of the yield methods on the NIOAsyncSequenceProducer.Source. Furthermore, the NIOAsyncSequenceProducerDelegate has a method produceMore.

Now let's talk about how these should be used to propagate back-pressure between a ChannelHandler and the NIOAsyncSequenceProducer. First, we recommend to buffer all the incoming data that you get in channelRead until you get a channelReadComplete. In channelReadComplete you should forward your whole buffer at once to the NIOAsyncSequenceProducer.Source.yield method. Importantly here, depending on the returned YieldResult you should issue a context.read() to request more data if the YieldResult.produceMore was returned.
The other side of the back-pressure propagation is to implement the NIOAsyncSequenceProducerDelegate.produceMore. If this method gets called you also should issue a context.read() to request more data.

Implementing, both of these makes sure that depending on how fast you are consuming the AsyncSequence new data is requested from the channel pipeline.

2. Consuming the AsyncSequence

An important characteristic of the NIOAsyncSequenceProducer is that it is a unicast AsyncSequence. That means it only allows to create a single AsyncIterator and that AsyncIterator is not Sendable. In practice, that means only a single Task can consume the AsyncSequence.
From looking at your code it is not exactly clear how many Tasks are involved here but from my guessing of the executeAsync method this probably spawns a Task.
We really recommend to only spawn a single Task best a child task to consume the AsyncSequence.

On another note, I think you are also using the NIOAsyncWriter here which is the bridge for writes in the async world back into the NIO channel pipeline. Interestingly, enough it looks like on every channel read you are also getting the processed data from your Task back and feed it back down the channel pipeline.
In general, I would recommend here to not bridge back and forth in the middle of the channel pipeline into Concurrency land. Rather, I would do it at the very end where you feed all the reads into a NIOAsyncSequenceProducer and get all the writes back by using a NIOAsyncWriter.

On the NIOAsyncWriter you want to simply feed all the yielded writes into context.write() and you want to implement the channelWritabilityChanged(context:) method to toggle the writability of the NIOAsyncWriter.Sink.

@lukasa also has an open PR that shows how to bridge a channel into an AsyncSequence. That PR is a bit outdated but it might give you a good idea how it can be done. In the future, we probably want to provide an out-of-box solution to convert a channel (pipeline) into the Concurrency world using the primitives (NIOAsyncSequenceProducer & NIOAsyncWriter).

Let me know if you have more questions!

5 Likes

Thank you very much for the detailed explanation. I do have a question in regards to the mentioning of forwarding the whole buffer to the channelReadComplete method once buffering has completed. If we are buffering to determine how many readable bytes have been receive and if there are anymore we need to wait for until we give them to the AsyncStequence, what would be the recommended way of passing them along to channelReadComplete. Does the back pressure API have a method for doing this?

    public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        self.logger.info("AsyncMessageChannelHandler Read")
        let buffer = self.unwrapInboundIn(data)

        self.backPressureStrategy.didYieldHandler = { bufferDepth in
            bufferDepth < buffer.readableBytes
        }
        self.backPressureStrategy.didNextHandler = { bufferDepth in
            bufferDepth < 0
        }

//Give to channelReadComplete
}

    public func channelReadComplete(context: ChannelHandlerContext) {
        self.logger.info("AsyncMessageChannelHandler Read Complete")
//Get Back pressured buffer
            guard let lines = buffer.readString(length: buffer.readableBytes) else { return }

            guard !lines.isEmpty else { return }
            let messages = lines.components(separatedBy: "\n")
                .map { $0.replacingOccurrences(of: "\r", with: "") }
                .filter{ $0 != ""}

// Handle yield result and produce more if needed
            let result = self.source.yield(contentsOf: messages)
            switch result {
            case .produceMore:
                logger.trace("Produce More")
                context.read()
            case .stopProducing:
                logger.trace("Stop Producing")
            case .dropped:
                logger.trace("Dropped Yield Result")
            }
}

It does, but you don't want to include the pending buffer in channelRead in your calculations. You should set a buffer depth inside the backpressure strategy itself irrespective of what is about to be put into the NIOAsyncSequenceProducer. In general it's acceptable to tolerate a little buffer slop here.

1 Like

In particular, check out the HighLowWatermark strategy: swift-nio/NIOAsyncSequenceProducerStrategies.swift at 41f0b2d3983038ae68bbb902ea43e123a5466494 · apple/swift-nio · GitHub.

2 Likes

@lukasa Already pointed you in the right direction with the HighLowWatermark strategy. In general, we just recommend to go with that and you can tune the marks depending on your use-case. There might be some value down the road in implementing an adaptive buffer strategy.

On another note, I would recommend you to just buffer the ByteBuffer that you unwrap in every channelRead into a Deque and then just yield the whole Deque into the NIOAsyncSequenceProducer.Source.

2 Likes

Thanks again for your help. So I have a question in regards to NIOAsyncWriter.Sink How exactly do I pass the deque in the writer to channelWritabilityChanged(context:) in order to pass it to the next handler? My understanding is that when the HighLowWatermark strategy starts to overflow it should trigger the writability changed method? From there I should be able to get whatever I put in the writer and then call context.fireChannelRead(). From there I can feed the data to the next handler. But I am stuck on how to get from A to B?

My understanding is NIOAsyncWriter takes Swift Concurrent data and yields it, in order to feed it into NIO contexts by sinking the yielded data from the writer? Am I missing something?

//private method called from channel read after parsing and yielding
        private func channelRead(context: ChannelHandlerContext) {

            self.writerDelegate.didYieldHandler = { deq in
//deq is a Deque<IRCMessage>

            }

//Context wants the InboundIn handler which is a ByteBuffer, but I want to forward a IRCMessage Type to channelWritabilityChanged(context:) 
            _ = context.write(NIOAny(ByteBuffer()))
        }
    
    public func channelWritabilityChanged(context: ChannelHandlerContext) {
        print("CHANGED____")

//How to sink the writerDelegate's yield? 
// Then call 

for message in messages {
                   let wioValue = self.wrapInboundOut(message)
                    context.fireChannelRead(wioValue)
}
        context.fireChannelWritabilityChanged()
    }

The whole purpose of my handler is to have a client/server handler that parses IRCMessages from the BB -> String which then passes it to another client or server handler which in turn does very different things. That is why I am using NIOAsyncWriter. I need to go from NIO -> Swift Concurrent world for parsing -> NIO -> Give to Client or server to do different stuff.

Thanks again for the consideration.

Cole

So the idea behind the writer is that it should be tied 1:1 to a single channel handler. The implementation of the writer's delegate should basically take the Deque that gets yielded and for each element in the Deque call context.write(). After everything is written you should flush the writes.

In the channelWritabilityChanged you should call the NIOAsyncWriter.Sink.changeWritability.

If I understand you correctly you actually have two Channels in play here one server and one client right? If that is the case you should have an AsyncSequence and writer for each of them and then in the Concurrency world forward from the sequences to the correct writer.

Ok thank you for the bit of information. In this Scenerio the handler is used on one channel, running in 2 different programs, one is up on a VPS and the other is on an iOS device. I think I figured out a solution, not much different than what I had.

        private func channelRead(context: ChannelHandlerContext) {
            let promise = context.eventLoop.makePromise(of: Deque<IRCMessage>.self)
            self.writerDelegate.didYieldHandler = { deq in
            promise.succeed(deq)
            }
            promise.futureResult.whenSuccess { messages in
                for message in messages {
                    let wioValue = self.wrapInboundOut(message)
                    context.fireChannelRead(wioValue)
                    context.flush()
                }
            }
        }

I don't want to write the context to a byte buffer I want to pass the IRCMessage value to the next handler. Will there be a problem reading the value and then flushing it like I have done above?

@FranzBusch So I think I have something working however I am seeing something interesting. ChannelReadComplete() Seems to be called twice after a ChannelRead(). This causes the error to be thrown that I am creating an Async Iterator more than once which is thrown on the next() method. I am not sure why this is happening since I am calling self.iterator = self.sequence?.makeAsyncIterator() only if it is nil and only in the initializer.

Here is another snippet of my code. Any thoughts on why the fatal error is thrown in this situation?
fatalError("NIOThrowingAsyncSequenceProducer allows only a single AsyncIterator to be created")

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let buffer = self.unwrapInboundIn(data)
        bufferDeque.append(buffer)
    }
    
    
    public func channelReadComplete(context: ChannelHandlerContext) {
        let result = self.source.yield(contentsOf: bufferDeque)
        switch result {
        case .produceMore:
            logger.trace("Produce More")
            context.read()
        case .stopProducing:
            logger.trace("Stop Producing")
            return
        case .dropped:
            logger.trace("Dropped Yield Result")
            break
        }
        
        Task {
            try await withThrowingTaskGroup(of: Void.self, body: { group in
                group.addTask {
                    guard var buffer = try await self.iterator?.next() else { throw MyError.parsingError }
                    guard let lines = buffer.readString(length: buffer.readableBytes) else { return }
                    guard !lines.isEmpty else { return }
                    let messages = lines.components(separatedBy: Constants.cLF)
                        .map { $0.replacingOccurrences(of: Constants.cCR, with: Constants.space) }
                        .filter { $0 != ""}
                    
                    for message in messages {
                        let parsedMessage = try await AsyncMessageTask.parseMessageTask(task: message, messageParser: self.parser)
                        try await self.writer.yield(parsedMessage)
                    }
                }
                try await group.waitForAll()
            })
        }
        self.channelRead(context: context)
        self.bufferDeque.removeAll()
    }
    
    private func channelRead(context: ChannelHandlerContext) {
        
        func processWrites() {
            context.eventLoop.execute {
                if self.writes.count >= 1 {
                    guard let message = self.writes.popLast() else { return }
                    let wioValue = self.wrapInboundOut(message)
                    context.fireChannelRead(wioValue)
                    context.fireChannelReadComplete()
                    context.flush()
                    processWrites()
                } else {
                    return
                }
            }
        }
        
        self.writerDelegate.didYieldHandler = { deq in
            self.writes.append(contentsOf: deq)
            processWrites()
        }
    }

You are still holding the types a bit wrong in a few places which lead to this odd behaviour. Before I dive into more details here I just want to point you to this PR from @lukasa again. He recently updated it to adopt the NIOAsyncSequenceProducer and it shows nicely how a complete Channel should be bridged to structured Concurrency without the need to reach to unstructured Concurrency via Task {}. It still needs a bit of work and the writer side is missing but it shows the general setup quite nicely.

Now, to your specific code. I will try to focus on each individual method and provide you some hints and guidance.

This is perfect :+1:

In the above channelReadComplete are a few problems:

  1. We don't recommend to call context.read() directly but let that get handled by auto-read on the channel. In the PR from Cory you can see how he keeps a PendingReadState that he updates in channelReadComplete and in the delegate's produceMore method. He then makes a decision in read to determine if he should actually issue a read. (Note: The naming here is something that we still want to improve but the rough concept stands)
  2. You are spewing a new unstructured Task for every channelReadComplete. This poses two problems. First, you are off into unstructured Concurrency which ought to be avoided. Secondly, this can mess up ordering of your written messages since later channelReadCompletes can spawn a Task that finishes before previous ones. (On a side note: As far as I can see the usage of a withThrowingTaskGroup here is not needed since you only have a single child task anyhow)

This part here is also a bit problematic since it sets the didYieldHandler multiple times which really only should be set once on construction of the NIOAsyncWriter and then the writer itself should be passed to the consuming/producing child task and the NIOAsyncWriter.Sink should be retained by the channel handler. Furthermore, you are calling context.fireChannelReadComplete potentially multiple times here since didYield can be called more than once. What I would recommend is to just call it once at the end of channelReadComplete.

I am also a bit confused by the context.fireChannelRead(). Are there more channel handlers in the pipeline?

Another thing: Why are you buffering the writes? In the didYield of the writers delegate you really should only call context.write for all the writes you just got and then call context.flush.

I hope this helps a bit and we really want to provide you an easier way to do all of this with NIO so that you can just setup your channel pipeline and then bridge to the async world once.