Proper use of NIOThrowingAsyncProducer

I just want to start off by saying thanks for the hard work in making this API. It is truly taking the NIO/Swift Concurrency bridge closer to a happier place.

I started trying to implement this API into my Channel Handler in order to keep messages in order and receive them one at a time to feed to my other Swift Concurrent handlers. This particular Handler is designer to take IRC protocol lines and parse them before feeding them to be handled else where. It becomes sticky while spawning tasks with in an event loop and also working with the context to write and flush. So this API really is perfect to use in the given situation as coded bellow. I am not sure if any other Libraries have adopted it yet, but here is a first pass at my implementation working with the Test Case code.

    public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        self.logger.info("AsyncMessageChannelHandler Read")
        var buffer = self.unwrapInboundIn(data)
        guard let lines = buffer.readString(length: buffer.readableBytes) else { return }

        guard !lines.isEmpty else { return }
        let messages = lines.components(separatedBy: "\n")
            .map { $0.replacingOccurrences(of: "\r", with: "") }
            .filter{ $0 != ""}
        _ = self.source.yield(contentsOf: messages)

        _ = context.eventLoop.executeAsync {
            func runIterator() async throws {
                guard let result = try await self.iterator?.next() else { throw NeedleTailError.parsingError }
                let parsedMessage = try await AsyncMessageTask.parseMessageTask(task: result, messageParser: self.parser)
                try await self.writer.yield(parsedMessage)
            }

            try await runIterator()
            if self.delegate.produceMoreCallCount != 0 {
               try await runIterator()
            }
        }
        channelRead(context: context)
    }
    
    private func channelRead(context: ChannelHandlerContext) {
        let promise = context.eventLoop.makePromise(of: Deque<IRCMessage>.self)
        self.writerDelegate.didYieldHandler = { deq in
            promise.succeed(deq)
        }
        promise.futureResult.whenSuccess { messages in
            for message in messages {
                let wioValue = self.wrapInboundOut(message)
                context.fireChannelRead(wioValue)
            }
        }
    }

I am curious to know in the use case as mentioned above, do I need to be concerned about handling the yield result or something else that I may seem to not understand about the API?

Thank you for your expertise.

Cole

1 Like

Great, to hear that you are trying to adopt our new Concurrency bridges. I want to prepare a more detailed post about them and how they ought to be used but happy to also provide feedback for your specific code.

In general, what you are trying to achieve here is the correct use-case for the NIOAsyncSequenceProducer. It allows you to bridge a channel pipeline into an AsyncSequence which can be consumed from a Task. However, I think there are a few important details which you need to adhere to do this safely and correctly.

1. Backpressure

One of the main goals of NIOAsyncSequenceProducer is to properly communicate the back-pressure from the AsyncSequence to the channel pipeline. To do this the NIOAsyncSequenceProducer returns a YieldResult when calling any of the yield methods on the NIOAsyncSequenceProducer.Source. Furthermore, the NIOAsyncSequenceProducerDelegate has a method produceMore.

Now let's talk about how these should be used to propagate back-pressure between a ChannelHandler and the NIOAsyncSequenceProducer. First, we recommend to buffer all the incoming data that you get in channelRead until you get a channelReadComplete. In channelReadComplete you should forward your whole buffer at once to the NIOAsyncSequenceProducer.Source.yield method. Importantly here, depending on the returned YieldResult you should issue a context.read() to request more data if the YieldResult.produceMore was returned.
The other side of the back-pressure propagation is to implement the NIOAsyncSequenceProducerDelegate.produceMore. If this method gets called you also should issue a context.read() to request more data.

Implementing, both of these makes sure that depending on how fast you are consuming the AsyncSequence new data is requested from the channel pipeline.

2. Consuming the AsyncSequence

An important characteristic of the NIOAsyncSequenceProducer is that it is a unicast AsyncSequence. That means it only allows to create a single AsyncIterator and that AsyncIterator is not Sendable. In practice, that means only a single Task can consume the AsyncSequence.
From looking at your code it is not exactly clear how many Tasks are involved here but from my guessing of the executeAsync method this probably spawns a Task.
We really recommend to only spawn a single Task best a child task to consume the AsyncSequence.

On another note, I think you are also using the NIOAsyncWriter here which is the bridge for writes in the async world back into the NIO channel pipeline. Interestingly, enough it looks like on every channel read you are also getting the processed data from your Task back and feed it back down the channel pipeline.
In general, I would recommend here to not bridge back and forth in the middle of the channel pipeline into Concurrency land. Rather, I would do it at the very end where you feed all the reads into a NIOAsyncSequenceProducer and get all the writes back by using a NIOAsyncWriter.

On the NIOAsyncWriter you want to simply feed all the yielded writes into context.write() and you want to implement the channelWritabilityChanged(context:) method to toggle the writability of the NIOAsyncWriter.Sink.

@lukasa also has an open PR that shows how to bridge a channel into an AsyncSequence. That PR is a bit outdated but it might give you a good idea how it can be done. In the future, we probably want to provide an out-of-box solution to convert a channel (pipeline) into the Concurrency world using the primitives (NIOAsyncSequenceProducer & NIOAsyncWriter).

Let me know if you have more questions!

6 Likes

Thank you very much for the detailed explanation. I do have a question in regards to the mentioning of forwarding the whole buffer to the channelReadComplete method once buffering has completed. If we are buffering to determine how many readable bytes have been receive and if there are anymore we need to wait for until we give them to the AsyncStequence, what would be the recommended way of passing them along to channelReadComplete. Does the back pressure API have a method for doing this?

    public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        self.logger.info("AsyncMessageChannelHandler Read")
        let buffer = self.unwrapInboundIn(data)

        self.backPressureStrategy.didYieldHandler = { bufferDepth in
            bufferDepth < buffer.readableBytes
        }
        self.backPressureStrategy.didNextHandler = { bufferDepth in
            bufferDepth < 0
        }

//Give to channelReadComplete
}

    public func channelReadComplete(context: ChannelHandlerContext) {
        self.logger.info("AsyncMessageChannelHandler Read Complete")
//Get Back pressured buffer
            guard let lines = buffer.readString(length: buffer.readableBytes) else { return }

            guard !lines.isEmpty else { return }
            let messages = lines.components(separatedBy: "\n")
                .map { $0.replacingOccurrences(of: "\r", with: "") }
                .filter{ $0 != ""}

// Handle yield result and produce more if needed
            let result = self.source.yield(contentsOf: messages)
            switch result {
            case .produceMore:
                logger.trace("Produce More")
                context.read()
            case .stopProducing:
                logger.trace("Stop Producing")
            case .dropped:
                logger.trace("Dropped Yield Result")
            }
}

It does, but you don't want to include the pending buffer in channelRead in your calculations. You should set a buffer depth inside the backpressure strategy itself irrespective of what is about to be put into the NIOAsyncSequenceProducer. In general it's acceptable to tolerate a little buffer slop here.

1 Like

In particular, check out the HighLowWatermark strategy: swift-nio/NIOAsyncSequenceProducerStrategies.swift at 41f0b2d3983038ae68bbb902ea43e123a5466494 · apple/swift-nio · GitHub.

2 Likes

@lukasa Already pointed you in the right direction with the HighLowWatermark strategy. In general, we just recommend to go with that and you can tune the marks depending on your use-case. There might be some value down the road in implementing an adaptive buffer strategy.

On another note, I would recommend you to just buffer the ByteBuffer that you unwrap in every channelRead into a Deque and then just yield the whole Deque into the NIOAsyncSequenceProducer.Source.

2 Likes

Thanks again for your help. So I have a question in regards to NIOAsyncWriter.Sink How exactly do I pass the deque in the writer to channelWritabilityChanged(context:) in order to pass it to the next handler? My understanding is that when the HighLowWatermark strategy starts to overflow it should trigger the writability changed method? From there I should be able to get whatever I put in the writer and then call context.fireChannelRead(). From there I can feed the data to the next handler. But I am stuck on how to get from A to B?

My understanding is NIOAsyncWriter takes Swift Concurrent data and yields it, in order to feed it into NIO contexts by sinking the yielded data from the writer? Am I missing something?

//private method called from channel read after parsing and yielding
        private func channelRead(context: ChannelHandlerContext) {

            self.writerDelegate.didYieldHandler = { deq in
//deq is a Deque<IRCMessage>

            }

//Context wants the InboundIn handler which is a ByteBuffer, but I want to forward a IRCMessage Type to channelWritabilityChanged(context:) 
            _ = context.write(NIOAny(ByteBuffer()))
        }
    
    public func channelWritabilityChanged(context: ChannelHandlerContext) {
        print("CHANGED____")

//How to sink the writerDelegate's yield? 
// Then call 

for message in messages {
                   let wioValue = self.wrapInboundOut(message)
                    context.fireChannelRead(wioValue)
}
        context.fireChannelWritabilityChanged()
    }

The whole purpose of my handler is to have a client/server handler that parses IRCMessages from the BB -> String which then passes it to another client or server handler which in turn does very different things. That is why I am using NIOAsyncWriter. I need to go from NIO -> Swift Concurrent world for parsing -> NIO -> Give to Client or server to do different stuff.

Thanks again for the consideration.

Cole

So the idea behind the writer is that it should be tied 1:1 to a single channel handler. The implementation of the writer's delegate should basically take the Deque that gets yielded and for each element in the Deque call context.write(). After everything is written you should flush the writes.

In the channelWritabilityChanged you should call the NIOAsyncWriter.Sink.changeWritability.

If I understand you correctly you actually have two Channels in play here one server and one client right? If that is the case you should have an AsyncSequence and writer for each of them and then in the Concurrency world forward from the sequences to the correct writer.

Ok thank you for the bit of information. In this Scenerio the handler is used on one channel, running in 2 different programs, one is up on a VPS and the other is on an iOS device. I think I figured out a solution, not much different than what I had.

        private func channelRead(context: ChannelHandlerContext) {
            let promise = context.eventLoop.makePromise(of: Deque<IRCMessage>.self)
            self.writerDelegate.didYieldHandler = { deq in
            promise.succeed(deq)
            }
            promise.futureResult.whenSuccess { messages in
                for message in messages {
                    let wioValue = self.wrapInboundOut(message)
                    context.fireChannelRead(wioValue)
                    context.flush()
                }
            }
        }

I don't want to write the context to a byte buffer I want to pass the IRCMessage value to the next handler. Will there be a problem reading the value and then flushing it like I have done above?

@FranzBusch So I think I have something working however I am seeing something interesting. ChannelReadComplete() Seems to be called twice after a ChannelRead(). This causes the error to be thrown that I am creating an Async Iterator more than once which is thrown on the next() method. I am not sure why this is happening since I am calling self.iterator = self.sequence?.makeAsyncIterator() only if it is nil and only in the initializer.

Here is another snippet of my code. Any thoughts on why the fatal error is thrown in this situation?
fatalError("NIOThrowingAsyncSequenceProducer allows only a single AsyncIterator to be created")

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let buffer = self.unwrapInboundIn(data)
        bufferDeque.append(buffer)
    }
    
    
    public func channelReadComplete(context: ChannelHandlerContext) {
        let result = self.source.yield(contentsOf: bufferDeque)
        switch result {
        case .produceMore:
            logger.trace("Produce More")
            context.read()
        case .stopProducing:
            logger.trace("Stop Producing")
            return
        case .dropped:
            logger.trace("Dropped Yield Result")
            break
        }
        
        Task {
            try await withThrowingTaskGroup(of: Void.self, body: { group in
                group.addTask {
                    guard var buffer = try await self.iterator?.next() else { throw MyError.parsingError }
                    guard let lines = buffer.readString(length: buffer.readableBytes) else { return }
                    guard !lines.isEmpty else { return }
                    let messages = lines.components(separatedBy: Constants.cLF)
                        .map { $0.replacingOccurrences(of: Constants.cCR, with: Constants.space) }
                        .filter { $0 != ""}
                    
                    for message in messages {
                        let parsedMessage = try await AsyncMessageTask.parseMessageTask(task: message, messageParser: self.parser)
                        try await self.writer.yield(parsedMessage)
                    }
                }
                try await group.waitForAll()
            })
        }
        self.channelRead(context: context)
        self.bufferDeque.removeAll()
    }
    
    private func channelRead(context: ChannelHandlerContext) {
        
        func processWrites() {
            context.eventLoop.execute {
                if self.writes.count >= 1 {
                    guard let message = self.writes.popLast() else { return }
                    let wioValue = self.wrapInboundOut(message)
                    context.fireChannelRead(wioValue)
                    context.fireChannelReadComplete()
                    context.flush()
                    processWrites()
                } else {
                    return
                }
            }
        }
        
        self.writerDelegate.didYieldHandler = { deq in
            self.writes.append(contentsOf: deq)
            processWrites()
        }
    }

