

Introduction
오늘의집 Data & Discovery 조직 내 데이터 플랫폼 팀은 Airflow를 통해 매일 20,000개 이상 제출되는 Apache Spark Application 작업을 실행하며, 오늘의집 프로덕션 서비스에서 수집한 데이터를 분석계로 가공, 처리, 적재(ETL) 하는 전 과정을 담당하고 있습니다.
각 팀에서 정의한 Spark Application은 여러 서비스에서 긴밀하게 사용되기 때문에, 정해진 주기마다 안정적으로 빌드되는 것이 중요합니다. 하지만 전사 비즈니스가 성장하고, Data & Discovery 팀의 규모가 커짐에 따라 Spark Application 개수는 점점 증가하기 시작했고, 이에 따라 더 효율적이고 빠른 배치 컴퓨팅 환경을 구축할 필요성도 점차 높아졌는데요. 데이터 플랫폼 팀은 이러한 배경 속에서 기존 레거시 환경에서 활용하던 EMR on EC2 의 한계를 극복하고자 EMR on EKS 로의 이전을 시작으로, Spark Application을 Kubernetes 환경으로 제출하고 실행하는 여러 방법을 시도해 보며 운영 경험을 쌓아오고 있습니다.
이 글에서는 오늘의집 데이터 플랫폼 팀이 EMR on EKS를 시작으로 Spark on Kubernetes 환경으로 전환하는 과정에서 겪은 운영 경험과, 각 과정에서 시도한 다양한 방법의 장단점을 공유해 드리고자 합니다.
오늘의집 데이터 인프라 훑어보기
우선 오늘의집 데이터 분석 인프라를 소개해 드리겠습니다. 프로덕션 데이터베이스 데이터를 비롯하여 유저 행동 로그, 그리고 서드 파티 Application 등으로부터 받아온 로그는 일차적으로 AWS S3로 저장됩니다. 이렇게 저장된 AWS S3 데이터들은 각 팀에서 요구하는 분석 목표에 사용하기 위해 추출, 전환, 적재(ETL) 됩니다. 이러한 ETL 과정은 대부분 Spark Application 형태로 구현되어 있고 Airflow를 통해 매시간, 매일, 매주 등 다양한 시간 간격으로 실행됩니다.
문제 정의 : EMR on EC2의 한계
Amazon EMR은 Apache Hadoop 및 Apache Spark와 같은 빅 데이터 프레임워크 배포와 실행을 도와주는 AWS 관리형 클러스터 플랫폼입니다. Hadoop YARN 클러스터를 EC2 클러스터에 배포해 주는 방식(이하 EMR on EC2), EKS 상에 가상 클러스터를 배포해 주는 방식(이하 EMR on EKS)부터 EMR Serverless 까지 여러가지 배포 모델을 지원하고 있습니다.

오늘의집 EMR on EC2 환경에서는, 사용자가 Airflow를 통해 작업을 제출하면 EMR API를 통해 각 작업 전용 YARN 클러스터가 새로 생성되고, 그 위에서 작업이 실행되는 구조(이하 transient 클러스터) 를 사용하고 있었습니다. 그러나 EMR on EC2 클러스터를 생성 - 부트스트랩 하는 과정에만 약 10분 내외의 시간이 소요되어, 3분 내에 끝나는 간단한 작업조차도 불필요한 대기 시간이 발생하는 문제가 있었습니다. 게다가 이 대기 시간 동안에도 EC2 instance 요금은 고스란히 청구되기에, 매일 1000+개의 클러스터를 생성 / 삭제하는 상황에서 비용 낭비가 반복되고 있었어요.
이런 현상을 개선하기 위해 상시로 운영되는 EMR on EC2 YARN 클러스터를 생성하여 사용하기도 했습니다. 하지만 상시로 유지되는 EMR 클러스터를 효율적으로 운영하기 위해서는 YARN 및 EMR 기반의 오토스케일링 정책이나 인스턴스 그룹 등 transient 클러스터 방식 대비 더 복잡한 관리가 필요했는데요, 이러한 YARN 클러스터 또한 전사적으로 사용하는 Kubernetes와는 별도의 자원 오토스케일링을 사용하고 있기 때문에 사내 인프라 전문가분들을 통한 도움을 받기 어려웠습니다. 또한 데이터 플랫폼 팀 내에서 관리하기에도 통합된 전사 Datadog 대시보드, 모니터와 같이 연동하여 통합된 뷰로 자원 모니터링을 하기 어려워 별도의 모니터링 방식을 강구해야만 했습니다.


이렇게 여러 접근을 시도해 오는 과정에서 어느 한 방식이 모든 요구사항을 완전히 충족시키지는 못했습니다. 그 결과 Spark 작업을 실행하기 위한 구현체가 너무나도 많아진 상태로, 개발자 간의 Spark 작업 방식이 파편화되는 문제가 발생했어요.
이러한 문제점들을 정리하고 개선하기 위해, 데이터 플랫폼 팀은 다음과 같은 운영 목표를 정의했습니다.
- 작업 제출 오버헤드 개선 : 작업 제출부터 실행까지의 대기 시간을 단축합니다.
- 모니터링 환경 개선 : 사내 다른 Application과 동일한 방식으로 실시간 리소스 모니터링이 가능하도록 합니다.
- IaC를 통한 권한 관리 : 신규입사자나 팀 변경 등 조직 변화에 맞춰 사용자 권한을 매번 수동으로 부여하지 않고, IaC로 일관되게 관리합니다.
- 작업 제출 레이어 구현체의 파편화 개선 : EMR on EC2 사용 방식이 레거시 코드 내에서 파편화된 EMR on EC2 제출 방식을 하나의 통합된 인터페이스로 일원화합니다.
- 컴퓨팅 자원 오토스케일링 한계점 개선 : 기존에 전사적으로 사용하고 있던 Karpenter를 활용해 자동으로 유동적인 horizontal scaling이 가능하도록 합니다.
- 자원 최적화 운영 구조 정비 : EC2 운영 및 스케줄러, 오토스케일러 등 자원 최적화 문제에 있어 인프라 전문가의 도움을 받기 힘든 구조를 개선합니다.
Spark on Kubernetes 일대기
Why Spark on Kubernetes?
결론적으로 현재 오늘의집 데이터 플랫폼 팀은 Spark on Kubernetes 환경으로 전환한 이후, 위 목표들을 모두 달성하여 안정적으로 매일 20,000개 이상의 Spark app을 실행하고 있는데요, Spark on Kubernetes 환경으로 이전하면서 위에서 제기된 문제점들이 구체적으로는 각각 어떻게 해결되었을까요?
먼저 오늘의집 Spark on Kubernetes 작업 제출 방식에 관한 전체적인 도식을 참고해봅시다.

