Airflow Dag-Dependencies

2025. 2. 6.·학습 주제/데이터파이프라인과 Airflow
 

Dag-Dependencies

 

 

 

Airflow에서 DAG(Directed Acyclic Graph)는 데이터 파이프라인을 정의하고 실행하는 핵심 요소입니다. DAG 간의 의존성을 관리하고, 다양한 실행 방식을 적용하는 것은 효율적인 데이터 파이프라인 운영에 필수적입니다. 본 글에서는 Airflow의 DAG 실행 방식과 Jinja Template 활용법, 그리고 다양한 트리거 및 센서에 대해 정리하겠습니다.


DAG 실행 방법

1) 주기적 실행 (Scheduled Execution)

  • schedule을 설정하여 주기적으로 실행
  • 예: schedule='0 0 * * *'  →  매일 00:00에 실행

2) 다른 DAG에 의해 트리거되는 방식

  • Explicit Trigger (TriggerDagOperator)
    • DAG A가 명확하게 DAG B를 트리거하는 방식
    • TriggerDagOperator 사용
  • Reactive Trigger (ExternalTaskSensor)
    • DAG B가 DAG A의 완료를 대기하는 방식
    • ExternalTaskSensor 사용

3) 실행 조건에 따른 분기 처리

  • BranchPythonOperator: 특정 조건에 따라 다른 태스크 실행
  • LatestOnlyOperator: 과거 데이터 Backfill 시 불필요한 태스크 실행 방지
  • Trigger Rules: 앞단 태스크의 실패 여부에 따라 실행할지 결정

Jinja Template과 Airflow

1) Jinja Template이란?

  • Python 기반의 템플릿 엔진
  • HTML, SQL 등 다양한 파일에서 동적 변수를 활용 가능
  • {{ 변수 }} 또는 {% 제어문 %} 형식으로 사용

2) Airflow에서의 Jinja Template 활용

  • DAG 내 변수, 작업 이름, SQL 쿼리 등을 동적으로 정의 가능
  • 대표적인 예시:
    • {{ ds }}: Execution Date (YYYY-MM-DD 형식)
    • {{ var.value.get('my_var', 'fallback') }}: Variable 값 불러오기
    • {{ dag }}, {{ task }}: 현재 DAG 및 Task 정보 출력

3) Jinja Template 예제 (BashOperator 활용)

더보기
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# DAG 정의
dag = DAG(
    "Learn_Jinja",
    schedule='0 0 * * *', # 매일실행
    start_date=datetime(2023, 6, 1),
    catchup=False
)

# BashOperator을 사용하여 템플릿 작업 정의
task1 = BashOperator(
    task_id='task1',
    bash_command='echo "{{ ds }}"', # 지금 실행한 execution_date
    dag=dag
)

# 동적 매개변수가 있는 다른 템플릿 작업 정의
task2 = BashOperator(
    task_id='task2',
    bash_command='echo "안녕하세요, {{params.name}}!"', #John출력
    params={'name': 'John'},	# 사용자 정의 가능한 매개변수
    dag=dag
)

task3 = BashOperator(
    task_id='task3',
    bash_command="""echo "{{ dag }}, {{ task }}, {{ var.value.get('csv_url) }}" """,
    ## dag명, task명, variable의 csv_url키에 해당하는 값을 가져옴
    dag=dag
)

task1 >> task2 >> task3

DAG 간 트리거 (TriggerDagRunOperator)

TriggerDagRunOperator 사용법

  • conf={'path': 'value1'} → DAG B로 전달할 값 설정
  • execution_date='{{ ds }}' → 실행 날짜 설정
  • reset_dag_run=True → 기존 실행 이력이 있어도 다시 실행
더보기
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime

dag = DAG(
    dag_id="SourceDag",
    start_date=datetime(2023, 6, 19),
    schedule='@daily'
)

trigger_task = TriggerDagRunOperator(
    task_id='tirgger_task',
    trigger_dag_id='TargetDag',
    conf={'path':'value1'},
    execution_date='{{ds}}',
    reset_dag_run=True,
    dag=dag
)

