Embulk multiple filtering
Embulk is an open-source bulk data loader that helps data transfer between various databases, storages, file formats, and cloud services. We have used embulk extensively in combination with airflow (but all alternative scheduling workflows software is fine like rundeck) to transfer various data sources to our enterprise datalake with great success.
Embulk has a bunch of advantages that make us select it for bulk data loading and specifically we liked:
- Parallel & distributed execution to deal with big data sets
- Transaction control to guarantee All-or-Nothing
Two really important positive aspects that we have not fully appreciated are the following:
- Rich, open-source ready to use plugins for nearly all use cases.
- Ability to create easily new plugins or extend the functionality of existing ones.
I think it’s hard to understand and appreciate the full potential of using embulk as a data loading tool since there are not plenty complicated examples that solve real-world problems. Even embulk has only one “recipe” as it calls it in the official documentation showcasing how to transfer csv data to Elastic Search.
Let’s examine a different example that will use a bunch of embulk plugins. So let’s move grahite csv stats from API to Parquet files that can be exposed as external redshift tables for our datalake.
The plugins that we will use are:
- input: http
- parser: csv
- filter: split_column
- filter: column
- output: parquet
Before starting defining the embulk yml file that will load the data we have first to see an example of a graphite response and the desired created table in Redshift:
We get the input data as csv data.
“summarize(stats.timers.system.metrics.LoginPage.ClientSide.AU.chrome.desktop.TTFB.count, “”4h””, “”*””)”,2020–05–10 08:00:00,162.0
“summarize(stats.timers.system.metrics.LoginPage.ClientSide.AU.chrome.desktop.TTFB.count, “”4h””, “”*””)”,2020–05–10 12:00:00,154.0
“summarize(stats.timers.system.metrics.RegistrationPage.ClientSide.GR.chrome.mobile.TTFB.count_75, “”4h””, “”*””)”,2020–05–10 08:00:00,11.708333333333336
“summarize(stats.timers.system.metrics.RegistrationPage.ClientSide.GR.chrome.mobile.TTFB.count_75, “”4h””, “”*””)”,2020–05–10 12:00:00,13.725
“summarize(stats.timers.system.metrics.SearchUI.ClientSide.US.firefox.desktop.domContentLoaded.count_90, “”4h””, “”*””)”,2020–05–10 08:00:00,11.833333333333336
“summarize(stats.timers.system.metrics.SearchUI.ClientSide.US.firefox.desktop.domContentLoaded.count_90, “”4h””, “”*””)”,2020–05–10 12:00:00,13.725
and we want to save in a target table like following:
Page, Country, browser, device, metric, stats, date, value
LoginPage, AU, chrome, desktop, TTFB, count, 2020–05–10 08:00:00, 162.0
LoginPage, AU, chrome, desktop, TTFB, count, 2020–05–10 12:00:00, 154.0
RegistrationPage, GR, chrome, mobile, TTFB, count_75, 2020–05–10 08:00:00, 11.708333333333336
RegistrationPage, GR, chrome, mobile, TTFB, count_75, 2020–05–10 12:00:00, 13.725
SearchUI, US, firefox, desktop, domContentLoaded,count_90, 2020–05–10 08:00:00, 11.833333333333336
SearchUI, US, firefox, desktop, domContentLoaded,count_90, 2020–05–10 12:00:00, 13.725
1st Version:
Let’s first consume the input csv files and create the parquet files with minimum effort:
in:
type: http
url: http://graphite.com/render/?from={{env.from_date}}&until={{env.to_date}}&target=summarize(stats.timers.system.metrics.*.ClientSide.*.*.*.*.*%2C%224h%22%2C%22avg%22)&format=csv
method: GET
params:
request_body:
request_interval: 0
retry_interval: 10000
basic_auth:
parser:
type: csv
stop_on_invalid_record: true
columns:
— {name: target, type: string}
— {name: stattime, type: string}
— {name: stats, type: double}out:
type: parquet
path_prefix: s3a://bucket/keys
overwrite: true
addUTF8: true
extra_configurations:
fs.s3a.access.key: ‘your_access_key’
fs.s3a.secret.key: ‘your_secret_access_key’
The above yml template will fetch and convert csv files to parquet format. If we check the output file we will see:
target, stattime, stats
“summarize(stats.timers.system.metrics.LoginPage.ClientSide.AU.chrome.desktop.TTFB.count, “”4h””, “”*””)”,2020–05–10 08:00:00,162.0
Let’s break the first column into smaller parts to get the inner details.
2nd Version
In the second version, we will use embulk split column filter to split target column using as delimiter: ‘.’
in:
type: http
url: http://graphite.com/render/?from={{env.from_date}}&until={{env.to_date}}&target=summarize(stats.timers.system.metrics.*.ClientSide.*.*.*.*.*%2C%224h%22%2C%22avg%22)&format=csv
method: GET
params:
request_body:
request_interval: 0
retry_interval: 10000
basic_auth:
parser:
type: csv
stop_on_invalid_record: true
columns:
- {name: target, type: string}
- {name: stattime, type: string}
- {name: stats, type: double}filters:
- type: split_column
delimiter: '.'
target_key: target
output_columns:
- {name: summarize, type: string}
- {name: timers, type: string}
- {name: system, type: string}
- {name: metrics, type: string}
- {name: page, type: string}
- {name: client, type: string}
- {name: country, type: string}
- {name: browser, type: string}
- {name: device, type: string}
- {name: metric, type: string}
- {name: stats_metric, type: string}out:
type: parquet
path_prefix: s3a://bucket/keys
overwrite: true
addUTF8: true
extra_configurations:
fs.s3a.access.key: 'your_access_key'
fs.s3a.secret.key: 'your_secret_access_key'
The result of this yml template will be:
summarize, timers, system, metrics, page, client, country, browser, device, stats_metric, stattime, stats
summarize(stats , timers, system, metrics, LoginPage, ClientSide, AU, chrome, desktop, TTFB, 'count, “”4h””, “”*””)', 2020–05–10 08:00:00,162.0
We need to do 2 kind of improvements in the schema above:
- Break ‘stats_metric’ to retrieve only the metric e.g. from ‘count, “”4h””, “”*””)’ we need to get only ‘count’
- Remove unneeded columns
3rd Version
In order to break stats_metric column we are going to use again split-column filter as following:
in:
type: http
url: http://graphite.com/render/?from={{env.from_date}}&until={{env.to_date}}&target=summarize(stats.timers.system.metrics.*.ClientSide.*.*.*.*.*%2C%224h%22%2C%22avg%22)&format=csv
method: GET
params:
request_body:
request_interval: 0
retry_interval: 10000
basic_auth:
parser:
type: csv
stop_on_invalid_record: true
columns:
- {name: target, type: string}
- {name: stattime, type: string}
- {name: stats, type: double}filters:
- type: split_column
delimiter: '.'
target_key: target
output_columns:
- {name: summarize, type: string}
- {name: timers, type: string}
- {name: system, type: string}
- {name: metrics, type: string}
- {name: page, type: string}
- {name: client, type: string}
- {name: country, type: string}
- {name: browser, type: string}
- {name: device, type: string}
- {name: metric, type: string}
- {name: stats_metric, type: string}- type: split_column
delimiter: ','
target_key: stats_metric
output_columns:
- {name: stat, type: string}
- {name: hours, type: string}
- {name: asterisk, type: string}
out:
type: parquet
path_prefix: s3a://bucket/keys
overwrite: true
addUTF8: true
extra_configurations:
fs.s3a.access.key: 'your_access_key'
fs.s3a.secret.key: 'your_secret_access_key'
The result of running the above template is:
summarize, timers, system, metrics, page, client, country, browser, device, stat, hours, asterisk, stattime, stats
summarize(stats , timers, system, metrics, LoginPage, ClientSide, AU, chrome, desktop, TTFB, count, “”4h””, “”*””), 2020–05–10 08:00:00,162.0
4th Version
Finally, let’s remove the undesired columns by using columns filter plugin that can easily add or remove columns.
in:
type: http
url: http://graphite.com/render/?from={{env.from_date}}&until={{env.to_date}}&target=summarize(stats.timers.system.metrics.*.ClientSide.*.*.*.*.*%2C%224h%22%2C%22avg%22)&format=csv
method: GET
params:
request_body:
request_interval: 0
retry_interval: 10000
basic_auth:
parser:
type: csv
stop_on_invalid_record: true
columns:
- {name: target, type: string}
- {name: stattime, type: string}
- {name: stats, type: double}filters:
- type: split_column
delimiter: '.'
target_key: target
output_columns:
- {name: summarize, type: string}
- {name: timers, type: string}
- {name: system, type: string}
- {name: metrics, type: string}
- {name: page, type: string}
- {name: client, type: string}
- {name: country, type: string}
- {name: browser, type: string}
- {name: device, type: string}
- {name: metric, type: string}
- {name: stats_metric, type: string}- type: split_column
delimiter: ','
target_key: stats_metric
output_columns:
- {name: stat, type: string}
- {name: hours, type: string}
- {name: asterisk, type: string}
- type: column
drop_columns:
- {name: summarize }
- {name: timers }
- {name: agate }
- {name: metrics }
- {name: client }
- {name: hours }
- {name: asterisk }
out:
type: parquet
path_prefix: s3a://bucket/keys
overwrite: true
addUTF8: true
extra_configurations:
fs.s3a.access.key: 'your_access_key'
fs.s3a.secret.key: 'your_secret_access_key'
The result of the above yml template is as below:
page, country, browser, device, stat, stattime, stats
LoginPage, AU, chrome, desktop, TTFB, count, 2020–05–10 08:00:00, 162.0
Conclusion
We saw a complete production-ready example (recipe) that loads data from the csv response of an API to S3 in parquet format (to be exposed as external redshift table)
What I really want to emphasize is the ability to pipe multiple filters and succeed in converting input data to the expected schema in a declarative and elegant way. This functionality is not stressed enough in the existing examples or documentation of embulk.
Bonus exercise: Convert schema created after Version 4 to the following schema
page, country, browser, device, stat, stat_timestamp, stats
LoginPage, AU, chrome, desktop, TTFB, count, 1589961600, 162.0
Tip: Check the following filters: timestamp filter and column filter