Ad Code

Data Science labs blog

Building ETL Pipelines in BigQuery with Python

Building ETL Pipelines in BigQuery with Python

In the realm of data analytics, Extract, Transform, Load (ETL) processes play a pivotal role.
They streamline the integration of data from various sources, enabling its cleaning, manipulation, and loading into target systems like BigQuery, Google’s cloud-based data warehouse.
By leveraging Python’s versatility and BigQuery’s scalability, you can construct powerful ETL pipelines to prepare your data for insightful analysis.

Read my web post

Importance of ETL

ETL plays a crucial role in modern data management and analytics for several reasons:

  • Data Consolidation

ETL helps gather data from multiple sources and consolidate it into a single, unified repository.

  • Data Quality

By cleaning and transforming data, ETL ensures data accuracy, consistency, and reliability.

  • Data Accessibility

ETL makes data accessible to business analysts, data scientists, and other stakeholders for analysis and decision-making.

  • Data-Driven Insights

ETL enables organizations to extract valuable insights from their data and make informed decisions.

Key Challenges in ETL

While ETL is a powerful tool, it can be complex to implement and manage effectively. Some common challenges include:

  • Data Quality Issues

Inconsistent data formats, missing values, and errors can hinder the ETL process.

  • Scalability

As data volumes grow, ETL pipelines need to be scalable to handle increasing workloads.

  • Data Integration Complexity

Integrating data from diverse sources with varying structures and formats can be challenging.

  • Data Security and Privacy

Protecting sensitive data during the ETL process is essential.

Environment settings

# Import libraries
import numpy as np
import pandas as pd
import polars as pl
import gspread
import duckdb
import sqlalchemy as db
import pyodbc
from oauth2client.service_account import ServiceAccountCredentials
from google.oauth2 import service_account
from google.cloud import bigquery
import connectorx as cx
import warnings
warnings.filterwarnings('ignore')
scopes = ['https://www.googleapis.com/auth/spreadsheets',
'https://www.googleapis.com/auth/drive',
'https://www.googleapis.com/auth/analytics.readonly']

# Read google credentials
api = 'creds.json'

# connect to google sheets
gs_credentials = ServiceAccountCredentials.from_json_keyfile_name(api, scopes)
gc = gspread.authorize(gs_credentials)

# connect to big query
bq_credentials = service_account.Credentials.from_service_account_file(api)
project_id = 'my-repository'
client = bigquery.Client(credentials=bq_credentials,project=project_id)

Extract Phase

The extraction phase entails retrieving data from your source. This may involve interacting with:

  • Flat files
  • Databases
  • XML files
  • APIs
  • Other

Flat files

users = (
pl.read_csv('../users.csv', dtypes={'phone': pl.Utf8,
'id_atg':pl.Utf8})
.with_columns(
pl.col('entry_data').str.strptime(pl.Datetime, strict=False)
)
)
users_profiling = (
# read csv file
pl.read_csv('../profiles.csv',
dtypes={'contact_phone': pl.Utf8,'post_code':pl.Utf8})
# change column dtypes
.with_columns(
pl.col('entry_data','entry_data_gep','update_date').str.strptime(
pl.Datetime,
strict=False),
pl.col('contact_phone').cast(pl.Utf8),
)
)
orders = (
pl.read_csv('../orders.csv').with_columns(
pl.col('entry_date','delivery_date').str.strptime(
pl.Datetime,
strict=False)
)
)
order_details = (
pl.read_csv('../order-details.csv')
)
promotions = (
pl.read_csv('../promotions.csv', dtypes={'short_description':pl.Utf8})
.with_columns(pl.col('key').cast(pl.Utf8))
).unique(subset='key')
order_status = pl.read_csv('../order-status.csv')
social_networks = (
pl.read_csv('../social_networks.csv').select(
pl.col('id_social_network','social_network','description')
)
)

Parquet files

types = pl.read_parquet('../types.parquet')
genre = pl.read_parquet('..genre.parquet')

json files

warehouses = pl.read_json('.warehuse_catalog.json')

Databases

# Connection to MS Access
conn = pyodbc.connect(r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};
'
r'DBQ=C:\Users\user\folder\file.accdb;')
# Create cursor
cursor = conn.cursor()
# Write query
query = 'SELECT * FROM sales'
# Convert to pandas dataframe
df = pd.read_sql(query, con=conn)
df.head()

Transform Phase

Cleanse, validate, and manipulate the extracted data based on your analysis requirements. This might include:

  • Data Cleaning

Handle missing values, inconsistent formatting, or errors.

  • Data Type Conversion

Ensure consistent data types for columns based on their intended use in BigQuery.

  • Filtering/Aggregation

Select or aggregate specific data subsets for targeted analysis.

  • Enrichment

Merge extracted data with additional sources to enhance its value.

# join types, orders, order_details, promotions, warehouses
sheet = (
types.join(orders, on='id_type', how='left')
.join(order_details, on='id_order', how='left')
.join(promotions, on='key', how='left')
.join(warehouses, on='id_warehouse', how='left')
).rename({'id_warehouse':'id_warehouse_promo', 'active':'promo_active'})

Load Phase

There are two primary options for loading data:

  • Staging Table

Create a staging table and load the transformed data into it for temporary storage before validating and potentially modifying it:

  • Direct Load

Load the data directly into your target table, bypassing the staging step. However, this approach can be less flexible for complex transformations:

# create dataset
client.create_dataset('database')
# convert to pandas
sheet = sheet.to_pandas()
# upload to big query
sheet.to_gbq('dw-538.transformation.catalog',
project_id='repository-538',
if_exists='replace',
credentials=bq_credentials)

Execute queries from Big Query

# create sql query
query = '''
SELECT *
FROM `dw.transformation.catalog`
'''

# convert query to pandas dataframe
catalog = pd.read_gbq(query, credentials=bq_credentials)

Conclusions

ETL is a fundamental process for organizations that want to leverage their data to drive business growth and innovation. By understanding the core principles and best practices of ETL, you can build efficient and reliable data pipelines that deliver valuable insights.

We’ve delved into the core concepts of ETL, including data extraction from diverse sources, data transformation for cleaning, validation, and enrichment, and data loading into target systems like data warehouses or data lakes using Python.


Reactions

Post a Comment

0 Comments