Routing events to different Kafka topics in real-time with BitSwan

Routing events to different Kafka topics in real-time with BitSwan

The BitSwan pump (“BSPump“) is not only a fast and scalable stream analyzer, it also allows software engineers to parse, transform, enrich and route events to different destinations based on a value of some attribute contained within an event. This article presents an example how this functionality can be achieved with Kafka as the output system.

This is useful, when we want to classify data based on their type. For instance, let us assume data events contain an attribute titled color. Based on its value, we can push data events with red color to red_topic Kafka topic, while "blue" events to blue_topic Kafka topic.

For more information about stream analyzers and BitSwan in general, please refer to the official documentation at: https://docs.libertyaces.com/

Kafka producer and consumer

Let’s start with creating a simple pipeline, that will load events from Kafka topic named input (you can use any other topic with some messages present), print them and send them back to Kafka, to a topic named output. The following code will be stored in app.py file.

class KafkaPipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'input'}),
            bspump.common.BytesToStringParser(app, self),
            bspump.common.JsonToDictParser(app, self),
            bspump.common.PPrintProcessor(app, self),
            bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'output'}),
        )

As you can see, the code is quite simple. The next step is to create the application, where we are going to register the pipeline in along with all necessary lookups, connections and other top-level BitSwan objects. In our case, it is only the KafkaConnection, which is then located in the pipeline under the ID KafkaConnection.

if __name__ == '__main__':
    app = bspump.BSPumpApplication()

    svc = app.get_service("bspump.PumpService")

    svc.add_connection(
        bspump.kafka.KafkaConnection(app, "KafkaConnection")
    )

    svc.add_pipeline(
        KafkaPipeline(app, "KafkaPipeline")
    )

app.run()

The last thing is to specify configuration file with credentials for KafkaConnection. You can also redefine any other default configuration values for the connection, the pipeline etc. Let’s name the file bspump.conf to explicitly state that the configuration file is specific for a given site (or environment).

[connection:KafkaConnection]
bootstrap_servers=YOUR_KAFKA_SERVER:9092

Since everything is ready now, let’s run the application to see the messages in the Kafka topic input (or any other topic you have chosen) printed in the terminal. All of them should also be available in the output topic.

python3 app.py -c bspump.conf

Producing Kafka messages to different topics

In order to route events to different Kafka topics on the output, we need to create a processor within the pipeline, that would inform the KafkaSink about the topic it should route the event to. KafkaSink expects the target topic name in kafka_topic attribute inside the event’s context. We are going to utilize this ability.

class KafkaTopicSelector(bspump.Processor):
    def process(self, context, event):
        if event.get("weight") > 10:
            context["kafka_topic"] = "heavy"
        else:
            context["kafka_topic"] = "light"
        return event

In the processor, we are selecting the output topic based on weight attribute inside the event (but you can use any other attribute you have in your testing dataset). If the weight is larger than 10, the output topic will be named heavy, otherwise it will be titled light.

It is necessary also to include the processor in the pipeline before the final KafkaSink:

class KafkaPipeline(bspump.Pipeline):

    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'input'}),
            bspump.common.BytesToStringParser(app, self),
            bspump.common.JsonToDictParser(app, self),
            bspump.common.PPrintProcessor(app, self),
            KafkaTopicSelector(app, self),
            bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'output'}),
        )

The output topic will be used as default, if no topic is specified inside the kafka_topic attribute in the context dictionary. There is no other change you need to do. Simply run the application again and observe heavy and light topics inside your Kafka.

python3 app.py -c bspump.conf

You can create a new pipeline which would consume data from heavy and light topics and print them in the terminal, output them to CSV files, MySQL database, Slack channel etc. Feel free to utilize all possibilities the open-source BSPump can offer to you. If you think that some feature is missing, feel free to comment and/or contribute to our GitHub repository: https://github.com/LibertyAces/BitSwanPump

About the Author

Premysl Cerny

Premysl is senior developer in LibertyAces

You Might Be Interested in Reading These Articles

BitSwan Training #1

The following article serves as an introduction for software engineers, who are getting familiar with the BitSwan data pump (“BSPump”), which is used for real-time stream analytic tasks. It briefly reflects the first lesson of the BitSwan training program that is focused on making experts in the area of real-time stream analysis and BitSwan.

Continue reading ...

training

Published on March 09, 2019