Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[xy] Support Airflow integration #665

Merged
merged 6 commits into from
Jul 15, 2022
Merged

Conversation

wangxiaoyou1993
Copy link
Member

@wangxiaoyou1993 wangxiaoyou1993 commented Jul 15, 2022

Summary

This makes some changes to the library so that it can run in Airlfow

  • Update dependencies to resolve conflicts
  • Not update block status when running pipeline with command
  • Allow specifying the variables path to resolve write permission issue.

Code for creating a BashOperator

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
from shared_utils.dag import DEFAULT_ARGS, DEFAULT_SETTINGS
from shared_utils.hash import merge_dict
import os


ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
pipeline_path = os.path.join(ABSOLUTE_PATH, 'pipelines', 'pipeline_test')

dag = DAG(
    'test_mage_pipeline',
    default_args=merge_dict(DEFAULT_ARGS, {
        'start_date': datetime(2022, 7, 14),
    }),
    description='Test running mage pipeline.',
    schedule_interval='@once',
    **DEFAULT_SETTINGS,
)

task = BashOperator(
    dag=dag,
    task_id='run_pipeline',
    bash_command=f'mage run {pipeline_path} prod_pipeline',
)

Code for creating a DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from mage_ai.data_preparation.models.pipeline import Pipeline
from shared_utils.dag import DEFAULT_ARGS, DEFAULT_SETTINGS
from shared_utils.hash import ignore_keys, merge_dict
import os


ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
pipeline_path = os.path.join(ABSOLUTE_PATH, 'pipelines', 'pipeline_test')

dag = DAG(
    'test_mage_dag',
    default_args=merge_dict(DEFAULT_ARGS, {
        'start_date': datetime(2022, 7, 14),
    }),
    description='Test running mage pipeline.',
    schedule_interval='@once',
    **DEFAULT_SETTINGS,
)

pipeline = Pipeline('prod_pipeline', repo_path=pipeline_path)

blocks = pipeline.blocks_by_uuid

tasks = []


def build_execute_block(block):
    def _callable(ds, **kwargs):
        block.execute_sync(
            analyze_outputs=False,
            update_status=False,
        )

    return _callable


for uuid, b in blocks.items():
    if b.type == 'scratchpad':
        continue
    tasks.append(dict(
        task_id=uuid,
        upstream_task_ids=b.upstream_block_uuids,
        python_callable=build_execute_block(b),
    ))


def initialize_tasks(dag, tasks):
    operators = {}

    for task_dict in tasks:
        task_operator = PythonOperator(dag=dag, **ignore_keys(task_dict, [
            'upstream_task_ids',
        ]))
        operators[task_operator.task_id] = task_operator

    # Must set the dependencies after adding the task to the DAG or else error
    for task_dict in tasks:
        for task_id in task_dict.get('upstream_task_ids', []):
            dag.set_dependency(task_id, task_dict['task_id'])

    return operators


initialize_tasks(dag, tasks)

Tests

Tested in Airflow
Execute as a BashOperator
image

Executed as a DAG
image

cc:

@wangxiaoyou1993 wangxiaoyou1993 merged commit e3910c9 into master Jul 15, 2022
@wangxiaoyou1993 wangxiaoyou1993 deleted the xiaoyou/airflow-integration branch July 15, 2022 23:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant