-
Notifications
You must be signed in to change notification settings - Fork 180
/
Copy pathexample_source_rendering.py
47 lines (41 loc) · 1.52 KB
/
example_source_rendering.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""
An example DAG that uses Cosmos to render a dbt project into an Airflow DAG using Cosmos source rendering.
"""
import os
from datetime import datetime
from pathlib import Path
from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import SourceRenderingBehavior
from cosmos.profiles import PostgresUserPasswordProfileMapping
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
disable_event_tracking=True,
),
)
# [START cosmos_source_node_example]
source_rendering_dag = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "altered_jaffle_shop",
),
profile_config=profile_config,
operator_args={
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
},
render_config=RenderConfig(source_rendering_behavior=SourceRenderingBehavior.ALL),
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
dag_id="source_rendering_dag",
default_args={"retries": 2},
on_warning_callback=lambda context: print(context),
)
# [END cosmos_source_node_example]