Dag-Dependencies

Airflow에서 DAG(Directed Acyclic Graph)는 데이터 파이프라인을 정의하고 실행하는 핵심 요소입니다. DAG 간의 의존성을 관리하고, 다양한 실행 방식을 적용하는 것은 효율적인 데이터 파이프라인 운영에 필수적입니다. 본 글에서는 Airflow의 DAG 실행 방식과 Jinja Template 활용법, 그리고 다양한 트리거 및 센서에 대해 정리하겠습니다.
DAG 실행 방법
1) 주기적 실행 (Scheduled Execution)
- schedule을 설정하여 주기적으로 실행
- 예: schedule='0 0 * * *' → 매일 00:00에 실행
2) 다른 DAG에 의해 트리거되는 방식
- Explicit Trigger (TriggerDagOperator)
- DAG A가 명확하게 DAG B를 트리거하는 방식
- TriggerDagOperator 사용
- Reactive Trigger (ExternalTaskSensor)
- DAG B가 DAG A의 완료를 대기하는 방식
- ExternalTaskSensor 사용
3) 실행 조건에 따른 분기 처리
- BranchPythonOperator: 특정 조건에 따라 다른 태스크 실행
- LatestOnlyOperator: 과거 데이터 Backfill 시 불필요한 태스크 실행 방지
- Trigger Rules: 앞단 태스크의 실패 여부에 따라 실행할지 결정
Jinja Template과 Airflow
1) Jinja Template이란?
- Python 기반의 템플릿 엔진
- HTML, SQL 등 다양한 파일에서 동적 변수를 활용 가능
- {{ 변수 }} 또는 {% 제어문 %} 형식으로 사용
2) Airflow에서의 Jinja Template 활용
- DAG 내 변수, 작업 이름, SQL 쿼리 등을 동적으로 정의 가능
- 대표적인 예시:
- {{ ds }}: Execution Date (YYYY-MM-DD 형식)
- {{ var.value.get('my_var', 'fallback') }}: Variable 값 불러오기
- {{ dag }}, {{ task }}: 현재 DAG 및 Task 정보 출력
3) Jinja Template 예제 (BashOperator 활용)
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
# DAG 정의
dag = DAG(
"Learn_Jinja",
schedule='0 0 * * *', # 매일실행
start_date=datetime(2023, 6, 1),
catchup=False
)
# BashOperator을 사용하여 템플릿 작업 정의
task1 = BashOperator(
task_id='task1',
bash_command='echo "{{ ds }}"', # 지금 실행한 execution_date
dag=dag
)
# 동적 매개변수가 있는 다른 템플릿 작업 정의
task2 = BashOperator(
task_id='task2',
bash_command='echo "안녕하세요, {{params.name}}!"', #John출력
params={'name': 'John'}, # 사용자 정의 가능한 매개변수
dag=dag
)
task3 = BashOperator(
task_id='task3',
bash_command="""echo "{{ dag }}, {{ task }}, {{ var.value.get('csv_url) }}" """,
## dag명, task명, variable의 csv_url키에 해당하는 값을 가져옴
dag=dag
)
task1 >> task2 >> task3
DAG 간 트리거 (TriggerDagRunOperator)
TriggerDagRunOperator 사용법
- conf={'path': 'value1'} → DAG B로 전달할 값 설정
- execution_date='{{ ds }}' → 실행 날짜 설정
- reset_dag_run=True → 기존 실행 이력이 있어도 다시 실행
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime
dag = DAG(
dag_id="SourceDag",
start_date=datetime(2023, 6, 19),
schedule='@daily'
)
trigger_task = TriggerDagRunOperator(
task_id='tirgger_task',
trigger_dag_id='TargetDag',
conf={'path':'value1'},
execution_date='{{ds}}',
reset_dag_run=True,
dag=dag
)
## TargetDag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
dag = DAG(
'TargetDag',
schedule='@once',
start_date=datetime(2023, 6, 1),
)
task1 = BashOperator(
task_id='task1',
bash_command="""echo '{{ds}}, {{dag_run.conf.get("path","none") }}' """,
dag=dag
)
Sensor란 무엇인가?
Sensor는 특정 조건이 충족될 때까지 대기하는 Operator로, 외부 리소스의 가용성이나 특정 조건의 완료 여부를 확인하는 데 유용합니다. Airflow는 몇 가지 내장 Sensor를 제공합니다.
주요 Sensor 종류
- FileSensor: 지정된 위치에 파일이 생성될 때까지 대기
- HttpSensor: HTTP 요청을 수행하고 지정된 응답이 도착할 때까지 대기
- SqlSensor: SQL 데이터베이스에서 특정 조건을 만족할 때까지 대기
- ExternalTaskSensor: 다른 Airflow DAG의 특정 작업 완료를 대기
Sensor는 주기적으로 poke를 수행하며, 실행 모드는 mode 파라미터로 설정됩니다.
- poke (기본값): Worker를 하나 점유한 채 일정 간격으로 상태를 확인
- reschedule: Worker를 해제한 후 다시 잡아서 상태를 확인
ExternalTaskSensor
ExternalTaskSensor는 DAG B에서 DAG A의 특정 태스크 완료 여부를 체크하는 Sensor입니다.
사용 조건
- 동일한 schedule_interval 사용: DAG A와 DAG B의 실행 주기가 같아야 실행 타이밍이 일치합니다.
- Execution Date 일치 필요: 실행 날짜가 일치하지 않으면 태스크가 매칭되지 않음
다른 실행 주기인 경우 해결 방법
- execution_delta: 특정 시간 간격을 조정하여 execution date를 맞춤
- execution_date_fn: 더 정교한 조정이 필요할 경우 사용
- DAG 간 실행 빈도가 다르면 ExternalTaskSensor 사용 불가
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id='waiting_for_end_of_dag_a',
external_dag_id='DAG_이름',
external_task_id='end',
timeout=5*60,
mode='reschedule'
)
BranchPythonOperator
- 실행 상황에 따라 다음에 실행할 태스크를 동적으로 결정하는 Operator
- 특정 조건에 따라 미리 정해둔 태스크 중 하나를 선택하여 실행
- Variable.get("mode", "dev") 값이 dev이면 trigger_b 태스크를 실행하지 않음
- 실행되지 않은 태스크는 skipped 상태로 Web UI에서 분홍색으로 표시됨
from airflow.operators.python import BranchPythonOperator
from airflow.models import Variable
def skip_or_cont_trigger():
if Variable.get("mode", "dev") == "dev":
return []
else:
return ['trigger_b']
branching = BranchPythonOperator(
task_id='branching',
python_callable=skip_or_cont_trigger,
)
LatestOnlyOperator