You are still holding the types a bit wrong in a few places which lead to this odd behaviour. Before I dive into more details here I just want to point you to this PR from @lukasa again. He recently updated it to adopt the NIOAsyncSequenceProducer and it shows nicely how a complete Channel should be bridged to structured Concurrency without the need to reach to unstructured Concurrency via Task {}. It still needs a bit of work and the writer side is missing but it shows the general setup quite nicely.

Now, to your specific code. I will try to focus on each individual method and provide you some hints and guidance.

This is perfect :+1:

In the above channelReadComplete are a few problems:

  1. We don't recommend to call context.read() directly but let that get handled by auto-read on the channel. In the PR from Cory you can see how he keeps a PendingReadState that he updates in channelReadComplete and in the delegate's produceMore method. He then makes a decision in read to determine if he should actually issue a read. (Note: The naming here is something that we still want to improve but the rough concept stands)
  2. You are spewing a new unstructured Task for every channelReadComplete. This poses two problems. First, you are off into unstructured Concurrency which ought to be avoided. Secondly, this can mess up ordering of your written messages since later channelReadCompletes can spawn a Task that finishes before previous ones. (On a side note: As far as I can see the usage of a withThrowingTaskGroup here is not needed since you only have a single child task anyhow)

This part here is also a bit problematic since it sets the didYieldHandler multiple times which really only should be set once on construction of the NIOAsyncWriter and then the writer itself should be passed to the consuming/producing child task and the NIOAsyncWriter.Sink should be retained by the channel handler. Furthermore, you are calling context.fireChannelReadComplete potentially multiple times here since didYield can be called more than once. What I would recommend is to just call it once at the end of channelReadComplete.

I am also a bit confused by the context.fireChannelRead(). Are there more channel handlers in the pipeline?

Another thing: Why are you buffering the writes? In the didYield of the writers delegate you really should only call context.write for all the writes you just got and then call context.flush.

I hope this helps a bit and we really want to provide you an easier way to do all of this with NIO so that you can just setup your channel pipeline and then bridge to the async world once.

Thank you for your help @FranzBusch. I have one question when it comes to when channelReadComplete is called. In the channelRead method I am appending messages to a deque. At times only 1 message is appended and channelReadComplete is never called. How can I ensure channelReadComplete is called all of the time. What causes ChannelReadComplete to be called in the ContextHandler?

Thanks,

Cole

It depends on what Channel you are using here but assuming you use a BaseSocketChannel under the hood then channelReadComplete is called when there is no more data left to pull from the socket. So the normal flow is a read is issued to the channel, the channel tries to read from the socket, it pulls data and sends channelRead down the channel pipeline. Once there is no more data left, it sends a channelReadComplete.
So it is totally expected to see only a single channelRead before a channelReadComplete but there can be more. However, there should always be a channelReadComplete at some point. What kind of channel are you using?

I would assume it also is a BaseSocketChannel. I am building on NIOClientTCPBootstrap so nothing really special, just a TCP connection.

I am still confused as to

  1. How I am suppose to call try await iterator.next() While not in a Task{} because that is unstructured.

  2. How to actually get the list of items in the iterator not just 1 item at a time without creating the iterator more than once. I can not seem to find a typical for in logic as in normal iterators or have the ability to count the number of items in the iterator.

