You can download the complete code from our repository damavis/advanced-airflow If you found this post useful, we encourage you to see more similar articles from the Data Engineering category in our blog and to share it with your contacts.If you work in data engineering, you're definitely acquainted with one of two terms: Airflow Data Pipeline or DAG. In this way, when the Operational DAG is executed, it will be responsible for launching the Finance DAG in due course, and the departments can continue to evolve their processes independently and taking into account only the dependencies they have on each other. In this case, we see the external task sensor, in blue. Wait_operations_a_calculate_expenses > finances_a_report Income_bookkeep > validate_income > wait_operations_a_calculate_expenses Outcome_bookkeep = DummyOperator(task_id='outcome_bookkeep',įinances_a_report = DummyOperator(task_id='finances_a_report', Task_id='wait_operations_a_calculate_expenses', Wait_operations_a_calculate_expenses = ExternalTaskSensor( Validate_income = DummyOperator(task_id='validate_income', Income_bookkeep = DummyOperator(task_id='income_bookkeep', Here, we can observe that the Operators in charge of launching an external DAG are shown in pink, and the external task sensor Operators in dark blue. Task_id='wait_finances_a_outcome_bookeep',Ĭalculate_revenue > trigger_finances_a > calculate_expensesĬalculate_expenses > wait_finances_a_expenses_bookkept > operations_a_report Wait_finances_a_expenses_bookkept = ExternalTaskSensor( Trigger_finances_a = TriggerDagRunOperator(task_id='trigger_finances_a',Ĭalculate_expenses = DummyOperator(task_id='calculate_expenses', Operations Process # operations_a.pyįrom _dagrun import TriggerDagRunOperatorįrom _task import ExternalTaskSensorĬalculate_revenue = DummyOperator(task_id='calculate_revenue', To develop the solution, we are going to make use of 2 Airflow Operators, TriggerDagRunOperator, which is used to launch the execution of an external DAG, and ExternalTaskSensor, which is used to wait for a Task of an external DAG. Basically because the finance DAG depends first on the operational tasks. To do this, we will have to follow a specific strategy, in this case, we have selected the operating DAG as the main one, and the financial one as the secondary. ![]() ![]() The objective of this exercise is to divide this DAG in 2, but we want to maintain the dependencies. Here we can see how we have, in fact, 2 processes with dependencies, in the same DAG Two departments, two processes ![]() Operations_a_report = DummyOperator(task_id='operations_a_report',įinance_a_report = DummyOperator(task_id='finance_a_report',Ĭalculate_revenue > income_bookkeep > validate_income ![]() Outcome_bookkeep = DummyOperator(task_id='finances_outcome_bookkeep', Validate_income = DummyOperator(task_id='finances_validate_income',Ĭalculate_expenses = DummyOperator(task_id='operations_calculate_expenses', Income_bookkeep = DummyOperator(task_id='finances_income_bookkeep', This would be the DAG code and its representation in the Airflow UI: from datetime import timedeltaįrom import DummyOperatorĬalculate_revenue = DummyOperator(task_id='operations_calculate_revenue', Book keep of purchases + other expenses (5m).
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |