Apache Kafka In Python

What is Apache Kafka?

Apache Kafka is a distributed streaming platform, used for building real-time data pipelines and streaming applications by adapting the publish-subscribe messaging system architecture. This messaging system architecture lets you send messages between processes, applications, and servers.

Kafka use cases

Messaging:

Kafka works best on messaging. Since it’s the primary work. The servers of Kafka known as brokers are best in storing the messages which are yet to be published and also to start the processing at the consumers by ending it in the producers. It is proficient in transferring the message of huge sets of data due to its high resistance capabilities.

Real-time data processing:

Real-time processing in Kafka is one of the applications of Kafka. IoT devices are often useless without real-time data processing ability. Kafka can be useful here since it is able to transmit data from producers to data handlers and then to data storage.

Log Aggregation Solution:

Since logs can be considered as messages, Apache Kafka can then be used across an organization to collect logs from multiple services and make them available in a standard format to multiple consumers.

Major components of Kafka

  • Kafka Broker - Apache Kafka runs as a cluster on one or more servers that can span multiple data centers. An instance of the cluster is a broker.

  • Producers - It sends data to the brokers.

  • Consumers - it collects data from the broker.

  • Topic - it is a feed name to which messages are stored and published. If you wish to send a message you send it to a specific topic and if you wish to read a message you read it from a specific topic.

Capture.PNG

Kafka implementation in python

To see the entire code used in this article, check repo

KafkaComponents.py

An extended module for handling Kafka services such as specifying a topic name, publishing, and the consuming of messages sent to the broker.

from kafka import KafkaProducer, KafkaConsumer
from json import loads
from json import dumps


class Kafka_Consumer:
    def __init__(self, topic='GetEvent', bootstrap_servers=['157.230.219.54:9092'], auto_offset_reset="latest", group_id=None):
        """This function is used to create a kafka consumer."""
        self.consumer = KafkaConsumer(
            topic,  # same topic used to publish
            # default kafka server: ip address and port
            bootstrap_servers=bootstrap_servers,
            # enables the consumer to read msgs after restarting (due to loss in connection)
            auto_offset_reset=auto_offset_reset,
            group_id=group_id,  # consumer group enables auto commit
            enable_auto_commit=True,
            #  consumer_timeout_ms=1000,  # interval between two commits has a default value of 1000ms.
            value_deserializer=lambda x: loads(
                x.decode('utf-8'))  # load the json data
        )


class Kafka_Producer:
    def __init__(self, topic='NewEvent', bootstrap_servers=['157.230.219.54:9092']):
        """This function is used to create a kafka producer."""
        self.topic = topic
        self.producer = None
        try:
            self.producer = KafkaProducer(
                bootstrap_servers=bootstrap_servers,  # default kafka server ip address and port
                value_serializer=lambda x: dumps(x).encode(
                    'utf-8')  # covert the data to json and encode it
            )
        except Exception as e:
            print("An error occured while connecting Kafka")
            print(e)
        return

    def publish_message(self, data):
        """This function is used to by the producer to publish videoURLs to the topic."""
        topic = self.topic
        try:
            # send the data i.e (topic, data)
            self.producer.send(topic, value=data)
            self.producer.flush()   # makes all buffered records immediately available to send
        except Exception as e:
            print("An error occured while publishing the message")
            print(e)

producer.py

it iterates value 1 to 100 and publishes each value to the Kafka broker with a specific topic every 0.5s.

from KafkaComponents import Kafka_Producer
from time import sleep

kfpro = Kafka_Producer(topic="testing")


def iter():

    for j in range(100):
        print("Iteration", j)
        data = {'counter': j}

        kfpro.publish_message(data)
        sleep(0.5)


if __name__ == "__main__":
    iter()

consumer.py

it listens to a new message sent to the Kafka broker with a specific topic every 2s.

from KafkaComponents import Kafka_Consumer
from time import sleep

kfcon = Kafka_Consumer(topic="testing")


def main():
    for message in kfcon.consumer:
        print(message.value)
        sleep(2)

if __name__ == "__main__":
    main()