사용자가 작업을 제출하면 새로운 Spark cluster를 생성할 필요 없이, 이미 실행되고 있는 Kubernetes cluster 상에 Spark pod들이 생성됩니다. Spark cluster를 새로 생성할 필요가 없기 때문에 작업 제출 후 실행까지의 오버헤드가 혁신적으로 줄어들었습니다.
또한 이렇게 생성된 Spark pod들은 일반적인 웹서버, DB pod들과 동일한 방식으로 Kubernetes node들에 배치되는데요, Spark Application의 특성상 작업 시작 시 많은 노드 할당을 요구하게 되고, 종료 후에는 해당 노드들이 더이상 사용되지 않게 됩니다. 따라서 Spark on Kubernetes 환경의 원활한 운영을 위해서는 클러스터 오토스케일러의 존재가 필수적인데요, 마침 오늘의집에서는 Karpenter를 도입하여 노드 생애주기를 효율적으로 관리해오고 있었습니다.
Karpenter는 만약 노드가 부족하여 pod가 할당될 수 없는 경우 수 분 내로 pod의 node selector에 맞는 node를 신규로 할당해주며, 일부 작업이 종료되어 노드가 비효율적으로 사용되고 있는 경우 유휴 노드를 회수하여 탄력적인 자원 관리를 가능하게 해줍니다.
Spark on Kubernetes로의 이관 시작: EMR on EKS
EMR on EC2 환경으로부터 Spark on Kubernetes를 사용하기 위해서는 아래와 같은 작업이 필요했습니다.
- Kubernetes cluster 구축
- Spark application이 실행될 Spark docker image 빌드
- Spark application을 제출하는 Airflow Operator 구현
- Kubernetes RBAC 설정 및 AWS IAM 자원과의 연동
- (EMR 및 YARN에서 제공하는 모니터링을 사용하지 못하게 되므로) Kubernetes monitor 설정
이중 1, 5번은 기존 Kubernetes cluster에서 이미 갖춰져 있었기에 2, 3, 4번의 구현 방식이 중요했는데요, 이러한 문제를 고민하던 중 마침 GA 상태로 릴리즈된 EMR on EKS 서비스로 이전을 결심하게 되었습니다. EMR on EKS 서비스는 Amazon EKS 클러스터 내에 virtual cluster를 생성하고, 해당 클러스터에 Spark 작업을 제출하여 실행하고 관리할 수 있도록 해주는 관리형 서비스로, 위 문제들을 해결하기에 알맞은 선택지였습니다.
이 중 2, 3번을 구현할 때 기본적으로 emr-containers API를 통한 작업 제출 및 작업 모니터링을 제공해 주기 때문에, Spark 작업을 제출하고 실행하고 작업 상태를 관리하는 구현을 동반하지 않고도 Spark on Kubernetes로 빠르게 이전할 수 있는 장점이 있었습니다. 더군다나 EMR on EKS 내에서 Spark 작업을 실행하기 위한 베이스 이미지를 제공해 주기 때문에 기존에 EMR on EC2에서 제공해 주고 있던 여러 Spark 최적화 패치나, 기존에 사용하던 glue data catalog 설정을 쉽게 호환시킬 수 있다는 장점이 있었어요.
그 외에 4번의 경우 terraform 을 통해 필요한 AWS IAM 리소스를 생성하고, Kubernetes 자원과 연동하는 과정을 거쳤습니다.
효과는 대단했다! 하지만…
성공적으로 EMR on EKS으로 대부분의 작업을 이전하고 나니, 아래와 같은 효용을 볼 수 있었습니다.
- Karpenter에 의한 오토스케일링을 통해 피크 시간대에 3200+ cores의 spot 자원을 활용할 수 있게 되었으며, instance family 설정을 다양화하여 spot 가용률도 높일 수 있었습니다.
- 병렬적인 작업 제출이 훨씬 간편해져, 각 파이프라인의 리드 타임이 최대 70% 줄어들고, 기존에는 수 일 이상의 시간이 필요했던 연 단위의 backfill 작업도 수십 분 내에 가능해졌습니다.
- 단일 작업의 리드 타임이 cluster provisioning으로 인해 발생하던 만큼 줄어들었습니다(약 10분).
- 파편화된 제출 구현체를 새로운 SparkSubmitOperator로 합쳐낼 수 있었어요.
- 또한 기존에 팀별로 Spark 작업 시 사용하는 권한이나, 각 팀 및 프로젝트별 리소스 사용량을 트래킹하고 싶은 니즈가 있었는데요, EMR on EKS로 전환하는 과정에서 팀 별로 AWS 리소스에 대한 접근 제한과 리소스 트래킹을 위한 태그를 세분화하여 설정할 수 있었습니다.

- Kubernetes 클러스터 내에서 작업을 실행할 수 있게 되면서, 기존에 연동되어 있던 Datadog 을 통한 리소스 모니터링이 용이해졌습니다. 특히 리소스 태그를 통해 Datadog dashboard 내에서 자원을 세분화하여 효율성을 모니터링하기 용이했어요.

하지만 아직 개선할 여지는 있었습니다.
- 위 그림은 emr-containers API를 사용해서 작업을 제출할 때 구성되는 컴포넌트 구조입니다. 작업 제출 시 필수적으로 생성되는 Spark Submitter Pod는 구조적으로 커스터마이징이 어렵게 설계되어 있어, 원하는 리소스 사용량이나 toleration등의 배치 관련 설정을 세부적으로 제어하기 어렵다는 한계가 있었습니다.
- 작업 제출 과정이 Spark submitter pod 생성 > Spark driver pod 생성 > Spark executor pod 생성까지 단계적으로 이루어지며, 각 단계에서 Karpenter nodepool의 scale-out이 발생하는 경우 적지 않은 오버헤드가 발생했습니다.
- 오늘의집 파이프라인은 최대 500개 이상의 작업이 같은 시점에 제출되기도 하는데요, 이 경우 AWS API의 rate limit에 걸려 추가적인 조치가 필요해졌습니다. 제출 작업의 수가 점진적으로 늘어날 것이 예상되는 상황에서 더욱 탄력적인 제출 방식이 필요하다고 생각했어요.
- 또한 이전 과정에서 EMR on EKS에서 제공되는 Spark image를 사용했는데, 기존에 데이터 파이프라인 개발 환경에서 사용하던 jupyterhub에서는 해당 이미지를 그대로 활용하여 Spark session을 생성하긴 어려운 문제가 있었습니다.
작업 제출 방식을 개선하자! (spark-operator, spark-submit)
제출 단계의 문제점(1, 2) 를 개선하기 위해, Spark submitter pod를 별도로 두지 않고, Airflow worker pod 내에서 Spark 작업을 직접 제출할 수 있는 방향을 검토했습니다. 첫 번째 시도로 채택된 건 kubeflow/spark-operator의 사용이었습니다. spark-operator를 사용하면 사용자는 Kubernetes CRD 형태로 간단하게 작업 제출을 정의할 수 있고, 단일 서비스가 모든 Spark 작업의 생애주기를 책임지기에 관리성이 개선될 것이라 기대했습니다. 하지만 본격적인 도입에 앞서 저희가 실제로 사용하고 있고 늘어날 작업 제출량에 대한 stress test를 진행해 보았을 때 성능 저하가 발생하는 문제로 인해 수 분가량의 작업 제출 지연이 발생하는 것을 확인할 수 있었습니다(관련 github 링크). 때문에 현재는 대안으로 spark-submit 프로세스를 직접 Airflow worker pod 내에서 실행할 수 있도록 Airflow Spark provider의Airflow.providers.apache.spark.operators.spark_submit 구현을 일부 수정하여 사용하고 있습니다.
emr-containers API를 통해 Spark 작업을 실행하는 경우, 각 Spark pod의 sidecar로 fluentd container를 실행하여 같은 동작을 지원합니다. spark-submit 자체는 driver / executor log를 AWS S3로 저장하는 로직을 지원하지 않기 때문에, 별도로 fluent-bit을 daemonset으로 구성하여 S3로 작업을 저장하는 구성이 필요했습니다. 결론적으로 Spark submit pod 생성을 생략함으로써 scale-out 오버헤드가 1분가량 감소하였고, spark-submit 및 fluent-bit의 자원 사용량을 합쳐도 기존 emr-containers submitter pod 대비 1/10 수준으로 줄일 수 있었습니다.
이미지 파편화를 개선하자!
데이터 플랫폼 팀은 최근 이 문제를 집중하여 풀어나가는 마지막 단계에 있는데요, 글 마지막 단락에서 더 자세한 설명을 드리고 여기서는 넘어가 보겠습니다.
유연한 마이그레이션을 위한 구현체 디자인
지금까지의 마이그레이션 과정은 적지 않은 단계를 거쳐야 했고, 무엇보다 각 옵션을 빠르게 시도해 보며 효용성을 검증하는 것이 중요했는데요. 동시에 아래와 같은 목표들을 같이 달성해야 했습니다.
- 기존에 파편화된 작업 제출 구현체들을 호환하며 통합할 수 있는 구현체를 만들고, 모든 구현체를 해당 구현체로 통합하여 파편화를 해소한다.
- 마이그레이션 과정에서 충격을 최소화한다: 새로운 작업 제출 방식의 호환성을 충분히 검증하며 마이그레이션을 진행하여 기존 작업의 SLI에 영향을 미치지 않도록 한다.
- 팀 간의 관심사를 구분하여, 각 팀이 효율적으로 작업할 수 있도록 한다:
마이그레이션을 빠르게 진행해 보고 효과를 검증하는 동시에, 위와 같은 문제들을 어떻게 풀었을까요? 같이 살펴봅시다.
SparkSubmitOperator, SparkSubmitter 그리고 SparkResourceConfig

