그동안 Airflow를 이용하여 DAG를 작성하면서 불편함을 느낀점이 하나 있었다.
바로 Time 스케줄에 관한 것인데 지금 토이프로젝트에서 사용하는 데이터의 구조상 반드시 선행되어야하는 DAG가 존재했고 예를들면 일자별 Data 크롤링 등등 선행 DAG가 작동한 이후에 신규 선수 추출과 선수의 생년월일 크롤링 DAG가 작동해야했다. 지금까지는 눈대중으로 Time 스케줄을 잡았었지만 비효율적인 부분이 몇개 있었다.
1. 시간적 손실
현재 매일 동작하는 daily DAG는 20시 45분에 작동을 시작하고 소요시간은 대략 3분 정도이다. 넉넉잡아서 5분의 여유를 두고 다음 DAG의 작동시간은 20시 50분으로 잡으면 크게 문제는 없을 것 처럼 보이나, 크롤링 사이트의 변화나 time 딜레이가 있을 경우 5분 안에 DAG가 전부 작동할거라 장담할 수는 없다.
2. 선행 DAG의 작동 에러로 인한 연쇄 에러
또한 선행 DAG에서 에러가 발생해서 제대로 데이터가 쌓이지 않을 경우 신규선수가 있음에도 데이터가 없어 제대로 추출되지 않을 수 있고 이는 연쇄적으로 에러를 유발할 수 있다.
이러한 이유로 Trigger DAG를 만들어 DAG간의 dependency 잡아 두려고 한다. 우선 필요한 Trigger DAG의 개수부터 생각해봤다.
기본적으로 1개가 필요하고 주간, 일간을 나눠서 총 2개의 Trigger DAG를 구성하기로 했다.
첫 번째 Trigger DAG는 일자별 DAG를 묶어두는 형태로 작성했다.
필요한 모듈은 TriggerDagRunOperator로 airflow에 이미 설치되어 있는 모듈이다.
from airflow import DAG
from datetime import datetime, timedelta
from pytz import timezone
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
# DAG 정의
dag = DAG(
'daily_trigger',
default_args={
'retries': 1,
},
schedule_interval='45 20 * * *',
start_date=datetime(2023,4, 3),
catchup=False
)
trigger_daily = TriggerDagRunOperator(
task_id="trigger_daily",
trigger_dag_id="n_crawling_daily",
wait_for_completion=True,
dag = dag
)
trigger_newplayer = TriggerDagRunOperator(
task_id="trigger_newplayer",
trigger_dag_id="n_crawling_newplayer",
wait_for_completion=True,
dag = dag
)
trigger_daily >> trigger_newplayer
작성방법은 매우 간단했다. pythonoperator처럼 변수에 TriggerDagRunOperator를 작성해주면 되고 pythonoperator에 있는 python_callable 대신 trigger_dag_id를 입력해주면된다. trigger_dag_id는 작동시킬 DAG의 이름을 적어두면된다.
'wait_for_completion' 은 trigger DAG를 만든 이유중 하나이다. "n_crawling_newplayer"의 DAG는 선행 DAG인 "n_crawling_daily"의 작업이 완료 된 후 시작하라는 의미이다. 글을 작성하면서 생각해보니 하단
"trigger_daily >> trigger_newplayer" 이부분에 선언이 되어있기도 하다... 아무튼 이런식으로 작성하면 선행 DAG가 자동으로 20시 45분에 동작하고 작업을 완료하면 다음 DAG가 작동하게 된다. TriggerDAG는 말그대로 DAG의 Trigger를 당겨주기 때문에 여기에 묶여 있는 DAG의 schedule_interval은 once로 변경해뒀다.
주간 Trigger DAG도 같은 방식으로 작성했으며, Airflow는 DAG dependency를 감지하여 시각화도 해준다.
우측 메뉴에서 Browse -> DAG Dependencies를 눌러보면
이런식으로 시각화된 dependency를 확인할 수 있다.
마치며
이번 포스팅은 DAG의 dependency를 잡기위해 Trigger DAG를 작성해봤다. 사실 토이프로젝트 레벨에서는 time 스케줄을 그렇게 빡빡하게 잡을 필요는 없긴하지만, 실무에서 일할때는 주로 야간에 DAG 스케줄이 몰려있어서 불필요한 시간 손실을 최대한 줄여야했다. 진작에 만들어야했던건데 미루다보니 이제서야 만들게되었다. 그래도 만들고 보니 DAG 2개만 이용하여 나머지 DAG를 관리한다는 점에서 편리한 것 같다.
'Airflow' 카테고리의 다른 글
주별 집계 데이터 적재 DAG 만들기 (0) | 2023.06.19 |
---|---|
Airflow와 DB 연결하기 (0) | 2023.04.17 |
Airflow Sample DAG 생성하기 (0) | 2023.04.14 |