## TargetDag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

dag = DAG(
    'TargetDag',
    schedule='@once',
    start_date=datetime(2023, 6, 1),
)

task1 = BashOperator(
    task_id='task1',
    bash_command="""echo '{{ds}}, {{dag_run.conf.get("path","none") }}' """,
    dag=dag
)

Sensor란 무엇인가?

Sensor는 특정 조건이 충족될 때까지 대기하는 Operator로, 외부 리소스의 가용성이나 특정 조건의 완료 여부를 확인하는 데 유용합니다. Airflow는 몇 가지 내장 Sensor를 제공합니다.

주요 Sensor 종류

  • FileSensor: 지정된 위치에 파일이 생성될 때까지 대기
  • HttpSensor: HTTP 요청을 수행하고 지정된 응답이 도착할 때까지 대기
  • SqlSensor: SQL 데이터베이스에서 특정 조건을 만족할 때까지 대기
  • ExternalTaskSensor: 다른 Airflow DAG의 특정 작업 완료를 대기

Sensor는 주기적으로 poke를 수행하며, 실행 모드는 mode 파라미터로 설정됩니다.

  • poke (기본값): Worker를 하나 점유한 채 일정 간격으로 상태를 확인
  • reschedule: Worker를 해제한 후 다시 잡아서 상태를 확인

ExternalTaskSensor

ExternalTaskSensor는 DAG B에서 DAG A의 특정 태스크 완료 여부를 체크하는 Sensor입니다.

사용 조건

  1. 동일한 schedule_interval 사용: DAG A와 DAG B의 실행 주기가 같아야 실행 타이밍이 일치합니다.
  2. Execution Date 일치 필요: 실행 날짜가 일치하지 않으면 태스크가 매칭되지 않음

다른 실행 주기인 경우 해결 방법

  • execution_delta: 특정 시간 간격을 조정하여 execution date를 맞춤
  • execution_date_fn: 더 정교한 조정이 필요할 경우 사용
  • DAG 간 실행 빈도가 다르면 ExternalTaskSensor 사용 불가
from airflow.sensors.external_task import ExternalTaskSensor

waiting_for_end_of_dag_a = ExternalTaskSensor(
    task_id='waiting_for_end_of_dag_a',
    external_dag_id='DAG_이름',
    external_task_id='end',
    timeout=5*60,
    mode='reschedule'
)

BranchPythonOperator

  • 실행 상황에 따라 다음에 실행할 태스크를 동적으로 결정하는 Operator
  • 특정 조건에 따라 미리 정해둔 태스크 중 하나를 선택하여 실행
  • Variable.get("mode", "dev") 값이 dev이면 trigger_b 태스크를 실행하지 않음
  • 실행되지 않은 태스크는 skipped 상태로 Web UI에서 분홍색으로 표시됨
from airflow.operators.python import BranchPythonOperator
from airflow.models import Variable

def skip_or_cont_trigger():
    if Variable.get("mode", "dev") == "dev":
        return []
    else:
        return ['trigger_b']

branching = BranchPythonOperator(
    task_id='branching',
    python_callable=skip_or_cont_trigger,
)

LatestOnlyOperator

 

과거 데이터의 backfill 실행 시 특정 태스크가 실행되지 않도록 방지하는 역할 수행

동작 방식

  • 현재 실행 시점이 execution_date보다 미래이고, 다음 execution_date보다는 과거이면 실행됨
  • 그렇지 않으면 해당 태스크를 포함한 후속 태스크는 실행되지 않음
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='latest_only_example',
    schedule=timedelta(hours=48),  # 48시간마다 실행
    start_date=datetime(2023, 6, 14),
    catchup=True
) as dag:
    
    t1 = EmptyOperator(task_id='task1')
    t2 = LatestOnlyOperator(task_id='latest_only')
    t3 = EmptyOperator(task_id='task3')
    t4 = EmptyOperator(task_id='task4')
    
    t1 >> t2 >> [t3, t4]

