Implement Kafka Producers in .NET

The ongoing digitization leads to increasing demands on software applications: they should be efficient, scalable and fault-tolerant, and exchange their data in real time. This results in new challenges in software development that are not so easy to overcome. Apache Kafka® (or Kafka for short) is an excellent tool to implement exactly these requirements: Kafka is a distributed event streaming application that offers scalability and high availability, as well as real-time communication between applications, and is extremely efficient.

This article explains how data is written to Kafka and what needs to be considered before hand. First, the basic concepts are presented that are to be understood to successfully master this tool. Afterwards we look at an important factor of Kafka’s success: it’s efficiency. The article is then completed by .NET code samples that explain how to implement various producer strategies.

History

Kafka was designed by Jay Krebs in 2010 while working at LinkedIn to meet the challenges originating from the increasing amount of data and users the platform was confronted with. One year later, the source code was placed under an open source license and reached the graduation of the Apache Incubator program in 2012 and has since been further developed by the Apache Software Foundation. In 2014, the company Confluent was founded by Jay Krebs and former LinkedIn employees to offer business services around Kafka. In 2015, various large tech companies, such as Uber, Spotify and Netflix, used the technology to optimize their data flows. In 2021 the release v3 introduced the Raft consensus algorithm (KRaft) to remove the dependence of Zookeeper.

Basic Concepts

Kafka is basically a platform for data exchange. In addition, it is also a distributed system consisting of nodes that include storage and that replicate their data to other nodes, in order to be able to be seamlessly replaced in the event of a failure. These nodes are called brokers, accordingly the whole platform is called a “broker cluster”. The data to be exchanged are actually messages that consist of a key and a value The messages are created by producers and received from consumers. They are divided into topics and persisted sequentially in a commit log. This means that they are attached and stored similar to an application log or a database transaction log. Therefore they can be read several times - at least until the so-called retention time is reached; after that, they are deleted.

The topics are divided into partitions, whereby the order of the messages is only guaranteed within one partition. Inside a partition a message is accessed by it’s offset, i.e. the position in the partition.

A feature called log compaction can be enabled on topic level and asserts only the most recent message per message key is retained. This speeds-up consumers having to read through a topic from the beginning but also reduces the storage demands. Be aware though that it does not guarantee only one message per key exists at any moment in time because compaction timing is non-deterministic.

In fact, the brokers physically only know partitions while topics are a logical concept. For the replication the partitions are distributed to the desired number of replicas (replication factor) based on a leader/follower design . This means that messages of a particular partition can only be created by the corresponding leader. The followers having the same data as the leader are called in-sync-replicas or short ISRs.

As already mentioned, the recipients of the messages are referred to as “consumers” and subscribe to selected topics. They identify themselves via a self-defined “client id” and use the message’s offset to fetch the desired messages. This current position of a consumer is usually stored in Kafka itself in a special “metadata topic”. The consumer reports its last read position as committed offset. The consumers retrieve the messages through an infinite query loop, each followed by a predefined sleep interval.

Several consumers can be combined into a consumer group to make consumption fault-tolerant, whereby one consumer per partition is responsible for reading its messages: as soon as the consumer crashes, the consumption is delegated to the next group member.

For this purpose a broker is chosen as group coordinator: it is responsible for determining the group leader and in the event of its failure to choose a successor. With the help of regular heartbeats, the consumers report their availability. The leader, in turn, is responsible for assigning any partition to consume to a specific consumer. A partition can be consumed by the leader or by a follower.

Efficiency through Batching

An important factor for Kafka’s success is its efficiency: due to various optimizations, even large amounts of data can be exchanged in a very short time.

On the one hand, data is already converted by the producer into an optimized binary format which is first sent to the brokers and later to the consumers. This means that the chunks can be passed on without further processing.

On the other hand, the messages are grouped by the producer into batches before they are sent to the brokers. This results in larger packages that are transferred over the network which reduces the network overhead. It also results in fewer but bigger I/O operations.

