Sources:https://kafka.apache.org/quickstart
Prerequisites:
zookeeper
Downloads:
Configuration:
single node single broker
single node multiple broker
multiple node multiple broker
Start:
#Create topics:
kafka-topics.sh --zookeeper <ZOOKEEPER_IP_PORT> --create --topic <TOPIC_NAME> --partitions 1 --replication-factor 1
#Increase nuber of partition
kafka-topics.sh --zookeeper <ZOOKEEPER_IP_PORT> --alter --topic <TOPIC_NAME> --partitions 3
delete topic:
1. Stop Kafka server.
2. Delete the topic directory with rm -rf command.
3. Connect to Zookeeper instance.
4. ls /brokers/topics.
5. Remove the topic folder from ZooKeeper using rmr /brokers/topics/yourtopic.
6. Restart Kafka server.
7. Confirm if it was deleted or not by using this command kafka-topics.sh --list --zookeeper yourip:port.
Consume Messages:
Produce Message:
using console-producer- tail -n+1 -f <INPUT_FILE> | kafka-console-producer.sh --broker-list <KAFA_SERVER_IP>:<PORT> --topic <TOPIC_NAME> $1 #This will continuously keep producing messages for lines written in and any new line being added to <INPUT_FILE>
Stop:
Problems:
#1. Following exception thrown while consuming data from a topic where older producer(Nagios) sent test messages, using spring-kafka client-2.0.1 :
Exception in thread "" org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord key = 0, value = This is a producer-consumer test message from HariSekhon:check_kafka.pl:ip-172-31-29-160 at epoch 1571063100.84196 (2019-10-14 14:25:00) with random token: AlRccLSOoN68Tpk3PTre) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.Soln:
#2.
See zookeeper page, section "Running zookeeper on win-10", for download and zookeeper configuration steps.
Created paths E:\ProgramData\kafka\logs, E:\ProgramData\kafka\config and E:\ProgramData\kafka\data
Copied server.properties from installation path to E:\ProgramData\kafka\config\server.properties. and modified -
log.dirs=E:\\ProgramData\\kafka\\logs
Started kafka server in cmd - E:\>kafka-server-start E:\ProgramData\kafka\config\server.properties
Create topic in another cmd -
C:\Users\rohit>kafka-topics --create --replication-factor 1 --zookeeper localhost:2181 --partitions 2 --topic quizzdata
See created topic -
C:\Users\rohit>kafka-topics --list --zookeeper localhost:2181
Run console consumer in cmd -
C:\Users\rohit>kafka-console-consumer --bootstrap-server localhost:9092 --topic quizzdata
Run producer in cmd -
C:\Users\rohit>kafka-console-producer --broker-list localhost:9092 --topic quizzdata
Run consumer to read message from offset (earliest, latest or specific offset) of specific partition -
C:\Users\rohit>kafka-console-consumer --bootstrap-server localhost:9092 --topic quizzdata --offset earliest --partition 1