과거 데이터의 backfill 실행 시 특정 태스크가 실행되지 않도록 방지하는 역할 수행
동작 방식
- 현재 실행 시점이 execution_date보다 미래이고, 다음 execution_date보다는 과거이면 실행됨
- 그렇지 않으면 해당 태스크를 포함한 후속 태스크는 실행되지 않음
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta
with DAG(
dag_id='latest_only_example',
schedule=timedelta(hours=48), # 48시간마다 실행
start_date=datetime(2023, 6, 14),
catchup=True
) as dag:
t1 = EmptyOperator(task_id='task1')
t2 = LatestOnlyOperator(task_id='latest_only')
t3 = EmptyOperator(task_id='task3')
t4 = EmptyOperator(task_id='task4')
t1 >> t2 >> [t3, t4]
Trigger Rules란?
앞단 태스크들의 실행 결과에 따라 후속 태스크 실행 여부를 결정하는 기능
기본 동작 방식
- 기본적으로 앞단 태스크 중 하나라도 실패하면 후속 태스크는 실행되지 않음
- trigger_rule 파라미터를 설정하면 실행 조건을 변경 가능
주요 옵션
- all_success (기본값): 모든 앞단 태스크가 성공해야 실행됨
- all_failed: 모든 앞단 태스크가 실패해야 실행됨
- all_done: 앞단 태스크가 성공, 실패와 관계없이 모두 완료되면 실행됨
- one_failed: 앞단 중 하나라도 실패하면 실행됨
- one_success: 앞단 중 하나라도 성공하면 실행됨
- none_failed_min_one_success: 하나 이상 성공하고, 실패한 태스크가 없으면 실행됨
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.bash import BashOperator
with DAG("trigger_rules", default_args=default_args, schedule=timedelta(days=1)) as dag:
t1 = BashOperator(task_id="print_date", bash_command="date")
t2 = BashOperator(task_id="sleep", bash_command="sleep 5")
t3 = BashOperator(task_id="exit", bash_command="exit 1")
t4 = BashOperator(
task_id="final_task",
bash_command="echo DONE!",
trigger_rule=TriggerRule.ALL_DONE
)
[t1, t2, t3] >> t4
Task Grouping
태스크 그룹핑의 필요성
Airflow에서 DAG의 태스크 수가 많아질 경우, 태스크를 논리적으로 그룹화하여 관리할 필요가 있습니다. Airflow 2.0 이전에는 SubDAG를 사용했지만, 현재는 Task Grouping이 대체하고 있습니다.
Task Grouping의 장점
- 비슷한 역할을 하는 태스크들을 그룹화하여 관리 가능
- DAG 내에서 그룹 간의 실행 순서를 쉽게 정의 가능
- TaskGroup을 사용하면 가독성이 좋아지고 유지보수 용이
- TaskGroup 내부에 또 다른 TaskGroup을 생성할 수도 있음 (Nesting 지원)
파일 처리 DAG

