Are there any good examples of an AsyncSequence powered by a ChannelHandler?

I've been playing around with this in a personal project, and there are some subtle design choices here. For instance, I've been trying to only call context.read() if the sequence is actively being awaited. AsyncStream might be a possible tool to reach for here, though it seems like the latest proposal doesn't provide a straightforward mechanism for back pressure. Have any folks experimented with this, and are the results of those experiments available anywhere?

The case for backpressure is more-so the case for just AsyncIteratorProtocol itself. You can somewhat think of each call to next as a demand of 1. But in reality there are a couple of variations on the back-pressure theme - not all actually map to that easily. The part that does become a bit of an issue is when you need to do that and have a mechanism of interruptible calls that can be cancelled. That is a bit hairy since that will incur reference semantics and require your state to be managed more closely for atomicity. Perhaps you can point a more concrete example than just context.read() in the pre-async-await as simple as you can and that might help me to shine some light on what might be a good solution or not.

A simple example is reading a massive file in line by line (with each line being awaited in a for loop), and performing some non-trivial per-line processing. My intuition is that if the processing is slower than the read, and we don't have a back pressure mechanism, more and more of the file will be read and buffered, needlessly growing memory usage. My hope is that I can create a channel pipeline that may buffer some number of lines, but will stop reading if it gets sufficiently backed up.

Ah, so one that you could take a look at (which is VERY similar to what you are talking about) is this PR: Port the benchmark harness to async mode and add an initial AsyncSequence benchmark by Catfish-Man · Pull Request #36923 · apple/swift · GitHub

Thanks, Philippe! This is an interesting example, but I was more looking for examples of how to expose the last InboundOuts of a NIO ChannelPipeline as an AsyncSequence, and not from a raw file descriptor. (because I may want to do some ChannelHandler processing prior to presenting the result as an AsyncSequence)

is there a small sample bit of code you can point me at; I can take a stab at an example of what it might look like.

1 Like

My code is a bit of a mess right now, but I added a note to post something here once I've cleaned it up.

Currently there are no good examples of this, but we'll aim to ship a first-party one in SwiftNIO. If you've got one already feel free to propose it as a PR against SwiftNIO itself and we can file away any rough edges together.

Hi George, I think we have an example for what you are looking for in PostgresNIO.

Multiple files play together here:

  1. PSQLBackendMessage.Decoder just decodes the messages but does nothing for backpressure
  2. PSQLChannelHandler reads the messages from the channel and drives the ConnectionStateMachine
  3. The ConnectionStateMachine buffers incoming rows into a CircularBuffer in its Substate machine: ExtendedQueryStateMachine
  4. PSQLChannelHandler also catches read events and drives the ConnectionStateMachine with those. If we are in a query and the ExtendedQueryStateMachine has rows in it's buffer the read event is hold...
  5. PSQLRows has method next() -> EventLoopFuture<PSQLRows.Row> that provides the next row. For this it asks the ExtendedQueryStateMachine for the next row from the row buffer. If the row buffer is empty, we should have received a read event before. Now is the time to forward the readevent!

I have a pr with performance improvements up right now, which mainly affect where the rows are buffered. Boost performance by batching Rows by fabianfett · Pull Request #153 · vapor/postgres-nio · GitHub

If you have any detail question please reach out, I'm happy to help.

2 Likes

The PostgresNIO example wasn't exactly what I was looking for, so I wrote my own.

It includes a handler with an async next method (this could theoretically confirm to AsyncIterator directly, but that feels a bit off to me).

I then have a simple AsyncIterator which repeatedly calls next on a handler:

@lukasa If this is something that would be interesting to contribute to NIO (and doesn't have any obvious pitfalls I'm missing). I'm not super familiar with the NIO project structure so if someone could point me in the right direction (for instance, where would be a good place for something like this to live), I'd be happy to work up a PR.

2 Likes

What a coincidence! I actually started creating the exact same thing last night, but in the old fashioned way, without async. For a similar purpose as well: getting the output of a command executed with swift-nio-ssh

I’m not very familiar with NIO either, so I can’t say much about the implementation, but I think it would be great to have something provided by NIO.

I think we can definitely use this as a starting point for getting something into SwiftNIO. The place to put the PR would be against the core NIO repository itself, adding the code to the _NIOConcurrency module.

1 Like

Just letting you know I'm still planning on opening a PR, just waiting for this to be fixed...

Terms of Service

Privacy Policy

Cookie Policy