Airflow

Airflow Sample DAG 생성하기

Developer trainee_J ^~^ 2023. 4. 14. 16:04

이번 포스팅은 지난 포스팅에서 띄운 Airflow에 Hello World를 출력하는 sample DAG를 생성해보려고 한다. 

Airflow DAG는 파이썬 기반으로 작성되기 때문에 기존에  파이썬을 사용하는 사람이라면 큰 어려움은 없을 것 같다. 

일단 docker-compose.yaml로 Airflow를 띄우게 되면 자동적으로 dags, logs, plugins 폴더가 생성된다. 

 

  • dags 폴더

Airflow에서 실행될 파이썬 파일들이 담겨있어야하는 공간이다. 

Airflow는 주기적으로 dags를 폴더를 스캔하며 해당 폴더에 있는 DAG를 읽어 Web UI에 표시한다. 

dags폴더의 경로는 고정이 아닌 수정이 가능하며 수정은 docker-compose.yaml 상단에 있는 volumes에서 가능하다 .

 

  • logs 폴더

Airflow의 로그 파일이 적재되는 경로로 DAG별 실행로그가 쌓인다. 

dags폴더와 마찬가지로 경로 수정이 가능하다. 

 

  • plugins 폴더 

Airflow에서 외부 기능을 코어에 통합할 수 있게 해주는 폴더이다. 

단순히 폴더에 파일을 드롭하여 사용한다고한다. 실제로 사용해본적은 없다.

 

Airflow에 새로운 DAG를 생성하기 위해서는 dags폴더에 파이썬코드를 작성해야한다. 

 

작성 코드는 다음과 같다 

# 필요 모듈 import
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator

# DAG 정의
dag = DAG(
    'DAG_test',         # DAG_Name
    default_args={                  
        'retries': 1,               # 실패시 재시도 횟수
        },
    schedule_interval='@once',      # 배치 주기
    start_date=datetime(2023,4, 3)  # 배치 시작날짜 설정
)

# 함수 설정
def print_test():
    print('Hello World')

# task 설정
t1 = PythonOperator( 
    task_id = 'print_test', #task 이름 설정
    python_callable=print_test, # 불러올 함수 설정
    dag=dag #dag 정보 
)

# task 진행
t1

간단하게 설명하자면 DAG에 대한 정보를 선언해준 후 PythonOperator에서 사용할 함수를 정의한다. 

그 후 task를 생성하면서 정의된 함수를 입력해주면된다. 

 

코드를 정상적으로 작성했을 경우 Web UI에 DAG가 표시된다. 

 

 

 

이제 DAG를 실행시켜보자 DAG_test이름의 왼쪽 버튼을 클릭해주면 해당 DAG는 활성화된 상태가 되며 

스케쥴에 따라 작동하게된다. sample DAG는 스케쥴을 단 한번만 돌게 잡아놔서 1회만 작동하고 더이상 작동하지 않는다. 

Runs에 초록색 원이 생긴것을 확인할 수 있다.  정상적으로 돌았다는 뜻

이제 DAG_test를 눌러 DAG 정보를 봐보자 

DAG의 시작시간 완료시간 소요시간과 Task의 개수, 사용한 Operator의 개수가 나온다. 

초록색 네모를 클릭해보면

Task의 log를 확인할 수 있고 Hello World가 정상적으로 출력된게 확인된다. 

 

이번에는 여러개의 Task를 만들어 순서대로 실행되게 해보자 

 

# 필요 모듈 import
from datetime import datetime, time
from airflow import DAG
from airflow.operators.python import PythonOperator
from pytz import timezone

KST = timezone('Asia/Seoul')


# DAG 정의
dag = DAG(
    'DAG_test_2',         # DAG_Name
    default_args={                  
        'retries': 1,               # 실패시 재시도 횟수
        },
    schedule_interval='@once',      # 배치 주기
    start_date=datetime(2023,4, 3)  # 배치 시작날짜 설정
)

# 함수 설정
def task1():
    
    print('Task 1 execute time ', datetime.now().astimezone(KST).strftime('%Y-%m-%D %H-%M-%S'))
    time.sleep(10)
    
def task2():
    
    print('Task 2 execute time ', datetime.now().astimezone(KST).strftime('%Y-%m-%D %H-%M-%S'))
    time.sleep(10)
    
def task3():
    
    print('Task 3 execute time ', datetime.now().astimezone(KST).strftime('%Y-%m-%D %H-%M-%S'))
    time.sleep(10)
    
def task4():
    
    print('Task 4 execute time ', datetime.now().astimezone(KST).strftime('%Y-%m-%D %H-%M-%S'))


# task 설정
t1 = PythonOperator( 
    task_id = 'task1', #task 이름 설정
    python_callable=task1, # 불러올 함수 설정
    dag=dag #dag 정보 
)
t2 = PythonOperator( 
    task_id = 'task2', #task 이름 설정
    python_callable=task2, # 불러올 함수 설정
    dag=dag #dag 정보 
)
t3 = PythonOperator( 
    task_id = 'task3', #task 이름 설정
    python_callable=task3, # 불러올 함수 설정
    dag=dag #dag 정보 
)
t4 = PythonOperator( 
    task_id = 'task4', #task 이름 설정
    python_callable=task4, # 불러올 함수 설정
    dag=dag #dag 정보 
)

# task 진행 
t1 >> t2 >> t3 >> t4

Task 4개를 생성했도 각 Task는 실행되는 시간을 출력하게 만들었다. 

t1 ~ t3 Task는 10초의 딜레이 기간이 존재한다. 

t1 >> t2 >> t3 >> t4  이부분은  Task별 순서를 의미하고 task1부터 task4까지 차례대로 실행되게 지정하였다.

 

이제 DAG를 확인해보자 

4개의 Task가 생성된것이 확인된다 .

DAG를 실행시키면 Task가 순차적으로 실행된다. 상단에 gantt를 클릭하면 시간대를 편리하게 볼 수 있다.