Home Constructing a Web Server Using Kafka
Post
Cancel

Constructing a Web Server Using Kafka

Kafka is an open-source distributed event streaming platform. The Confluent platform seen in the project runs an enhanced distribution of Kafka designed for community and commercial use. Using Python, Javascript, and a docker Kafka installation we can produce data and then create a web server that initiates the data consumption process.

Setup Kafka Docker Container

Running Kafka in Docker

We will first install Kafka in a docker container. If we navigate to the same directory as docker-compose.yaml, we can run the console command to start the container.

1
C:\Users\VP1050\OneDrive\Documents> docker-compose up

Docker Desktop

Creating a New Kafka Topic

WIthin the Confluent webpage running on localhost:9021, under “Topics” we can add a new topic to send data to called “vehicle-coordinates”. This topic should be created with default settings:

Confluent UI

In the picture below we see the new “vehicle-coordinates” topic:

Confluent UI

Creating a kafka-python Producer to Send Sample Data

The Kafka platform operates via producer and consumer. The producer creates and sends the data to the Kafka server.

kafka-python Setup

First we need to install kafka-python.

1
C:\Users\VP1050\OneDrive\Documents> pip install kafka-python

Next, we will create a python script to send data to the Kafka broker. In the PublishVehicleCoordinates.py file, we first define variables needed to specify the Kafka server.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
'''PublishVehicleCOordinates.py'''
import kafka
import time
import random
import json
from time import sleep
import numpy as np

#define producer and consumer variable
sensor_data = {'longitude': 0, 'latitude': 0}
topic_name = "vehicle-coordinates"
client = kafka.KafkaClient(bootstrap_servers=['localhost:9092'])
producer = kafka.KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: 
                         json.dumps(x).encode('utf-8'))
consumer = kafka.KafkaConsumer(bootstrap_servers=['localhost:9092'])

Next, in the same file, we establish a function to do the following:

  1. Print an error message if data fails to deliver
  2. Check if the topic we want to send data to exists
  3. Generate example data via a while loop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: {0}: {1}"
              .format(msg.value(), err.str()))
    else:
        print("Message produced: {0}".format(msg.value()))
try:
   
    if topic_name in consumer.topics():
         print(topic_name+" exist")
    else:
        client.ensure_topic_exists(topic_name)

    consumer.close()
    client.close()

    while True:
        longitude = np.random.randint(-180, 180)
        latitude = np.random.randint(-90, 90)
        
        print(f"longitude: {longitude} latitude: {latitude}")
        sensor_data['longitude'] = longitude
        sensor_data['latitude'] = latitude
        producer.send(topic_name, value = sensor_data)
        sleep(3)

Let’s run our created code to produce data to be sent to the Kafka server.

1
C:\Users\VP1050\OneDrive\Documents> python3 PublishVehicleCoordinates.py

Below is an example of correct output.

Confluent UI

Using Node.js to initiate data consumption

Ensure that node-rdkafka is installed in order to create the consumer.

1
C:\Users\VP1050\OneDrive\Documents> npm install node-rdkafka

Also, ensure that Node.js is installed on your machine to run the web server.

Defining the Kafka Consumer

We will borrow from Confluent example code to define a function that subscribes to a topic and consumes data. We wil add 1 line to the myconsumer function to return the response. We will add another line of code outside of the myconsumer function so that the function can be executed by a javascript file within the same directory.

1
2
3
4
5
6
7
8
9
10
11
//myconsumer.js
const myconsumer = function myconsumer(topic){
    const Kafka = require('node-rdkafka');
    const bootstrapServers = 'localhost:9092';
    var response = "";
    //Code
    //...
    return response;
}
exports.myconsumer = myconsumer

Next, the server.js file will complete the following:

  1. Create a UI Button
  2. Call the myconsumer function in the consumer.js file upon clicking the button
  3. Give user feedback once data is consumed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//server.js
const express = require('express');
const consumer = require("./myconsumer");
const app = express()

app.use(express.json());
let options = {
    dotfiles: "ignore",
    redirect:false
}

app.use(express.static('public',options))

app.get("/",(req,res)=>{
    var response = `
    <html>
        <h1>Confluence Kafka REST</h1>
        <div>
        <form action="/consumer" method="get" >
            <input type="submit" value="Consume Vehicle Coordinates"/>
        </form>
        </div>
    </html>`
res.send(response)
}
)
app.get("/consumer", (req,res)=>{
    let topic = "vehicle-coordinates"
    response = consumer.myconsumer({topic:topic})
    res.send("Success- Consumed: " +topic);
})


app.listen(5000,()=>console.log('Listening on 5000'))

Initiating Data Consumption

Next, let’s start the node.js server.

1
C:\Users\VP1050\OneDrive\Documents> node server.js

The app is listening on localhost:5000. The data is already being produced if the python file in the first step is running. We can now run the app and consume data by clicking the “Consume Vehicle Coordinates” button:

CONSUME

We can view the data consumed in the console window running the server.js file. The data is also visible in the Confluent dashboard:

Consume Output

Conclusion

With Kafka, Python, and Node.js we can create a data streaming process to control data flow to a broker. Adjustments could be made to pull the python data from another source, or have the data consumed on a schedule rather than upon user interaction defined in javascript. The project files can be downloaded on Github.

This post is licensed under CC BY 4.0 by the author.