데이터 리터러시를 위한 발자취

[Airflow] Naver API 기반 Airflow 파이프라인 구축하기 본문

데이터 분석/Airflow

[Airflow] Naver API 기반 Airflow 파이프라인 구축하기

wosole 2023. 12. 5. 16:27

 

안녕하세요-! Airflow 공부 중에 파이프라인 구축 관련 테스트하기 좋은 블로그가 있어서요.

아래 블로그 글을 참고하여 Naver api 기반 Airflow 데이터 파이프라인 구축까지 테스트 겸 실행해 보았습니다. 

 

참고 : Airflow 엄청 자세한 튜토리얼 #왕초심자용
 

Airflow 엄청 자세한 튜토리얼 #왕초심자용

에어플로우에 대한 정보를 제공하는 정확하고 전문적인 글들이 많고, 여기서는 나와 같은 초보분들을 위해 설치부터 아주 간단한 실습을 다룰 것이기 때문에 이론은 직관적으로만 설명해서 느

velog.io

처음 Airflow를 접해보는 분들도 잘 따라할 수 있게 정리해 주신 블로그라 아래 내용 보기 전에 먼저 보신다면 조금 더 이해하기 쉬울 것 같아요~! 👍 👍 👍

 

전체적인 Dag skelton과 파이프라인은 유사하지만, Airflow 버전 업데이트로 인해 더이상 사용되지 않는 operator도 있어 일부 수정한 코드로 진행한 점 참고해 주세요-! 

 

원본 글과 수정 또는 차이나는 부분은 '*️⃣00000' 이런 식으로 표기할 예정이니 같이 참고 해주세요~

재구성한 네이버 API 기반 파이프라인 구축 개념

 

재구성한 데이터 파이프라인 구축 개념은 위와 같으며, 상기 순서 순으로 정리하도록 하겠습니다. 😊

 

그리고 사용한 Tool별 버전은 아래와 같습니다.

- MySQL :  Ver 8.0.35-0ubuntu0.22.04.1 for Linux on x86_64 ((Ubuntu))

- Python : Python 3.10.12

- Airlfow : v2.7.3

# 네이버 API 기반 Airflow 파이프라인 구축하기
0. Dag Skelton 생성하기 : naver-search-pipeline
1. SQLExecuteQueryOperator 기반 테이블 생성 : creating_table
2. HttpSensor 기반 API 확인 : is_api_available
3. PythonOperator 기반 데이터 수집 : crawl_naver
4. PythonOperator 기반 데이터 전처리 및 MySQL DB 저장 : preprocess_result
5. naver-search-pipeline 파이프라인 실행하기 

0. Dag skelton 생성하기 

➡️ 해당 과정의 경우, 참고 블로그와 동일하게 작성되었음

➡️ 기본 라이브러리는 아래와 같이 추가함


  
from datetime import datetime
from airflow import DAG
from pandas import json_normalize
import json
import pandas as pd
import requests
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.http.operators.http import HttpOperator
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook

 

1. SQLExecuteQueryOperator 기반 테이블 생성 : creating_table

- 원본 글에는 Sqliteopeartor 기반 작성되었지만, Airflow webserver 메인화면에서 MySQL 또는 PostgreSQL을 권장하고 있어 SQLExecuteQueryOperator를 이용해서 테이블을 생성해 줌

 

 *️⃣Sqliteopeartor vs SQLExecuteQueryOperator 차이점

      ➡️ conn_id 파라미터가 다름

      -  Sqliteopeartor  → sqlite_conn_id

      -  SQLExecuteQueryOperator → conn_id


  
# 1. SQLExecuteQueryOperator 기반 테이블 생성하기
creating_table = SQLExecuteQueryOperator(
task_id = 'creating_table',
conn_id = 'db_sql', # admin - connections 등록해줘야 함
sql = '''
CREATE TABLE IF NOT EXISTS naver_search_result(
title TEXT,
address TEXT,
category TEXT,
description TEXT,
link TEXT,
mapx INT,
mapy INT
)
'''
)

 

*️⃣Admin - Connections - db_sql 생성하는 방법

- Arirflow webserver 접속 후, 상단 메뉴 'Admin - Connections - ➕' - 아래 이미지와 같이 입력

- Connection Type에 MySQL이 없을 시, 아래 명령어 실행하여 추가하기

➡️Airflow 공식 문서 가이드 - Setting up a MySQL Database 참고


  
# 1. MySQl 드라이버 설치
pip install apache-airflow-providers-mysql
pip3 install mysqlclient
# 2. 데이터베이스 생성
CREATE DATABASE airflow CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
# 3. 계정 생성 및 권한 부여
CREATE USER 'airflow' IDENTIFIED BY 'password';
GRANT ALL PRIVILEGES ON airflow.* TO 'airflow';
# airflow.cfg의'sql_alchemy_conn' 수정하기
cd airflow # airflow 경로로 이동
ls # airflow.cfg 유무 확인
nano airflow.cfg # airflow.cfg 확인
## ctrl+w로 sql_alchemy_conn 검색/이동 → 예시와 같이 수정하기
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
# 예시 : mysql+mysqldb://airflow:password@<host>:3306/airflow
# sql_engine_collation_for_ids 수정하기
# 최초 수정이면 해당 값 주석 처리 되어있음, 위와 동일하게 검색/이동 후 주석 처리 제외, 수정하기
sql_engine_collation_for_ids = utf8mb3_bin

Ariflow webserver 내 MySQL connection 추가하기


2. HttpSensor 기반 API 확인 : is_api_available

- 원본 글과 동일하게 작성되었으나, query(검색)을 스타벅스로 수정함

- 1번과 동일하게 http_conn_id를 위해 'Admin - Connections - ➕'로 naver_search_api라는 커넥션 생성 필요

➡️세부 정보는 원본 블로그 참고


  
# 2. HTTP operator 기반 API 가져오기
is_api_available = HttpSensor(
task_id = 'is_api_available',
http_conn_id= 'naver_search_api',
endpoint= 'v1/search/local.json',
# 요청 헤더
headers= {
'X-Naver-Client-Id' : f'{NAVER_CLI_ID}',
'X-Naver-Client-Secret': f'{NAVER_CLI_SECRET}',
},
request_params= {
'query' : '스타벅스',
'display' : 5
},
response_check = lambda response: response.json() # 응답확인
)

3. PythonOperator 기반 데이터 수집 : crawl_naver

 

*️⃣ 네이버 검색 Open API 문서에 따라, 스타벅스 관련 검색 결과를 50개 수집하도록 코드 수정

- display는 최대 5개씩만 검색 결과 값 출력함(기본값 : 1)

- SimpleHttpOperator 대신 def 함수와 PythonOperator을 사용하여 데이터 가져옴


  
# 3-1. 크롤러 api 수정코드(추가)
def fetch_naver_search_results(ti):
base_url = "https://openapi.naver.com/v1/search/local.json"
headers = {
'X-Naver-Client-Id': NAVER_CLI_ID,
'X-Naver-Client-Secret': NAVER_CLI_SECRET,
}
params = {
'query': '스타벅스',
'display': 5,
'start': 1
}
total_results = []
for _ in range(10): # 총 50개의 결과 가져오기
response = requests.get(base_url, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
total_results.extend(data['items'])
params['start'] += params['display']
else:
break # 요청이 실패하면 반복 중단
# 결과를 XCom에 저장
ti.xcom_push(key='naver_search_results', value=total_results)
# 3-2. 기존 SimpleHttpOperator 대신 PythonOperator 사용
crawl_naver = PythonOperator(
task_id='crawl_naver',
python_callable=fetch_naver_search_results,
dag=dag
)

4. PythonOperator 기반 데이터 전처리 및 MySQL DB 저장 : preprocess_result

*️⃣원본 글과 같이 CSV 파일 저장은 정상 동작했지만, BashOperator 기반 데이터 저장 과정에서 실패함

  ➡️원본 글은 Sqlite 기반 BashOperator 동작, SQLExecuteQueryOperator로 변경 시 bash_command 에러 지속 발생

  ➡️로그 조회 결과, 따옴표의 문제라고 했지만 아무리 수정해도 동일한 로그 오류 반복

  ➡️문제 해결을 위해 CSV 파일 저장 과정 제외, API 데이터를 바로 MySQL에 저장하는 과정으로 변경함

  ➡️ 수집된 데이터 확인 결과, title 값에서 <b>스타벅스</b>0000점 이런 식으로 결과값이 저장되어 replace 함수 사용

- MysqlHook : mysql_conn_id 식별자를 사용하여 Airflow의 connections에서 설정된 MySQL 데이터베이스 연결 설정 참조

- 4번 과정이 종료된 후, Dag별로 의존성을 설정하여 파이프라인 구축 완료함


  
# 4-1. 수집한 데이터 전처리하기
__all__ = ['preprocessing']
def preprocessing(ti):
search_result = ti.xcom_pull(task_ids='crawl_naver', key='naver_search_results')
print(f"search_result: {search_result}") # 로깅을 추가하여 search_result의 내용 확인
#items = search_result[0]['items']
normalized_items = pd.json_normalize([
{ 'title': item['title'].replace('<b>', '').replace('</b>', ''),
'address': item['address'],
'category': item['category'],
'description': item['description'].replace('<b>', '').replace('</b>', ''),
'link': item['link'],
'mapx' : item['mapx'],
'mapy' : item['mapy']
} for item in search_result
])
mysql_hook = MySqlHook(mysql_conn_id='db_sql')
for index, item in normalized_items.iterrows():
sql = """
INSERT INTO naver_search_result (title, address, category, description, link, mapx, mapy)
VALUES (%s, %s, %s, %s, %s, %s, %s);
"""
params = (item['title'], item['address'], item['category'], item['description'], item['link'], item['mapx'], item['mapy'])
mysql_hook.run(sql, parameters=params)
# 4-2. 검색 결과 전처리, MySQL 데이터베이스에 저장
preprocess_result = PythonOperator(
task_id="preprocess_result",
python_callable=preprocessing # 전처리 함수를 호출
)
# 파이프라인 구성하기
creating_table >> is_api_available >> crawl_naver >> preprocess_result

5. naver-search-pipeline 파이프라인 실행하기

- 상기 과정까지 모두 완료하고 나면 아래처럼 Airflow 내 파이프라인이 연결된 것을 확인할 수 있음

Airflow webserver 내 연결된 파이프라인

 

- 해당 Dag 내 우측 상단 Trigger Dag(▶️) 실행하기

➡️정상 동작까지 아래와 같은 트러블 슈팅 발생함

# 트러블 슈팅
🔸반복적으로 failed이 발생한 prepeocess_result의 로그를 살펴본 결과,
Creating_table 과정에서 생성된 컬럼명과 Insert 할 컬럼명이 오타로 인해 서로 불일치하면서 발생한 에러였음
🔸
is_api_available 에러의 경우, HTTP Connection이 초기화되면서 발생한 에러, 재연결하면서 에러 해결

정상 동작할 때까지 로그 반복 검토 및 Dag 재실행 시도

 

- MySQL 테이블 조회 결과, 아래와 같이 정상적으로 데이터가 Insert 된 것을 확인함

 

✅파이프라인 소스코드(Full)

더보기

  
# 기본 라이브러리
from datetime import datetime
from airflow import DAG
from pandas import json_normalize
import json
import pandas as pd
import requests
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.http.operators.http import HttpOperator
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
# default 생성
default_args = {
'start_date': datetime(2023, 12, 4)
}
NAVER_CLI_ID = '발급받은 CLIENT_ID'
NAVER_CLI_SECRET = '발급받은 CLIENT_SECRET_ID'
# DAG 생성
with DAG(
dag_id = 'naver-search-pipeline',
schedule= '@daily',
default_args=default_args,
tags= ['naver','search','local','api','pipeline'],
catchup= False) as dag:
# 1. SQLExecuteQueryOperator 기반 테이블 생성하기
creating_table = SQLExecuteQueryOperator(
task_id = 'creating_table',
conn_id = 'db_sql',
sql = '''
CREATE TABLE IF NOT EXISTS naver_search_result(
title TEXT,
address TEXT,
category TEXT,
description TEXT,
link TEXT,
mapx INT,
mapy INT
)
'''
)
# 2. HTTP operator 기반 API 가져오기
is_api_available = HttpSensor(
task_id = 'is_api_available',
http_conn_id= 'naver_search_api',
endpoint= 'v1/search/local.json',
# 요청 헤더
headers= {
'X-Naver-Client-Id' : f'{NAVER_CLI_ID}',
'X-Naver-Client-Secret': f'{NAVER_CLI_SECRET}',
},
request_params= {
'query' : '스타벅스',
'display' : 5
},
response_check = lambda response: response.json() # 응답확인
)
# 3-1. 크롤러 api 수정코드(추가)
def fetch_naver_search_results(ti):
base_url = "https://openapi.naver.com/v1/search/local.json"
headers = {
'X-Naver-Client-Id': NAVER_CLI_ID,
'X-Naver-Client-Secret': NAVER_CLI_SECRET,
}
params = {
'query': '스타벅스',
'display': 5,
'start': 1
}
total_results = []
for _ in range(10): #총 50개의 결과 가져오기
response = requests.get(base_url, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
total_results.extend(data['items'])
params['start'] += params['display']
else:
break # 요청이 실패하면 반복 중단
# 결과를 XCom에 저장
ti.xcom_push(key='naver_search_results', value=total_results)
# 3-2. 기존 SimpleHttpOperator 대신 PythonOperator 사용
crawl_naver = PythonOperator(
task_id='crawl_naver',
python_callable=fetch_naver_search_results,
dag=dag
)
# 4-1. 수집한 데이터 전처리하기
__all__ = ['preprocessing']
def preprocessing(ti):
search_result = ti.xcom_pull(task_ids='crawl_naver', key='naver_search_results')
print(f"search_result: {search_result}") # 로깅을 추가하여 search_result의 내용 확인
#items = search_result[0]['items']
normalized_items = pd.json_normalize([
{ 'title': item['title'].replace('<b>', '').replace('</b>', ''),
'address': item['address'],
'category': item['category'],
'description': item['description'].replace('<b>', '').replace('</b>', ''),
'link': item['link'],
'mapx' : item['mapx'],
'mapy' : item['mapy']
} for item in search_result
])
mysql_hook = MySqlHook(mysql_conn_id='db_sql')
for index, item in normalized_items.iterrows():
sql = """
INSERT INTO naver_search_result (title, address, category, description, link, mapx, mapy)
VALUES (%s, %s, %s, %s, %s, %s, %s);
"""
params = (item['title'], item['address'], item['category'], item['description'], item['link'], item['mapx'], item['mapy'])
mysql_hook.run(sql, parameters=params)
# 4-2. 검색 결과 전처리, MySQL 데이터베이스에 저장
preprocess_result = PythonOperator(
task_id="preprocess_result",
python_callable=preprocessing # 수정된 전처리 함수를 호출
)
# 파이프라인 구성하기
creating_table >> is_api_available >> crawl_naver >> preprocess_result

에러 해결 과정에서 로그와 코드를 봐도 해결이 안 돼서 몇 시간 동안 GPT와 씨름했지만 결국 파이프라인 동작까지 성공했습니다 -! 😭 

 

금방 할 줄 알았는데 생각보다 오래 걸렸네요..ㅎㅎ

 

여기까지 만드는 과정에서 MySQL Inset 된 데이터가 dbeaver에서 보이지 않아 Connection setting에서도 문제가 많았는데요..ㅎㅎ

이건 다음 포스팅에서 간단하게 다루도록 하겠습니다-! 

 

긴 글 읽어주셔서 감사합니다.

 

 

 

Comments