TITEDIOS 편한 코딩

[MLOps] 9강 - 첫 Dag 작성하기 (Airflow Multi-Node System 구축 가이드 및 사례) 본문

MLOps

[MLOps] 9강 - 첫 Dag 작성하기 (Airflow Multi-Node System 구축 가이드 및 사례)

TitediosKW 2025. 1. 14. 19:00
반응형

목차

  1. DAG 란?
  2. DAG 구조
  3. Dag Tutorial 작성하기
  4. Airflow에서 Dag 실행해 보기
  5. 결론

1. Dag 란?

DAG(Directed Acyclic Graph)는 데이터를 처리하거나 워크플로우를 관리하기 위해 사용되는 중요한 개념으로, 방향성이 있고 사이클이 없는 그래프 구조를 의미합니다. 간단히 말해, DAG는 작업 간의 순서를 명확히 하고 종속성을 정의하는 데 사용됩니다. 이 구조는 작업의 병렬 실행과 순차 실행을 효과적으로 관리할 수 있게 해 줍니다.

DAG의 특성은 다음과 같습니다:

  • 방향성(Directed): 작업 간의 의존 관계는 방향을 가지고 있습니다.
  • 비순환성(Acyclic): 순환 구조가 없어 작업이 무한히 반복되지 않습니다.

DAG는 주로 데이터 엔지니어링, 머신러닝 파이프라인, ETL(Extract, Transform, Load) 작업 등에서 널리 사용됩니다.


2. DAG 구조

DAG의 구조를 분석하려면 다음과 같은 요소를 주의 깊게 살펴봐야 합니다:

  • 태스크(Task): DAG의 기본 구성 요소로, 개별 작업 단위를 의미합니다. PythonOperator, BashOperator 등 다양한 타입이 있습니다.
  • 태스크 간의 의존성(Task Dependencies): 작업 실행 순서를 결정합니다. >>, <<, .set_downstream(), .set_upstream() 등 다양한 방식으로 의존성을 정의할 수 있습니다.
  • 스케줄(Schedule): DAG의 실행 주기를 나타냅니다. 예를 들어, @daily, @hourly, 또는 크론 표현식을 사용할 수 있습니다.
  • 파라미터(Parameter)]: 사용자 정의 파라미터를 설정하여 DAG의 동작을 동적으로 제어할 수 있습니다.
반응형

3. Dag Tutorial 작성하기

Python으로 DAG를 정의하는 기본 코드는 아래와 같습니다. Airflow에서 태스크는 개별 작업 단위이며, Operator를 사용하여 정의합니다. 예를 들어, DummyOperator는 단순히 태스크 실행을 나타냅니다. 태스크 간의 실행 순서를 사용자가 정의해야 합니다. >> 연산자를 사용하여 순서를 지정합니다.

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime

# DAG 생성
with DAG(
    dag_id='example_dag',
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False
) as dag:

    start = DummyOperator(task_id='start')
    task001 = DummyOperator(task_id='task001')
    task002 = DummyOperator(task_id='task002')
    task003 = DummyOperator(task_id='task003')
    end = DummyOperator(task_id='end')

    start >> [task001, task002] >> task003 >> end
  • (dag_id): DAG의 고유한 이름을 정의합니다.
  • (schedule_interval): DAG가 실행될 주기를 설정합니다.
  • (start_date): DAG 시작 날짜를 지정합니다.
  • (catchup): 이전 실행 날짜의 작업을 실행할지 여부를 결정합니다.

4. Airflow에서 DAG 실행해 보기

1) Airflow 실행

이전 글에서 docker-compose.yaml을 작성했던 Airflow 작업위치에 접근합니다. 저의 경우 /home/user/works/dags입니다. 해당 위치에서 Airflow를 실행합니다.

docker compose up -d

 

실행 후에는 아래와 같이 docker ps명령어로 정상적으로 실행되었는지 확인이 가능합니다.

2) dag 파일 작성

Airflow 작업위치에서 dag/ 디렉토리 하위에 my_first_dag.py를 생성해 줍니다. 저의 경우 /home/user/works/dags/my_first_dag.py입니다. 위에서 작성한 코드를 복사하여 붙여 넣어 줍니다.

3) Airflow web에서 확인하기

Airflow web에 접속해 봅니다. 로그인을 해야 하는데 기본 설정을 변경하지 않으셨다면 airflow/airflow로 로그인이 가능하십니다. 로그인을 하고 나면 우리가 작성한 DAG가 보입니다. 화면에는 Airflow에서 제공하는 예제가 잔뜩 있습니다. 이러한 예제를 보고 싶지 않으시다면 docker-compose.yaml 파일에서 AIRFLOW__CORE__LOAD_EXAMPLES 설정을 False로 변경하시면 보이지 않게 됩니다.

 

Dag를 클릭하면 조금 더 상세한 화면을 확인하실 수 있습니다. 이제 Play 버튼을 클릭하여 DAG를 실행할 수 있습니다. 사실 Dummy Operator로 작성되어 있으므로 특별한 동작은 하지 않습니다. Web 화면에서 시각적으로 우리가 작성한 Task의 순서를 확인하고 의존성을 확인할 수 있습니다. 이러한 시각화 등은 Airflow로 머신러닝 학습 등을 수행 시 장점으로 작용합니다.

 

Play 버튼을 눌러 작업을 실행하면 아래와 같은 화면으로 보입니다. 성공적으로 작업을 한 기록이 추가된 것을 확인하실 수 있습니다.


결론

DAG는 복잡한 작업 흐름을 관리하기 위한 강력한 도구로, 데이터 파이프라인, ETL 작업, 머신러닝 모델 트레이닝 등에 필수적입니다. Airflow는 DAG를 쉽게 작성하고 실행할 수 있는 환경을 제공하며, 시각화와 모니터링 기능을 통해 워크플로우 관리의 생산성을 극대화합니다.

 

DAG를 활용하려면 기본 개념을 이해하고, 적절한 Operator와 의존성 설정 방법을 숙지하는 것이 중요합니다. Airflow와 함께 DAG를 잘 활용하면 작업의 자동화와 효율성을 극대화할 수 있습니다.

 

궁금한 점이나 추가로 다루었으면 하는 주제가 있다면 댓글로 남겨 주세요.

 

도움이 되셨다면 공감 부탁드리겠습니다. 여러분의 공감이 정말 큰 힘이 됩니다.

 

감사합니다! 🙌

반응형