우아한테크코스 레벨 4 팀 프로젝트 festabook에서 학습한 내용을 정리한 글입니다.
💭 들어가며
이번 작업은 미션 요구사항이기도 했고, 실제 사용자 유입이 시작되면서 더 이상 미룰 수 없었던 N+1 문제를 해결하는 과정에서 시작되었다. 동시에 DB 인덱스도 반드시 고려해야 한다고 판단했다. 하지만 단순한 추측만으로 어디에 인덱스를 걸어야 할지, N+1 문제가 얼마나 심각한지 판단할 수는 없었다.
그래서 지금까지 쌓인 사용자 데이터를 토대로 앞으로 누적될 데이터를 예측해 넣어본 뒤, 실제로 성능이 얼마나 저하되는지 지표를 뽑아보기로 했다. 이 과정에서 대량 데이터를 직접 삽입해 본 것은 처음이었고, 이를 기록 차원에서 정리해 두고자 한다.
✅ 전체적인 설계
본격적으로 진행하기 전에, 대량의 데이터를 한 번에 밀어 넣었다가 DB가 장애를 일으켰다는 사례들을 접했다. 따라서 안전하게 데이터를 삽입하기 위해 배치 처리 방식을 선택했다.
전체적인 흐름은 다음과 같다.
- 로컬에서 대량의 데이터를 CSV 파일로 생성한다.
- SSH 터널을 통해 DB에 연결한다.
- 생성해 둔 CSV 파일을 배치 단위로 업로드한다.
✅ CSV 생성 스크립트 (Python)
CSV를 배치 단위로 생성하는 스크립트이다. Python으로 작성했으며, 예시에서는 약 54,000개를 생성했다. 이후 다른 테이블용 스크립트에서는 필요한 행 수에 맞게 숫자를 변경해 100~1300만 개 수준까지 생성해 활용했다.
book.txt에는 저작권이 만료된 책 한 권을 넣어 두었다. 이는 임의의 랜덤 문자열이 필요할 때 해당 텍스트에서 랜덤 길이로 잘라 추출하기 위함이다.
🔽 예시 코드
#!/usr/bin/env python3
import csv
import random
import datetime
import os
import uuid
# ===== 설정 =====
OUTPUT_DIR = "mock_lineup"
TOTAL_ROWS = 54_000
FILES = 5
ROWS_PER_FILE = TOTAL_ROWS // FILES
FESTIVAL_IDS = range(1, 5401)
# 출력 디렉터리 생성
os.makedirs(OUTPUT_DIR, exist_ok=True)
# ===== book.txt 로드 =====
with open("book.txt", "r", encoding="utf-8") as f:
book_text = f.read().replace("\n", " ").strip()
if not book_text:
raise ValueError("book.txt 가 비어있습니다!")
# ===== book.txt에서 랜덤 길이의 문자열 추출 =====
def random_text(min_len: int, max_len: int) -> str:
length = random.randint(min_len, max_len)
start = random.randint(0, max(0, len(book_text) - length - 1))
return book_text[start:start + length].strip()
# ===== 주어진 연도 범위 내에서 랜덤 datetime 반환 =====
def random_datetime(start_year=2026, end_year=2028) -> datetime.datetime:
start = datetime.datetime(start_year, 1, 1)
end = datetime.datetime(end_year, 12, 31, 23, 59, 59)
delta_seconds = int((end - start).total_seconds())
return start + datetime.timedelta(seconds=random.randint(0, delta_seconds))
# ===== datetime 객체를 문자열로 변환, \N은 그대로 반환 =====
def format_datetime(dt: datetime.datetime | str) -> str:
if dt == "\\N":
return dt
return dt.strftime("%Y-%m-%d %H:%M:%S.%f")
# ===== 데이터 생성 =====
row_id = 1
for file_idx in range(1, FILES + 1):
filename = os.path.join(OUTPUT_DIR, f"lineup_{file_idx}.csv")
with open(filename, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
# CSV 헤더 작성
writer.writerow([
"id", "created_at", "updated_at", "deleted", "deleted_at",
"festival_id", "name", "image_url", "performance_at"
])
for _ in range(ROWS_PER_FILE):
created_at = random_datetime()
updated_at = created_at + datetime.timedelta(minutes=random.randint(1, 5000))
performance_at = created_at + datetime.timedelta(days=random.randint(1, 100))
deleted = random.choice([0, 1])
deleted_at = "\\N" if not deleted else updated_at + datetime.timedelta(minutes=10)
row = [
row_id,
format_datetime(created_at),
format_datetime(updated_at),
deleted,
format_datetime(deleted_at),
random.choice(FESTIVAL_IDS),
random_text(1, 50),
f"/images/{uuid.uuid4()}",
format_datetime(performance_at),
]
writer.writerow(row)
row_id += 1
print(f"✅ {filename} 생성 완료 ({ROWS_PER_FILE} rows)")
print(f"🎉 총 {TOTAL_ROWS:,} rows를 {FILES}개 파일로 생성 완료 (폴더: {OUTPUT_DIR})")
✅ paramiko & pymysql 설치
- Paramiko는 Python에서 SSH 연결을 다룰 수 있게 해주는 라이브러리다. 원격 서버에 SSH 접속하기 위해 사용했다.
- PyMySQL은 Python 코드에서 MySQL 데이터베이스에 접속하고 쿼리를 실행할 수 있게 해주는 라이브러리다. CSV에서 읽은 데이터를 MySQL 테이블에 삽입하기 위해 사용했다.
pip install paramiko pymysql
참고로 sshtunnel 라이브러리도 고려했으나, 2022년 이후 업데이트가 중단되어 최신 PyMySQL과의 호환성 문제가 발생하여 사용하지 않았다.
✅ SSH 터널링 + CSV 배치 업로드 스크립트 (Python)
현재 Dev 아키텍처에서 Database는 Private Subnet 내부에 있어 직접 접근할 수 없다. 따라서 Database에 접속하기 위해서는 반드시 WAS를 거쳐야 한다. 이를 위해 WAS에 접속한 뒤 DB Private IP를 통해 접근할 수 있도록 스크립트를 작성했다.
🔽 예시 코드
#!/usr/bin/env python3
import csv
import glob
import socket
import select
import threading
import paramiko
import pymysql
# ===== Bastion 설정 =====
BASTION_HOST = "민감정보" # Bastion Public IP
BASTION_PORT = 민감정보 # Bastion Port
BASTION_USER = "민감정보" # Bastion 접속 계정
BASTION_KEY = "민감정보" # Bastion 키 파일 경로
# ===== DB 설정 (Private IP) =====
DB_HOST = "민감정보" # DB Private IP
DB_PORT = 민감정보 # DB Port
DB_USER = "민감정보" # DB Username
DB_PASSWORD = "민감정보" # DB Password
DB_NAME = "민감정보" # DB Name
# ===== CSV =====
csv_files = sorted(glob.glob("mock_lineup/*.csv"))
# ===== 포트 포워딩 핸들러 =====
def forward_tunnel(local_port, remote_host, remote_port, transport):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("127.0.0.1", local_port))
server.listen(100)
while True:
client, addr = server.accept()
chan = transport.open_channel("direct-tcpip", (remote_host, remote_port), addr)
threading.Thread(target=handler, args=(client, chan), daemon=True).start()
def handler(client, chan):
while True:
r, _, _ = select.select([client, chan], [], [])
if client in r:
data = client.recv(1024)
if not data:
break
chan.send(data)
if chan in r:
data = chan.recv(1024)
if not data:
break
client.send(data)
client.close()
chan.close()
# ===== SSH 연결 =====
key = paramiko.RSAKey.from_private_key_file(BASTION_KEY)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(BASTION_HOST, port=BASTION_PORT, username=BASTION_USER, pkey=key)
transport = ssh.get_transport()
local_port = 3307
threading.Thread(
target=forward_tunnel,
args=(local_port, DB_HOST, DB_PORT, transport),
daemon=True
).start()
# ===== DB 연결 =====
conn = pymysql.connect(
host="127.0.0.1",
port=local_port,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME,
charset="utf8mb4",
local_infile=1
)
print("✅ DB 연결 성공")
# ===== CSV → DB 삽입 =====
with conn.cursor() as cursor:
for file in csv_files:
print(f"📂 Inserting {file}")
with open(file, newline="", encoding="utf-8") as f:
reader = csv.reader(f)
next(reader) # 헤더 스킵
batch = []
for row in reader:
batch.append((
row[1], # created_at
row[2], # updated_at
int(row[3]),
None if row[4] == "\\N" else row[4],
int(row[5]),
row[6],
row[7],
row[8]
))
if len(batch) >= 1000: # 1000건 단위 insert
cursor.executemany("""
INSERT INTO lineup
(created_at, updated_at, deleted, deleted_at,
festival_id, name, image_url, performance_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", batch)
conn.commit()
batch.clear()
if batch: # 마지막 남은 데이터 처리
cursor.executemany("""
INSERT INTO lineup
(created_at, updated_at, deleted, deleted_at,
festival_id, name, image_url, performance_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", batch)
conn.commit()
print(f"✅ {file} 삽입 완료")
conn.close()
ssh.close()
print("🎉 모든 CSV 삽입 완료")
✅ 결과

데이터가 배치 단위로 정상적으로 삽입되는 것을 확인할 수 있다.
📍 참고 자료
- 스크립트는 GPT를 활용해 작성
'Backend > Database' 카테고리의 다른 글
| [Database] MySQL(InnoDB) 락(Lock) (2) (2) | 2025.10.06 |
|---|---|
| [Database] 락(Lock) (1) (0) | 2025.10.04 |
| [Database] 동시성 문제(Read 계열), 트랜잭션 격리 수준 (1) | 2025.10.01 |
| [Database] 데이터베이스 무중단 마이그레이션 (2) | 2025.08.27 |
| [Database] SQL 기본 문법 정리 (0) | 2024.07.16 |