최근 Airflow 이 3.0으로 판올림 되었습니다. Web UI도 세부 탭으로 변경되고 유틸적인 부분이 더 늘어났습니다. docker를 이용하여 airlfow 3 설치도 해보고 실습 예제도 구성해볼겸 다음날 온도데이터를 supabase(PostgresSQL) 에 저장하는 예제를 작성해보겠습니다. airflow, 기상 API, discord, supabase 연결을 각각 테스트 해보고 종합적인 파이프라인도 정리해보겠습니다.
1. 파이프라인 소개
3일간 단기 특보를 이용하여 구로구의 내일 정오 날씨를 알려주는 앱을 만든다고 생각해봅시다. 이를 반영하기 위하여 이 파이프라인은 매일 오후 11시에 실행되어 그다음날 구로구의 정오 온도(12시)를 저장하고 알림을 주며, 데이터에 저장합니다.

본 글에서는 (1) 기상 API 데이터 처리 (2) Airflow 설치 및 실행 (3) Supabase 저장 (4) Discord 알림 각 단계를 간단하게 세팅하는 것을 3단원에서 소개합니다. 마지막으로 최종 파이프라인을 위한 환경세팅과 코드를 4단원에서 기술합니다. 데이터는 기상청 API 허브의 예특보 - 4. 동네예보 - 3. 단기예보 조회를 활용할 예정입니다. 3일간의 기상 상황에 대한 정보로 특정 위치(x,y)좌표를 기준으로 기온, 풍속 하늘 상태 등에 대한 정보를 전달해 줍니다.


2. Pre-requisite
2.1. 기술 스택과 지식
기술 스택
- Python 3.10
- Airflow 3.1.0
- Visaul studio Code
- Docker (Desktop)
- Discord
필요한 지식
- Python
- Linux & Docker 가상환경 설정
- Airflow DAG 구성
- [윈도우] WSL
2.2. Airflow 3.1.0. 설치: Docker compose 이용
출처: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
- 윈도우의 경우 wsl 설치 후 운영체제 진입하여 실행
#윈도우 Command
#배포판 조회
wsl -l -v
#배포판 진입
wsl -d {배포판명}
- 맥북의 경우 작업 디렉토리 생성 후 Terminal에서 실행
#터미널
mkdir airflow3 && cd airflow3
#docker-compose.yaml 로드
#docker로 airflow 설정시 정보가 담김
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/3.1.0/docker-compose.yaml'
#기본 디렉토리 설정 및 uid 설정
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env
#airflow.cfg 설정파일 초기화
docker compose run airflow-cli airflow config list
#데이터베이스 초기화
docker compose up airflow-init
#도커 실행
docker compose up
#터미널 새로 열어 서비스 확인
docker ps
- docker ps 출력 내용

- [선택] Docker Desktop 에서 contanier 확인

[Airflow3 UI 변경 점]
이제 바로 Dags 탭이 나오지 않고 Home 화면이 보이며, Dag 성공,실패 유무가 먼저 확인 가능. 각 컨테이너(MetaDatabase, Scheduler, Triggerer, Dag Processor)의 health check 가 상단에 나옵니다. 그 밖에도 Browse - Xcom 에서 Xcom 데이터가 보이며, Admin에서 환경변수(Variable)설정, 연결(Connections) 관리가 가능합니다.

개인적으로 Xcom은 WebUI로 보는 것보다 DBeaver 같이 어플리케이션으로 조회하는 것이 사용성 측면에서는 더 좋았습니다. 이 경우 airflow.cfg 파일에서 MetaDatabase의 포트를 외부(5432)로 열어줘야 연결 가능합니다.
#도커 컴포즈 내리기
docker compose down
# airflow.cfg파일변경
postgres:
# ... 기존 설정
ports:
- "5432:5432" # 이 줄 추가
# 도커 재시작
docker compose up

3. 단위 테스트 실습
3.1 Airflow DAG 실행 테스트
- 다음코드를 /dags 에 저장, 30초 정도 뒤에 Web UI Dag탭에서 조회가능
- [꿀팁] Scheduler가 dags 폴더를 스캔하는 시간이 기다리기 싫다면 apiserver 컨테이너로 진입하여 강제 스캔
docker exec -it {컨테이너명} bash
airflow dags reseriealize
- [꿀팁] Scheduler가 dags 폴더를 스캔하는 시간이 기다리기 싫다면 apiserver 컨테이너로 진입하여 강제 스캔
- Dags 파이썬 코드
from datetime import datetime, timedelta
# 내장함수 -> 펜듈럼
from airflow import DAG
from airflow.operators.python import PythonOperator
# Operator 종류(Bash, Python, Email)
def print_hello():
print("Hello World from Airflow!") #로그 출력
return "Hello World!" #Xcom: aiflow 저장하는 변수 -> 다른 dag/task 받아서쓸수잇음
with DAG(
dag_id='hello_world_dag', # GUI에 출력되는 이름명
start_date=datetime(2025, 1, 1), #시작
schedule=timedelta(days=1), #실행주기
catchup=False, #최초 실행했을때 이전날짜부터 실행여부
tags=['example', 'hello'], #태그(검색용이))
) as dag:
hello_task = PythonOperator( # t1,t2 선언도 가능
task_id='print_hello_world',
python_callable=print_hello, #() 소괄호가 없음
)
Dag 탭에 들어가 hello_world_dag를 검색하여 우측 상단 Trigger를 눌러줍니다. 여기서 Airflow3.0 변경점으로 Single Run과 Backfill 선택할 수 있는 팝업이 뜹니다. Single Run을 클릭합니다.

정상적으로 실행되었다면 Task에서의 print는 로그에 Return은 xcom에서 조회할 수 있습니다.


3.2. 기상 API 가져오기 및 전처리 테스트
기상 API를 가져오는 파이썬 스크립트 입니다.
- get_weather: request하는 함수
- parser_weather_xml: xml 자료를 정리해주는 함수
- convert_to_dateframe: Pandas DataFrame으로 변환시켜주는 함수
import requests
import xml.etree.ElementTree as ET
import pandas as pd
import datetime
import json
import os
from dotenv import load_dotenv
def get_weather(api_key, today):
'''
기상청 동네예보: 단기예보를 조회하는 함수
'''
base_url = "https://apihub.kma.go.kr/api/typ02/openApi/VilageFcstInfoService_2.0/getVilageFcst"
params = {
"pageNo": 1,
"numOfRows": 1000,
"dataType": "XML",
"base_date": today,
"base_time": "0500",
"nx": 58,
"ny": 125,
"authKey": api_key
}
response = requests.get(base_url, params=params)
return response.text
def parse_weather_xml(xml_string):
"""XML 문자열을 파싱하여 <item> 요소의 데이터를 리스트(딕셔너리)로 반환"""
data_list = []
try:
root = ET.fromstring(xml_string)
items_element = root.find('./body/items')
if items_element is None:
return data_list
for item in items_element.findall('item'):
item_data = {}
for child in item:
item_data[child.tag] = child.text
data_list.append(item_data)
except ET.ParseError:
print("XML 파싱 오류: 수신된 데이터가 유효한 XML 형식이 아닙니다.")
except Exception as e:
print(f"데이터 파싱 중 예상치 못한 오류 발생: {e}")
return data_list
def convert_to_dataframe(xml_data, today):
'''
XML 데이터를 불러와 전처리를 함께하는 함수
'''
weather_data_list = parse_weather_xml(xml_data)
if weather_data_list:
df = pd.DataFrame(weather_data_list)
if 'fcstValue' in df.columns:
df['fcstValue'] = pd.to_numeric(df['fcstValue'], errors='coerce')
cat_map = {
"POP": "강수확률",
"UUU": "동서바람성분",
"VVV": "남북바람성분",
"VEC": "풍향",
"WSD": "풍속",
"SKY": "하늘상태",
"PTY": "강수형태",
"WAV": "파고",
"PCP": "강수량",
"REH": "습도",
"SNO": "적설",
"TMX": "최고기온",
"TMN": "최저기온",
"TMP": "기온",
}
df['category'] = df['category'].map(cat_map)
df['loc'] = '구로구'
target_categories = ['기온']
cond1 = df['category'].isin(target_categories)
cond2 = df['fcstTime'] == '1200'
cond3 = df['fcstDate'] == today
value = df[cond1 & cond2 & cond3]['fcstValue']
return value
def main():
"""메인 실행 함수"""
# 환경변수 로드
load_dotenv()
# 환경변수에서 API 키와 Webhook URL 가져오기
api_key = os.getenv('WEATHER_API_KEY')
webhook_url = os.getenv('DISCORD_WEBHOOK_URL')
# 환경변수 확인
if not api_key:
raise ValueError("WEATHER_API_KEY 환경변수가 설정되지 않았습니다.")
if not webhook_url:
raise ValueError("DISCORD_WEBHOOK_URL 환경변수가 설정되지 않았습니다.")
# 오늘 날짜
today = datetime.datetime.now().strftime('%Y%m%d')
# 날씨 데이터 가져오기
print(f"{today} 날씨 데이터를 가져오는 중...")
xml_data = get_weather(api_key, today)
# 데이터 변환
value = convert_to_dataframe(xml_data, today)
print(f"오늘 구로구의 기온은 {value.values[0]}도 입니다.")
if __name__ == "__main__":
main()
```
**사용 방법:**
1. **`.env` 파일 생성** (프로젝트 루트 디렉토리에):
```
WEATHER_API_KEY=your_api_key_here
DISCORD_WEBHOOK_URL=your_webhook_url_here
3.3. Python -> Discord: Webhook 설정 테스트
- 채널 생성 - 일반 채널 오른쪽 톱니바퀴 클릭
- 연동 - 웹후크 - 새 웹후크 - 봇 생성 - 웹 후크 URL 복사

