BitSwan Training #1

BitSwan Training #1

The following article serves as an introduction for software engineers, who are getting familiar with the BitSwan data pump (“BSPump”), which is used for real-time stream analytic tasks. It briefly reflects the first lesson of the BitSwan training program that is focused on making experts in the area of real-time stream analysis and BitSwan.

The BSPump enables software developers to dynamically collect data from various sources such as MySQL databases, Apache Kafka, file system and many more, to further process, parse and enrich data with relevant information and finally store them in a desired output system (that can also be some form of a database or file system). In the case of output, the usually selected destination is a running ElasticSearch cluster, that, if properly configured, can store the data using a great speed limited only by connectivity, query them responsively and use visualization tools such as Kibana to create Business Intelligence dashboards, visualizations, timelines and so on.

In this lesson you will learn how to build a simple BSPump application with MySQL database as input and ElasticSearch as output. Let us assume we have a simple table in our MySQL database that is called users, with thousands of records we need to filter, parse, enrich with relevant information, so we can store it in the schema-less ElasticSearch to quickly create graphs with users that have certain roles or flags, such as administrator users, blocked users, users based on activity etc.

Installing dependencies

When we have the MySQL database with the users table running and know the credentials, we can start creating a simple BSPump application. Since we are going to write a Python code, it is necessary to have Python version 3.5+ installed first. Then we can install the BSPump via pip:

pip install git+https://github.com/LibertyAces/BitSwanPump

However, BSPump itself requires additional dependencies, such as asab, asyncio and aiohttp libraries for asynchronous programming and aiomysql for MySQL connection.

pip install asab asyncio aiohttp aiomysql

The more features of the BSPump you use, the more dependencies you need to resolve. If you use, let us say, Apache Kafka connection, you will also need to install aiokafka library. However, in our situation the already mentioned dependencies should be enough.

Creating custom pipeline

Now create a Python file and name it, for instance, bspump-training-1.py. Include the following content, that we are going to extend and modify later:

import bspump
import bspump.common
import bspump.elasticsearch
import bspump.mysql
import bspump.trigger

class NameEnricher(bspump.Processor):
    def process(self, context, event):
        if "name" in event:
            firstname_lastname = event["name"].split(" ")
            event["firstname"] = firstname_lastname.get(0)
            event["lastname"] = firstname_lastname.get(-1)
        return event

class SamplePipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.mysql.MySQLSource(app, self, "MySQLConnection",
                config={'query': 'SELECT Id, name FROM users;'}
            ).on(
                bspump.trigger.PubSubTrigger(app, "run_me!")
            ),
            NameEnricher(app, self),
            bspump.common.NullSink(app, self)           
        )

The imports at the top specify modules we are going to use in our application. As you can see, the file now contains only two classes with no application object, which we are going to add later. The first class is a simple Processor object, which in the method called process expects two parameters: event and context. The event can be generally of any type, the processor itself should however define the type of the event object, either explicitly via asserts or in the class description. As you can see in the code, the NameEnricher class expects event to be a Python dictionary. The process method then checks for the name key in the event and if there is one, the method takes the value, splits it by spaces and stores first name and last name of users in separate fields. Since the processor adds new information to the event, we call it Enricher. There is also context variable, which is shared across events and that can store information such as file name. In our case, every row in the MySQL table will be automatically parsed into a dictionary event, while in the context we can store temporary variables such as ElasticSearch IDs, versions etc.

The second class is our custom Pipeline class. Pipelines are the top BSPump objects, that can be registered inside the application’s BSPump service to process load, process and store data. Every pipeline includes processors such as parsers and enrichers in an order that defines the processing steps. The first processors in the pipelines, that are connected to various sources of data such as databases or file systems, are called Source objects. There can be one or more of such objects in a pipeline. Sources are followed by processors ordered by processing of the event, i. e. parsers, enrichers, generators and so on. The final processor, which is connected to the data destination, are called Sink objects. Every pipeline has only one Sink objects, that is derived from the abstract class defined in BSPump. If there is a need to push data to various destinations, the internal routing mechanism of the BSPump can be used to push events to other pipeline(s). For more information about routing and the full description of processors, enrichers, parsers, generator etc., please follow the BitSwan documentation: https://docs.libertyaces.com/

See that in the pipeline there is now MySQLSource object, which depends on another type of top BSPump objects, which are called Connections and which define the connectivity between BSPump application and the desired system, in this case the MySQL database. At the end of the processing, there is NullSink object, which will simply throw all events away when they come to it. We can also use PPrintSink instead of it to print processed events in the terminal.

Creating an application object

When we have the pipeline object prepared, we can create desired connections and the application object itself, where we are going to register all necessary top BSPump objects. Let us now also assume we have a running ElasticSearch node or cluster that we can connect to via URL. For more information about ElasticSearch and its setup, please refer to the official documentation: https://www.elastic.co/

Add the following snippet into your bspump-training-1.py file:

if __name__ == '__main__':
    app = bspump.BSPumpApplication()
    svc = app.get_service("bspump.PumpService")

    # Create connection
    svc.add_connection(
        bspump.mysql.MySQLConnection(app, "MySQLConnection")
    )
    svc.add_connection(
        bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")
    )

    # Construct and register Pipeline
    pl = SamplePipeline(app, 'SamplePipeline')
    svc.add_pipeline(pl)

    # This is how pipeline is triggered:
    app.PubSub.publish("run_me!", asynchronously=True)
    app.run()

The code creates a BSPump application object and obtains the PumpService, where all the top objects, i. e. pipelines, connections and lookups, may be registered. First, we will add and register connections to MySQL and ElasticSearch, which we will configure later. Then we also register our custom pipeline and inform it to run itself, which is done via publish-subscribe mechanism, using a message titled run_me!. When the publish-subscribe object, which is in this case part of the application object, receives the message, it notifies all subscribers, in this case the PubSubTrigger, which is connected to the Source (see the pipeline code above). Triggers are used to start certain source objects, either per message, once per some time etc. On the other hand, the message systems such as Apache Kafka do not require triggers to be used, since there is an internal notification mechanism, which will simply start the pipeline when new bulk of data appear in the system.

In our custom pipeline, we can replace the NullSink with ElasticSearchSink, so that all events are loaded there. Replace

bspump.common.NullSink(app, self)

… with …

bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")

Adding configuration

That’s it when it comes to the code. We also need to add configuration, so the connections and other objects such as processors are properly pointed to desired systems or configured in some other way. Now create a file named site.conf with the following lines, while replacing values with your own credentials:

[connection:MySQLConnection]
host=YOUR_HOST
user= YOUR_USER
password= YOUR_PASSWORD
db= YOUR_DB

[connection:ESConnection]
url=YOUR_ES_URL (such as http://127.0.0.1:9200/)

[pipeline:SamplePipeline:ElasticSearchSink]
index_prefix=test_
rollover_mechanism=size

The configuration of MySQL and ElasticSearch connections are obvious. We also configure the ElasticSearchSink object in our SamplePipeline. You can configure any processor in the pipeline based on the pipeline ID and processor ID (which is implicitly the same as its name, yet you can override it). The index_prefix specifies which ElasticSearch index the data should be stored in. The name of the index is also constructed by the so-called rollover mechanism, which will add a number after the index prefixed based either on time, size etc. In this configuration we are using size-based rollover mechanism.

Running the application

Now we can finally run the code with the configuration linked to it:

python bspump-training-1.py -c site.conf

Congratulations! You have successfully implemented and ran your first BSPump application. You should see the data in ElasticSearch and create index patterns, visualizations and dashboards in Grafana. You can also create an index template to help Kibana with proper formatting. Please refer to the official Kibana website to learn how to set it up, connect to the ElasticSearch and create Kibana objects there: https://www.elastic.co/guide/en/kibana/

Simple table visualization in Kibana

Adding other custom objects

If you want to further enhance the code, you can, for example, add a mechanism that will utilize internal ElasticSearch versioning system, which will make sure that within one index there is only one document with the specified ID.

First, we need to create a processor that will store the ID and version in the pipeline’s context. Add the following code to the bspump-training-1.py file.

class IDVersionEnricher(Processor):

    def __init__(self, app, pipeline, id_postfix='', id=None, config=None):
        super().__init__(app, pipeline, id, config)
        self.IdPostfix = id_postfix

    def process(self, context, event):
        Id = event.get("Id")
        if Id is None:
            return None
        id_base = str(Id) + self.IdPostfix
        context["_id"] = id_base
        import time
        context["_version"] = int(time.time())
        return event

The IDVersionEnricher enriches the context variable with ID based on ID from the original MySQL table and version based on time, so only the latest record with the specific ID will be kept. It is further necessary to inform ElasticSearch to use the versioning system and the variables we stored in the pipeline’s context. We need to override the bspump.elasticsearch.ElasticSearchSink class, or, more precisely, its process method. Add the following code to the bspump-training-1.py file: class ElasticSearchSink(bspump.elasticsearch.ElasticSearchSink):

def process(self, context, event):
    assert self._rollover_mechanism.Index is not None

    _id = context.get("_id")
    _version = context.get("_version")

    data = '{{"index": {{ "_index": "{}", "_type": "{}", "_id": "{}",' \
            '"version_type": "external", "_version": {} }}\n{}\n' \
            .format(
                self._rollover_mechanism.Index,
                self._doctype,
                _id,
                _version,
                json.dumps(event)
            )

    ret = self._connection.consume(data)

The code utilizes the ElasticSearch Bulk API to pass the insert query to ElasticSearch via connection object. Here in the code we take the ID and the version from the context and pass them to ElasticSearch with the query itself.

The last thing before running the application is to modify our custom pipeline and add both processors there. The result should look like this:

class SamplePipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.mysql.MySQLSource(app, self, "MySQLConnection",
                config={'query': 'SELECT Id, name FROM users;'}
            ).on(
                bspump.trigger.PubSubTrigger(app, "run_me!")
            ),
            NameEnricher(app, self),
            IDVersionEnricher(app, self),
            ElasticSearchSink(app, self, "ESConnection")
        )

Now when you run the application twice on the same data, only the latest documents with specific IDs should be seen in Kibana.

I hope you liked the tutorial. Next time we will focus on routing events among more pipelines and creating Lookup objects, which help enrichers to find relevant information based on values in events. Such relevant information includes geo coordinates, identifications, labels etc. Happy coding!

About the Author

Premysl Cerny

Premysl is senior developer in LibertyAces