Data Engineer Cheat Sheet
ETL pipelines, data processing, and analytics quick reference.
Quick Reference
# Run dbt models
dbt run --select +orders
# Test data quality
dbt test
# Trigger Airflow DAG
airflow dags trigger daily_etl
# Spark submit
spark-submit --master yarn job.pySQL for Data Engineering
Window Functions
-- Running total
SELECT date, revenue,
SUM(revenue) OVER (ORDER BY date) as running_total
FROM daily_revenue;
-- Lag/Lead
SELECT date, revenue,
LAG(revenue, 1) OVER (ORDER BY date) as prev_revenue,
revenue - LAG(revenue, 1) OVER (ORDER BY date) as change
FROM daily_revenue;
-- Rank
SELECT product, revenue,
RANK() OVER (ORDER BY revenue DESC) as rank,
DENSE_RANK() OVER (ORDER BY revenue DESC) as dense_rank
FROM products;
-- Partition
SELECT department, employee, salary,
AVG(salary) OVER (PARTITION BY department) as dept_avg
FROM employees;CTEs and Subqueries
-- CTE
WITH daily_totals AS (
SELECT date, SUM(amount) as total
FROM orders
GROUP BY date
)
SELECT date, total,
AVG(total) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as moving_avg
FROM daily_totals;
-- Recursive CTE
WITH RECURSIVE hierarchy AS (
SELECT id, name, manager_id, 1 as level
FROM employees WHERE manager_id IS NULL
UNION ALL
SELECT e.id, e.name, e.manager_id, h.level + 1
FROM employees e
JOIN hierarchy h ON e.manager_id = h.id
)
SELECT * FROM hierarchy;dbt
Commands
# Run models
dbt run
dbt run --select model_name
dbt run --select +model_name # with upstream
dbt run --select model_name+ # with downstream
# Test
dbt test
dbt test --select model_name
# Documentation
dbt docs generate
dbt docs serve
# Debug
dbt debug
dbt compile --select model_nameModel Template
-- models/marts/orders.sql
{{ config(
materialized='incremental',
unique_key='order_id'
) }}
SELECT
order_id,
customer_id,
order_date,
total_amount
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}Apache Spark
PySpark Basics
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg
spark = SparkSession.builder.appName("ETL").getOrCreate()
# Read data
df = spark.read.parquet("s3://bucket/data/")
df = spark.read.csv("data.csv", header=True, inferSchema=True)
# Transformations
df_transformed = (df
.filter(col("status") == "active")
.groupBy("category")
.agg(
sum("amount").alias("total"),
avg("amount").alias("average")
)
)
# Write
df_transformed.write.mode("overwrite").parquet("output/")
df_transformed.write.partitionBy("date").parquet("partitioned/")Spark SQL
# Register temp view
df.createOrReplaceTempView("orders")
# Query
result = spark.sql("""
SELECT category, SUM(amount) as total
FROM orders
WHERE date >= '2024-01-01'
GROUP BY category
""")Airflow
DAG Template
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_etl',
default_args=default_args,
schedule_interval='0 6 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
)
load = PythonOperator(
task_id='load',
python_callable=load_data,
)
extract >> transform >> loadCLI Commands
# Trigger DAG
airflow dags trigger daily_etl
# List DAGs
airflow dags list
# Task logs
airflow tasks logs daily_etl transform 2024-01-15
# Backfill
airflow dags backfill daily_etl -s 2024-01-01 -e 2024-01-31Data Quality
Great Expectations
import great_expectations as gx
context = gx.get_context()
# Validate
result = context.run_checkpoint(checkpoint_name="my_checkpoint")
# Expectations
expect_column_values_to_not_be_null("id")
expect_column_values_to_be_between("age", 0, 120)
expect_column_values_to_be_unique("email")Common Patterns
| Pattern | Description |
|---|---|
| Incremental | Process only new/changed data |
| SCD Type 2 | Track historical changes |
| Deduplication | Handle duplicate records |
| Late arriving | Reprocess for late data |
| Backfill | Historical data processing |
- data engineering
- etl
- pipeline
- spark
- sql
- dbt