Data Lineage Guide
Baselinr provides comprehensive data lineage tracking to help you understand data dependencies and perform root cause analysis when drift is detected.
Table of Contents
- Overview
- Quick Start
- Lineage Providers
- Configuration
- CLI Usage
- Python SDK Usage
- Use Cases
- Best Practices
Overview
Data lineage in Baselinr tracks relationships between tables, enabling you to:
- Root Cause Analysis: When drift is detected, traverse upstream to find the source of changes
- Impact Analysis: Understand which downstream tables are affected by changes to upstream sources
- Intelligent Alert Grouping: Group related drift events by their lineage relationships
- LLM Context: Provide lineage information to LLMs for better explanations and recommendations
Baselinr uses a provider-based architecture that supports multiple lineage sources:
- dbt Manifest: Extract lineage from dbt
manifest.jsonfiles - SQL Parser: Parse SQL queries and view definitions using SQLGlot
- Query History: Extract lineage from warehouse query execution history (Snowflake, BigQuery, PostgreSQL, Redshift, MySQL)
- Dagster: Extract lineage from Dagster assets using metadata database, code scanning, or GraphQL API
Quick Start
Enable Lineage Extraction
Add lineage configuration to your config.yml:
profiling:
extract_lineage: true
lineage:
enabled: true
providers:
- name: dbt_manifest
enabled: true
config:
manifest_path: "target/manifest.json" # Optional, auto-detects if not specified
- name: sql_parser
enabled: true
Extract Lineage During Profiling
Lineage is automatically extracted when you profile tables:
baselinr profile --config config.yml
Query Lineage
# Get upstream dependencies
baselinr lineage upstream --config config.yml --table customers
# Get downstream dependencies
baselinr lineage downstream --config config.yml --table raw_events
# Find path between two tables
baselinr lineage path --config config.yml --from raw.events --to analytics.revenue
# List available providers
baselinr lineage providers --config config.yml
# Sync lineage from query history (bulk operation)
baselinr lineage sync --config config.yml --provider postgres_query_history
# Sync all query history providers
baselinr lineage sync --config config.yml --all
# Clean up stale lineage edges
baselinr lineage cleanup --config config.yml --provider postgres_query_history
Lineage Providers
dbt Manifest Provider
The dbt provider extracts lineage from dbt's manifest.json file, which contains complete dependency information for all models and sources.
Configuration:
lineage:
providers:
- name: dbt_manifest
enabled: true
config:
manifest_path: "target/manifest.json" # Optional
project_path: "." # Optional, for auto-detection
How it works:
- Reads dbt
manifest.jsonafterdbt compileordbt run - Extracts
ref()andsource()dependencies - Maps dbt models to database tables using your dbt project configuration
- Provides high-confidence lineage (confidence_score: 1.0)
Requirements:
- dbt project must be compiled (
dbt compileordbt run) manifest.jsonmust be accessible- Install with:
pip install baselinr[dbt]
SQL Parser Provider
The SQL parser provider extracts table references from SQL queries using SQLGlot.
Configuration:
lineage:
providers:
- name: sql_parser
enabled: true
How it works:
- Parses SQL queries to extract table references
- Currently supports explicit SQL parsing (future: view definitions, query history)
- Provides medium-confidence lineage (confidence_score: 0.8)
Requirements:
- SQLGlot library (included in baselinr dependencies)
Limitations:
- Requires SQL to be provided explicitly
- View definition parsing requires database access
- Complex SQL with dynamic table names may not be fully parsed
Query History Providers
Query history providers extract lineage from actual query execution history in your warehouse. This captures real-world data dependencies based on queries that have been executed, complementing dbt and SQL parsing providers.
Supported Warehouses:
- PostgreSQL: Uses
pg_stat_statementsextension - Snowflake: Uses
SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY - BigQuery: Uses
INFORMATION_SCHEMA.JOBS_BY_PROJECT - Redshift: Uses
STL_QUERYandSTL_SCANtables - MySQL: Uses
performance_schema.events_statements_history_long
Configuration:
lineage:
enabled: true
providers: [dbt, sql_parser, postgres_query_history]
query_history:
enabled: true
incremental: true # Enable incremental updates during profiling
lookback_days: 30 # Days of history for bulk sync
min_query_count: 2 # Minimum queries to establish relationship
exclude_patterns:
- ".*INFORMATION_SCHEMA.*"
- ".*SHOW.*"
edge_expiration_days: 90 # Auto-cleanup edges not seen for 90+ days (None = never)
warn_stale_days: 90 # Warn about edges not seen for 90+ days
postgres:
require_extension: true # Fail if pg_stat_statements not installed
snowflake:
use_account_usage: true
bigquery:
region: "us"
How it works:
- Initial Bulk Sync: Run
baselinr lineage syncto populate lineage from query history (last 30 days by default) - Incremental Updates: During profiling, query history providers automatically extract new lineage from queries executed since the last sync
- Staleness Detection: System warns about edges not seen in query history for extended periods
- Cleanup: Optionally remove stale edges that haven't been observed recently
Workflow:
# Initial setup: Bulk sync lineage from query history
baselinr lineage sync --provider postgres_query_history
# Regular profiling: Automatically updates lineage incrementally
baselinr profile --config config.yml
# Periodic refresh: Re-sync to catch any missed queries
baselinr lineage sync --provider postgres_query_history
# Cleanup stale edges
baselinr lineage cleanup --provider postgres_query_history
Requirements:
- PostgreSQL: Requires
CREATE EXTENSION pg_stat_statements;(run by DBA) - Snowflake: Requires
ACCOUNTADMINrole or access toSNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY - BigQuery: Requires
bigquery.jobs.listAllpermission - Redshift: Requires appropriate IAM permissions for
STL_QUERYaccess - MySQL: Requires
performance_schema = ONin MySQL configuration
Benefits:
- Captures real-world dependencies from actual query execution
- High confidence (0.95) since based on executed queries
- Automatic incremental updates during profiling
- Complements dbt/SQL parsing by capturing ad-hoc queries
Limitations:
- Query history may have latency (Snowflake ACCESS_HISTORY can lag by hours)
- Data retention varies by warehouse (Redshift: 2-5 days, others: longer)
- Requires appropriate permissions/extensions
- PostgreSQL
pg_stat_statementsdoesn't track individual query timestamps (aggregate stats only)
Dagster Provider
The Dagster provider extracts lineage from Dagster assets using multiple data sources with intelligent fallback logic. It supports both table-level and column-level lineage extraction.
Data Sources (in priority order):
- Metadata Database: Query Dagster's metadata database (PostgreSQL/SQLite) for asset dependencies and materialization events
- Code Scanning: Scan Python files for
@assetdecorators and extract dependencies from AST - GraphQL API: Query Dagster's GraphQL API for asset lineage information
Configuration:
lineage:
enabled: true
extract_column_lineage: true
providers: [dbt, sql_parser, dagster]
dagster:
# Option 1: Metadata database (recommended for production)
metadata_db_url: "postgresql://user:pass@localhost:5432/dagster"
auto_detect_metadata_db: true # Auto-detect from DAGSTER_POSTGRES_URL env var
# Option 2: Code scanning (works without Dagster running)
code_locations:
- "dagster_definitions/"
- "src/assets/"
# Option 3: GraphQL API (requires Dagster UI running)
graphql_url: "http://localhost:3000/graphql"
# Optional: Explicit asset-to-table mapping
asset_table_mapping:
"schema::table_name": ["schema", "table_name"]
"my_asset": ["public", "my_table"]
How it works:
- Metadata Database: Queries
event_log_entriestable for asset dependencies and materialization events. Extracts column-level lineage fromMaterializeResultmetadata. - Code Scanning: Parses Python files to find
@assetdecorators, extractsdepsparameters, and maps assets to tables using naming conventions or metadata. - GraphQL API: Queries Dagster's GraphQL endpoint for asset dependencies and metadata.
Asset-to-Table Mapping:
The provider uses multiple strategies to map Dagster assets to database tables:
- Explicit Mapping: Use
asset_table_mappingin config for precise control - Metadata-based: Check asset metadata for
table,schema,databasekeys - Naming Convention: Parse AssetKey segments (e.g.,
AssetKey(["schema", "table"])→schema.table) - Fallback: If no mapping found, skip lineage extraction for that asset
Column-Level Lineage:
Dagster supports column-level lineage via MaterializeResult metadata:
from dagster import asset, MaterializeResult, TableColumnLineage, TableColumnDep, AssetKey
@asset
def my_asset():
yield MaterializeResult(
metadata={
"dagster/column_lineage": TableColumnLineage(
deps_by_column={
"output_col": [
TableColumnDep(
asset_key=AssetKey("source"),
column_name="source_col"
)
]
}
)
}
)
The provider extracts this metadata from:
- Latest materialization events in metadata DB
- Code analysis (if MaterializeResult is returned)
- GraphQL API asset metadata
Requirements:
- Install with:
pip install baselinr[dagster] - For metadata DB: Access to Dagster's metadata database
- For code scanning: Read access to Python files with asset definitions
- For GraphQL API: Dagster UI must be running and accessible
Benefits:
- Captures lineage from Dagster's first-class asset model
- Supports multiple data sources with automatic fallback
- High confidence (1.0) for metadata DB and GraphQL sources
- Medium confidence (0.9) for code scanning
- Supports column-level lineage when available
Limitations:
- Code scanning may miss dynamic dependencies
- GraphQL API requires Dagster UI to be running
- Metadata DB requires appropriate database permissions
- Asset-to-table mapping may need manual configuration for complex setups
Configuration
Full Configuration Example
profiling:
extract_lineage: true # Enable lineage extraction during profiling
lineage:
enabled: true
providers: [dbt, sql_parser, postgres_query_history]
# dbt provider configuration
dbt:
manifest_path: "target/manifest.json"
# Query history provider configuration
query_history:
enabled: true
incremental: true # Enable incremental updates during profiling
lookback_days: 30 # Days of history for bulk sync
min_query_count: 2 # Minimum queries to establish relationship
exclude_patterns:
- ".*INFORMATION_SCHEMA.*"
- ".*SHOW.*"
- ".*pg_stat.*"
edge_expiration_days: 90 # Auto-cleanup edges not seen for 90+ days (None = never)
warn_stale_days: 90 # Warn about edges not seen for 90+ days
# Warehouse-specific configs
postgres:
require_extension: true # Fail if pg_stat_statements not installed
snowflake:
use_account_usage: true
bigquery:
region: "us"
Provider-Specific Configuration
Each provider can have its own configuration:
lineage:
providers:
- name: dbt_manifest
enabled: true
config:
manifest_path: "/path/to/manifest.json"
project_path: "/path/to/dbt/project"
- name: sql_parser
enabled: true
config:
# Future: SQL parser-specific options
CLI Usage
Upstream Dependencies
Get all upstream tables that feed into a table:
# Basic usage
baselinr lineage upstream --config config.yml --table customers
# With schema
baselinr lineage upstream --config config.yml --table customers --schema public
# Limit depth
baselinr lineage upstream --config config.yml --table analytics.revenue --max-depth 2
# JSON output
baselinr lineage upstream --config config.yml --table customers --format json
# Save to file
baselinr lineage upstream --config config.yml --table customers --output upstream.json
Output Example:
Upstream Lineage: customers
┏━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Schema ┃ Table ┃ Depth ┃ Provider ┃ Type ┃ Confidence ┃
┡━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ raw │ events │ 1 │ dbt_manifest│ dbt_ref │ 1.00 │
│ raw │ users │ 1 │ dbt_manifest│ dbt_ref │ 1.00 │
│ staging │ events_enriched │ 2 │ dbt_manifest│ dbt_ref │ 1.00 │
└────────────┴─────────────────────┴───────┴────────────┴─────────────────────┴──────────────┘
Downstream Dependencies
Get all downstream tables that depend on a table:
baselinr lineage downstream --config config.yml --table raw.events
# With max depth
baselinr lineage downstream --config config.yml --table raw.events --max-depth 3
Find Path Between Tables
Find if there's a dependency path between two tables:
baselinr lineage path \
--config config.yml \
--from raw.events \
--to analytics.revenue
# With max depth
baselinr lineage path \
--config config.yml \
--from raw.events \
--to analytics.revenue \
--max-depth 5
Output Example:
Lineage Path: raw.events → analytics.revenue
┏━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━┓
┃ Step ┃ Schema ┃ Table ┃
┡━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━┩
│ 1 │ raw │ events │
│ 2 │ staging │ events_enriched │
│ 3 │ analytics │ revenue │
└───────┴────────────┴────────────────────┘
List Providers
Check which lineage providers are available:
baselinr lineage providers --config config.yml
Output Example:
Lineage Providers
┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━┓
┃ Provider ┃ Status ┃
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━┩
│ dbt_manifest │ Available │
│ sql_parser │ Available │
│ postgres_query_history │ Available │
└────────────────┴────────────────────┘
Sync Query History Lineage
Sync lineage from warehouse query history (bulk operation):
# Sync specific provider
baselinr lineage sync --config config.yml --provider postgres_query_history
# Sync all query history providers
baselinr lineage sync --config config.yml --all
# Override lookback days
baselinr lineage sync --config config.yml --provider snowflake_query_history --lookback-days 60
# Dry run (show what would be extracted)
baselinr lineage sync --config config.yml --provider postgres_query_history --dry-run
# Force full resync (ignore last sync timestamp)
baselinr lineage sync --config config.yml --provider postgres_query_history --force
Output Example:
Syncing lineage from postgres_query_history...
Extracted 142 lineage edges
Sync complete: 142 edges extracted
Cleanup Stale Lineage Edges
Remove lineage edges that haven't been seen in query history for extended periods:
# Clean up stale edges (uses edge_expiration_days from config)
baselinr lineage cleanup --config config.yml --provider postgres_query_history
# Override expiration days
baselinr lineage cleanup --config config.yml --provider postgres_query_history --expiration-days 60
# Dry run (show what would be deleted)
baselinr lineage cleanup --config config.yml --provider postgres_query_history --dry-run
Output Example:
Cleaned up 23 stale edges
Python SDK Usage
Get Upstream Lineage
from baselinr import BaselinrClient
client = BaselinrClient(config_path="config.yml")
# Get upstream dependencies
upstream = client.get_upstream_lineage(
table="customers",
schema="public",
max_depth=3
)
for dep in upstream:
print(f"{dep['schema']}.{dep['table']} (depth: {dep['depth']})")
print(f" Provider: {dep['provider']}")
print(f" Confidence: {dep['confidence_score']}")
Get Downstream Lineage
# Get downstream dependencies
downstream = client.get_downstream_lineage(
table="raw_events",
schema="raw",
max_depth=2
)
for dep in downstream:
print(f"{dep['schema']}.{dep['table']} depends on raw_events")
Find Lineage Path
# Find path between two tables
path = client.get_lineage_path(
from_table=("raw", "events"),
to_table=("analytics", "revenue"),
max_depth=5
)
if path:
print("Path found:")
for step in path:
print(f" {step['schema']}.{step['table']}")
else:
print("No path found")
Get All Lineage
# Get all lineage edges
all_lineage = client.get_all_lineage()
for edge in all_lineage:
print(f"{edge['upstream_schema']}.{edge['upstream_table']} → "
f"{edge['downstream_schema']}.{edge['downstream_table']}")
Check Available Providers
# Get available lineage providers
providers = client.get_available_lineage_providers()
for provider in providers:
print(f"{provider['name']}: {provider['available']}")
Use Cases
Root Cause Analysis
When drift is detected, trace upstream to find the source:
from baselinr import BaselinrClient
client = BaselinrClient(config_path="config.yml")
# Detect drift
drift_report = client.detect_drift(
dataset_name="analytics.revenue",
baseline_run_id="baseline-run-id",
current_run_id="current-run-id"
)
if drift_report.summary["total_drifts"] > 0:
# Get upstream dependencies
upstream = client.get_upstream_lineage(
table="revenue",
schema="analytics"
)
print("Upstream tables to investigate:")
for dep in upstream:
print(f" - {dep['schema']}.{dep['table']}")
Impact Analysis
Understand which tables are affected by changes:
# Check what depends on a table you're about to modify
downstream = client.get_downstream_lineage(
table="raw_events",
schema="raw"
)
print(f"Warning: {len(downstream)} tables depend on raw.events")
for dep in downstream:
print(f" - {dep['schema']}.{dep['table']}")
Intelligent Alert Grouping
Group related drift events by lineage:
# Get all drift events
drift_events = client.query_drift_events(days=7)
# Group by lineage relationships
lineage_groups = {}
for event in drift_events:
upstream = client.get_upstream_lineage(
table=event.table_name,
schema=event.schema_name
)
# Group events that share upstream dependencies
for dep in upstream:
key = f"{dep['schema']}.{dep['table']}"
if key not in lineage_groups:
lineage_groups[key] = []
lineage_groups[key].append(event)
# Report grouped alerts
for upstream_table, events in lineage_groups.items():
if len(events) > 1:
print(f"Root cause: {upstream_table}")
print(f" Affected {len(events)} downstream tables")
Best Practices
1. Enable Multiple Providers
Use multiple providers for comprehensive coverage:
lineage:
enabled: true
providers: [dbt, sql_parser, postgres_query_history]
query_history:
enabled: true
incremental: true
This combination provides:
- dbt: High-confidence lineage from dbt models
- sql_parser: Lineage from view definitions and explicit SQL
- query_history: Real-world dependencies from executed queries
2. Keep dbt Manifest Updated
If using dbt, ensure manifest is up to date:
# Before profiling, compile dbt
dbt compile
# Or run dbt (which also compiles)
dbt run
3. Use Confidence Scores
Filter lineage by confidence when needed:
# Only use high-confidence lineage
upstream = client.get_upstream_lineage(table="customers")
high_confidence = [d for d in upstream if d['confidence_score'] >= 0.9]
4. Limit Depth for Performance
Use max_depth to limit traversal:
# Only go 2 levels deep
upstream = client.get_upstream_lineage(
table="customers",
max_depth=2
)
5. Combine with Drift Detection
Use lineage for root cause analysis:
# When drift detected, check upstream
drift = client.detect_drift(...)
if drift.summary["total_drifts"] > 0:
upstream = client.get_upstream_lineage(
table=drift.dataset_name
)
# Investigate upstream tables
Troubleshooting
No Lineage Extracted
Problem: baselinr lineage upstream returns no results.
Solutions:
- Check if lineage extraction is enabled:
profiling.extract_lineage: true - Verify providers are available:
baselinr lineage providers - For dbt: Ensure
manifest.jsonexists and is accessible - Check logs for extraction errors
Provider Not Available
Problem: Provider shows as "Unavailable" in baselinr lineage providers.
Solutions:
- dbt_manifest: Ensure dbt is installed and manifest.json exists
- sql_parser: SQLGlot should be installed automatically with baselinr
- postgres_query_history: Ensure
pg_stat_statementsextension is installed (CREATE EXTENSION pg_stat_statements;) - snowflake_query_history: Ensure
ACCOUNTADMINrole or access toSNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY - bigquery_query_history: Ensure
bigquery.jobs.listAllpermission - redshift_query_history: Ensure appropriate IAM permissions for
STL_QUERYaccess - mysql_query_history: Ensure
performance_schema = ONin MySQL configuration - Check provider-specific requirements in logs
Incomplete Lineage
Problem: Lineage graph is incomplete or missing relationships.
Solutions:
- Enable multiple providers for better coverage (dbt + sql_parser + query_history)
- For dbt: Ensure all models are compiled
- For SQL parser: Provide explicit SQL when possible
- For query history: Run initial bulk sync:
baselinr lineage sync --all - Check confidence scores - low confidence may indicate parsing issues
- Query history captures real-world usage but may miss rarely-executed queries
6. Use Query History for Real-World Dependencies
Query history providers capture dependencies from actual query execution:
# Initial bulk sync
baselinr lineage sync --provider postgres_query_history
# Regular profiling automatically updates incrementally
baselinr profile --config config.yml
# Periodic refresh
baselinr lineage sync --provider postgres_query_history
This captures ad-hoc queries and real-world usage patterns that may not be in dbt or view definitions.
7. Monitor Stale Lineage
Query history lineage includes staleness detection:
# System automatically warns about stale edges
upstream = client.get_upstream_lineage(table="customers")
# Warning logged if edges haven't been seen in query history for >90 days
Clean up stale edges periodically:
baselinr lineage cleanup --provider postgres_query_history
Future Enhancements
- Column-Level Lineage: Track dependencies at the column level
- Dagster Provider: Native integration with Dagster
- Airflow Provider: Extract lineage from Airflow DAGs
- Lineage Visualization: Visual graph representation in dashboard
Additional Resources
- CLI Reference - Complete CLI documentation
- Python SDK Guide - SDK usage examples
- dbt Integration Guide - dbt-specific configuration
- Schema Reference - Database schema documentation
Questions? Open an issue on GitHub or check the documentation.