cdc_table_task = PysparkOperator(
task_id=cdc_table.task_id, # eg. mongo_sourc.goods_cdc
# Spark submit 실행체
submitter=SparkOnKubernetesSubmitter(),
# ohouse-spark 이미지, "여전히 파편화되어 있는 Spark Image 통합하기" 참고해주세요.
runtime_release=("pyspark", "spark35", None),
# Spark 리소스 템플릿 및 override
resource=SparkResourceConfig.small().with_executor_disk(100),
# PySpark 작업
job="jobs.datapl.mongo_source.load_binlog_from_kafka",
# PySpark 작업 인자
arguments=dict(
broker=config.artifact.kafka.get_data_platform_broker(),
cdc_table_uri=cdc_table.uri,
start_datetime="{{ kst_datehour | add_duration(hours=1 - %d) }}" % interval_hours,
end_datetime="{{ kst_datehour | add_duration(hours=1) }}",
),
)
add_partition_to_sdt_ohs_log_task = SparkSqlOperator(
task_id="add_partition_to_sdt_ohs_log",
submitter=EksSubmitterProxy(),
runtime_release=("pyspark", "spark33"),
resource=SparkResourceConfig.x2small(),
query="""ALTER TABLE {{ config.database.hive.get_sdt() }}.sdt_ohs_log ADD IF NOT EXISTS PARTITION(date='{{ next_kst_date }}');"""
)
SparkSubmitOperator : 작업 인자와 Spark configuration을 간편하게 설정
SparkSubmitOperator는 Spark 작업 제출 시 넘겨줄 인자들을 전달해 주고, 이러한 인자들과 공통 Spark config을 resolve하여 spark_submitter로 전달해 주는 레이어입니다.
오늘의집 데이터 파이프라인은 Spark 작업을 선언하는 목적에 따라 여러 유형으로 나누어 두었는데요. 각 유형의 작업을 쉽게 제출할 수 있도록 SparkSubmitOperator를 확장한 여러 subclass를 같이 선언해 두었습니다. 예를 들어 간단한 Spark sql을 실행하고 싶은 경우 SparkSqlOperator instance를 쿼리문과 같이 선언하여 작업을 쉽게 정의할 수 있습니다.
SparkSubmitter : 작업 제출과 모니터링 방식을 간편하게 전환
이러한 SparkSubmitOperator 들은 공통적으로 SparkSubmitter subclass를 spark_submitter 인자로 넘겨받습니다. SparkSubmitter class는 실제로 작업을 제출하고, 해당 작업의 진행 상황과 driver log등을 모니터링 하는 역할을 맡아요.
유저가 의도적으로 spark_submitter 인자를 override하지 않는 경우, 기본적으로 데이터 플랫폼 팀이 설정한 가장 개선된 형태의 spark_submitter 를 기본 인자로 사용하여 작업이 실행됩니다. 따라서 실제 마이그레이션 과정에서 데이터 플랫폼 엔지니어는 새로운 SparkSubmitter subclass를 선언하여 테스트하고, 안정성이 검증되면 기본 인자를 변경하는 식으로 작업 제출 방식을 빠르게 변경해보면서 테스트가 가능했어요.
이러한 SparkSubmitter의 subclass들로는 기존 YARN 클러스터로의 작업 제출을 호환하는 EmrOnEC2SparkSubmitter, emr-containers API를 통해 작업을 제출하는 EmrOnEksSparkSubmitter, 그 외에 kubeflow/spark-operator를 사용하는 Submitter, spark-submit을 직접 실행하는 submitter 등이 있답니다. 더 나아가 production 배포 시에는 가용성을 확보하기 위해, 마이그레이션 타겟 작업 제출 방식이 실패하는 경우 fallback으로 기존에 안정적으로 동작하던 SparkSubmitter를 사용할 수 있도록 하는 fallback plugin을 구현하여 사용하기도 하였습니다.
SparkResourceConfig: Spark 리소스를 간편하게 설정
기존 구현체들이 공통적으로 안고 있었던 문제는 개별 사용자가 리소스 설정 방식을 열심히 이해하고, 세밀하게 설정해야만 했다는 점이었습니다. 이를 개선하기 위해 정의한 SparkResourceConfig 클래스는 AWS EC2 instance size와 대응하여 resource 사용량을 프리셋으로써 설정할 수 있도록 해줍니다. 예를 들어 SparkResourceConfig.large() 와 같이 설정하는 경우 executor instance를 16개 사용하고, SparkResourceConfig.xlarge() 를 설정하는 경우 32개 사용하는 식입니다.
이러한 프리셋들은 작업을 실행할 때 추천되는 옵션들로 구성되어 있습니다. 하지만 작업의 특성에 따라 특정 작업은 driver memory 용량이 32GB씩 필요할 수도 있고, executor의 disk size가 200GB씩 필요할 수도 있는데요, 이런 경우에는 SparkResourceConfig.large(memory_per_driver_core_gb=32, executor_disk_gb=200) 과 같이 override를 통해 간편하게 설정할 수 있습니다.
무사고 마이그레이션을 위한 장치
마이그레이션 구조를 단순화하면서 필요할 때 빠르게 실행할 수 있도록 기반이 마련되었습니다. 그러나 여전히 기존 작업들에 영향을 주지 않으면서 새로운 제출 방식을 안정적으로 적용하기 위해서는 추가적인 장치가 필요했습니다.
제출 방식의 변경은 제출 이후의 모든 단계 (Spark Application의 제출 -> 실행 -> 적재) 에 영향을 미치는데요. 이로 인해 데이터 파이프라인 운영의 SLI에 악영향을 미치지 않도록 새로 도입한 Spark application 제출 방식이 대체로 잘 동작하는지 확인하는 통합 테스트 (Integration Test), 각 작업 이전 시 달라진 제출 방식에서 동일한 argument가 전달되는지 확인하는 유닛 테스트, 변경된 제출 방식으로 빌드한 데이터가 기존과 일치하는지 확인하는 회귀 테스트 (Regression Test) 등을 구현하고 활용했습니다.
Spark Integration Test
새로운 유형의 SparkSubmitter 혹은 SparkSubmitOperator subclass를 정의하고 활용하기 전, 대표적인 데이터 파이프라인 유형이 실행 가능해야 할 기본적인 기능을 end-to-end로 실행하여 모든 컴포넌트가 다같이 잘 동작하는지 확인하는 테스트입니다.

