Hi, I'm sorry for long reply, I was kinda off at the last week.
Here some ideas I have based on experience of reworking the SwiftKafka
func sendSync(message: KafkaProducerMessage) async throws -> KafkaProducerMessage
It was one of the first function I've added to producer API since it's very inline with Swift concurrency. But then I found it a bit useless (at least in my case). The reason for that is librdkafka has producer queue and doesn't send messages instantly, but allows to batch them. There are number of options to control this (like queue.buffering.max.kbytes and queue.buffering.max.ms). That basically means that the sendSync
will suspended to uncertain time (like queue.buffering.max.ms). Basically sending task can be suspended for appox. queue.buffering.max.ms
.
I general I agree that such method can be useful in certain cases, for example, when strict synchronisation is required. In this case I would recommend to use rd_kafka_flush and carefully document the behaviour with explicit example why this API should be used with extreme care, but not as default one.
- Acknowledgments.
I use another way to handle acks after producer then rather then completionHandler
- AsyncStream. I found this as quite nice and very inline with Swift concurrency model.
Basically when I create a producer I can specify continuation which should be used to deliver an ack. (alternatively the continuation also can be specified in func send
instead of completionHandler
). Then I use the stream to listen for the acks and properly handle them.
I see next pros of this approach:
- I can have complex ack handling logic and I don't need to keep it in closure or trailing closure. It's totally separated.
- I can consume acks from different consumers in single stream (by providing the same continuation for multiple producers)
- And visa versa - handling acks in parallel in a few async streams and do no block librdkafka callback thread.
struct KafkaConsumerMessage
In my case it's quite efficient to have consumer message as class, since the message is shared between multiple clients (over AsyncStream/Continuation) and overall lifetime of the message is not predictable. So ARC helps here and allow to process message and make sure it will be released automatically.
For sure it can be struct and just copied, but I guess if message is quite big the overhead of ARC is much less.
- Configurations
I would consider to make Configuration is copyable. Or create config as a plan struct with dictionary (without actually creating and managing rd_kafka_conf_t
under the hood.
This is useful to avoid copy-pasting when you need to configure some common options, but then create a few clients (like a few consumers) with the same settings, but different group id or so. In SwiftKafka it's done in not a nice way since KafkaConfig is not copyable and KafkaConfig is class, so in order to actually cope config you need to create a new KafkaConfig and copy all settings.
- Result Builders for configuration
Another small thing to be considered is to use result builders for configuration.
let config = KafkaConfig() {
QueueBufferingMaxMs(milliseconds: 10)
MessageSendMaxRetries(count: 10)
...
}
I haven't done this, but use result builders in my app configuration and see it's very useful and nice to have things like if statements as part of configuration.
- String typing for configuration
In general I heavily vote for strict typing in configuration rather than have the config as [String: String]. I saw misconfigs, mistypes, mixing setting value units so many times...