Apache Kafka in Python
Kafka Producers and Consumers with Python
When it comes to data event streaming, Apache Kafka is the de facto standard. It is an open-source distributed system consisting of servers and clients. Apache Kafka is used primarily to build real-time data streaming pipelines.
Apache Kafka is used by thousands of the world’s leading organizations for high-performance data pipelines, streaming analytics, data integration and many other vital applications.
I am assuming Apache kafka is already setup in your system if not you can do it from the following link.
For the sake of this article, you need to be aware of 4 main Kafka concepts.
- Topic: A Kafka topic is a category or stream name to which messages are published in the Kafka messaging system. Topics in Kafka are similar to tables in a database, where each topic consists of one or more partitions, and each partition can be thought of as a log of records. All Kafka messages pass through topics. A topic is simply a way for us to organize and group a collection of messages.
- Consumers: In Kafka, a consumer is a client application that reads data from Kafka topics. Consumers subscribe to one or more Kafka topics, and consume messages from one or more partitions of those topics.
- Producers: In Kafka, a producer is a client application that writes data to Kafka topics. Producers publish messages to one or more Kafka topics, and the messages are then stored in partitions within those topics. Producers in Kafka can be implemented using either the low-level Kafka Producer API or the higher-level Kafka Streams API. The Kafka Producer API provides fine-grained control over how messages are produced, while the Kafka Streams API provides higher-level abstractions for building stream processing applications on top of Kafka.
- Consumer Groups: In Kafka, a consumer group is a group of one or more consumers that work together to consume messages from one or more partitions of a topic. When multiple consumers are part of a consumer group, Kafka automatically assigns partitions to each consumer within the group, ensuring that each partition is consumed by only one consumer at a time. This allows for horizontal scaling of consumers and high availability of data processing.
Note that the other terminals for Zookeeper and Kafka servers need to be running in the background.
Kafka Producer in Python
Here is my Producer code with the kafka-python
library. Open up the producer.py
file, and you’re ready to roll.
The Kafka producer needs to know where Kafka is running. Yours is probably on localhost:9092
if you haven’t changed the port during the configuration phase. In addition, the KafkaProducer
class needs to know how the values will be serialized. You know the answer to both.
And that’s all for the Kafka producer. Here’s the full source code:
I am getting stream data from API with the frequency of 2 seconds and reshaping the response data then pass it to kafka stream by producer.send method.
import json as json
import time
import requests
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from kafka import KafkaProducer
# Set up Kafka producer
def serializer(message):
return json.dumps(message).encode('utf-8')
# Kafka Producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=serializer
)
def get_push_reords():
# Define function to flatten client records
def flatten_record(server, client):
client.pop('mac', None)
client.pop('deviceHash', None)
client.pop('lon', None)
client.pop('lat', None)
client.pop('ip4', None)
if 'ip' in client:
client['client_ip'] = client.pop('ip')
client['application'] = ip_to_name.get(server['ip'], '')
client['timestamp'] = int(time.time() * 1000)
flat_record = json.dumps({"timestamp": client['timestamp'], "application": client['application'], "client_ip": client['client_ip'], "server_ip": server['ip'], "server_latlon":server['latlon'],**client})
return flat_record
# Fetch nested JSON record from Dropbox link
response = requests.get("https://www.dropbox.com/s/iwcpg1oo59i4yrn/exfoCustosTest%20%283%29.json?dl=1")
json_data = json.loads(response.content)
# Extract servers and services arrays
servers = json_data['servers']
services = json_data['services']
# Create dictionary mapping IP addresses to service names
ip_to_name = {server['ip']: service['name'] for service in services for server in service['servers']}
# Output flattened JSON record for each client record using threads
debug = True # Set to True to enable debug output
with ThreadPoolExecutor(max_workers=16) as executor:
future_results = []
for server in servers:
for client in server['clients']:
future = executor.submit(flatten_record, server, client)
future_results.append(future)
for future in future_results:
flat_record = future.result()
if debug:
print(f"Sending record to kafka Stream: {json.loads(flat_record)}")
data = json.loads(flat_record)
print(data["timestamp"])
producer.send('mec-xdr', flat_record)
producer.flush() # Wait for messages to be delivered
def main():
while True:
get_push_reords()
sleep(2)
if __name__ == "__main__":
main()
run python3 producer_from_api.py inti the terminal
Her is the output of sending continuous stream
Kafka Consumer in Python
Here’s my consumer.py
.
Kafka Consumer will be much easier to code out. When the Consumer starts, you’ll get all the messages from the ’mec-xdr’
topic and print them out. Of course, you’re not limited to printing the messages — you can do whatever you want. In my case, I am ingesting all the stream data in Postgres DB in realtime in the schema of app_data into raw table.
The auto_offset_reset
parameter will ensure the oldest messages are first.
Once you read the data in your consumer, you can do anything with it. For example, you can extract some variables from this data and pass it on to another function for further transformation.
You could write some logic and send an alert if some conditions are met. You could also plot the data as a graph on some web page to get a real-time graph. What you can do is limited by your imagination.
import json
from kafka import KafkaConsumer
import psycopg2
from sqlalchemy import create_engine
if __name__ == '__main__':
conn = psycopg2.connect(
host="localhost",
database="postgres",
user="robert",
password="Admin123"
)
# Create a cursor object
cur = conn.cursor()
# Kafka Consumer
consumer = KafkaConsumer(
'mec-xdr',
bootstrap_servers='localhost:9092',
max_poll_records = 100,
value_deserializer=lambda m: json.loads(m.decode('ascii')))
auto_offset_reset='earliest'#,'smallest'
)
for message in consumer:
print(json.loads(message.value))
cur.execute("INSERT INTO app_data.raw_data (raw) VALUES (%s)", (str(json.loads(message.value)),))
conn.commit()
run python3 consumer.py from terminal
To stop the execution in all the terminals, you can use ctrl + c
.
Note: If you notice any error in my articles, feel free to reach out and let me know of the same, and I will update the blog post accordingly.