- What is Kafka?
- Use cases
- Key components
- Kafka APIs
- How Kafka works?
- Real world examples
- Zookeeper – Install & get started
- Live Demo – Getting Tweets in Real Time & pushing in a Kafka topic by Producer
Kafka is a distributed streaming platform. Distributed streaming platform is nothing but assuming you have a lot of applications running in your company, so there can be applications like web application, mobile application , web server logs so there are ‘n’ number of applications which are writing logs or embedding data. We need to store all these data in a very efficient manner.
What is Kafka?
- Kafka is a distributed streaming platform:
– publish-subscribe messaging system
- A messaging system lets you send messages between processes, applications, and servers.
– Store streams of records in a fault-tolerant durable way.
– Process streams of records as they occur.
- Kafka is used for building real-time data pipelines and streaming apps
- It is horizontally scalable, fault-tolerant, fast and runs in production in thousands of companies.
- Originally started by LinkedIn, later open sourced Apache in 2011.
Use cases of Kafka
- Metrics − Apache Kafka is often used for operational monitoring data. This involves aggregating statistics from distributed applications to produce centralized feeds of operational data.
- Log Aggregation Solution − Apache Kafka can be used across an organization to collect logs from multiple services and make them available in a standard format to multiple consumers.
- Stream Processing − Popular frameworks such as Storm and Spark Streaming read data from a topic, process it, and write processed data to a new topic where it becomes available for users and applications. Apache Kafka’s strong durability is also very useful in the context of stream processing.
Key components of Kafka
- Consumer Group
Apache Kafka runs as a cluster on one or more servers that can span multiple datacenters. An instance of the cluster is broker.
Producer & Consumer:
Producer: It writes data to the brokers.
Consumer: It consumes data from brokers.
Apache Kafka clusters can be running in multiple nodes.
- A Topic is a category/feed name to which messages are stored and published.
- If you wish to send a message you send it to a specific topic and if you wish to read a message you read it from a specific topic.
- Why we need a topic: In the same Kafka Cluster data from many different sources can be coming at the same time. Ex. logs, web activities, metrics etc. So Topics are useful to identify that this data is stored in a particular topic.
- Producer applications write data to topics and consumer applications read from topics.
Also Read: Top 40 Hadoop Interview Questions
- Kafka topics are divided into a number of partitions, which contains messages in an unchangeable sequence(immutable).
- Each message in a partition is assigned and identified by its unique offset.
- A topic can also have multiple partition logs.This allows for multiple consumers to read from a topic in parallel.
- Partitions allow you to parallelize a topic by splitting the data in a particular topic across multiple brokers
Anatomy of a topic:
Offset: Messages in the partitions are each assigned a unique (per partition) and sequential id called the offset. Consumers track their pointers via (offset, partition, topic) tuples.
Consumer and Consumer Group:
- Consumers can read messages starting from a specific offset and are allowed to read from any offset point they choose.
- This allows consumers to join the cluster at any point in time.
- Consumers can join a group called a consumer group.
- A consumer group includes the set of consumer processes that are subscribing to a specific topic.
- In Kafka, replication is implemented at the partition level. Helps to prevent data loss.
- The redundant unit of a topic partition is called a replica.
- Each partition usually has one or more replicas meaning that partitions contain messages that are replicated over a few Kafka brokers in the cluster. As we can see in the pictures – the click-topic is replicated to Kafka node 2 and Kafka node 3.
Kafka has four core APIs:
● The Producer API allows an application to publish a stream of records to one or more Kafka topics.
● The Consumer API allows an application to subscribe to one or more topics and process the stream of records.
● The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
● The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
How Kafka Works?
- Producers writes data to the topic
- As a message record is written to a partition of the topic, it’s offset is increased by 1.
- Consumers consume data from the topic. Each consumer read data based on the offset value.
Real world example
- Website activity tracking.
- Let’s take an example of Flipkart, when you visit flipkart & perform any action like search, login, click on a product etc all of these events are captured.
- Tracking event will create a message stream for this based on the kind of event it’ll go to a specific topic by Kafka Producer.
- This kind of activity tracking often requires a very high volume of throughput, messages are generated for each action.
- A user clicks on a button on a website.
- The web application publishes a message to partition 0 in topic “click”.
- The message is appended to its commit log and the message offset is incremented.
- The consumer can pull messages from the click-topic and show monitoring usage in real-time or for any other use case.
- ZooKeeper is used for managing and coordinating Kafka broker.
- ZooKeeper service is mainly used to notify producers and consumers about the presence of any new broker in the Kafka system or failure of the broker in the Kafka system.
- As per the notification received by the Zookeeper regarding presence or failure of the broker then producer and consumer takes decision and starts coordinating their task with some other broker.
- The ZooKeeper framework was originally built at Yahoo!
How to install and get started?
- Download Apache kafka & zookeeper
2. Start Zookeeper server then kafka & run a single broker
> bin/zookeeper-server-start.sh config/zookeeper.properties > bin/kafka-server-start.sh config/server.properties
- Create a topic named test
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test > bin/kafka-topics.sh --list --zookeeper localhost:2181 test
- Run the producer & send some messages
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
- Start a consumer
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
Live Demo code:
- Live Demo of Getting Tweets in Real Time by Calling Twitter API
- Pushing all the Tweets to a Kafka Topic by Creating Kafka Producer in Real Time
Please see the code in Jupyter below.
from tweepy.streaming import StreamListener from tweepy import OAuthHandler from tweepy import Stream from kafka import SimpleProducer, KafkaClient import time access_token = "748200460067045376-zYxRRyxiPIywcw2IV50IQiIxzQVN5FZ" access_token_secret = "c6dRkeRbgPqtbWAOTz0OfOMpBvhZS6KqWFjtEqHBEv7me" consumer_key = "ukUrCrJdd6MQQd0HQBzCDwcLq" consumer_secret = "VCf2wU1MhedUFnQeCwffzstdVkF7rbURzoNNDAdPPvbWfDtggP" kafka_endpoint = "ip-20-0-32-4.ap-south-1.compute.internal:9092" kafka_topic = "rk_hadoop" twitter_hash_tag = "RamNavami" time_limit = 10 class StdOutListener(StreamListener): def __init__(self, time_limit=time_limit): self.start_time = time.time() self.limit = time_limit super(StdOutListener, self).__init__() def on_data(self, data): if (time.time() - self.start_time) < self.limit: #msg = json.loads(data) producer.send_messages(kafka_topic, data.encode('utf-8')) print (data) return True exit(0) def on_error(self, status): print (status) kafka = KafkaClient(kafka_endpoint) producer = SimpleProducer(kafka) l = StdOutListener() auth = OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) stream = Stream(auth, l) stream.filter(track=twitter_hash_tag)
This brings us to the end of the blog. If you found this helpful and wish to learn more such concepts, you can join Great Learning Academy’s free online courses today.1