Airflow Sample DAG 생성하기
이번 포스팅은 지난 포스팅에서 띄운 Airflow에 Hello World를 출력하는 sample DAG를 생성해보려고 한다.
Airflow DAG는 파이썬 기반으로 작성되기 때문에 기존에 파이썬을 사용하는 사람이라면 큰 어려움은 없을 것 같다.
일단 docker-compose.yaml로 Airflow를 띄우게 되면 자동적으로 dags, logs, plugins 폴더가 생성된다.
- dags 폴더
Airflow에서 실행될 파이썬 파일들이 담겨있어야하는 공간이다.
Airflow는 주기적으로 dags를 폴더를 스캔하며 해당 폴더에 있는 DAG를 읽어 Web UI에 표시한다.
dags폴더의 경로는 고정이 아닌 수정이 가능하며 수정은 docker-compose.yaml 상단에 있는 volumes에서 가능하다 .
- logs 폴더
Airflow의 로그 파일이 적재되는 경로로 DAG별 실행로그가 쌓인다.
dags폴더와 마찬가지로 경로 수정이 가능하다.
- plugins 폴더
Airflow에서 외부 기능을 코어에 통합할 수 있게 해주는 폴더이다.
단순히 폴더에 파일을 드롭하여 사용한다고한다. 실제로 사용해본적은 없다.
Airflow에 새로운 DAG를 생성하기 위해서는 dags폴더에 파이썬코드를 작성해야한다.
작성 코드는 다음과 같다
# 필요 모듈 import
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
# DAG 정의
dag = DAG(
'DAG_test', # DAG_Name
default_args={
'retries': 1, # 실패시 재시도 횟수
},
schedule_interval='@once', # 배치 주기
start_date=datetime(2023,4, 3) # 배치 시작날짜 설정
)
# 함수 설정
def print_test():
print('Hello World')
# task 설정
t1 = PythonOperator(
task_id = 'print_test', #task 이름 설정
python_callable=print_test, # 불러올 함수 설정
dag=dag #dag 정보
)
# task 진행
t1
간단하게 설명하자면 DAG에 대한 정보를 선언해준 후 PythonOperator에서 사용할 함수를 정의한다.
그 후 task를 생성하면서 정의된 함수를 입력해주면된다.
코드를 정상적으로 작성했을 경우 Web UI에 DAG가 표시된다.
이제 DAG를 실행시켜보자 DAG_test이름의 왼쪽 버튼을 클릭해주면 해당 DAG는 활성화된 상태가 되며
스케쥴에 따라 작동하게된다. sample DAG는 스케쥴을 단 한번만 돌게 잡아놔서 1회만 작동하고 더이상 작동하지 않는다.
이제 DAG_test를 눌러 DAG 정보를 봐보자
DAG의 시작시간 완료시간 소요시간과 Task의 개수, 사용한 Operator의 개수가 나온다.
초록색 네모를 클릭해보면
Task의 log를 확인할 수 있고 Hello World가 정상적으로 출력된게 확인된다.
이번에는 여러개의 Task를 만들어 순서대로 실행되게 해보자
# 필요 모듈 import
from datetime import datetime, time
from airflow import DAG
from airflow.operators.python import PythonOperator
from pytz import timezone
KST = timezone('Asia/Seoul')
# DAG 정의
dag = DAG(
'DAG_test_2', # DAG_Name
default_args={
'retries': 1, # 실패시 재시도 횟수
},
schedule_interval='@once', # 배치 주기
start_date=datetime(2023,4, 3) # 배치 시작날짜 설정
)
# 함수 설정
def task1():
print('Task 1 execute time ', datetime.now().astimezone(KST).strftime('%Y-%m-%D %H-%M-%S'))
time.sleep(10)
def task2():
print('Task 2 execute time ', datetime.now().astimezone(KST).strftime('%Y-%m-%D %H-%M-%S'))
time.sleep(10)
def task3():
print('Task 3 execute time ', datetime.now().astimezone(KST).strftime('%Y-%m-%D %H-%M-%S'))
time.sleep(10)
def task4():
print('Task 4 execute time ', datetime.now().astimezone(KST).strftime('%Y-%m-%D %H-%M-%S'))
# task 설정
t1 = PythonOperator(
task_id = 'task1', #task 이름 설정
python_callable=task1, # 불러올 함수 설정
dag=dag #dag 정보
)
t2 = PythonOperator(
task_id = 'task2', #task 이름 설정
python_callable=task2, # 불러올 함수 설정
dag=dag #dag 정보
)
t3 = PythonOperator(
task_id = 'task3', #task 이름 설정
python_callable=task3, # 불러올 함수 설정
dag=dag #dag 정보
)
t4 = PythonOperator(
task_id = 'task4', #task 이름 설정
python_callable=task4, # 불러올 함수 설정
dag=dag #dag 정보
)
# task 진행
t1 >> t2 >> t3 >> t4
Task 4개를 생성했도 각 Task는 실행되는 시간을 출력하게 만들었다.
t1 ~ t3 Task는 10초의 딜레이 기간이 존재한다.
t1 >> t2 >> t3 >> t4 이부분은 Task별 순서를 의미하고 task1부터 task4까지 차례대로 실행되게 지정하였다.
이제 DAG를 확인해보자
DAG를 실행시키면 Task가 순차적으로 실행된다. 상단에 gantt를 클릭하면 시간대를 편리하게 볼 수 있다.