Kafka

Apache Kafka is a publish-subscribe messaging rethought as a distributed commit log

Why Kafka?

Connectors available to disparate sources

Large-scale data movement pipelines

"Big-Data" integration

Kafka Listeners explained in this link - https://rmoff.net/2018/08/02/kafka-listeners-explained/

Event Sourcing

An architectural style or approach to maintain an application's state by capturing all changes as a sequence of time-ordered, immutable events.

Time ordered sequence of messages.

Message has - Timestamp, reference able identifier, Payload (data),

Message Offset -

Is a placeholder like a bookmark. (last read message)

Maintained by Kafka consumer.

Corresponds to the message identifier

Message Retention Policy:

retains all published messages regardless of consumption

Retention period is configurable (default in 168 hr ie. 7 days)

Retention period is defined on a per-topic basic

Physical storages resources

Partitions ( Partition == Log)

Each topic has one or more partitions

Scalability is determined by the number of partitions being managed by multiple broker nodes.

Say if number of partition is set to 3 there will be 3 partitions created in 3 brokers. This is to say that messages in the topic will be distributed in the 3 different brokers.

Partition is the physical representation of the commit log. i.e mytopic_01, mytopic_02 and mytopic_03.

If any of the partition goes down, or files system not accessible. We will loose the messages in the broker. To avoid the losses, we have 'replication-factor'.

Replication-Factor - When we set replication factor is set to 3, the mytopic_01, will identify 2 more brokers to replicate the log.

More partition, the greater the zookeeper overhead. (ZK works on memory) . With large partition numbers ensure proper ZK capacity.

The more partitions the longer the leaders fail-over time.

Message ordering - Go get global ordering without consumers maintaining it, go for single partition.

When an replication factor is set to 'n', it is the leaders job to get peer brokers to participate in a quorum for the purposes of replicating the log to achieve the intended redundancy level. The leader will engage its peers and start copying the partition log. When all the members of the quorum are caught up, it is reported that the ISR or IN-synch replicas are in synch.

OFFSET

Last committed offset - last message the consumer has read

Offset if pertinent to Partitions.

Read != committed - offset commit behaviour is configurable

- enable.auto.commit = true (default) - if it is set to false, you take full control on the commit.

you can use consumer.commitSync() - commit when you're done. This be done in the for loop after each batch of messages are processed, instead of done for each message. As this is synchronous, this might affect performance. At the even of failure to commit. it is retried

after 100 mill second which is configured in retry.backoff.ms(default 100).

You can also set consumer.commitASync() - non-blocking but non-deterministic and no retries. But we can set callback option to determine the status of commit. Avoid this unless you register a callback.

If you are building a system where 'Automicity' is important like Exactly-once, it will make more sense to manage the offset instead of defaults.

- auto.commit.interval.ms = 5000 (default)

- auto.offset.reset = "latest" (default) can be set to 'earliest'. or 'none'

Consumer-Group

consumer group allow to scale customer-side scale-out.

Independent consumers working as a team with the "group.id" setting.

Sharing the message consumption and processing load

- parallelism and throughput, redundancy andPerformance

Here are group co-ordinator is created automatically and which in turn talks to ZK to assign and monitor specific topics or partitions. Once a consumer group is formed each consumer in the consumer group sends a hear-beat defined in the heartbeat.interval property. The group coordinator relies on this to determine individual consumer is alive. The session.timeout setting is the timeout setting coordinator will wait until the consumer is marked as failure. If a consumer is not available the consumer will be reassigned the partitions to another consumer. This is called consumer-rebalance.

If the number of consumers are more than the partitions, the it is is considered 'over-provisioining' and it will wait for the new partitions.

Consumer Performance and efficiency

