airflow 8

Airflow Trigger 스케줄링

그동안 Airflow를 이용하여 DAG를 작성하면서 불편함을 느낀점이 하나 있었다. 바로 Time 스케줄에 관한 것인데 지금 토이프로젝트에서 사용하는 데이터의 구조상 반드시 선행되어야하는 DAG가 존재했고 예를들면 일자별 Data 크롤링 등등 선행 DAG가 작동한 이후에 신규 선수 추출과 선수의 생년월일 크롤링 DAG가 작동해야했다. 지금까지는 눈대중으로 Time 스케줄을 잡았었지만 비효율적인 부분이 몇개 있었다. 1. 시간적 손실 현재 매일 동작하는 daily DAG는 20시 45분에 작동을 시작하고 소요시간은 대략 3분 정도이다. 넉넉잡아서 5분의 여유를 두고 다음 DAG의 작동시간은 20시 50분으로 잡으면 크게 문제는 없을 것 처럼 보이나, 크롤링 사이트의 변화나 time 딜레이가 있을 경우 5분..

Airflow 2023.07.27

Docker 환경에서 크롤링하기(동적 웹)

프로야구 데이터 시각화 프로젝트를 진행하면서, statiz.co.kr 웹 페이지의 박스스코어 데이터를 크롤링했었다. 그런데 데이터 무결성 검사를 해보니 대략 10%의 데이터가 비어있는것을 확인했다. 원인 분석을 위해 일단 크롤링 DAG부터 확인했는데 정상적으로 작동하고 있었고, 매일매일 데이터가 적재되고있었다. 비어있는 데이터를 추적해보니 경기 진행일 전체의 데이터가 아니라 경기하나의 데이터만 빠져있는 것을 확인했고, 실제 웹 페이지에서도 확인해보니 박스 스코어 데이터가 비어있었다.. 사실 저번달에도 이런 경우를 확인했는데 월 마감이 될때마다 데이터가 들어오는걸 확인하고 월 재적재 DAG를 만들어 놨었는데 아예 개막전 경기부터 비어있었다니 정말 충격적이었다.. 어쩔 수 없이 데이터 크롤링하는 웹을 바꾸기..

Docker 2023.06.30

주별 집계 데이터 적재 DAG 만들기

이번 포스팅은 매주 월요일마다 지난주의 일별 데이터를 집계하여 DB에 저장하는 DAG를 만드는 과정에 대해 작성해보려고한다. 지난 포스팅에서 만들었던 로직을 그대로 DAG에 옮기는 거라서 큰 어려움은 없을듯하다. 일자별 데이터를 주별 데이터로 변환하기 이번 포스팅은 일자별로 적재된 타자, 투수 데이터를 주별 최우수 선수를 가려내기 위한 주별 데이터로 변환해보려고한다. 주별 집계를 하기전 필요한 지표부터 추려봤다. 타자의 경우는 타석, developer-trainee-j.tistory.com 이번 DAG는 일배치 DAG와 마찬가지로 하나의 PythonOperator를 사용할 것이고, 모든 주차 집계가 아니라 지난주 데이터만 집계를 해야하기 때문에 약간의 쿼리 수정이 있다. # # 필요 모듈 import f..

Airflow 2023.06.19

Docker 네트워크 설정 변경하기

지난 포스팅에서 Docker 네트워크를 잡아주지않고 default설정으로 올렸더니 localhost로 DB접근이 불가능했었다. 결국 내 IP를 입력하여 문제를 해결하긴 했지만, 네트워크설정을 변경하여 문제를 해결하는 방법이 있다고 해서 시도해보려고한다. 우선 Docker 네트워크 종류부터 알아보자 bridge : docker0라는 네트워크 브릿지에 각 컨테이너마다 고유한 네트워크 영역이 하나씩 생성되며, docker0브릿지에 컨테이너의 네트워크가 하나씩 바인딩 되는 구조 host : 컨테이너가 독립적인 네트워크 영역을 갖지 않고 호스트와 네트워크를 함께사용하게 된다. overlay : 네트워크 안에 있는 여러개의 docker daemon간의 통신을 관리하는 가상의 네트워크 내 컨테이너가 bridge 네트..

