Have you ever wondered how to level up your data processing game? If you're transitioning from ad-hoc analytics and researching options, this might be a good starting point.
This project has two main modules:
localwhich shows how to setup a simple data processing pipeline using Luigi, Python, Pandas and Postgres in no time. Though simple, this approach can get you pretty far.cloudwhich illustrates how you can easily swap out:- local storage in favour of durable and distributed Google Cloud Storage,
- local processing power in favour of scalable Google Dataflow,
- local PostgreSQL database that you need to manage in favour of BigQuery which has a familiar SQL interface, but can process TBs of data without breaking a sweat and integrates nicely with GSuite accounts.
There is another module sampledata which is used to generate sample data.
To make it a bit more interesting imagine that the data is from a car renting company called DailyCar.
Specifically, we have the following information (under sampledata/generated/):
users.csvhas information about registered clients of DailyCar.cars.csvhas information about its car park.rents.csvcontains a list of rents, specifically, who and when rented what car.fines.csvis pulled from police database, and help us see all the fines (like speed limit) that are related to company's cars.
Business would like to enrich information about fines, so it's able to understand who was driving a specific car at a particular point in time. More formally, we need to generate a table with the following fields (transposed):
| column | data |
|---|---|
| fine_id | 1 |
| fine_amount | 15 |
| fine_registered_at | 2017-10-01 21:36:00 |
| rent_id | 1 |
| rented_on | 2017-10-01 |
| car_id | 3 |
| car_reg_number | ks2888 |
| car_make | bmw |
| car_model | series_2 |
| user_id | 3 |
| user_name | Dumitru Matei |
| user_passport_no | 482850738 |
| user_birth_date | 1966-06-22 |
| user_driving_permit_since | 1991-10-18 |
We'll demonstrate how to build an ETL pipline around this problem under local and cloud modules.
Also, feel free to tune parameters in sampledata/generate.py to get more or less data to work with.
First, make sure you have python 2.7.
Then, inside project's root folder execute the following commands to install required packages:
$ pip install pipenv
$ pipenv install --skip-lockFor the local part you need to install PostgreSQL and create a database and a user, like this:
> psql postgres
=# create role dwh login password 'dwh';
=# create database data_zero_to_cloud owner dwh;For the cloud part you need to obtain Google Cloud Service credentials and put them under config/credentials.json.
Don't forget to update config/config.ini accordingly.
To run an ETL task use the following command:
$ ./run-luigi.py --local-scheduler --module=MODULE_NAME TASK_NAME --on=DATEReplace TASK_NAME with the name of a defined task, like ProcessFines.
DATE parameter can take any value (for our purposes it doesn't matter much what value), for instance 2017-11-16.
MODULE_NAME can be either local or cloud.
For example:
$ ./run-luigi.py --local-scheduler --module=cloud ProcessFines --on=2017-11-16If you want to go really wild, change runner parameter in config.ini to DataflowRunner and unleash the full power of the cloud, as it will run Apache Beam tasks using Google Dataflow.
After you run a cloud ETL, you may want to see the result.
If you have a Google Cloud account and your own credentials, feel free to go to the web console.
Otherwise, obtain workshop host's credentials and use a ./shell.py script to load an iPython session with some predefined functions, such as gls and gcat.
An example usage is below:
In [5]: gls('2017-11-15')
Out[5]:
[<Blob: warehouse-in-gcs-store, 2017-11-15/cars.csv>,
<Blob: warehouse-in-gcs-store, 2017-11-15/fines.csv>,
<Blob: warehouse-in-gcs-store, 2017-11-15/rents.csv>,
<Blob: warehouse-in-gcs-store, 2017-11-15/rich_fines/_SUCCESS>,
<Blob: warehouse-in-gcs-store, 2017-11-15/rich_fines/data.csv-00000-of-00001>,
<Blob: warehouse-in-gcs-store, 2017-11-15/users.csv>]
In [6]: gcat('2017-11-15/cars.csv')
id,make,model,reg_number
1,nissan,murano,ko2116
2,hyundai,solaris,ct8988
3,bmw,series_2,ks2888
In [7]: gcat('2017-11-15/rich_fines/data.csv-00000-of-00001')
fine_id,fine_amount,fine_registered_at,rent_id,rented_on,car_id,car_reg_number,car_make,car_model,user_id,user_name,user_passport_no,user_birth_date,user_driving_permit_since
8,1,2017-10-03 09:09:00,7,2017-10-03,1,ko2116,nissan,murano,1,Cristina Ciobanu,547345952,1988-02-17,1991-02-27
...Practice makes perfect, so if you'd like to go a little bit deeper, here are some ideas to try:
-
Task
local.LoadRichFineswill not replace contents of the table, which may not be desirable especially if you run your ETL several times a day. Try to implement a task that inherits fromluigi.contrib.postgres.CopyToTable, and disregards whether it was run before or not. -
Similarly,
cloud.LoadRichFineswont't replace a table in BigQuery. Try to fix this. -
There's a bit of a boilerplate in
cloud.ProcessFineswithMaps andCoGroupBys. Try to implement a customJointransform that does SQL-style join on twoPCollections. Example usage is:((rich_rents, fines) | Join( left_on=lambda x: (x['car_reg_number'], x['rented_on']), right_on=lambda x: (x['car_reg_number'], x['registered_on'])))