How to merge `NIOAsyncChannelInboundStream` into another stream without losing backpressure?

i had naïvely tried merging NIOAsyncChannelInboundStream into another AsyncThrowingStream, as i mentioned in another thread. but then i wondered what would happen if an attacker spammed a ton of HTTP/2 frames, or even if a legitimate user had a long-running file upload.

extension AsyncSequence where Element:Sendable
{
    consuming
    func forward<T>(
        to stream:AsyncThrowingStream<T, any Error>.Continuation,
        by transform:(Element) throws -> T ) async rethrows
    {
        do
        {
            for try await element:Element in self
            {
                stream.yield(try transform(element))
            }

            stream.finish()
        }
        catch let error
        {
            stream.finish(throwing: error)
        }
    }
}

as i understand it, iterating NIOAsyncChannelInboundStream destroys any backpressure that might have been in effect for that stream. as a short term fix, i could apply .bufferingOldest(10) to the stream buffering policy, but that would probably break the file uploads.

so maybe this is not the way to inject additional events into a NIOAsyncChannelInboundStream. in which case, what should i be doing instead?

while researching the subject, i considered using AsyncChannel. but according to this post AsyncChannel doesn’t really have great performance for this sort of thing. AsyncChannel also belongs to the swift-async-algorithms package, which is very new and has not yet gone through the process of dividing its code into smaller modules. notably, adding a dependency to it would not only pull in all of the AsyncAlgorithms module, but also all three constituent modules of the swift-collections package, only one of which is a constituent of swift-nio-http itself.

You are right that your snippet here is dropping the backpressure on the floor; hence, it opens the door for potential denial of service attacks.

To tackle this problem, I have previously pitched adding external backpressure support to the standard libraries AsyncStream; however, that proposal got returned for revision and I converted it to a PR to swift-async-algorithms now Add `AsyncBackpressuredStream` proposal and implementation by FranzBusch · Pull Request #305 · apple/swift-async-algorithms · GitHub.
The reason why it isn't merged yet is that I haven't found the time lately to get it over the line. I want to do a few small changes and then it should be good to go.

You should totally adopt swift-async-algorithms for merging streams though. It is the canonical package that will include any transformational algorithm on top of AsyncSequence. To me it sounds like you want to use either merge or zip here. Those two uphold the backpressure!

Now to the other problem that you brought up with dividing into smaller problems, I don't see that as a goal for libraries. Having each package ship hundreds of modules isn't great and reduces the discoverability. Instead the compiler and linker should strip away any unused dead code so it doesn't make it into your binary.

1 Like

merge and zip weren’t really the right tools here, because the injected events aren’t a second sequence. instead, i ended up using AsyncThrowingChannel against the advice of the other thread.

extension AsyncSequence where Element:Sendable
{
    consuming
    func forward<T>(to channel:AsyncThrowingChannel<T, any Error>,
        by transform:(Element) throws -> T ) async rethrows
    {
        do
        {
            for try await element:Element in self
            {
                await channel.send(try transform(element))
            }

            channel.finish()
        }
        catch let error
        {
            channel.fail(error)
        }
    }
}

this is a concerning statement to me. the swift compiler has always struggled to strip dead code, this is one of the oldest criticisms of the language. to give the library the benefit of the doubt, i did a small experiment with the Swiftinit server, a real open source server application from the swift-unidoc project.

i compared a version of the server that depends on a vendored copy of AsyncThrowingChannel from swift-async-algorithms to a version that imports the AsyncAlgorithms module from the canonical package.

