Airflow와 DB 연결하기
이번 포스팅은 Airflow DAG에서 MariaDB를 연결하여 DB에 있는 데이터들을 출력해보려 한다.
Airflow 자체 기능으로 Connection 메뉴가 있지만 그냥 파이썬 코드에서 DB를 연결시켜보려한다.
우선 DB를 설치해야했는데 Docker위에 MariaDB 컨테이너를 생성하기와 그냥 로컬환경에 DB 설치하기 두가지 방식이 있었다.
처음에는 Docker위에 컨테이너를 올리는 방식으로 하려고 했는데 외부접속 세팅이나 Docker 네트워크 세팅을 하는 것 보다 그냥 로컬에 설치하는게 더 빠를 것 같아서 로컬에 설치했다.
그 다음 연결에 필요한 모듈을 불러온다.
이제 pymysql 커넥션을 만들면되는데
한가지 문제가 생겼다.
conn = pymysql.connect(host = 'localhost', user = 'sy , passwd='****', db = 'DB_name', charset='utf8', port = 3307)
분명 local 주피터에서 접속할때는 host를 localhost로 지정해도 정상적으로 연결이 됐는데 Docker 환경에서는 접속이 안되는 것이었다..!
구글링을 통해 찾아보니
그래서 결국 내 IP주소를 입력해야 접속이 가능했다.. 이 코드들이 Git에 올라가고 있다보니 숨겨야해서 yaml파일을 만들어 접속정보를 담아두기로 했다. yaml파일은 gitignore를 이용해서 감춰뒀다!
이렇게 계층 구조를 만들어 정보를 입력했고 compose.yaml파일에 이 파일을 마운트 할 수 있게 설정을 변경해줬다.
이제 컨테이너에서 이 파일이 들어가 있고 읽기만 하면된다.
# 필요 모듈 import
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import pymysql
import yaml
with open('yamls/sql_info.yaml') as f:
info = yaml.load(f, Loader=yaml.FullLoader)
# DAG 정의
dag = DAG(
'db_connect_test',
default_args={
'retries': 1,
},
schedule_interval='@once',
start_date=datetime(2023,4, 3)
)
# 함수 설정
def connect_test():
conn = pymysql.connect(host = info['MARIADB']['IP'], user = info['MARIADB']['USER'] , passwd=info['MARIADB']['PASSWD'], db = info['MARIADB']['DB'], charset='utf8', port = info['MARIADB']['PORT'])
cur = conn.cursor()
sql = 'select * from player_info'
cur.execute(sql)
result = cur.fetchall()
print(result)
conn.close()
# task 설정
t1 = PythonOperator(
task_id = 'connect_test', #task 이름 설정
python_callable=connect_test, # 불러올 함수 설정
dag=dag #dag 정보
)
# task 진행
t1
간단하게 DB에 접속해서 테이블을 select하는 쿼리로 구성해봤는데 정상적으로 작동했다.
마치며
찾아보니 실제 IP를 사용하지 않는 방식이 있었는데 docker run을 할 때 network를 'host'로 잡아두는 방법이었다.
이렇게 하면 도커 네트워크와 호스트 네트워크를 완벽하게 일치시켜서 바로 접근이 가능하다.
연결을 시키는게 목적이라 내 IP주소를 사용했지만 다음포스팅에서는 docker의 네트워크 설정을 변경하는 방법을 다뤄봐야겠다.