Airflow

주별 집계 데이터 적재 DAG 만들기

Developer trainee_J ^~^ 2023. 6. 19. 18:34

이번 포스팅은 매주 월요일마다 지난주의 일별 데이터를 집계하여 DB에 저장하는 DAG를 만드는 과정에 대해 작성해보려고한다.  지난 포스팅에서 만들었던 로직을 그대로 DAG에 옮기는 거라서 큰 어려움은 없을듯하다. 

 

일자별 데이터를 주별 데이터로 변환하기

이번 포스팅은 일자별로 적재된 타자, 투수 데이터를 주별 최우수 선수를 가려내기 위한 주별 데이터로 변환해보려고한다. 주별 집계를 하기전 필요한 지표부터 추려봤다. 타자의 경우는 타석,

developer-trainee-j.tistory.com

 

이번 DAG는 일배치 DAG와 마찬가지로 하나의 PythonOperator를 사용할 것이고, 모든 주차 집계가 아니라 지난주 데이터만 집계를 해야하기 때문에 약간의 쿼리 수정이 있다. 

 

# # 필요 모듈 import
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import pymysql
import yaml
import pandas as pd
from sqlalchemy import create_engine
from pytz import timezone

with open('yamls/sql_info.yaml') as f:

    info = yaml.load(f, Loader=yaml.FullLoader)

host = info['MARIADB']['IP']
user = info['MARIADB']['USER']
passwd=info['MARIADB']['PASSWD']
db = info['MARIADB']['DB']
port = info['MARIADB']['PORT']

# DAG 정의
dag = DAG(
    'data_agg_weekly',        
    default_args={                  
        'retries': 1,               
        },
    schedule_interval='0 21 * * 1',      
    start_date=datetime(2023,4, 3),
    catchup=False  
)

 

우선 DAG 정의 및 DB info를 가져오기 위한 부분이다. 'data_agg_weekly' DAG는 매주 월요일 오후 9시에 실행된다. 

daily DAG는 기존 오후 9시 45분에서 8시 45분으로 변경했다. 주별 배치가 동작하기 전 일요일 데이터가 적재되어야 했기 때문이다. 이부분은 나중에 trigger DAG 형태로 묶어서 time dependency를 잡아 둘 예정이다. 

아무튼 DAG 선언을 완료하고 코드작성을 시작했는데, 이번 DAG에 가장 중요한 추출 기간 설정 작업을 진행했다. 

# 추출 기간 설정 (지난주 월요일 ~ 금주 월요일)
today = datetime.now(timezone('Asia/Seoul')) 
s_date = (today - timedelta(7)).strftime("%Y%m%d")
e_date = (today - timedelta(1)).strftime("%Y%m%d")
week = (today - timedelta(7)).isocalendar()[1]
print(f'{week}주차 ({s_date} ~ {e_date}) 데이터 집계 시작')

배치가 시작된 날짜(today)기준으로 7일을 빼줘서 지난주 월요일을 s_date로 지정했고, e_date는 일요일로 지정했다. 

원래는 e_date를 today 변수로 사용하려했는데 시즌 막바지에는 월요일도 경기가 있기 때문에 일요일로 잡아뒀다. 

이제 s_date와 e_date 변수를 사용해서 쿼리를 작성해줬다. 

conn = pymysql.connect(host = host, user = user, passwd=passwd, db = db, charset='utf8', port = port,cursorclass=pymysql.cursors.DictCursor)
cur = conn.cursor()
sql = f'''select * from batting_info
where 1=1
and yyyymmdd between {s_date} and {e_date}
'''
cur.execute(sql)

result = cur.fetchall()
batting = pd.DataFrame(result)

sql = f'''select * from pitching_info
where 1=1
and yyyymmdd between {s_date} and {e_date}
'''
cur.execute(sql)

result = cur.fetchall()
pitching = pd.DataFrame(result)
conn.close()

크게 바뀐것 없이 그냥 날짜만 between으로 잡아주고 끝..! 이제 남은 부분은 지난 포스팅에서 작성한 로직을 그대로 복붙해서 완성했다. 

 

마치며

이번 포스팅은 지난번에 작성한 로직을 그대로 옮기는거라서 큰 어려움은 없었다. 

확실히 django 배포는 aws를 이용해서 배포하다보니 너무 편한데 airflow는 서버 메모리의 한계로 메인컴퓨터에서 사용하고있다. 그렇다보니 내가 PC를 꺼두면 배치가 돌지 않아서 data에 구멍이 생기는데 이 문제를 해결하기 위해서 airflow context 파라미터를 사용하여 스케줄링된 날짜를 받아와서 적재하는 DAG를 만들고 싶었다. 이렇게 구성하면 배치를 다른 날짜에 돌려도 스케줄링된 날짜를 파라미터로 받아오기때문에 수동으로 돌려줄 필요가 없어지기 때문인데 생각보다 잘 되지 않아서 일단 보류했다. 아무리 생각해봐도 airflow를 aws에 올리지못한게 너무 아쉽다..

다음 포스팅은 daily DAG와 weeklyDAG를 trigger로 묶어서 time dependency를 잡아주는 작업을 다뤄보려고한다.