For special requirements, such as synchronizing two geographically apart data centers, Kafka also offers the possibility to compress message blocks. Here, too, the entire compressed block is stored to prevent expensive I/O operations within the cluster.

Producer Implementation

The producer is responsible for creating the messages and for sending them to the cluster. By default the messages are first grouped locally before forwarding them as a batch to the cluster. This local batching can be adapted to the own needs by either defining the batch size or specifying the wait time. As developer you can also opt-out of this strategy by using the ProductAsync() method and awaiting it’s result which allows sending message per message:

await producer.ProduceAsync(topic, new Message{ Key = key, Value = value });

An other option is to use the Produce() method that uses the local batching which reduces the number of I/O operations within the cluster and therefore increases the throughput. Simply make sure to call Flush() before terminating the application to allow any message that was not yet sent to the cluster to be delivered:

foreach ((var key, var value) in keyPairs)
{
    // enqueue messages locally so that Kafka can create message batches to optimize throughput.
    // responses are handled in the callback
    var message = new Message
    {
        Key = key,
        Value = value
    };
    producer.Produce(topic, message, Handler);
}
// wait for in-flight messages to be delivered before closing the application
producer.Flush();

void Handler(DeliveryReport<string, string> report) { }

When building an application on top of a messaging system it’s always important to check your delivery guarantee requirements and to make sure they are full-filled by the chosen messaging system. Luckily Kafka covers all requirements and even supports exactly-once semantics.

For implementing a fire-and-forget scenario with the lowest latency possible a producer will not await any acknowledgement (Acks.None). In this scenario messages could of course be lost in case of a system failure. Therefore this approach only provides at-most-once delivery guarantee. You could also await the acknowledgement from at least the leader broker (Acks.Leader) and therefore not await the responses from the followers and in turn accept a slightly higher latency in favor of better durability. But in either case the message is stored at most once as the leader could fail while the message is being replicated to the followers.

Kafka also supports at-least-once semantics: by awaiting the acknowledgement of all ISRs (Acks.All) and in case of a failure the producer would re-send the message (or message batch) again. In case the producer transfers batches instead of single messages an edge-case needs to be considered: While the leader is replicating the batch to the follower, it could crash and the follower would then only have stored parts of the batch. Because of the leader being unavailable, the previous follower would now be elected as new leader: the producer would therefore re-send the batch with all messages because it didn’t receive the expected acknowledgment. The new leader would append all the messages from the batch and this would result in some messages to be duplicated (here m1 and m2):

To prevent duplicates in the above situation the idempotent writes feature has been introduced which guarantees the same message is only stored once and also in the right order by de-duplicating messages in the broker based on the the sequence number provided by the producer: the producer adds a sequence number to every message and when storing the messages the leader asserts the message with the same sequence number is stored only once. In the below example the new leader recognizes the messages m1 and m2 have already been stored and therefore adds only m3 to its log after the producer re-sent the same message batch because of the failure during the first run (previous leader crashed while replicating m3):

var config = new ProducerConfig
{
    BootstrapServers = "host1:9092,host2:9092",
    ClientId = Dns.GetHostName(),
    Acks = Acks.All, // get an acknowledgement of all ISRs
    EnableIdempotence = true, // remove duplicates and assert message order
    // ...
};

using (var producer = new ProducerBuilder<string, string>(config).Build())
{
    // ...
}

Kafka even allows to implement exactly-once delivery guarantees by providing transaction support. If, for example, a producer wants to store data in different partitions and its consumers should only be able to access the data when all write operations were successful, then a Kafka transaction could be spawned.

By default Kafka uses at-least-once semantics. By disabling automatic retries in the producer and committing the offset before processing the message in the consumer you can switch to at-most-once. Eventually it depends on your requirements what strategy to chose.

References

  1. https://engineering.linkedin.com/architecture/brief-history-scaling-linkedin
  2. https://raft.github.io/
  3. https://developer.confluent.io/learn/kraft/
  4. https://docs.confluent.io/kafka/design/producer-design.html
  5. https://docs.confluent.io/kafka/design/delivery-semantics.html