파일 다운로드, 파일 체크, 데이터 처리 등 다수의 태스크를 관리하는 DAG에서 Task Grouping을 활용하면 다음과 같은 방식으로 구현할 수 있습니다.
from airflow.utils.task_group import TaskGroup
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
start = EmptyOperator(task_id='start')
with TaskGroup("Download", tooltip="Tasks for downloading data") as section_1:
task_1 = EmptyOperator(task_id="task_1")
task_2 = BashOperator(task_id="task_2", bash_command="echo 1")
task_3 = EmptyOperator(task_id="task_3")
task_1 >> [task_2, task_3]
start >> section_1
Dynamic DAGs
Dynamic DAG는 템플릿과 YAML 파일을 기반으로 DAG을 동적으로 생성하는 방식입니다. 동일한 유형의 DAG을 여러 개 수동으로 만들 필요 없이, 템플릿을 사용하여 자동으로 생성할 수 있습니다.
Dynamic DAG의 필요성
- 비슷한 DAG을 반복적으로 개발하는 부담을 줄일 수 있음
- YAML 파일을 사용하여 DAG의 파라미터를 쉽게 변경 가능
- DAG을 확장할 때 관리가 쉬워짐
DAG의 개수를 늘리는 것과, 하나의 DAG에서 태스크 수를 늘리는 것 사이의 균형을 유지하는 것이 중요
Dynamic DAG 생성 흐름
- YAML 파일(config_appl.yml, config_goog.yml) 작성
- 템플릿 파일(templates.py) 작성
- DAG 생성 스크립트(generator.py) 실행하여 DAG 자동 생성
Dynamic DAG 활용 시 주의할 점
- DAG 수와 태스크 수 균형 유지
- 너무 많은 DAG을 생성하는 것은 관리가 어려울 수 있음
- 하나의 DAG에서 태스크가 과도하게 많아지는 것도 성능 문제를 일으킬 수 있음
- YAML 파일 유지보수
- YAML에서 설정을 변경하면 DAG 전체가 바뀌므로, 잘 관리해야 함
- 템플릿 코드 재사용성 고려
- 공통적인 DAG 패턴을 최대한 템플릿화하여 활용할 것
YAML 파일: DAG 설정 정의
# config_appl.yml
# config_goog.yml
dag_id: 'APPL'
schedule: '@daily'
catchup: False
symbol: 'APPL'
dag_id: 'GOOG'
schedule: '@daily'
catchup: False
symbol: 'GOOG'
템플릿 파일: DAG 생성 로직
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG(
dag_id="get_price_{{ dag_id }}",
start_date=datetime(2023, 6, 15),
schedule={{ schedule }},
catchup={{ catchup or False }}
) as dag:
@task
def process(symbol):
return symbol
@task
def store(symbol):
return symbol
store(process({{ symbol }}))
Generator 파일: YAML을 기반으로 DAG 코드 생성
from jinja2 import Environment, FileSystemLoader
import yaml
import os
file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('templates.py')
for f in os.listdir(file_dir):
if f.endswith(".yml"):
with open(f"{file_dir}/{f}", "r") as cf:
config = yaml.safe_load(cf)
with open(f"dags/get_price_{config['dag_id']}.py", "w") as f:
f.write(template.render(config))'학습 주제 > 데이터파이프라인과 Airflow' 카테고리의 다른 글
| Airflow 운영과 대안 (0) | 2025.02.06 |
|---|---|
| Airflow 구글 시트 연동 & 모니터링과 API (0) | 2025.02.03 |
| Airflow 환경설정 및 Slack연동 (0) | 2025.02.02 |
| Airflow - OLTP to OLAP(MySQL -> Snowflake) (0) | 2025.01.20 |
| Airflow DAG - Primary Key Uniqueness 및 API 실습 (1) | 2025.01.18 |