BitSwan Training #1
The following article serves as an introduction for software engineers, who are getting familiar with the BitSwan data pump (“BSPump”), which is used for real-time stream analytic tasks. It briefly reflects the first lesson of the BitSwan training program that is focused on making experts in the area of real-time stream analysis and BitSwan.
The BSPump enables software developers to dynamically collect data from various sources such as MySQL databases, Apache Kafka, file system and many more, to further process, parse and enrich data with relevant information and finally store them in a desired output system (that can also be some form of a database or file system). In the case of output, the usually selected destination is a running ElasticSearch cluster, that, if properly configured, can store the data using a great speed limited only by connectivity, query them responsively and use visualization tools such as Kibana to create Business Intelligence dashboards, visualizations, timelines and so on.
In this lesson you will learn how to build a simple BSPump application with MySQL database as input and ElasticSearch as output. Let us assume we have a simple table in our MySQL database that is called users
, with thousands of records we need to filter, parse, enrich with relevant information, so we can store it in the schema-less ElasticSearch to quickly create graphs with users that have certain roles or flags, such as administrator users, blocked users, users based on activity etc.
Installing dependencies
When we have the MySQL database with the users
table running and know the credentials, we can start creating a simple BSPump application. Since we are going to write a Python code, it is necessary to have Python version 3.5+ installed first. Then we can install the BSPump via pip:
pip install git+https://github.com/LibertyAces/BitSwanPump
However, BSPump itself requires additional dependencies, such as asab, asyncio and aiohttp libraries for asynchronous programming and aiomysql for MySQL connection.
pip install asab asyncio aiohttp aiomysql
The more features of the BSPump you use, the more dependencies you need to resolve. If you use, let us say, Apache Kafka connection, you will also need to install aiokafka library. However, in our situation the already mentioned dependencies should be enough.
Creating custom pipeline
Now create a Python file and name it, for instance, bspump-training-1.py
. Include the following content, that we are going to extend and modify later:
import bspump
import bspump.common
import bspump.elasticsearch
import bspump.mysql
import bspump.trigger
class NameEnricher(bspump.Processor):
def process(self, context, event):
if "name" in event:
firstname_lastname = event["name"].split(" ")
event["firstname"] = firstname_lastname.get(0)
event["lastname"] = firstname_lastname.get(-1)
return event
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.mysql.MySQLSource(app, self, "MySQLConnection",
config={'query': 'SELECT Id, name FROM users;'}
).on(
bspump.trigger.PubSubTrigger(app, "run_me!")
),
NameEnricher(app, self),
bspump.common.NullSink(app, self)
)
The imports at the top specify modules we are going to use in our application. As you can see, the file now contains only two classes with no application object, which we are going to add later. The first class is a simple Processor object, which in the method called process
expects two parameters: event
and context
. The event
can be generally of any type, the processor itself should however define the type of the event
object, either explicitly via asserts or in the class description. As you can see in the code, the NameEnricher
class expects event
to be a Python dictionary. The process
method then checks for the name
key in the event
and if there is one, the method takes the value, splits it by spaces and stores first name and last name of users in separate fields. Since the processor adds new information to the event, we call it Enricher. There is also context
variable, which is shared across events and that can store information such as file name. In our case, every row in the MySQL table will be automatically parsed into a dictionary event, while in the context
we can store temporary variables such as ElasticSearch IDs, versions etc.
The second class is our custom Pipeline class. Pipelines are the top BSPump objects, that can be registered inside the application’s BSPump service to process load, process and store data. Every pipeline includes processors such as parsers and enrichers in an order that defines the processing steps. The first processors in the pipelines, that are connected to various sources of data such as databases or file systems, are called Source objects. There can be one or more of such objects in a pipeline. Sources are followed by processors ordered by processing of the event, i. e. parsers, enrichers, generators and so on. The final processor, which is connected to the data destination, are called Sink objects. Every pipeline has only one Sink objects, that is derived from the abstract class defined in BSPump. If there is a need to push data to various destinations, the internal routing mechanism of the BSPump can be used to push events to other pipeline(s). For more information about routing and the full description of processors, enrichers, parsers, generator etc., please follow the BitSwan documentation: https://docs.libertyaces.com/
See that in the pipeline there is now MySQLSource
object, which depends on another type of top BSPump objects, which are called Connections and which define the connectivity between BSPump application and the desired system, in this case the MySQL database. At the end of the processing, there is NullSink
object, which will simply throw all events away when they come to it. We can also use PPrintSink
instead of it to print processed events in the terminal.
Creating an application object
When we have the pipeline object prepared, we can create desired connections and the application object itself, where we are going to register all necessary top BSPump objects. Let us now also assume we have a running ElasticSearch node or cluster that we can connect to via URL. For more information about ElasticSearch and its setup, please refer to the official documentation: https://www.elastic.co/
Add the following snippet into your bspump-training-1.py
file:
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
# Create connection
svc.add_connection(
bspump.mysql.MySQLConnection(app, "MySQLConnection")
)
svc.add_connection(
bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")
)
# Construct and register Pipeline
pl = SamplePipeline(app, 'SamplePipeline')
svc.add_pipeline(pl)
# This is how pipeline is triggered:
app.PubSub.publish("run_me!", asynchronously=True)
app.run()
The code creates a BSPump application object and obtains the PumpService
, where all the top objects, i. e. pipelines, connections and lookups, may be registered. First, we will add and register connections to MySQL and ElasticSearch, which we will configure later. Then we also register our custom pipeline and inform it to run itself, which is done via publish-subscribe mechanism, using a message titled run_me!
. When the publish-subscribe object, which is in this case part of the application object, receives the message, it notifies all subscribers, in this case the PubSubTrigger
, which is connected to the Source (see the pipeline code above). Triggers are used to start certain source objects, either per message, once per some time etc. On the other hand, the message systems such as Apache Kafka do not require triggers to be used, since there is an internal notification mechanism, which will simply start the pipeline when new bulk of data appear in the system.
In our custom pipeline, we can replace the NullSink
with ElasticSearchSink
, so that all events are loaded there. Replace
bspump.common.NullSink(app, self)
… with …
bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
Adding configuration
That’s it when it comes to the code. We also need to add configuration, so the connections and other objects such as processors are properly pointed to desired systems or configured in some other way. Now create a file named site.conf
with the following lines, while replacing values with your own credentials:
[connection:MySQLConnection]
host=YOUR_HOST
user= YOUR_USER
password= YOUR_PASSWORD
db= YOUR_DB
[connection:ESConnection]
url=YOUR_ES_URL (such as http://127.0.0.1:9200/)
[pipeline:SamplePipeline:ElasticSearchSink]
index_prefix=test_
rollover_mechanism=size
The configuration of MySQL and ElasticSearch connections are obvious. We also configure the ElasticSearchSink
object in our SamplePipeline
. You can configure any processor in the pipeline based on the pipeline ID and processor ID (which is implicitly the same as its name, yet you can override it). The index_prefix
specifies which ElasticSearch index the data should be stored in. The name of the index is also constructed by the so-called rollover mechanism, which will add a number after the index prefixed based either on time, size etc. In this configuration we are using size-based rollover mechanism.
Running the application
Now we can finally run the code with the configuration linked to it:
python bspump-training-1.py -c site.conf
Congratulations! You have successfully implemented and ran your first BSPump application. You should see the data in ElasticSearch and create index patterns, visualizations and dashboards in Grafana. You can also create an index template to help Kibana with proper formatting. Please refer to the official Kibana website to learn how to set it up, connect to the ElasticSearch and create Kibana objects there: https://www.elastic.co/guide/en/kibana/
Adding other custom objects
If you want to further enhance the code, you can, for example, add a mechanism that will utilize internal ElasticSearch versioning system, which will make sure that within one index there is only one document with the specified ID.
First, we need to create a processor that will store the ID and version in the pipeline’s context. Add the following code to the bspump-training-1.py
file.
class IDVersionEnricher(Processor):
def __init__(self, app, pipeline, id_postfix='', id=None, config=None):
super().__init__(app, pipeline, id, config)
self.IdPostfix = id_postfix
def process(self, context, event):
Id = event.get("Id")
if Id is None:
return None
id_base = str(Id) + self.IdPostfix
context["_id"] = id_base
import time
context["_version"] = int(time.time())
return event
The IDVersionEnricher
enriches the context
variable with ID based on ID from the original MySQL table and version based on time, so only the latest record with the specific ID will be kept. It is further necessary to inform ElasticSearch to use the versioning system and the variables we stored in the pipeline’s context. We need to override the bspump.elasticsearch.ElasticSearchSink
class, or, more precisely, its process
method. Add the following code to the bspump-training-1.py
file:
class ElasticSearchSink(bspump.elasticsearch.ElasticSearchSink):
def process(self, context, event):
assert self._rollover_mechanism.Index is not None
_id = context.get("_id")
_version = context.get("_version")
data = '{{"index": {{ "_index": "{}", "_type": "{}", "_id": "{}",' \
'"version_type": "external", "_version": {} }}\n{}\n' \
.format(
self._rollover_mechanism.Index,
self._doctype,
_id,
_version,
json.dumps(event)
)
ret = self._connection.consume(data)
The code utilizes the ElasticSearch Bulk API to pass the insert query to ElasticSearch via connection object. Here in the code we take the ID and the version from the context
and pass them to ElasticSearch with the query itself.
The last thing before running the application is to modify our custom pipeline and add both processors there. The result should look like this:
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.mysql.MySQLSource(app, self, "MySQLConnection",
config={'query': 'SELECT Id, name FROM users;'}
).on(
bspump.trigger.PubSubTrigger(app, "run_me!")
),
NameEnricher(app, self),
IDVersionEnricher(app, self),
ElasticSearchSink(app, self, "ESConnection")
)
Now when you run the application twice on the same data, only the latest documents with specific IDs should be seen in Kibana.
I hope you liked the tutorial. Next time we will focus on routing events among more pipelines and creating Lookup objects, which help enrichers to find relevant information based on values in events. Such relevant information includes geo coordinates, identifications, labels etc. Happy coding!