Trigger Rules란?

앞단 태스크들의 실행 결과에 따라 후속 태스크 실행 여부를 결정하는 기능

기본 동작 방식

  • 기본적으로 앞단 태스크 중 하나라도 실패하면 후속 태스크는 실행되지 않음
  • trigger_rule 파라미터를 설정하면 실행 조건을 변경 가능

주요 옵션

  • all_success (기본값): 모든 앞단 태스크가 성공해야 실행됨
  • all_failed: 모든 앞단 태스크가 실패해야 실행됨
  • all_done: 앞단 태스크가 성공, 실패와 관계없이 모두 완료되면 실행됨
  • one_failed: 앞단 중 하나라도 실패하면 실행됨
  • one_success: 앞단 중 하나라도 성공하면 실행됨
  • none_failed_min_one_success: 하나 이상 성공하고, 실패한 태스크가 없으면 실행됨
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.bash import BashOperator

with DAG("trigger_rules", default_args=default_args, schedule=timedelta(days=1)) as dag:
    t1 = BashOperator(task_id="print_date", bash_command="date")
    t2 = BashOperator(task_id="sleep", bash_command="sleep 5")
    t3 = BashOperator(task_id="exit", bash_command="exit 1")
    t4 = BashOperator(
        task_id="final_task",
        bash_command="echo DONE!",
        trigger_rule=TriggerRule.ALL_DONE
    )
    [t1, t2, t3] >> t4

Task Grouping

태스크 그룹핑의 필요성

Airflow에서 DAG의 태스크 수가 많아질 경우, 태스크를 논리적으로 그룹화하여 관리할 필요가 있습니다. Airflow 2.0 이전에는 SubDAG를 사용했지만, 현재는 Task Grouping이 대체하고 있습니다.

Task Grouping의 장점

  • 비슷한 역할을 하는 태스크들을 그룹화하여 관리 가능
  • DAG 내에서 그룹 간의 실행 순서를 쉽게 정의 가능
  • TaskGroup을 사용하면 가독성이 좋아지고 유지보수 용이
  • TaskGroup 내부에 또 다른 TaskGroup을 생성할 수도 있음 (Nesting 지원)

파일 처리 DAG

파일 다운로드, 파일 체크, 데이터 처리 등 다수의 태스크를 관리하는 DAG에서 Task Grouping을 활용하면 다음과 같은 방식으로 구현할 수 있습니다.

from airflow.utils.task_group import TaskGroup
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator

start = EmptyOperator(task_id='start')

with TaskGroup("Download", tooltip="Tasks for downloading data") as section_1:
    task_1 = EmptyOperator(task_id="task_1")
    task_2 = BashOperator(task_id="task_2", bash_command="echo 1")
    task_3 = EmptyOperator(task_id="task_3")

    task_1 >> [task_2, task_3]

start >> section_1

Dynamic DAGs

Dynamic DAG는 템플릿과 YAML 파일을 기반으로 DAG을 동적으로 생성하는 방식입니다. 동일한 유형의 DAG을 여러 개 수동으로 만들 필요 없이, 템플릿을 사용하여 자동으로 생성할 수 있습니다.

Dynamic DAG의 필요성

  • 비슷한 DAG을 반복적으로 개발하는 부담을 줄일 수 있음
  • YAML 파일을 사용하여 DAG의 파라미터를 쉽게 변경 가능
  • DAG을 확장할 때 관리가 쉬워짐

DAG의 개수를 늘리는 것과, 하나의 DAG에서 태스크 수를 늘리는 것 사이의 균형을 유지하는 것이 중요

Dynamic DAG 생성 흐름

  1. YAML 파일(config_appl.yml, config_goog.yml) 작성
  2. 템플릿 파일(templates.py) 작성
  3. DAG 생성 스크립트(generator.py) 실행하여 DAG 자동 생성