- fetch.min.bytes - minimum bytes that must be returned fro the poll (reduces cycles of

- max.fetch.wait.ms - wait for the max bytes before fetching

- max.partition.fetch.bytes - max number of bytes per cycle

- max.poll.records - incoming batch of records

The Process of Sending Messages:

When calling the send method, the producer will reach out to the cluster using the bootstrap servers list to discover the cluster membership. The response comes back as metadata, containing detailed information related to the topics, their partitions, and their managing brokers on the cluster. This metadata is used to instantiate a metadata object in the producer, and throughout the producer's life cycle, it will keep this object fresh with the latest information about the cluster.

Additionally, a pseudo processing pipeline within the Kafka producer is engaged. With the producer now having an actual producer record to work with, the first step in this pipeline will be to pass the message through the serializer using the configured serializer. Remember in our case, we're just using the string serializer.

The next step in the pipeline is the partitioner, whose job it is to determine what partition to send the record to. Here the producer can employ different partitioning strategies depending on the values being passed to it in the producer record, and the information it has regarding the cluster membership. Between the time the send operation is invoked to the time a message is received by a broker, quite a few things happen. determined by four possible strategies.

First, the Kafka producer looks at the ProducerRecord contents, especially the partition field. It will look if there's a value provided for that partition field. If it has, the next question will be if the proposed partition is actually a valid partition. For example, for the topic being requested, is there a partition that matches the one proposed? For this answer, the producer refers to the metadata object that maintains the cluster metadata, including a list of topics, their partitions, and the leaders for each. If the value proposed does not match a known partition for the topic, or if that partition is unavailable, which is unlikely if replication is enabled, then an exception will be thrown, and the send operation will abort. If the proposed partition is valid, then the producer will add the producer record object to the specific partition buffer for the topic where it will, on a separate thread, await the actual send to the broker leader of that specified partition.

If a partition was not specified in the producer record, the next question to determine the routing strategy is whether a key was provided in the producer record. If the answer is now, as was the case in the last module when using the Kafka producer shell program, the message will be routed using a round robin strategy that attempts to evenly distribute the message across all the partitions in the topic. Now, technically speaking, this scheme is defined in the default partitioner class we'll talk about in a few more steps. If there is a key provided, the next qualifying question is whether a custom non-default partitioner class was provided as part of the configuration properties provided to instantiate the Kafka producer. For this, the producer references the producer config object, and looks for a specific value called partitioner_class_config, which represents the optional partitioner.class setting provided in the properties object. If there is nothing provided, which is the common default scenario, the routing will be done through a key- based partitioning scheme, which Kafka provides as a default implementation of the partitioner interface.

The default partitioner class takes a MurmurHash of the key, and then applies a modulus function by the total number of partitions for the topic, and that's how it determines what partition to send it to. I suppose you could call that a fancy way of describing a murmur based modhash. Some use cases may call for a custom key-based partitioning scheme, and that's when you would need to develop your own partitioner implementation, add that implementation class to the class path, and specify the class type as the partitioner.class property setting. If that has been done, it is that custom scheme that will be used. I

With the partitioning scheme established, the producer can now dispatch the producer record onto an in-memory que like data structure, called a RecordAccumulator. The RecordAccumulator is a fairly low level object that has a lot of complexity.

Each time you send, persist, or read a message, resource overhead is incurred. In high throughput systems, this overhead can dramatically impact the performance, reliability, and overall throughput of the system, and the more that overhead is incurred on handling fewer units of work, the less efficient that system is.

Kafka's approach to addressing common inefficiencies in messaging systems - Micro-batching. Whether it be on the producing side, the broker side, or the consumer side, Apache Kafka was designed with the means of being able to rapidly que or batch up requests to send, persist, or read inflexibly bound memory buffers that can take advantage of modern day operating system functions, such as Pagecache, and the Linux sendfile system call. By batching, the cost overhead of transmission, flushing to disk, or doing a network fetch is amortized over the entire batch. The RecordAccumulator gives the producer its ability to micro-batch records intended to be sent at high volumes and high frequencies. When a ProducerRecord has been assigned to a partition through the partitioner, it will get handed over to a RecordAccumulator, where it will be added to a collection of RecordBatch objects for each topic partition combination needed by the producer instance. Each of these RecordBatch objects, as the name suggests, is a small batch of records that is going to be sent to the broker that owns the assigned partition. There are a lot of factors that determine how many ProducerRecords are to be accumulated and buffered into a RecordBatch before it is sent off to the brokers. Most of these factors are based on advanced configuration settings defined at the producer level, that are set using a properties object, similar to the way the other properties were set. Let's take a look at a few of the important settings.

DELIVERY GUARANTEES

Broker acknowledgement ("asks")

- 0 - fire and forget ( fastest)

- 1 - leader acknowledged ( leader receipt ) Performance and reliability

- 2 - replication quorum acknowledged ( highest assurance on cost of performance)

ORDERING GUARNTEES

Message is only preserved on a given partition. Messages sent to multiple partition will not have global order.

-max.in.flight.request.per.connection to set to 1 - Only one messages request is made

Performance Testing ( creating data for performace test)

Send 50 records 1 byes 10 msg per/sec

/bin/kafka-producer-perf.sh --topic myopic --num-records 50 --record-size 1 throughput 10 producer-props bootstrap.servers=localhost:9092 key.serializer=or.gapache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serializationStringSerializer

You can consumer message by subscribe() or assign() methods. Subscribe methods gets messages from all the partitions even the once if they are added eventually after subscribing. On the other hand assign only gets messages from a specific partition.

Create New Partition

/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic myopic --partitions 4