위와 같이 Airflow DAG 형태로 구현되어 있는데요. 데이터 플랫폼 팀의 작업자가 SparkSubmitter / SparkSubmitOperator 의 구현을 변경하고 전체 데이터 파이프라인에 적용하기 전에 간단하게 이 DAG을 실행하는 식으로 통합 테스트를 완수할 수 있습니다.
각 테스트 케이스는 SparkResourceConfig.x2small() 등의 매우 적은 리소스를 사용하여 기본적인 동작을 작은 데이터셋에 대해 실행하기 때문에, 적은 리소스로 end to end 동작을 테스트해볼 수 있도록 설계하였습니다. 이를 통해 하루 20,000건 이상 실행되는 Spark Application에 영향을 줄 수 있는 변경 사항도 자신 있게 배포할 수 있었어요.
Spark-submit Argument Unit Test
EMR on EC2에서 Spark on Kubernetes 환경으로 전환하면서, 작업 제출 인터페이스를 SparkSubmitOperator 기반으로 통일하는 과정에서 작업 실행 인자 또한 영향을 받는 문제가 있었습니다. 특히 일부 작업은 복잡한 쿼리 등을 인자로 전달하는 경우도 있었기에, 작업 제출 시 기존과 동일한 인자가 정확히 전달되는지 보장하는 것이 매우 중요했습니다.
때문에 이러한 작업 인자를 테스트할 수 있는 방식을 구현하고, 필요한 utility를 작성하였습니다. 아래와 같은 방식으로 기존 방식과 신규 방식 간에 작업 인자가 동일함을 확인할 수 있도록 구성했는데요.
def test_adv_h_003(airflow_environment, prod_environment):
from batch.dags.etc.mart.adv import adv_h_003
query = render_template(adv_h_003.TASK1, "2023-12-01T00:00:00+09:00")["query"]
assert query == """INSERT OVERWRITE ..."""
def test_count_and_update_num_products_in_category_dag(airflow_environment, prod_environment):
from batch.dags.commerce.product import count_and_update_num_products_in_category_dag
assert extract_spark_submit_cmd(
count_and_update_num_products_in_category_dag.count_products_in_category, "2024-01-04T06:10:00"
) == [
'spark-submit',
'--class',
...
]
Airflow를 통해 제출되는 Spark Application은 각 Task Instance마다 동적으로 렌더링되는 Templated Field를 실행 인자로 입력받습니다. 이를 이용해 기존 작업에서 사용되고 있던 렌더링 된 인자들을 비교값으로 정의하고, 변경된 작업 제출 방식을 통해 빌드되는 인자들을 비교하는 방식의 테스트를 간단하게 정의할 수 있도록 하였습니다.
또한 Fix-it Week 한 주 동안 백여 개가 넘는 Airflow DAG들이 레거시 제출 방식에서 SparkSubmitOperator 를 통한 작업 제출을 하도록 안전하게 이전할 수 있었습니다.
Hive Table Regression Test
데이터 플랫폼 팀을 비롯해 오늘의집 Data & Discovery 팀은 비즈니스/서비스 내에서 중요하게 사용되는 테이블의 빌드 로직을 변경하는 경우, 두 개의 서로 다른 Hive Table 및 Partition 간의 차이점을 알기 쉽게 비교해 주는 툴 (이하 Hive Table Diff Tool) 을 만들어 사용하고 있습니다.