- Python으로 메시지 보내기
import requests
import json
def send_discord_message(webhook_url, message):
"""
docstring
디스코드 봇으로 메시지를 보내는 함수입니다.
discord 채널에서 생성한 webhook_url과 메세지를 전달받습니다.
"""
data = {
"content": message,
"username" :"디스코드 봇"
}
response = requests.post(
webhook_url,
data = json.dumps(data),
headers = {"Content-Type":"application/json"}
)
if response.status_code == 204:
print('메시지 전달 완료!')
print(response.text)
else:
print('문제발생!')
print(response.status_code)
if __name__ == "__main__":
webhook_url = 'https://discord.com/api/webhooks/1430385186058801214/g45PEv-CE790IOOLnmNAKtT4m0VnW-BpMXaAz4kAiILcJsOZLceGWNwjoMYchNnR3mcO'
# message = "테스트!!"
message = input()
send_discord_message(webhook_url,message)

3.4. Python -> Supabase 테스트
1. Supabase 프로젝트 생성
- 샘플데이터 삽입: SQL Editor: DDL, DML 실행
- 예제링크: https://www.postgresql.org/docs/current/dml-insert.html
CREATE TABLE products (
product_no integer,
name text,
price numeric
);
-- 위치 전달인자처럼 컬럼의 순서를 맞춰 넣기
INSERT INTO products VALUES (1, 'Cheese', 9.99);
-- 데이터 여러줄 넣기
INSERT INTO products (product_no, name, price) VALUES
(1, 'Cheese', 9.99),
(2, 'Bread', 1.99),
(3, 'Milk', 2.99);
2. Python - supabase 연결을 위한 환경 변수 가져오기
- Data API 키, Public API 설정
- 왼쪽 하단 Project Setting - Data API - URL 복사: supabase_url 로 저장
- 왼쪽 탭 API Keys - anon public 키 복사: supabase_key 로 저장

- 로컬 .env 파일 저장
# .env
SUPABASE_URL=https://your-project.supabase.co
SUPABASE_KEY=your-key
- Python 코드 실행
from dotenv import load_dotenv
import os
from supabase import create_client
import pandas as pd
load_dotenv()
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
# print(SUPABASE_URL,'\n', SUPABASE_KEY)
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
type(supabase)
def get_data():
result = supabase.table("products").select('*').execute()
return result
get_data()
def insert_data():
data = {"product_no":4, 'name': "치킨", 'price': 20.00}
result = supabase.table("products").insert(data).execute()
print(result.data)
insert_data()

