Multiple NIO client connections for multipart upload

I have a question in regards to simultaneously sending and receiving Data from a NIO Client and the proper usage of it.

I have a NIO TCP Client that uses NIOAsyncChannels. I make a single connection to the TCP server. I am sending messages successfully to and from clients via the server. I want to send a pong back to the server every 7 seconds or so. When I receive a Ping message I sleep ping child task and then send it after 7 seconds. I Ping/Pong on a task priority of background. Sending other messages are set to a medium priority when they are a small simple message, let's say anything under 10mb. I also Upload and Download large blobs as multipart packets which is resource intensive(Say 30 packets at 10mb each), I set this priority to utility. My problem is I cannot really receive messages at the same time I upload or download media. There seems to be a huge amount of lag in receiving messages with at least a 7 second delay due to the PING/PONG dance plus however long it takes to flush a queued Multipart Packet. Would it be a proper thing to open a new connection and send the Uploads and downloads through that new NIO Channel, or do I have a huge bottle neck potentially in my code. What would be a typical thing to do in a use case like this?

Thanks,

Cole

Generally I would not expect pings or pongs to meaningfully slow down data transfer. It's not really clear through your code exactly how this is structured, but my best guess is that your ping/pong code is blocking the write loop.

Thanks for the reply. I removed the all the ping/pong code and the client is writing the messages super fast. However the problem seems the be with my server set up. For some reason when I send these multipart messages and only these multipart messages the channel inbound handler receives them like every 18 seconds or so. It receives all of them, but it just takes an unreasonable amount of time. My handler set up look like this.

            let serverChannel: NIOAsyncChannel<NIOAsyncChannel<ByteBuffer, ByteBuffer>, Never> = try await ServerBootstrap(group: group)
            // Specify backlog and enable SO_REUSEADDR for the server itself
                .serverChannelOption(ChannelOptions.backlog, value: Int32(configuration.backlog))
            // Enable TCP_NODELAY and SO_REUSEADDR for the accepted Channels
                .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
                .childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
                .childChannelOption(ChannelOptions.maxMessagesPerRead, value: 1)
                .bind(to: address, childChannelInitializer: { channel in
                    channel.eventLoop.makeCompletedFuture {
                        try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(
                            LineBasedFrameDecoder()
                        )
                        )
                        return try NIOAsyncChannel(wrappingChannelSynchronously: channel)
                    }
                })

            try await withThrowingTaskGroup(of: Void.self) { group in
                try Task.checkCancellation()
                group.addTask {
                    try await withThrowingTaskGroup(of: Void.self) { _ in
                        try await serverChannel.executeThenClose { inbound in
                            try await withThrowingTaskGroup(of: Void.self) { group in
                                try Task.checkCancellation()
                                for try await childChannel in inbound {
                                    group.addTask {
                                        do {
                                            try await self.handleChildChannel(childChannel: childChannel)
                                        } catch {
                                            logger.error("Caught Child Channel Error: \(error)")
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }



    private func handleChildChannel(childChannel: NIOAsyncChannel<ByteBuffer, ByteBuffer>) async throws {
        try await childChannel.executeThenClose { [weak self] inbound, outbound in
            guard let self else { return }
            do {
                let sessionHandler = await self.initializeSession(childChannel: childChannel)
                
                
                try await withThrowingDiscardingTaskGroup { group in
                    let (_outbound, outboundContinuation) = AsyncStream<NIOAsyncChannelOutboundWriter<ByteBuffer>>.makeStream()
                    
                    outboundContinuation.onTermination = { status in
                        self.logger.info("Writer Stream Terminated with status: \(status)")
                    }
                    
                    let (_inbound, inboundContinuation) = AsyncStream<NIOAsyncChannelInboundStream<ByteBuffer>>.makeStream()
                    inboundContinuation.onTermination = { status in
                        self.logger.info("Inbound Stream Terminated with status: \(status)")
                    }
                    
                    outboundContinuation.yield(outbound)
                    inboundContinuation.yield(inbound)
                    
                    _ = group.addTaskUnlessCancelled {
                        for await writer in _outbound {
                            await sessionHandler.setWriter(writer: writer)
                        }
                    }
                    
                    for await stream in _inbound {
                        for try await buffer in stream {
                                        group.addTask(priority: priority) {
                                        }
                                    }
                        self.logger.info("Closing child channel...")
                        return
                    }
                }
            } catch {
                logger.error("\n\n There was an error in the child channel...\n The Error is: \(error)\n Closing the child channel...\n")
                return
            }
            self.logger.info("Closed child channel...")
        }
    }

If I make the packets smaller and send more then the packets are processed a bit faster. But on the client side it will send like 25 packets instantly and then the writer seems to suspend until the server stream processes more packets. From my investigation it really seems like there is a bottle neck on the server side with how much data can be accepted at once and how many packets can be accepted at once. Each Packet looks like this at the ByteBuffer level ByteBuffer { readerIndex: 0, writerIndex: 14369797, readableBytes: 14369797, capacity: 14369797, storageCapacity: 16777216, slice: _ByteBufferSlice { 0..<14369797 }, storage: 0x000000011a200000 (16777216 bytes) } 14mb is a decent sized chunk, but shouldn't be a problem from my understanding.

If I change the server side water mark from .init(lowWatermark: 2, highWatermark: 10) to .init(lowWatermark: 1, highWatermark: 5) then the client side write will not all write at once(let's say 8 out of 10), after the server receives packets slowly and processes them then the client will finish writing the remaining packets(2 packets).

If I double the low/high water mark then we end up back to the same behavior as square one.

I think there is definitely an issue with the server-side handling of things, but I have no idea what would be causing this behavior.

Thanks for your thoughts in advance,

Cole

Let's start by removing .childChannelOption(ChannelOptions.maxMessagesPerRead, value: 1) as this value is unnecessarily low and will act as a performance drag.

Next, is that really the whole body of your consuming thread? It looks like you're reading messages from the server and then dropping them, is that accurate?

Ok Thanks I removed the childChannelOption. I don't actually drop the packets when received, although it does not seem to make a difference if I break from the loop when the IRCMessage is an Upload Message. For instance, if after I parse the message out of the string and then I just break out of the loop if it is an upload message we still have poor performance.

When I add the childTask inside of the _inbound loop I am parsing, decoding and processing different IRCMessages.

                        for try await buffer in stream {
                                var buffer = buffer
                                if let message = buffer.readString(length: buffer.readableBytes) {
                                    let messages = message.components(separatedBy: Constants.cLF.rawValue)
                                        .map { $0.replacingOccurrences(of:  Constants.cCR.rawValue, with:  Constants.space.rawValue) }
                                        .filter { !$0.isEmpty }
                                    for message in messages {
                                        guard let parsedMessage = MessageTask.parseMessageTask(task: message, messageParser: self.parser) else { break }
                                        
                                        var priority: _Concurrency.TaskPriority = .medium
                                        switch parsedMessage.command.commandAsString {
                                        case Constants.multipartMediaUpload.rawValue, Constants.multipartMediaDownload.rawValue:
                                            priority = .utility
                                        case Constants.ping.rawValue, Constants.pong.rawValue:
                                            priority = .background
                                        default:
                                            break
                                        }
                                        group.addTask(priority: priority) {
                                            await sessionHandler.needleTailIRCMessage(parsedMessage)
                                            switch parsedMessage.command {
                                            case .QUIT(_):
                                                outboundContinuation.finish()
                                                inboundContinuation.finish()
                                                return
                                            default:
                                                break
                                            }
                                        }
                                    }
                                } else {
//handle
                                }
                            }

Inside the sessionHandler I switch on an enum containing IRCCommands. The following is the code where a multipartUpload message code is.

 func needleTailIRCMessage(_ message: IRCMessage) async {
        let delegate = self.sessionInfo.delegate
        do {
            switch message.command {
            case .otherCommand(Constants.multipartMediaUpload.rawValue, let payload):
                    try await delegate?.doMultipartMessageUpload(payload)
            default:
                break
            }
        } catch {
            print(error)
        }
    }
 func doMultipartMessageUpload(_ packet: [String]) async {
        do {
// Only contains data after all the packets have been received. Until then we just return after the processPacket() method has returned nil.
            if let data = try processPacket(packet) {
                guard let first = packet.first else { return }
                removePacket(first)
                let messagePacket = try BSONDecoder().decode(MessagePacket.self, from: Document(data: data))
                guard let multipartMessage = messagePacket.multipartMessage else { fatalError() }
                guard let dtfp = multipartMessage.dtfp else { fatalError() }
                if let thumbnailBlobName = messagePacket.multipartMessage?.usersThumbnailName {
                    guard let thumbnailBlob = dtfp.thumbnailBlob else { fatalError() }
                    guard let mediaId = messagePacket.multipartMessage?.dtfp?.mediaId else { fatalError() }
                  
                    let packet = FilePacket(
                        mediaId: mediaId,
                        mediaType: .thumbnail,
                        name: thumbnailBlobName,
                        data: thumbnailBlob
                    )
        
                        try await handleMultipart(packet: packet, name: thumbnailBlobName)
                        
                        
                        guard let recipient = multipartMessage.recipient else { fatalError() }
                    
                        _ = try await self.doAcknowledgment(
                            .multipartUploadComplete(
                                MultipartUploadAckPacket(
                                    name: thumbnailBlobName,
                                    mediaId: mediaId,
                                    size: thumbnailBlob.count
                                )
                            ),
                            recipients: [
                                .nick(recipient)
                            ],
                            sender: multipartMessage.sender
                        )
                    
                    if let fileBlobName = messagePacket.multipartMessage?.usersFileName {
                        guard let fileBlob = dtfp.fileBlob else { return }
                        guard let mediaId = messagePacket.multipartMessage?.dtfp?.mediaId else { return }
                        
                        let packet = FilePacket(
                            mediaId: mediaId,
                            mediaType: .file,
                            name: fileBlobName,
                            data: fileBlob
                        )
                        
                        try await handleMultipart(packet: packet, name: fileBlobName)
                        
                        guard let recipient = multipartMessage.recipient else { return }
                        _ = try await self.doAcknowledgment(
                            .multipartUploadComplete(
                                MultipartUploadAckPacket(
                                    name: fileBlobName,
                                    mediaId: mediaId,
                                    size: fileBlob.count
                                )
                            ),
                            recipients: [
                                .nick(recipient)
                            ],
                            sender: multipartMessage.sender
                        )
                    }
                }
            }
        } catch {
           print(error)
        }
    }

    func processPacket(_ packet: [String]) throws -> Data? {
            precondition(packet.count == 4)
            let firstItem = packet[0]
            let secondItem = packet[1]
            let thirdItem = packet[2]
            let fourthItem = packet[3]
            
            try createPacket([
                firstItem,
                secondItem,
                thirdItem,
                fourthItem
            ])
            
            guard let packets = findPackets(firstItem) else { return nil }
            guard packets.count == Int(thirdItem) else { return nil }
            var totalData = Data()
            totalData.append(contentsOf: packets.compactMap({ $0.chunk }).joined())
            return totalData
        }

This is basically all the code for an upload task on the server. I really wouldn't think that the parsing of the message would cause this lag, but I suppose it could be an issue.

There's a lot of code here in fragments that make it very hard for me to follow exactly what is being done where. Can I suggest that you time how long it takes each iteration of the NIOAsyncChannelInboundStream to complete the loop body? That is, time from the top of the for loop body to the bottom?

Thanks for the great idea, I think this narrowed it down a bit. I wrapped a ContinuousClock around the inbound stream and one also around the parsing task. The parsing task seems to be taking along time which surprises me since it is a synchronous job.

PARSE_DURATION 12.994349542 seconds
2024-03-13T22:20:13+0800 info NeedleTailSessionHandler : [NeedleTailServer]
Received multipart packet with id C25ACB14-0C22-4ACB-AD7F-D1B87098F58F:
Packet: 4 of 10
Number of items in Multipart Packet: 4
2024-03-13T22:20:13+0800 info NeedleTailSessionHandler : [NeedleTailServer] Creating packet
STREAM_DURATION 13.224296249999998 seconds
2024-03-13T22:20:13+0800 info NeedleTailSessionHandler : [NeedleTailServer] Created packet

Clearly the parsing code is working way too hard for some reason.

So the problem is with how large an IRCMessage can be and how I need to parse the Multipart Message. When I have a 10mb chunk that needs to be converted to a string and is done as base64 encoding it then will become about 14mb. When one of these chunks gets parsed it just takes time because of how large it is. Not sure if anything can be done about that. But the solution is to move the parsing work inside the child task so that the iterator can process the next item without waiting on the last item. Which should be done regardless of how long parsing takes. I suppose the next task is dealing with making parsing take less time or handle that bit of parsing outside of message parsing, but that will take a bit more thought.

Thanks again for your help @lukasa

Cole