I have tried a variety of thing like allowing the buffer to handle the deque and then just iterate over the deque which I am sure is wrong because I seem to get weird behavior such as the task spawning twice.

I have looked at the example in the PR which has helped. I am just unclear on the 2 points above.

Thank you and sorry for my lack of understanding.

Cole

The whole purpose of me doing this is to use 1 NIO ChannelDuplexHandler to Parse information using Swift Concurrency then feed it to another ChannelInboundHandler client or server which will then take the messages and handle then accordingly with Swift Concurrency.

I feel like I am working way too hard on this so I would like to be smarter. Any help is much appreciated.

Cole

If I understand you correctly you have two separate Channels and you basically want to connect them through Swift Concurrency. This is tricky to do right now. You need to spawn one unstructured Task in the inbound channel handler once and pass it the NIOAsyncSequenceProducer. Inside that task you then consume the values and hand them to the other channel handler.

However, all of this can be way easier once we fully embrace Concurrency in NIO and allow you to create NIOAsyncChannels from a Channel. This would allow you to do something like this in the end

let channel1 = NIOClientBootstrap.makeAsyncChannelAndBind()
let channel2 = NIOServerBootstrap.makeAsyncChannelAndBind()

for await value in channel1.inboundStream {
    // Do some transformation/work here if required with the value
    channel2.writer.write(value)
}

The above is hypothetical and we are still working out some of the spellings. So the only way to really achieve this right now is doing your unstructured Task dance and passing it from one ChannelHandler to the other.

If your setup is not based on two different Channels being connected but rather you want to use Swift Concurrency inside a ChannelHandler and then pass the output of Concurrency further down the pipeline, then I would recommend not doing that. Our general guidance is, stay inside the Channel pipeline for network protocol work and switch into async land for business logic. Just don't switch back and forth inside a single pipeline.

Ok, thank you that is helpful. I noticed the NIOAsyncAwaitServerExample. Is there a client equivalent in a repo out there somewhere?

I look forward to the official release of this code! Thanks a bunch.

Cole

@FranzBusch . So I have adopted NIOAsyncChannelAdapter Stream/Writer successfully in 2 of my projects. Thanks a bunch for your continued help. I have one more question, My client is using the AsyncWriter to send string based message to the server to be handled. If I have a very long message when the server's NIOAsyncStream reads the channel it doesn't receive the entire message in the readable bytes.

For Example

let message = "SOME LONG MESSAGE....."
let buffer = ByteBuffer(string: message)
 try await channel.writeAndFlush(buffer)

When the server reads the message on the stream

  for try await buffer in asyncChannel.inboundStream {
                var buffer = buffer
               let message = buffer.readString(length: buffer.readableBytes) 
               print(message)
                //SOME LONG
}

This of course is an example and the actual message is quite a bit longer

It seems a ByteBuffer's capacity is 2048 on the Servers Child NIOAsyncChannel, But my Client's NIOAsyncWrite is send a buffer with a capacity of 4096.

Any ideas of how I can fix this? Not sure what I am doing wrong here.

The protocol I am using is text based so I must send strings. Currently I am creating data from the message and then base64 stringing it to follow the protocols requirement.

Thank you for your thoughts,

Cole

Great to hear that you had success in adopting our new async types. We are still actively working on finishing them up. There are a few more challenges that we have to solve to make them both performant but also usable in scenarios such as protocol negotiation.

To your concrete problem. What you are seeing here is that the individual packet can get chunked into various smaller packets by your networking stack. Same can happen on the server side where you read.
To solve this problem you can do a few things, but the easiest is length prefixing your messages. ByteBuffer has a method called writeLengthPrefixed that allows you to write a length prefixed String. The important bit here is that you now need to handle not receiving reads that don't contain a full length prefixed message. The good thing is that we already provide helpers to make this a lot easier to do. Namely you should be looking at NIOSingleStepByteToMessageDecoder, ByteToMessageDecoder and lastly ByteToMessageHandler.

Let me know if that works for you!