Dynamic DAG 활용 시 주의할 점

  1. DAG 수와 태스크 수 균형 유지
    • 너무 많은 DAG을 생성하는 것은 관리가 어려울 수 있음
    • 하나의 DAG에서 태스크가 과도하게 많아지는 것도 성능 문제를 일으킬 수 있음
  2. YAML 파일 유지보수
    • YAML에서 설정을 변경하면 DAG 전체가 바뀌므로, 잘 관리해야 함
  3. 템플릿 코드 재사용성 고려
    • 공통적인 DAG 패턴을 최대한 템플릿화하여 활용할 것

YAML 파일: DAG 설정 정의

# config_appl.yml
# config_goog.yml
dag_id: 'APPL'
schedule: '@daily'
catchup: False
symbol: 'APPL'

dag_id: 'GOOG'
schedule: '@daily'
catchup: False
symbol: 'GOOG'

템플릿 파일: DAG 생성 로직

from airflow import DAG
from airflow.decorators import task
from datetime import datetime

with DAG(
    dag_id="get_price_{{ dag_id }}",
    start_date=datetime(2023, 6, 15),
    schedule={{ schedule }},
    catchup={{ catchup or False }}
) as dag:
    
    @task
def process(symbol):
        return symbol

    @task
def store(symbol):
        return symbol
    
    store(process({{ symbol }}))

Generator 파일: YAML을 기반으로 DAG 코드 생성

from jinja2 import Environment, FileSystemLoader
import yaml
import os

file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('templates.py')

for f in os.listdir(file_dir):
    if f.endswith(".yml"):
        with open(f"{file_dir}/{f}", "r") as cf:
            config = yaml.safe_load(cf)
            with open(f"dags/get_price_{config['dag_id']}.py", "w") as f:
                f.write(template.render(config))

'학습 주제 > 데이터파이프라인과 Airflow' 카테고리의 다른 글

Airflow 운영과 대안  (0) 2025.02.06
Airflow 구글 시트 연동 & 모니터링과 API  (0) 2025.02.03
Airflow 환경설정 및 Slack연동  (0) 2025.02.02
Airflow - OLTP to OLAP(MySQL -> Snowflake)  (0) 2025.01.20
Airflow DAG - Primary Key Uniqueness 및 API 실습  (1) 2025.01.18
'학습 주제/데이터파이프라인과 Airflow' 카테고리의 다른 글
  • Airflow 운영과 대안
  • Airflow 구글 시트 연동 & 모니터링과 API
  • Airflow 환경설정 및 Slack연동
  • Airflow - OLTP to OLAP(MySQL -> Snowflake)
굥여9
굥여9
9idryd 님의 블로그 입니다.
  • 굥여9
    문과의 개발
    굥여9
  • 전체
    오늘
    어제
    • 분류 전체보기 (114)
      • 학습 주제 (86)
        • 자료구조와 알고리즘 (8)
        • HTML & 웹크롤링 (4)
        • 데이터 시각화 (4)
        • Django & Django Rest Framew.. (11)
        • AWS 클라우드 (6)
        • SQL & 데이터 웨어하우스 (11)
        • 데이터파이프라인과 Airflow (12)
        • Docker & K8S (8)
        • DBT (4)
        • CI & CD (1)
        • 빅데이터 처리와 Spark (12)
        • Kafka & Spark Streaming (5)
        • 보안 엔지니어링 (0)
      • 구름 프로펙트 클라우드 엔지니어링 (0)
        • [Monolithic] 서비스의 기초와 설계 (0)
        • [MSA & EDA] 비동기 전환과 정합성 (0)
        • [Cloud Native] K8s 기반 인프라와 .. (0)
      • 프로그래머스 데브코스 데이터 엔지니어링 (4)
      • 개발 기록 (24)
        • 일일 (24)
        • 주간 (0)
      • 회고 (0)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

    • 깃허브
  • 공지사항

  • 인기 글

  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
굥여9
Airflow Dag-Dependencies
상단으로

티스토리툴바