[GSoC 2022] swift-kafka-gsoc Project Kickoff

Hey folks,

I am Felix, 20 years old and currently studying computer science in Munich, Germany. This summer, I am excited to be implementing a Swift Package for Apache Kafka (swift-kafka-gsoc) alongside @FranzBusch as part of Google Summer of Code.

The new Apache Kafka package will set itself apart from its companions through the implementation of asynchronous APIs and the use of Swift's concurrency features in general.

Franz and I are already in the process of laying the foundation for this project, and I will start working on this project full-time for a month beginning on 11th August 2022.

Additionally, I have already created issues on GitHub that provide a better explanation of the tasks I will complete this summer. If this project caught your interest, check out the initial proposal that I have written for this project here.

Furthermore, open-source development thrives through collaboration. Therefore I encourage you, the community, to be part of my journey and participate in this project by providing feedback, creating issues or even contributing in the future.

Lastly, I will use this thread to keep you all updated on my progress once I start working on this project full-time in August.

If you have any questions, feel free to also ask them in this thread!

Best wishes

Felix

13 Likes

Awesome to hear your plans, thanks for sharing and best of luck!

2 Likes

Hello @felixschlegel! Glad to see this project is kicked off!

I have small question. Is there any thread for providing feedback to the API in you proposal? Didn't find any separate thread for this.

I've spent some time by reworking one old Swift Kafka package (namely SwiftKafka) and would like to share some findings and thoughts I have.

3 Likes

Hey @mr-swifter, it is great to hear you are interested in this project!

For now, we plan on having all the project-related discussions in this thread, so feel free to share your thoughts here. Having worked on a modified version of SwiftKafka before, you can undoubtedly help us here a lot!

3 Likes

Hi, I'm sorry for long reply, I was kinda off at the last week.

Here some ideas I have based on experience of reworking the SwiftKafka

  1. func sendSync(message: KafkaProducerMessage) async throws -> KafkaProducerMessage

It was one of the first function I've added to producer API since it's very inline with Swift concurrency. But then I found it a bit useless (at least in my case). The reason for that is librdkafka has producer queue and doesn't send messages instantly, but allows to batch them. There are number of options to control this (like queue.buffering.max.kbytes and queue.buffering.max.ms). That basically means that the sendSync will suspended to uncertain time (like queue.buffering.max.ms). Basically sending task can be suspended for appox. queue.buffering.max.ms.

I general I agree that such method can be useful in certain cases, for example, when strict synchronisation is required. In this case I would recommend to use rd_kafka_flush and carefully document the behaviour with explicit example why this API should be used with extreme care, but not as default one.

  1. Acknowledgments.

I use another way to handle acks after producer then rather then completionHandler - AsyncStream. I found this as quite nice and very inline with Swift concurrency model.

Basically when I create a producer I can specify continuation which should be used to deliver an ack. (alternatively the continuation also can be specified in func send instead of completionHandler). Then I use the stream to listen for the acks and properly handle them.

I see next pros of this approach:

  • I can have complex ack handling logic and I don't need to keep it in closure or trailing closure. It's totally separated.
  • I can consume acks from different consumers in single stream (by providing the same continuation for multiple producers)
  • And visa versa - handling acks in parallel in a few async streams and do no block librdkafka callback thread.
  1. struct KafkaConsumerMessage

In my case it's quite efficient to have consumer message as class, since the message is shared between multiple clients (over AsyncStream/Continuation) and overall lifetime of the message is not predictable. So ARC helps here and allow to process message and make sure it will be released automatically.

For sure it can be struct and just copied, but I guess if message is quite big the overhead of ARC is much less.

  1. Configurations

I would consider to make Configuration is copyable. Or create config as a plan struct with dictionary (without actually creating and managing rd_kafka_conf_t under the hood.

This is useful to avoid copy-pasting when you need to configure some common options, but then create a few clients (like a few consumers) with the same settings, but different group id or so. In SwiftKafka it's done in not a nice way since KafkaConfig is not copyable and KafkaConfig is class, so in order to actually cope config you need to create a new KafkaConfig and copy all settings.

  1. Result Builders for configuration

Another small thing to be considered is to use result builders for configuration.

let config = KafkaConfig() {
    QueueBufferingMaxMs(milliseconds: 10)
    MessageSendMaxRetries(count: 10)
    ...
}

I haven't done this, but use result builders in my app configuration and see it's very useful and nice to have things like if statements as part of configuration.

  1. String typing for configuration

In general I heavily vote for strict typing in configuration rather than have the config as [String: String]. I saw misconfigs, mistypes, mixing setting value units so many times...

2 Likes

Hey @mr-swifter,

No worries, I am always happy about your advice!

Here is what we have/plan to do regarding your points:

func sendSync(message: KafkaProducerMessage) async throws -> KafkaProducerMessage

Thanks for the input here. I already had the same logical reasoning with @FranzBusch

Acknowledgements

Your idea makes sense here! We will consider this.

struct KafkaConsumerMessage

It makes sense for large payloads, but we would still prefer something like a struct with an internal class that incorporates copy-on-write. I have implemented something similar already for KafkaConfig. You can find it in this PR.

Configurations

As mentioned above, we have a PR that features a struct KafkaConfig with an internal class that enables copy-on-write.

Regarding the strongly typed config parameters: we certainly want to have something like this, though we currently focus on getting things to run first. We can always build something on top of the key-value-based KafkaConfig.

Again, thank you for your advice and your time investment into this! Your suggestions will put us forward a lot!

2 Likes

Hey folks,

After many discussions and hard work, we finally merged the KafkaProducer onto the main branch of the swift-kafka-gsoc repository. While developing the KafkaProducer, I interacted with the Swift on Server community and learned a lot.

At this stage, I came up with the following API for sending messages to a Kafka server:

let logger = Logger(label: "kafka.test")

var config = KafkaConfig()
try config.set("localhost", forKey: "bootstrap.servers")

let producer = try await KafkaProducer(config: config, logger: logger)

let message = KafkaProducerMessage(
  topic: "topic",
  key: "key",
  value: "Hello, World!"
)

let messageID = try await producer.sendAsync(message: message)

for await messageResult in producer.acknowledgements {
  switch messageResult {
  case .success(let acknowledgedMessage):
    assert(acknowledgedMessage.id == messageID)
  case .failure:
    break
  }
}

await producer.shutdownGracefully()

I want to thank @FranzBusch and @lukasa for helping me overcome the hurdles we encountered while integrating the C library's Kafka producer into our package. The most challenging part was the implementation of a workaround that now enables us to pass Swift capturing closures to a C function (not possible by default). We achieved this with the help of Unmanaged object references, something very new to me at the time.

Looking forward, my implementation of a KafkaConsumer is currently in review. Feel free to check it out and leave some comments in this thread or in the PR itself.

Furthermore, you are welcome to ask any other questions in this thread. I would be happy about feedback and new ideas!

Best wishes

Felix

7 Likes

Hey folks,

Having merged the KafkaProducer last week, we finally merged the KafkaConsumer this week. I owe special thanks to @FranzBusch and @mr-swifter for the collaboration on this Pull Request. We had some discussions and found aspects of the KafkaConsumer API that can still improve. However, we are happy to have merged an initial version that works well. I also created issues for KafkaConsumer's further development.

For now, we agreed upon the following public interface for the KafkaConsumer (excerpt from our future documentation):

Consumer API

After initializing the KafkaConsumer with a topic-partition pair to read from, messages can be consumed using the messages AsyncSequence.

var config = KafkaConfig()
try config.set("localhost:9092", forKey: "bootstrap.servers")

let consumer = try KafkaConsumer(
	topic: "topic-name",
	partition: KafkaPartition(rawValue: 0),
	config: config,
	logger: .kafkaTest
)

for await messageResult in consumer.messages {
	...
}
Consumer Groups

SwiftKafka also allows users to subscribe to an array of topics as part of a consumer group.

var config = KafkaConfig()
try config.set("localhost:9092", forKey: "bootstrap.servers")

let consumer = try KafkaConsumer(
	topics: ["topic-name"],
	groupID: "example-group",
	config: config,
	logger: .kafkaTest
)

for await messageResult in consumer.messages {
	...
}
Manual commits

By default, the KafkaConsumer automatically commits message offsets after receiving the corresponding message. However, we allow users to disable this setting and commit message offsets manually.

var config = KafkaConfig()
try config.set("localhost:9092", forKey: "bootstrap.servers")
try config.set("false", forKey: "enable.auto.commit")

let consumer = try KafkaConsumer(
	topics: ["topic-name"],
	groupID: "example-group",
	config: config,
	logger: .kafkaTest
)

for await messageResult in consumer.messages {
	...
	guard case .success(let message) = messageResult else {
		continue
	}
	try await consumer.commitSync(message)
}

Feel free to check out our project repository and the KafkaConsumer Pull Request.

You are always welcome to provide feedback in this thread or ask questions!

Best wishes

Felix

4 Likes

Hey folks,

Sadly, my time as a Google Summer of Code student has ended. However, I will continue contributing to the project, so I would be happy to keep in touch with you all.

GSoC has taught me much about open-source development and made me a more open and experienced developer. I want to thank everybody who accompanied me on this journey!

I want to especially thank @FranzBusch, as he was my mentor during that period. He always gave me valuable advice in our weekly meetings, and I learned a lot from him. I could not have wished for a better mentor!

Moving on, I hope to see swift-kafka-gsoc continue growing in the future. Furthermore, I will still try to post updates on the project in this thread.

Best regards

Felix

6 Likes

Thanks for the great work during GSoC @felixschlegel. It was really great to work with you during all this time and I think we made significant progress on a Concurrency ready Kafka library for Swift.

I want to thank you for your persistence during GSoC. Even when you faced challenging issues, like the C interop, you pushed through!

There is still some work left to get the library production ready, but a lot of big problems have been solved now!

7 Likes