4. Python -> Discord: Webhook 설정 테스트
4.1. Airflow 환경 변수 설정
Airflow 에서 환경변수 설정에는 3가지 방법이 있습니다.
- Airflow 프로젝트 루트에 .env 파일 생성
- docker-compose.yaml 에 작성
- Web UI - Admin - Variable 에서 설정
3번이 제일 쉽고 편합니다.

이렇게 설정한 환경변수는 Variable.get 메소드를 이용해 불러올 수 있습니다.
[여기서 팁: 전역변수 설정하면 안되나요?] : 파이썬 스크립트에서 전역변수로 사용하면 될 것 같지만 Dags 하부의 task 는 Airflow 아키텍쳐에서 각각 다른 worker에게 할당될 수 있습니다. 따라서 전역변수로 설정하기보다 환경변수 설정을 해야합니다.
- 환경 변수 불러오기 예제 Dags
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from airflow.models import Variable
def print_variable():
my_value = Variable.get("SUPABASE_URL", default_var="url 로드 안됌!")
print(f"Supabase URL 값은 : {my_value}")
with DAG(
dag_id="get_variable",
start_date=datetime(2025, 10, 23),
schedule=None,
catchup=False
) as dag:
t1 = PythonOperator(
task_id="print_variable_task",
python_callable=print_variable
)
4.2. Supabase 테이블 생성
Supabase 내에서 온도 정보를 담을 테이블을 선언해줍니다.
-- SQL 코드 시작
-- weather_forecast 테이블 생성
CREATE TABLE weather_forecast (
id BIGSERIAL PRIMARY KEY,
forecast_date VARCHAR(8) NOT NULL,
forecast_time VARCHAR(4) NOT NULL,
temperature DECIMAL(5,2) NOT NULL,
location VARCHAR(50) NOT NULL,
nx INTEGER NOT NULL,
ny INTEGER NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
CONSTRAINT unique_forecast UNIQUE (forecast_date, forecast_time, location)
);
-- 인덱스 생성 (검색 성능 향상)
CREATE INDEX idx_forecast_date ON weather_forecast(forecast_date);
CREATE INDEX idx_location ON weather_forecast(location);
CREATE INDEX idx_created_at ON weather_forecast(created_at);
-- 코멘트 추가
COMMENT ON TABLE weather_forecast IS '날씨 예보 데이터';
COMMENT ON COLUMN weather_forecast.forecast_date IS '예보 날짜 (YYYYMMDD)';
COMMENT ON COLUMN weather_forecast.forecast_time IS '예보 시간 (HHMM)';
COMMENT ON COLUMN weather_forecast.temperature IS '기온 (섭씨)';
COMMENT ON COLUMN weather_forecast.location IS '위치명';
COMMENT ON COLUMN weather_forecast.nx IS '격자 X 좌표';
COMMENT ON COLUMN weather_forecast.ny IS '격자 Y 좌표';
COMMENT ON COLUMN weather_forecast.created_at IS '데이터 생성 시간';
4.3. Airflow 내 Supabase 모듈 설치
Supabase는 airflow에 등록되어있지 않는 패키지 이기 때문에 Dockerfile과 docker-compose.yaml 파일 수정이 필요합니다.
- requirments.txt 작성
- Python -> Supabase 실행한 가상환경에서 pip freeze > requirements.txt 실행
certifi==2025.10.5
charset-normalizer==3.4.4
idna==3.11
numpy==2.3.4
pandas==2.3.3
python-dateutil==2.9.0.post0
pytz==2025.2
requests==2.32.5
six==1.17.0
tzdata==2025.2
urllib3==2.5.0
supabase==2.11.0
- Dockerfile 작성
FROM apache/airflow:3.1.0
# requirements.txt 복사 및 설치
COPY requirements.txt /requirements.txt
RUN pip install --no-cache-dir -r /requirements.txt
- docker-compose.yaml 내 build 명령어 추가
airflow-apiserver:
<<: *airflow-common
build: .
command: api-server
ports:
4.4. Dags 작성
태스크 정의
- API 불러오기(get_weather)
- xml 데이터 처리(parse_weather_xml)
- 데이터 전처리(convert_to_dataframe)
- supabase에 데이터 저장(insert_to_supabase)
- 디스코드 완료 메시지 전송(send_discord_message)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from airflow.models import Variable
import requests
import json
import xml.etree.ElementTree as ET
import pandas as pd
from supabase import create_client, Client
import pendulum
kst = pendulum.timezone('Asia/Seoul') # start_date에 timezone 설정용
'''
t1 -> t2 -> t3 -> t4 -> t5
api 코드 -> xml 파싱 -> 데이터프레임 변환 & 정보 -> Supabase 적재 -> 디스코드 전송
'''
def get_weather(**context):
'''기상청 동네예보 단기예보 조회'''
api_key = Variable.get("API_KEY")
today = datetime.now().strftime('%Y%m%d')
tmr = (datetime.now() + timedelta(days=1)).strftime('%Y%m%d') # 내일 날짜 계산
base_url = "https://apihub.kma.go.kr/api/typ02/openApi/VilageFcstInfoService_2.0/getVilageFcst"
params = {
"pageNo": 1,
"numOfRows": 1000,
"dataType": "XML",
"base_date": today,
"base_time": "0500",
"nx": 58,
"ny": 125,
"authKey": api_key
}
response = requests.get(base_url, params=params)
# XCom에 데이터 저장
context['ti'].xcom_push(key='xml_data', value=response.text)
context['ti'].xcom_push(key='today', value=today)
context['ti'].xcom_push(key='tmr', value=tmr)
print(f"날씨 데이터 조회 완료: {today}, 내일: {tmr}")
def parse_weather_xml(**context):
'''XML 파싱하여 리스트로 반환'''
xml_string = context['ti'].xcom_pull(key='xml_data', task_ids='get_weather')
data_list = []
root = ET.fromstring(xml_string)
items_element = root.find('./body/items')
if items_element is None:
raise ValueError("XML에 items 요소가 없습니다.")
for item in items_element.findall('item'):
item_data = {}
for child in item:
item_data[child.tag] = child.text
data_list.append(item_data)
context['ti'].xcom_push(key='parsed_data', value=data_list)
print(f"XML 파싱 완료: {len(data_list)}개 항목")
def convert_to_dataframe(**context):
'''데이터 전처리 및 온도 추출'''
weather_data_list = context['ti'].xcom_pull(key='parsed_data', task_ids='parse_weather_xml')
tmr = context['ti'].xcom_pull(key='tmr', task_ids='get_weather')
if not weather_data_list:
raise ValueError("날씨 데이터가 없습니다.")
df = pd.DataFrame(weather_data_list)
df['fcstValue'] = pd.to_numeric(df['fcstValue'], errors='coerce')
# 카테고리 매핑
cat_map = {
"POP": "강수확률",
"UUU": "동서바람성분",
"VVV": "남북바람성분",
"VEC": "풍향",
"WSD": "풍속",
"SKY": "하늘상태",
"PTY": "강수형태",
"WAV": "파고",
"PCP": "강수량",
"REH": "습도",
"SNO": "적설",
"TMX": "최고기온",
"TMN": "최저기온",
"TMP": "기온",
}
df['category'] = df['category'].map(cat_map)
# 필터링: 내일 12시 기온
filtered_df = df[
(df['category'] == '기온') &
(df['fcstTime'] == '1200') &
(df['fcstDate'] == tmr)
]
if filtered_df.empty:
raise ValueError("기온 데이터를 찾을 수 없습니다.")
temperature = float(filtered_df['fcstValue'].values[0])
context['ti'].xcom_push(key='temperature', value=temperature)
context['ti'].xcom_push(key='forecast_date', value=tmr)
print(f"온도 데이터 추출 완료: {temperature}도")
def insert_to_supabase(**context): #xcom이랑 주고받는 태스크는 아무튼 context 받아야함
'''Supabase에 날씨 데이터 적재'''
supabase_url = Variable.get("SUPABASE_URL_WEATHER")
supabase_key = Variable.get("SUPABASE_KEY_WEATHER")
temperature = context['ti'].xcom_pull(key='temperature', task_ids='convert_to_dataframe')
forecast_date = context['ti'].xcom_pull(key='forecast_date', task_ids='convert_to_dataframe')
# Supabase 클라이언트 생성
supabase: Client = create_client(supabase_url, supabase_key) #type hint
# 삽입할 데이터 준비
insert_data = {
"forecast_date": forecast_date,#20251023
"forecast_time": "1200",
"temperature": temperature,
"location": "구로구",
"nx": 58,
"ny": 125
}
try:
# 데이터 삽입 (upsert 사용 - UNIQUE 제약조건 처리)
response = supabase.table("weather_forecast").upsert( #update + insert
insert_data,
on_conflict="forecast_date,forecast_time,location"
).execute()
print(f"Supabase 데이터 적재 완료: {insert_data}")
# 적재된 데이터를 Discord로 전달하기 위해 XCom에 저장
context['ti'].xcom_push(key='db_insert_success', value=True)
return response
except Exception as e:
raise Exception(f"Supabase 데이터 적재 실패: {str(e)}")
def send_discord_message(**context):
'''Discord로 메시지 전송 (DB 적재 후)'''
webhook_url = Variable.get("DISCORD_WEBHOOK_URL")
temperature = context['ti'].xcom_pull(key='temperature', task_ids='convert_to_dataframe')
db_success = context['ti'].xcom_pull(key='db_insert_success', task_ids='insert_to_supabase')
if not db_success:
raise Exception("DB 적재가 완료되지 않았습니다.")
data = {
"content": f"✅ DB 적재 완료!\n내일 구로구의 기온은 {temperature}도 입니다.",
"username": "디스코드 봇"
}
response = requests.post(
webhook_url,
data=json.dumps(data),
headers={"Content-Type": "application/json"}
)
if response.status_code == 204:
print('메시지 전달 완료!')
else:
raise Exception(f"Discord 메시지 전송 실패: {response.status_code}")
# DAG 정의
with DAG(
'discord_weather_supa',
description='구로구 기온 정보를 Supabase에 저장하고 Discord로 전송하는 DAG',
schedule='0 23 * * *',
start_date=datetime(2025, 10, 21, tzinfo=kst),
catchup=False
) as dag:
t1 = PythonOperator(
task_id='get_weather',
python_callable=get_weather
)
t2 = PythonOperator(
task_id='parse_weather_xml',
python_callable=parse_weather_xml
)
t3 = PythonOperator(
task_id='convert_to_dataframe',
python_callable=convert_to_dataframe
)
t4 = PythonOperator(
task_id='insert_to_supabase',
python_callable=insert_to_supabase
)
t5 = PythonOperator(
task_id='send_discord_message',
python_callable=send_discord_message
)
# 태스크 의존성: t1 -> t2 -> t3 -> t4 -> t5
t1 >> t2 >> t3 >> t4 >> t5

5. 소감
- Slack 봇 보다 Discord 봇이 설정이 훨씬 쉬워서 간단한 알림 만들기에 강점
- Supbase 설정도 너무 편하다.
- Airflow 3.0 Web UI 변한것 쓰다보면 편하긴하다. 하지만 개발자 감성이 물씬드는 투박한 UI
- Docker는 신이다.
이렇게 데이터 출처로부터 데이터베이스 저장과 알람까지의 파이프라인을 구축해보았습니다. 많은 도움되길 바랍니다!
'Data Science > Engineering' 카테고리의 다른 글
| n8n을 이용한 장보기 지원 AI Agent 개발과 운영기 (1) | 2025.09.27 |
|---|---|
| Claude Code를 이용한 바이브 코딩 짧은 후기 - 방통대 수업 트래커 만들기 (3) | 2025.09.05 |
| [월간 데이터 노트] n8n 으로 농수산물 데이터(kamis api) 수집하기 (5) | 2025.08.01 |
| Cloud Run과 Streamlit으로 완성한 자동 채점 시스템: 개발부터 운영까지 (5) | 2025.06.23 |
| 클라우드별 서비스 무료 크레딧 정리, AWS GCP Azure NCP (0) | 2025.03.24 |














































