Building ETL Pipelines in BigQuery with Python

In the realm of data analytics, Extract, Transform, Load (ETL) processes play a pivotal role.
They streamline the integration of data from various sources, enabling its cleaning, manipulation, and loading into target systems like BigQuery, Google’s cloud-based data warehouse.
By leveraging Python’s versatility and BigQuery’s scalability, you can construct powerful ETL pipelines to prepare your data for insightful analysis.
Importance of ETL
ETL plays a crucial role in modern data management and analytics for several reasons:
- Data Consolidation
ETL helps gather data from multiple sources and consolidate it into a single, unified repository.
- Data Quality
By cleaning and transforming data, ETL ensures data accuracy, consistency, and reliability.
- Data Accessibility
ETL makes data accessible to business analysts, data scientists, and other stakeholders for analysis and decision-making.
- Data-Driven Insights
ETL enables organizations to extract valuable insights from their data and make informed decisions.
Key Challenges in ETL
While ETL is a powerful tool, it can be complex to implement and manage effectively. Some common challenges include:
- Data Quality Issues
Inconsistent data formats, missing values, and errors can hinder the ETL process.
- Scalability
As data volumes grow, ETL pipelines need to be scalable to handle increasing workloads.
- Data Integration Complexity
Integrating data from diverse sources with varying structures and formats can be challenging.
- Data Security and Privacy
Protecting sensitive data during the ETL process is essential.
Environment settings
# Import libraries
import numpy as np
import pandas as pd
import polars as pl
import gspread
import duckdb
import sqlalchemy as db
import pyodbc
from oauth2client.service_account import ServiceAccountCredentials
from google.oauth2 import service_account
from google.cloud import bigquery
import connectorx as cx
import warnings
warnings.filterwarnings('ignore')
scopes = ['https://www.googleapis.com/auth/spreadsheets',
'https://www.googleapis.com/auth/drive',
'https://www.googleapis.com/auth/analytics.readonly']
# Read google credentials
api = 'creds.json'
# connect to google sheets
gs_credentials = ServiceAccountCredentials.from_json_keyfile_name(api, scopes)
gc = gspread.authorize(gs_credentials)
# connect to big query
bq_credentials = service_account.Credentials.from_service_account_file(api)
project_id = 'my-repository'
client = bigquery.Client(credentials=bq_credentials,project=project_id)

Extract Phase
The extraction phase entails retrieving data from your source. This may involve interacting with:
- Flat files
- Databases
- XML files
- APIs
- Other
Flat files
users = (
pl.read_csv('../users.csv', dtypes={'phone': pl.Utf8,
'id_atg':pl.Utf8})
.with_columns(
pl.col('entry_data').str.strptime(pl.Datetime, strict=False)
)
)
users_profiling = (
# read csv file
pl.read_csv('../profiles.csv',
dtypes={'contact_phone': pl.Utf8,'post_code':pl.Utf8})
# change column dtypes
.with_columns(
pl.col('entry_data','entry_data_gep','update_date').str.strptime(
pl.Datetime,
strict=False),
pl.col('contact_phone').cast(pl.Utf8),
)
)
orders = (
pl.read_csv('../orders.csv').with_columns(
pl.col('entry_date','delivery_date').str.strptime(
pl.Datetime,
strict=False)
)
)
order_details = (
pl.read_csv('../order-details.csv')
)
promotions = (
pl.read_csv('../promotions.csv', dtypes={'short_description':pl.Utf8})
.with_columns(pl.col('key').cast(pl.Utf8))
).unique(subset='key')
order_status = pl.read_csv('../order-status.csv')
social_networks = (
pl.read_csv('../social_networks.csv').select(
pl.col('id_social_network','social_network','description')
)
)
Parquet files
types = pl.read_parquet('../types.parquet')
genre = pl.read_parquet('..genre.parquet')
json files
warehouses = pl.read_json('.warehuse_catalog.json')
Databases
# Connection to MS Access
conn = pyodbc.connect(r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};
'r'DBQ=C:\Users\user\folder\file.accdb;')
# Create cursor
cursor = conn.cursor()
# Write query
query = 'SELECT * FROM sales'
# Convert to pandas dataframe
df = pd.read_sql(query, con=conn)
df.head()
Transform Phase
Cleanse, validate, and manipulate the extracted data based on your analysis requirements. This might include:
- Data Cleaning
Handle missing values, inconsistent formatting, or errors.
- Data Type Conversion
Ensure consistent data types for columns based on their intended use in BigQuery.
- Filtering/Aggregation
Select or aggregate specific data subsets for targeted analysis.
- Enrichment
Merge extracted data with additional sources to enhance its value.
# join types, orders, order_details, promotions, warehouses
sheet = (
types.join(orders, on='id_type', how='left')
.join(order_details, on='id_order', how='left')
.join(promotions, on='key', how='left')
.join(warehouses, on='id_warehouse', how='left')
).rename({'id_warehouse':'id_warehouse_promo', 'active':'promo_active'})
Load Phase
There are two primary options for loading data:
- Staging Table
Create a staging table and load the transformed data into it for temporary storage before validating and potentially modifying it:
- Direct Load
Load the data directly into your target table, bypassing the staging step. However, this approach can be less flexible for complex transformations:
# create dataset
client.create_dataset('database')
# convert to pandas
sheet = sheet.to_pandas()
# upload to big query
sheet.to_gbq('dw-538.transformation.catalog',
project_id='repository-538',
if_exists='replace',
credentials=bq_credentials)
Execute queries from Big Query
# create sql query
query = '''
SELECT *
FROM `dw.transformation.catalog`
'''
# convert query to pandas dataframe
catalog = pd.read_gbq(query, credentials=bq_credentials)
Conclusions
ETL is a fundamental process for organizations that want to leverage their data to drive business growth and innovation. By understanding the core principles and best practices of ETL, you can build efficient and reliable data pipelines that deliver valuable insights.
We’ve delved into the core concepts of ETL, including data extraction from diverse sources, data transformation for cleaning, validation, and enrichment, and data loading into target systems like data warehouses or data lakes using Python.
0 Comments