2 Likes

Thanks for the prompt reply, So I did forget to add the ByteToMessageHandler back in when I converted the code. But I had not adopted the write length prefix yet. Thanks for the pointer.

I have added the handler on both the client and server and I added writeLengthPrefix on both the client and the server. Perhaps I am doing it wrong, but now I seem to have extra bytes being sent which throws off my parsing. This is how I have adopted the code.

//Server
            try await withThrowingTaskGroup(of: Void.self) { taskGroup in
                for try await childChannel in serverChannel.inboundStream {
                    asyncChannel = childChannel
#if RELEASE
                    taskGroup.addTask {
                        try await childChannel.channel.pipeline.addHandler(sslServerHandler)
                    }
#endif
                    taskGroup.addTask {
                        try await childChannel.channel.pipeline.addHandler(ByteToMessageHandler(LineBasedFrameDecoder()))
                    }
                    taskGroup.addTask {
                        await self.handleChildChannel(childChannel, logger: logger)
                    }
                }
            }

//Client
            await withThrowingTaskGroup(of: Void.self) { taskGroup in
                taskGroup.addTask {
                    try await channel.channel.pipeline.addHandler(ByteToMessageHandler(LineBasedFrameDecoder()))
                }
                taskGroup.addTask {
                    await self.handleChildChannel(channel.inboundStream, mechanism: await self.mechanism!, transport: await self.transport!, store: self.store!)
                }
            }

//Buffer writing
        let encodedBuffer = await NeedleTailEncoder.encode(value: message)
        var newBuffer = encodedBuffer
        _ = try await withCheckedThrowingContinuation { continuation in
            do {
                let length = try newBuffer.writeLengthPrefixed(as: Int.self) { buffer in
                    buffer.writeBytes(encodedBuffer.readableBytesView)
                }
                continuation.resume(returning: length)
            } catch {
                continuation.resume(throwing: error)
            }
        }
        try await channel.writeAndFlush(newBuffer)

and I seem to get reads on the stream like so when printing the message

THE MESSAGE____ :WQAAAANuaWNrAE4AAAACbmFtZQAMAAAAd2hpdGV0aXBwZWQAAmRldmljZUlkACUAAAA5ZWNkMGVjZS0zNzJiLTQxYTQtYjVlYi1mZjE5ODg2NDNmOGIAAAA= PRIVMSG newUser:9ecd0ece-372b-41a4-b5eb-ff1988643f8b :rgAAAAJpZAAlAAAAMzQ1MkJFQ0QtOTM4RS00OTNBLUExRUYtRjRDQzA0MjQ5OTFFAAJwdXNoVHlwZQAFAAAAbm9uZQADdHlwZQBQAAAAA2FjawBGAAAABV8wADgAAAAAOAAAAANhY2tub3dsZWRnbWVudAAjAAAAA3JlZ2lzdGVyZWQAEgAAAAJfMAAFAAAAdHJ1ZQAAAAAAAAljcmVhdGVkQXQARngZX4YBAAAA
//The message seems to be trying to fill the rest of the space with the same message again???
�:WQAAAANuaWNrAE4AAAACbmFtZQAMAAAAd2hpdGV0aXBwZWQAAmRldmljZUlkACUAAAA5ZWNkMGVjZS0zNzJiLTQxYTQtYjVlYi1mZjE5ODg2NDNmOGIAAAA= PRIVMSG newUser:9ecd0ece-372b-41a4-b5eb-ff1988643f8b :rgAAAAJpZAAlAAAAMzQ1MkJFQ0QtOTM4RS00OTNBLUExRUYtRjRDQzA0MjQ5OTFFAAJwdXNoVHlwZQAFAAAAbm9uZQADdHlwZQBQAAAAA2FjawBGAAAABV8wADgAAAAAOAAAAANhY2tub3dsZWRnbWVudAAjAAAAA3JlZ2lzdGVyZWQAEgAAAAJfMAAFAAAAdHJ1ZQAAAAAAAAljcmVhdGVkQXQARngZX4YBAAAA

Thanks for your time,

Cole