Routing events to different Kafka topics in real-time with BitSwan
The BitSwan pump (“BSPump“) is not only a fast and scalable stream analyzer, it also allows software engineers to parse, transform, enrich and route events to different destinations based on a value of some attribute contained within an event. This article presents an example how this functionality can be achieved with Kafka as the output system.
This is useful, when we want to classify data based on their type. For instance, let us assume data events contain an attribute titled color
. Based on its value, we can push data events with red color to red_topic
Kafka topic, while "blue" events to blue_topic
Kafka topic.
For more information about stream analyzers and BitSwan in general, please refer to the official documentation at: https://docs.libertyaces.com/
Kafka producer and consumer
Let’s start with creating a simple pipeline, that will load events from Kafka topic named input
(you can use any other topic with some messages present), print them and send them back to Kafka, to a topic named output
. The following code will be stored in app.py
file.
class KafkaPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'input'}),
bspump.common.BytesToStringParser(app, self),
bspump.common.JsonToDictParser(app, self),
bspump.common.PPrintProcessor(app, self),
bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'output'}),
)
As you can see, the code is quite simple. The next step is to create the application, where we are going to register the pipeline in along with all necessary lookups, connections and other top-level BitSwan objects. In our case, it is only the KafkaConnection, which is then located in the pipeline under the ID KafkaConnection
.
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(
bspump.kafka.KafkaConnection(app, "KafkaConnection")
)
svc.add_pipeline(
KafkaPipeline(app, "KafkaPipeline")
)
app.run()
The last thing is to specify configuration file with credentials for KafkaConnection. You can also redefine any other default configuration values for the connection, the pipeline etc. Let’s name the file bspump.conf
to explicitly state that the configuration file is specific for a given site (or environment).
[connection:KafkaConnection]
bootstrap_servers=YOUR_KAFKA_SERVER:9092
Since everything is ready now, let’s run the application to see the messages in the Kafka topic input
(or any other topic you have chosen) printed in the terminal. All of them should also be available in the output
topic.
python3 app.py -c bspump.conf
Producing Kafka messages to different topics
In order to route events to different Kafka topics on the output, we need to create a processor within the pipeline, that would inform the KafkaSink
about the topic it should route the event to. KafkaSink
expects the target topic name in kafka_topic
attribute inside the event’s context
. We are going to utilize this ability.
class KafkaTopicSelector(bspump.Processor):
def process(self, context, event):
if event.get("weight") > 10:
context["kafka_topic"] = "heavy"
else:
context["kafka_topic"] = "light"
return event
In the processor, we are selecting the output topic based on weight
attribute inside the event (but you can use any other attribute you have in your testing dataset). If the weight is larger than 10, the output topic will be named heavy
, otherwise it will be titled light
.
It is necessary also to include the processor in the pipeline before the final KafkaSink
:
class KafkaPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'input'}),
bspump.common.BytesToStringParser(app, self),
bspump.common.JsonToDictParser(app, self),
bspump.common.PPrintProcessor(app, self),
KafkaTopicSelector(app, self),
bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'output'}),
)
The output
topic will be used as default, if no topic is specified inside the kafka_topic
attribute in the context
dictionary. There is no other change you need to do. Simply run the application again and observe heavy
and light
topics inside your Kafka.
python3 app.py -c bspump.conf
You can create a new pipeline which would consume data from heavy
and light
topics and print them in the terminal, output them to CSV files, MySQL database, Slack channel etc. Feel free to utilize all possibilities the open-source BSPump can offer to you. If you think that some feature is missing, feel free to comment and/or contribute to our GitHub repository: https://github.com/LibertyAces/BitSwanPump