/swift/swift-unidoc$ ls -l .build/release/ | grep SwiftinitServer
-rwxrwxr-x  1 ec2-user ec2-user  86283048 Feb 27 20:58 SwiftinitServer
drwxrwxr-x  9 ec2-user ec2-user      4096 Feb 22 20:03 SwiftinitServer.build
drwxrwxr-x  2 ec2-user ec2-user      4096 Feb 22 19:54 SwiftinitServer.product
-rw-rw-r--  1 ec2-user ec2-user       380 Feb 22 20:02 SwiftinitServer.swiftdoc
-rw-rw-r--  1 ec2-user ec2-user    213308 Feb 22 20:02 SwiftinitServer.swiftmodule
-rw-rw-r--  1 ec2-user ec2-user     28224 Feb 27 02:01 SwiftinitServer.swiftsourceinfo
/swift/swift-unidoc$ ls -l .build/release/ | grep SwiftinitServer
-rwxrwxr-x  1 ec2-user ec2-user  95346776 Feb 27 21:01 SwiftinitServer
drwxrwxr-x  9 ec2-user ec2-user      4096 Feb 22 20:03 SwiftinitServer.build
drwxrwxr-x  2 ec2-user ec2-user      4096 Feb 22 19:54 SwiftinitServer.product
-rw-rw-r--  1 ec2-user ec2-user       380 Feb 22 20:02 SwiftinitServer.swiftdoc
-rw-rw-r--  1 ec2-user ec2-user    213308 Feb 22 20:02 SwiftinitServer.swiftmodule
-rw-rw-r--  1 ec2-user ec2-user     28224 Feb 27 02:01 SwiftinitServer.swiftsourceinfo

compared to the version that uses the vendored copy of the type, adding a dependency on AsyncAlgorithms adds a whopping 9.1 MB to the compiled binary. perhaps i am not building it correctly. but all i did was follow the instructions in the README.

the changes to swift-async-algorithms that would pare down this binary bloat are not extensive. the AsyncAlgorithms module is already well-partitioned internally, and the only additional files i needed to copy to extract the AsyncThrowingChannel type were Locking.swift and Rethrow.swift.

it is understandable that an incubating package may not have had the time to address binary size concerns. but i would really encourage you to reconsider dismissing it as a “non-goal” for server libraries. swift-async-algorithms will need to be partitioned before it can be reasonably called production-ready. this work doesn’t have to happen today, but it will have to be done eventually.

1 Like

9.1Mb seems like a failure of specialization to me - I'm kinda blown back in the regards it isn't really that complex of a library: it doesn't have data tables or anything of that manner that really is a "smoking gun" that would infer that large of a footprint - there probably is something else going on here that should likely be fixed at a more systemic level.

1 Like

maybe try LTO: swift build --experimental-lto-mode full

i tried using this, and ran into an LLVM crash in lld:

Building for production...
LLVM ERROR: sleb128 and uleb128 expressions must be absolute
PLEASE submit a bug report to https://github.com/llvm/llvm-project/issues/ and include the crash backtrace.
Stack dump:
0.      Program arguments: /usr/bin/ld.lld @/tmp/response-2cd5a3.txt
Stack dump without symbol names (ensure you have llvm-symbolizer in your PATH or set the environment var `LLVM_SYMBOLIZER_PATH` to point to it):
/usr/bin/ld.lld(_ZN4llvm3sys15PrintStackTraceERNS_11raw_ostreamEi+0x23)[0x57dccf561fb3]
/usr/bin/ld.lld(_ZN4llvm3sys17RunSignalHandlersEv+0xee)[0x57dccf55ff6e]
/usr/bin/ld.lld(+0x7d75fa)[0x57dccf5625fa]
/lib64/libc.so.6(+0x54dd0)[0x7ff6c69d3dd0]
/lib64/libc.so.6(+0xa153c)[0x7ff6c6a2053c]
/lib64/libc.so.6(raise+0x16)[0x7ff6c69d3d26]
/lib64/libc.so.6(abort+0xd3)[0x7ff6c69a77f3]
/usr/bin/ld.lld(_ZN4llvm18report_fatal_errorERKNS_5TwineEb+0x1cb)[0x57dccf50a93b]
/usr/bin/ld.lld(+0x77f766)[0x57dccf50a766]
/usr/bin/ld.lld(+0x2acb8db)[0x57dcd18568db]
/usr/bin/ld.lld(_ZN4llvm11MCAssembler10layoutOnceERNS_11MCAsmLayoutE+0x25e)[0x57dcd1855a3e]
/usr/bin/ld.lld(_ZN4llvm11MCAssembler6layoutERNS_11MCAsmLayoutE+0x153)[0x57dcd18554b3]
/usr/bin/ld.lld(_ZN4llvm11MCAssembler6FinishEv+0x28)[0x57dcd1855ee8]
/usr/bin/ld.lld(_ZN4llvm13MCELFStreamer10finishImplEv+0xbd)[0x57dcd18738bd]
/usr/bin/ld.lld(_ZN4llvm10AsmPrinter14doFinalizationERNS_6ModuleE+0x16ca)[0x57dcd02b17ea]
/usr/bin/ld.lld(_ZN4llvm13FPPassManager14doFinalizationERNS_6ModuleE+0x31)[0x57dcd1a92f51]
/usr/bin/ld.lld(_ZN4llvm6legacy15PassManagerImpl3runERNS_6ModuleE+0x1411)[0x57dcd1a8d321]
/usr/bin/ld.lld(+0x18c9364)[0x57dcd0654364]
/usr/bin/ld.lld(_ZN4llvm3lto7backendERKNS0_6ConfigESt8functionIFNS_8ExpectedISt10unique_ptrINS_16CachedFileStreamESt14default_deleteIS7_EEEEjEEjRNS_6ModuleERNS_18ModuleSummaryIndexE+0x122)[0x57dcd0653562]
/usr/bin/ld.lld(_ZN4llvm3lto3LTO13runRegularLTOESt8functionIFNS_8ExpectedISt10unique_ptrINS_16CachedFileStreamESt14default_deleteIS5_EEEEjEE+0x4e1)[0x57dcd0647f61]
/usr/bin/ld.lld(_ZN4llvm3lto3LTO3runESt8functionIFNS_8ExpectedISt10unique_ptrINS_16CachedFileStreamESt14default_deleteIS5_EEEEjEES2_IFNS3_ISB_EEjNS_9StringRefEEE+0x2df)[0x57dcd06477df]
/usr/bin/ld.lld(_ZN3lld3elf15BitcodeCompiler7compileEv+0x47e)[0x57dccf6d036e]
/usr/bin/ld.lld(_ZN3lld3elf12LinkerDriver19compileBitcodeFilesIN4llvm6object7ELFTypeILNS3_7support10endiannessE1ELb1EEEEEvb+0xc6)[0x57dccf658fe6]
/usr/bin/ld.lld(_ZN3lld3elf12LinkerDriver4linkERN4llvm3opt12InputArgListE+0x157e)[0x57dccf6552fe]
/usr/bin/ld.lld(_ZN3lld3elf12LinkerDriver10linkerMainEN4llvm8ArrayRefIPKcEE+0x1543)[0x57dccf649b63]
/usr/bin/ld.lld(_ZN3lld3elf4linkEN4llvm8ArrayRefIPKcEERNS1_11raw_ostreamES7_bb+0x6d1)[0x57dccf6485c1]
/usr/bin/ld.lld(+0x7666bb)[0x57dccf4f16bb]
/usr/bin/ld.lld(main+0xf7)[0x57dccf4f0f37]
/lib64/libc.so.6(+0x3feb0)[0x7ff6c69beeb0]
/lib64/libc.so.6(__libc_start_main+0x80)[0x7ff6c69bef60]
/usr/bin/ld.lld(_start+0x25)[0x57dccf4f0ab5]

the lld version, and also ld version

$ /usr/bin/ld.lld --version
LLD 15.0.0 (compatible with GNU linkers)
$ /usr/bin/ld.gold --version
GNU ld version 2.39-6.amzn2023.0.9