Welcome to dag-factory! dag-factory is a library for Apache Airflow® to construct DAGs declaratively via configuration files.
The minimum requirements for dag-factory are:
- Python 3.8.0+
- Apache Airflow® 2.0+
For a gentle introduction, please take a look at our Quickstart Guide. For more examples, please see the examples folder.
- Construct DAGs without knowing Python
- Construct DAGs without learning Airflow primitives
- Avoid duplicative code
- Everyone loves YAML! ;)
If you want to split your DAG configuration into multiple files, you can do so by leveraging a suffix in the configuration file name.
from dagfactory import load_yaml_dags # load relevant YAML files as airflow DAGs
load_yaml_dags(globals_dict=globals(), suffix=['dag.yaml'])
dag-factory supports scheduling DAGs via Apache Airflow Datasets.
To leverage, you need to specify the Dataset
in the outlets
key in the configuration file. The outlets
key is a list of strings that represent the dataset locations.
In the schedule
key of the consumer dag, you can set the Dataset
you would like to schedule against. The key is a list of strings that represent the dataset locations.
The consumer dag will run when all the datasets are available.
producer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG producer simple datasets"
schedule_interval: "0 5 * * *"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
outlets: [ 's3://bucket_example/raw/dataset1.json' ]
task_2:
bash_command: "echo 2"
dependencies: [ task_1 ]
outlets: [ 's3://bucket_example/raw/dataset2.json' ]
consumer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG consumer simple datasets"
schedule: [ 's3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json' ]
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
dag-factory supports using custom operators. To leverage, set the path to the custom operator within the operator
key in the configuration file. You can add any additional parameters that the custom operator requires.
...
tasks:
begin:
operator: airflow.operators.dummy_operator.DummyOperator
make_bread_1:
operator: customized.operators.breakfast_operators.MakeBreadOperator
bread_type: 'Sourdough'
dag-factory also supports using "callbacks" at the DAG, Task, and TaskGroup level. These callbacks can be defined in
a few different ways. The first points directly to a Python function that has been defined in the include/callbacks.py
file.
example_dag1:
on_failure_callback: include.callbacks.example_callback1
...
Here, the on_success_callback
points to first a file, and then to a function name within that file. Notice that this
callback is defined using default_args
, meaning this callback will be applied to all tasks.
example_dag1:
...
default_args:
on_success_callback_file: /usr/local/airflow/include/callbacks.py
on_success_callback_name: example_callback1
dag-factory users can also leverage provider-built tools when configuring callbacks. In this example, the
send_slack_notification
function from the Slack provider is used to dispatch a message when a DAG failure occurs. This
function is passed to the callback
key under on_failure_callback
. This pattern allows for callback definitions to
take parameters (such as text
, channel
, and username
, as shown here).
Note that this functionality is currently only supported for on_failure_callback
's defined at the DAG-level, or in
default_args
. Support for other callback types and Task/TaskGroup-level definitions are coming soon.
example_dag1:
on_failure_callback:
callback: airflow.providers.slack.notifications.slack.send_slack_notification
slack_conn_id: example_slack_id
text: |
:red_circle: Task Failed.
This task has failed and needs to be addressed.
Please remediate this issue ASAP.
channel: analytics-alerts
username: Airflow
...
The package airflow.providers.http.sensors.http
is available for Airflow 2.0+
The following example shows response_check
logic in a python file:
task_2:
operator: airflow.providers.http.sensors.http.HttpSensor
http_conn_id: 'test-http'
method: 'GET'
response_check_name: check_sensor
response_check_file: /path/to/example1/http_conn.py
dependencies: [task_1]
The response_check
logic can also be provided as a lambda:
task_2:
operator: airflow.providers.http.sensors.http.HttpSensor
http_conn_id: 'test-http'
method: 'GET'
response_check_lambda: 'lambda response: "ok" in response.text'
dependencies: [task_1]
To learn more about the terms and conditions for use, reproduction and distribution, read the Apache License 2.0.
This project follows Astronomer's Privacy Policy.
For further information, read this
Check the project's Security Policy to learn how to report security vulnerabilities in DAG Factory and how security issues reported to the DAG Factory security team are handled.