HxHippy

Data Engineer Cheat Sheet

ETL pipelines, data processing, and analytics essentials.

Last updated: 2025-01-15

Data Engineer Cheat Sheet

ETL pipelines, data processing, and analytics quick reference.

Quick Reference

# Run dbt models
dbt run --select +orders

# Test data quality
dbt test

# Trigger Airflow DAG
airflow dags trigger daily_etl

# Spark submit
spark-submit --master yarn job.py

SQL for Data Engineering

Window Functions

-- Running total
SELECT date, revenue,
  SUM(revenue) OVER (ORDER BY date) as running_total
FROM daily_revenue;

-- Lag/Lead
SELECT date, revenue,
  LAG(revenue, 1) OVER (ORDER BY date) as prev_revenue,
  revenue - LAG(revenue, 1) OVER (ORDER BY date) as change
FROM daily_revenue;

-- Rank
SELECT product, revenue,
  RANK() OVER (ORDER BY revenue DESC) as rank,
  DENSE_RANK() OVER (ORDER BY revenue DESC) as dense_rank
FROM products;

-- Partition
SELECT department, employee, salary,
  AVG(salary) OVER (PARTITION BY department) as dept_avg
FROM employees;

CTEs and Subqueries

-- CTE
WITH daily_totals AS (
  SELECT date, SUM(amount) as total
  FROM orders
  GROUP BY date
)
SELECT date, total,
  AVG(total) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as moving_avg
FROM daily_totals;

-- Recursive CTE
WITH RECURSIVE hierarchy AS (
  SELECT id, name, manager_id, 1 as level
  FROM employees WHERE manager_id IS NULL
  UNION ALL
  SELECT e.id, e.name, e.manager_id, h.level + 1
  FROM employees e
  JOIN hierarchy h ON e.manager_id = h.id
)
SELECT * FROM hierarchy;

dbt

Commands

# Run models
dbt run
dbt run --select model_name
dbt run --select +model_name  # with upstream
dbt run --select model_name+  # with downstream

# Test
dbt test
dbt test --select model_name

# Documentation
dbt docs generate
dbt docs serve

# Debug
dbt debug
dbt compile --select model_name

Model Template

-- models/marts/orders.sql
{{ config(
  materialized='incremental',
  unique_key='order_id'
) }}

SELECT
  order_id,
  customer_id,
  order_date,
  total_amount
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}

Apache Spark

PySpark Basics

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg

spark = SparkSession.builder.appName("ETL").getOrCreate()

# Read data
df = spark.read.parquet("s3://bucket/data/")
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# Transformations
df_transformed = (df
  .filter(col("status") == "active")
  .groupBy("category")
  .agg(
    sum("amount").alias("total"),
    avg("amount").alias("average")
  )
)

# Write
df_transformed.write.mode("overwrite").parquet("output/")
df_transformed.write.partitionBy("date").parquet("partitioned/")

Spark SQL

# Register temp view
df.createOrReplaceTempView("orders")

# Query
result = spark.sql("""
  SELECT category, SUM(amount) as total
  FROM orders
  WHERE date >= '2024-01-01'
  GROUP BY category
""")

Airflow

DAG Template

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
  'owner': 'data-team',
  'retries': 3,
  'retry_delay': timedelta(minutes=5),
}

with DAG(
  'daily_etl',
  default_args=default_args,
  schedule_interval='0 6 * * *',
  start_date=datetime(2024, 1, 1),
  catchup=False,
) as dag:

  extract = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
  )

  transform = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
  )

  load = PythonOperator(
    task_id='load',
    python_callable=load_data,
  )

  extract >> transform >> load

CLI Commands

# Trigger DAG
airflow dags trigger daily_etl

# List DAGs
airflow dags list

# Task logs
airflow tasks logs daily_etl transform 2024-01-15

# Backfill
airflow dags backfill daily_etl -s 2024-01-01 -e 2024-01-31

Data Quality

Great Expectations

import great_expectations as gx

context = gx.get_context()

# Validate
result = context.run_checkpoint(checkpoint_name="my_checkpoint")

# Expectations
expect_column_values_to_not_be_null("id")
expect_column_values_to_be_between("age", 0, 120)
expect_column_values_to_be_unique("email")

Common Patterns

Pattern Description
Incremental Process only new/changed data
SCD Type 2 Track historical changes
Deduplication Handle duplicate records
Late arriving Reprocess for late data
Backfill Historical data processing
advanced Developer Roles Updated 2025-01-15
  • data engineering
  • etl
  • pipeline
  • spark
  • sql
  • dbt