Ok Thanks I removed the childChannelOption. I don't actually drop the packets when received, although it does not seem to make a difference if I break from the loop when the IRCMessage is an Upload Message. For instance, if after I parse the message out of the string and then I just break out of the loop if it is an upload message we still have poor performance.
When I add the childTask inside of the _inbound loop I am parsing, decoding and processing different IRCMessages.
for try await buffer in stream {
var buffer = buffer
if let message = buffer.readString(length: buffer.readableBytes) {
let messages = message.components(separatedBy: Constants.cLF.rawValue)
.map { $0.replacingOccurrences(of: Constants.cCR.rawValue, with: Constants.space.rawValue) }
.filter { !$0.isEmpty }
for message in messages {
guard let parsedMessage = MessageTask.parseMessageTask(task: message, messageParser: self.parser) else { break }
var priority: _Concurrency.TaskPriority = .medium
switch parsedMessage.command.commandAsString {
case Constants.multipartMediaUpload.rawValue, Constants.multipartMediaDownload.rawValue:
priority = .utility
case Constants.ping.rawValue, Constants.pong.rawValue:
priority = .background
default:
break
}
group.addTask(priority: priority) {
await sessionHandler.needleTailIRCMessage(parsedMessage)
switch parsedMessage.command {
case .QUIT(_):
outboundContinuation.finish()
inboundContinuation.finish()
return
default:
break
}
}
}
} else {
//handle
}
}
Inside the sessionHandler I switch on an enum containing IRCCommands. The following is the code where a multipartUpload message code is.
func needleTailIRCMessage(_ message: IRCMessage) async {
let delegate = self.sessionInfo.delegate
do {
switch message.command {
case .otherCommand(Constants.multipartMediaUpload.rawValue, let payload):
try await delegate?.doMultipartMessageUpload(payload)
default:
break
}
} catch {
print(error)
}
}
func doMultipartMessageUpload(_ packet: [String]) async {
do {
// Only contains data after all the packets have been received. Until then we just return after the processPacket() method has returned nil.
if let data = try processPacket(packet) {
guard let first = packet.first else { return }
removePacket(first)
let messagePacket = try BSONDecoder().decode(MessagePacket.self, from: Document(data: data))
guard let multipartMessage = messagePacket.multipartMessage else { fatalError() }
guard let dtfp = multipartMessage.dtfp else { fatalError() }
if let thumbnailBlobName = messagePacket.multipartMessage?.usersThumbnailName {
guard let thumbnailBlob = dtfp.thumbnailBlob else { fatalError() }
guard let mediaId = messagePacket.multipartMessage?.dtfp?.mediaId else { fatalError() }
let packet = FilePacket(
mediaId: mediaId,
mediaType: .thumbnail,
name: thumbnailBlobName,
data: thumbnailBlob
)
try await handleMultipart(packet: packet, name: thumbnailBlobName)
guard let recipient = multipartMessage.recipient else { fatalError() }
_ = try await self.doAcknowledgment(
.multipartUploadComplete(
MultipartUploadAckPacket(
name: thumbnailBlobName,
mediaId: mediaId,
size: thumbnailBlob.count
)
),
recipients: [
.nick(recipient)
],
sender: multipartMessage.sender
)
if let fileBlobName = messagePacket.multipartMessage?.usersFileName {
guard let fileBlob = dtfp.fileBlob else { return }
guard let mediaId = messagePacket.multipartMessage?.dtfp?.mediaId else { return }
let packet = FilePacket(
mediaId: mediaId,
mediaType: .file,
name: fileBlobName,
data: fileBlob
)
try await handleMultipart(packet: packet, name: fileBlobName)
guard let recipient = multipartMessage.recipient else { return }
_ = try await self.doAcknowledgment(
.multipartUploadComplete(
MultipartUploadAckPacket(
name: fileBlobName,
mediaId: mediaId,
size: fileBlob.count
)
),
recipients: [
.nick(recipient)
],
sender: multipartMessage.sender
)
}
}
}
} catch {
print(error)
}
}
func processPacket(_ packet: [String]) throws -> Data? {
precondition(packet.count == 4)
let firstItem = packet[0]
let secondItem = packet[1]
let thirdItem = packet[2]
let fourthItem = packet[3]
try createPacket([
firstItem,
secondItem,
thirdItem,
fourthItem
])
guard let packets = findPackets(firstItem) else { return nil }
guard packets.count == Int(thirdItem) else { return nil }
var totalData = Data()
totalData.append(contentsOf: packets.compactMap({ $0.chunk }).joined())
return totalData
}
This is basically all the code for an upload task on the server. I really wouldn't think that the parsing of the message would cause this lag, but I suppose it could be an issue.