Dagster Integration Guide
Baselinr provides comprehensive integration with Dagster to enable scalable profiling and drift detection within your Dagster workflows.
Overview
The Dagster integration includes:
- Dynamic Assets: Automatically create Dagster assets from your Baselinr configuration
- Profiling Jobs: Create jobs to run profiling for all configured tables
- Sensors: Intelligent sensors that use Baselinr's planning system to determine which tables need profiling
- Resources: BaselinrResource for accessing Baselinr functionality within Dagster ops
- RCA Collector: Automatic collection of Dagster run metadata for root cause analysis
Installation
Install Baselinr with Dagster support:
pip install baselinr[dagster]
Or install Dagster separately:
pip install baselinr
pip install dagster dagster-webserver
Quick Start
Basic Integration
The simplest way to integrate Baselinr with Dagster is using build_baselinr_definitions:
from baselinr.integrations.dagster import build_baselinr_definitions
defs = build_baselinr_definitions(
config_path="config.yml",
asset_prefix="baselinr",
job_name="baselinr_profile_all",
enable_sensor=True, # optional
)
This creates:
- One asset per table in your configuration
- A job to run all profiling assets
- An optional sensor that uses Baselinr's planning system
Complete Example
"""
Dagster repository example for Baselinr.
This file defines Dagster assets and jobs for profiling tasks.
It demonstrates how to integrate Baselinr with Dagster for
orchestration and scheduling.
"""
import os
from pathlib import Path
from dagster import Definitions, ScheduleDefinition
from baselinr.integrations.dagster import (
BaselinrResource,
baselinr_plan_sensor,
create_profiling_assets,
create_profiling_job,
)
# Determine config path
CONFIG_PATH = os.getenv("BASELINR_CONFIG", str(Path(__file__).parent / "config.yml"))
# Create profiling assets from configuration
profiling_assets = create_profiling_assets(
config_path=CONFIG_PATH,
asset_name_prefix="baselinr",
)
profiling_job = create_profiling_job(
assets=profiling_assets,
job_name="profile_all_tables",
)
plan_sensor = baselinr_plan_sensor(
config_path=CONFIG_PATH,
job_name="profile_all_tables",
asset_prefix="baselinr",
sensor_name="baselinr_plan_sensor",
)
# Create a schedule to run profiling daily at midnight
daily_profiling_schedule = ScheduleDefinition(
name="daily_profiling",
job=profiling_job,
cron_schedule="0 0 * * *", # Daily at midnight
description="Run Baselinr profiling daily",
)
defs = Definitions(
assets=profiling_assets,
jobs=[profiling_job],
schedules=[daily_profiling_schedule],
sensors=[plan_sensor],
resources={"baselinr": BaselinrResource(config_path=CONFIG_PATH)},
)
Core Components
build_baselinr_definitions
The simplest way to get started. Creates all necessary Dagster components from your Baselinr configuration.
Parameters:
config_path(str, required): Path to Baselinr configuration fileasset_prefix(str, default: "baselinr"): Prefix for asset keysjob_name(str, default: "baselinr_profile_all"): Name of the profiling jobenable_sensor(bool, default: True): Whether to create the planning sensorgroup_name(str, default: "baselinr_profiling"): Asset group namedefault_tags(dict, optional): Default tags for assetsdefault_metadata(dict, optional): Default metadata for assets
Returns: Definitions object ready to use in your Dagster repository
Example:
defs = build_baselinr_definitions(
config_path="config.yml",
asset_prefix="data_quality",
job_name="profile_warehouse",
enable_sensor=True,
)
create_profiling_assets
Creates Dagster assets dynamically from your Baselinr configuration.
Parameters:
config_path(str, required): Path to Baselinr configuration fileasset_name_prefix(str, default: "baselinr"): Prefix for asset keysgroup_name(str, default: "baselinr_profiling"): Asset group namedefault_tags(dict, optional): Default tags for assetsdefault_metadata(dict, optional): Default metadata for assets
Returns: List of Dagster assets (one per table in configuration)
Example:
from baselinr.integrations.dagster import create_profiling_assets
assets = create_profiling_assets(
config_path="config.yml",
asset_name_prefix="baselinr",
group_name="data_quality",
)
create_profiling_job
Creates a Dagster job to run all profiling assets.
Parameters:
assets(list, required): List of profiling assetsjob_name(str, default: "baselinr_profile_all"): Name of the job
Returns: Dagster job definition
Example:
from baselinr.integrations.dagster import create_profiling_job
job = create_profiling_job(
assets=profiling_assets,
job_name="profile_all_tables",
)
baselinr_plan_sensor
Creates a sensor that uses Baselinr's planning system to intelligently determine which tables need profiling.
Parameters:
config_path(str, required): Path to Baselinr configuration filejob_name(str, required): Name of the job to triggerasset_prefix(str, required): Prefix used for asset keyssensor_name(str, default: "baselinr_plan_sensor"): Name of the sensor
Returns: Dagster sensor definition
Example:
from baselinr.integrations.dagster import baselinr_plan_sensor
sensor = baselinr_plan_sensor(
config_path="config.yml",
job_name="profile_all_tables",
asset_prefix="baselinr",
sensor_name="baselinr_plan_sensor",
)
BaselinrResource
A Dagster resource that provides access to Baselinr functionality within ops.
Parameters:
config_path(str, required): Path to Baselinr configuration file
Example:
from dagster import op
from baselinr.integrations.dagster import BaselinrResource
@op(required_resource_keys={"baselinr"})
def custom_profiling_op(context):
baselinr = context.resources.baselinr
# Use baselinr client here
results = baselinr.profile()
return results
Advanced Usage
Custom Asset Definitions
You can create custom assets with additional logic:
from dagster import asset, AssetExecutionContext
from baselinr.integrations.dagster import BaselinrResource
@asset(
key=["baselinr", "customers"],
group_name="data_quality",
required_resource_keys={"baselinr"},
)
def profile_customers(context: AssetExecutionContext):
"""Profile customers table with custom logic."""
baselinr = context.resources.baselinr
# Run profiling
results = baselinr.profile(table="customers", schema="public")
# Custom post-processing
if results:
# Send notification, update metadata, etc.
pass
return results
Incremental Profiling with Sensors
The baselinr_plan_sensor integrates with Baselinr's incremental profiling system:
sensor = baselinr_plan_sensor(
config_path="config.yml",
job_name="profile_all_tables",
asset_prefix="baselinr",
)
This sensor:
- Uses Baselinr's planning system to determine which tables need profiling
- Only triggers profiling for tables that have changed or need updates
- Reduces unnecessary work and costs
Schedules
Create schedules to run profiling on a regular cadence:
from dagster import ScheduleDefinition
daily_profiling = ScheduleDefinition(
name="daily_profiling",
job=profiling_job,
cron_schedule="0 0 * * *", # Daily at midnight
description="Run Baselinr profiling daily",
)
hourly_profiling = ScheduleDefinition(
name="hourly_profiling",
job=profiling_job,
cron_schedule="0 * * * *", # Every hour
description="Run Baselinr profiling hourly",
)
Root Cause Analysis Integration
Baselinr automatically collects Dagster run metadata for root cause analysis:
from baselinr.rca.collectors import DagsterRunCollector
# Configure in your Baselinr config or code
collector = DagsterRunCollector(
instance_path="/path/to/.dagster", # Optional, uses DAGSTER_HOME env var
graphql_url="http://localhost:3000/graphql", # Optional
)
This enables Baselinr to correlate data quality issues with Dagster pipeline runs.
Configuration
Basic Configuration
Your Baselinr configuration file (config.yml) defines which tables to profile:
source:
type: postgres
host: localhost
database: mydb
username: user
password: password
storage:
connection:
type: postgres
host: localhost
database: baselinr
username: user
password: password
profiling:
tables:
- table: customers
schema: public
- table: orders
schema: public
Environment Variables
You can override the config path using environment variables:
export BASELINR_CONFIG=/path/to/config.yml
Best Practices
-
Use Sensors for Incremental Profiling: The
baselinr_plan_sensorintegrates with Baselinr's planning system to only profile tables that need updates. -
Group Assets: Use asset groups to organize profiling assets:
assets = create_profiling_assets(
config_path="config.yml",
group_name="data_quality",
) -
Tag Assets: Add tags for better organization and filtering:
assets = create_profiling_assets(
config_path="config.yml",
default_tags={"team": "data-engineering", "domain": "analytics"},
) -
Separate Jobs for Different Environments: Create separate jobs for dev/staging/prod:
dev_job = create_profiling_job(
assets=dev_assets,
job_name="profile_dev",
)
prod_job = create_profiling_job(
assets=prod_assets,
job_name="profile_prod",
) -
Use Resources for Custom Logic: Use
BaselinrResourcewhen you need custom profiling logic within ops.
Troubleshooting
Assets Not Appearing
- Check that your config file path is correct
- Verify that tables are properly configured in
profiling.tables - Check Dagster logs for errors during asset creation
Sensor Not Triggering
- Ensure the sensor is included in your
Definitions - Check that the job name matches between sensor and job
- Verify the asset prefix matches
Import Errors
- Ensure Dagster is installed:
pip install baselinr[dagster] - Check that you're using compatible versions (Dagster 1.0+)
See Also
- Airflow Integration - Similar integration for Airflow
- Incremental Profiling - How the planning system works
- Root Cause Analysis - Using Dagster metadata for RCA
- Data Lineage - Dagster lineage provider