카테고리 없음 2023.04.19

Airflow와 DB 연결하기

이번 포스팅은 Airflow DAG에서 MariaDB를 연결하여 DB에 있는 데이터들을 출력해보려 한다. Airflow 자체 기능으로 Connection 메뉴가 있지만 그냥 파이썬 코드에서 DB를 연결시켜보려한다. 우선 DB를 설치해야했는데 Docker위에 MariaDB 컨테이너를 생성하기와 그냥 로컬환경에 DB 설치하기 두가지 방식이 있었다. 처음에는 Docker위에 컨테이너를 올리는 방식으로 하려고 했는데 외부접속 세팅이나 Docker 네트워크 세팅을 하는 것 보다 그냥 로컬에 설치하는게 더 빠를 것 같아서 로컬에 설치했다. 그 다음 연결에 필요한 모듈을 불러온다. import pymysql 이제 pymysql 커넥션을 만들면되는데 한가지 문제가 생겼다. conn = pymysql.connect(ho..

Airflow 2023.04.17

Docker 환경에서 크롤링하기

야구 데이터를 지속적으로 얻기 위해서 http://www.statiz.co.kr/main.php 사이트에서 경기 정보를 크롤링하여 데이터를 적재하는 DAG를 작성하는과정중 예상치 못한 문제가 발생했다. 크롤링을 제대로 해본적이 없어서 이것 저것 찾아가면서 찾은 방법중 하나는 selenium 모듈을 이용한 방식이었다. 모듈을 import 한 뒤 크롬 웹드라이버를 이용해서 크롤링 샘플 코드를 작성했고 정상적으로 작동했다. from bs4 import BeautifulSoup import pandas as pd from selenium import webdriver import re from selenium.webdriver.common.by import By from datetime import dateti..

Docker 2023.04.16

Airflow Sample DAG 생성하기

이번 포스팅은 지난 포스팅에서 띄운 Airflow에 Hello World를 출력하는 sample DAG를 생성해보려고 한다. Airflow DAG는 파이썬 기반으로 작성되기 때문에 기존에 파이썬을 사용하는 사람이라면 큰 어려움은 없을 것 같다. 일단 docker-compose.yaml로 Airflow를 띄우게 되면 자동적으로 dags, logs, plugins 폴더가 생성된다. dags 폴더 Airflow에서 실행될 파이썬 파일들이 담겨있어야하는 공간이다. Airflow는 주기적으로 dags를 폴더를 스캔하며 해당 폴더에 있는 DAG를 읽어 Web UI에 표시한다. dags폴더의 경로는 고정이 아닌 수정이 가능하며 수정은 docker-compose.yaml 상단에 있는 volumes에서 가능하다 . lo..

Airflow 2023.04.14

Docker - Airflow

이번에는 Docker위에 Airflow를 띄워보려고한다. 우선 Airflow란? - airbnb에서 만든 워크플로우 오픈소스 플랫폼이다. - python 코드로 구성되어 동적 파이프라인 생성이 가능하다. Airflow 구성요소 - webserver : 웹 UI로 DAG를 시각화하고 DAG를 실행시키거나 중단, 결과 로그를 확인할 수 있는 인터페이스를 제공한다. - scheduler : DAG에 설정된 스케줄에 맞춰 worker에 DAG 실행을 예약해둔다. - worker : DAG의 Task를 실질적으로 실행하는 곳 Executor에 따라 확장이 가능하다. - Metastore : Airflow에 있는 DAG, Task, 실행 log등을 저장하는 곳 - Executor : 스케쥴러와 같이 동작하는 구성요..

Docker 2023.04.12