이 툴은 작업자가 테이블 빌드 방식을 변경한 후 해당 툴을 원하는 인자와 함께 실행하면, 입력한 두 테이블 간의 데이터를 비교하고 알기 쉽게 보여주는데요, 이를 통해 직접 AWS Athena 등의 쿼리 툴이나, 개발 목적의 Spark Session을 열어 데이터를 확인할 필요 없이 적은 스텝으로 회귀 테스트가 가능합니다. 물론 실제 데이터의 정합성을 체크하는 툴인 만큼 적지 않은 자원이 필요하다 보니, 중요도가 높은 테이블의 작업 방식 마이그레이션 시에만 선택적으로 적용하여 안정성을 높였습니다.
Fallback & Gradual Ramp-up
테스트 체계를 충분히 마련하더라도 데이터 파이프라인의 특성상 테스트 환경에서는 문제없이 실행되던 작업이 프로덕션 환경에서는 여러 이유 (ex: 정합성이 맞지 않는 데이터, 포맷에서 벗어난 데이터, 다른 데이터셋 크기) 로 인해 정상 동작하지 않는 경우가 종종 발생합니다. 이러한 경우를 대비하기 위해 아래 두 가지 방법을 적용했어요.
- 신규 작업 제출 방식의 Fallback으로서 기존 작업 제출 방식이 동작할 수 있도록 한다: 신규 방식으로 작업 실행에 실패할 경우, 에러 알림과 함께 기존 작업 제출 방식으로 자동 재수행되도록 하여 SLI에는 약간의 지연 정도의 영향만 주도록 방어했습니다.
- 점진적인 적용 범위 확대: 전체 작업을 한 번에 마이그레이션하기 보단, 전체 중 설정된 비율만큼의 작업만 신규 작업 제출 방식으로 이전되도록 설정했어요. 만약 전체적인 범위에서 문제가 발생하더라도 일부 데이터 파이프라인만 영향이 제한되며, 또 설정을 통해 제어했기 때문에 빠르게 롤백 할 수 있도록 설계했습니다.
성과
지금까지 긴 글을 통해 설명했던 일련의 작업을 통해, 저희 데이터 플랫폼 팀은 데이터 파이프라인에 아래와 같은 개선사항들을 이뤄낼 수 있었습니다.
작업 실행 성능 개선
- EMR on EC2에서 존재했던 약 10분의 오버헤드를 일차적으로 덜어냈고, 더 나아가 spark-submit 제출 방식으로 변경하면서 추가적인 오버헤드 감소를 이뤄냈습니다.
- 기존에 순차적으로 실행하던 작업들을 이관 과정에서 병렬적으로 제출할 수 있게 개선하며, 최대 70% 리드 타임 감소를 이뤄냈습니다.
개선된 리소스 사용 효율
- EMR on EKS 로 마이그레이션하면서 EMR on EKS에서 제공하는 Spark 자체의 성능 개선 효과를 얻을 수 있었습니다.
- 한 개의 Spark 클러스터에 하나의 작업만 실행되면서 유휴 자원이 생기기 쉬웠던 구조에서 Karpenter를 활용하며 탄력적인 오토스케일링을 적용하여 자원 탄력성을 개선하였습니다.
- 개별 사용자가 리소스를 지정할 때도 비슷한 작업을 참고하여 동일한 프리셋을 사용하는 식으로 알맞은 리소스 사용량을 지정하기 쉬운 환경이 제시되었습니다.
간편해진 모니터링
- 전체 작업 내역을 단일 Spark history server를 통해 열람할 수 있게 되며, 작업 실패 시 문제를 특정하기 위한 과정도 간단해졌습니다.
- 작업 실행 중 사용한 리소스 사용량을 Spark history server 내의 metric 뿐만 아니라 Datadog dashboard 내에서도 breakdown해서 볼 수 있었습니다.
높아진 협업 효율성
- 전사적으로 사용하는 EKS 환경을 같이 사용하게 되며, EC2 instance의 부분적 장애가 발생하는 등 인프라 전문 인력이 필요한 장애 상황에 대해 더욱 민첩하고 효율적으로 대응할 수 있게 되었습니다.
- 그 외에 리소스 효율화를 위해 필요한 작업이 있을 경우, 인프라팀과 더욱 긴밀하게 협업할 수 있게 되었습니다.
개선된 코드 구조
- 기존에 파편화된 5+개의 Spark 작업 제출 구현체들을 하나로 통합하며 인지 부하를 줄일 수 있었어요. 이를 통해 개별 사용자가 훨씬 간편하게 Spark 작업을 선언하고 실행할 수 있게 되었습니다.
- 추후에도 Spark 작업 제출 환경을 개선할 경우, 높은 안정성으로 빠르게 마이그레이션을 시도해 볼 수 있는 패턴을 확보하였습니다.
오늘의집 데이터 플랫폼 팀은 아직 열일중
지금까지 많은 개선 사항을 이뤄냈지만, 오늘의집 데이터 플랫폼 팀은 아직 배고픕니다. 저희는 아래와 같은 재밌는 일들을 풀어나가고 있어요.
여전히 파편화되어 있는 Spark Image 통합하기
- Airflow 환경에서 사용하는 Spark image는 EMR on EKS 용으로 제공된 이미지이다 보니, 앞서 언급하였듯 jupyterhub등의 로컬 / 개발 환경에서는 사용이 어렵다는 문제가 있어요.
- 이 때문에 개별 사용자는 jupyterhub에서 완결된 개발을 마쳐도 Airflow 환경에서 동일하게 실행되는 것을 보장받지 못하기에 이로 인한 혼란이 있는 상태입니다.
- 오늘의집 데이터 플랫폼 팀은 이를 개선하기 위해 ohouse-spark image 빌드 파이프라인을 정의하고, 모든 환경에서 동일한 도커 이미지를 사용하여 E2E 개발을 보다 쉽게 만들고자 여러 작업을 진행 중입니다. (여담이지만, 앞서 설명한 SparkSubmitOperator & SparkSubmitter를 통해 해당 환경으로의 이전도 매끄럽게 진행하고 있어요!)
- 더 자세한 내용에 관심이 있으시다면, 다음 포스트로 이어질 “Ohouse Spark - 오늘의집의 통합 Spark 환경”에서 관련된 내용을 읽어보는 것을 추천드립니다!
FinOps: 자원 사용량 기반으로 긴 시계열에서의 비용 증가를 높은 정확도로 측정/모니터링할 수 있는 아키텍처 설계
- Datadog Kubernetes dashboard를 사용할 수 있게 되며 리소스 효율성을 매우 직관적으로 볼 수 있게 되었지만, AWS측 데이터와는 다른 probe와 timeSeries 데이터를 사용하다 보니 Datadog에서 예상한 비용이 실제와는 괴리가 있는 문제가 있었어요.
- Datadog을 장기 시계열에 대한 비용 증가 측정 및 모니터링 목적으로 사용하는 것이 정확하지 않은 접근으로 판단하고, 해당 목적으로 사용할 수 있는 정확도가 높은 데이터를 찾는 것으로부터 개선을 시작했습니다.
- 이때 팀별 RBAC 설정을 통해 얻기 쉬워진 팀별 사용 데이터와, AWS 측에서 제공하는 Split cost allocation data 를 종합하여, 보다 직관적으로 각 팀 및 프로젝트별 컴퓨팅 비용을 모니터링하고 예상할 수 있는 아키텍처를 고민하고 만들어 나가고 있습니다.
오늘의집 데이터 플랫폼 팀은 지금과 같은 성과에서 그치지 않고 더욱 복잡한 문제를 구조적으로 풀어내고, 더 나은 방향을 훌륭한 동료분들과 함께 끊임없이 고민하며 나아가고 있어요. 매번 보다 훌륭한 데이터 인프라를 구축하기 위해 훌륭한 동료분들과 협업하고 있는 오늘의집 데이터 플랫폼 팀은 현재 다음 챕터를 만들어갈 동료분들을 기다리고 있습니다.
지금까지 긴 글 읽어주셔서 감사합니다!