Airflow 2 버전으로 Migration 한 후기
프로그래밍/데이터 엔지니어링

Airflow 2 버전으로 Migration 한 후기

 

migration하면서 화나서 귀요미 로고로 바꿔봤다

데이터 엔지니어 포지션으로 회사에 들어간 지 3개월이 되었습니다. 취준을 하면서 데이터 파이프라인에 중요한 역할을 하는 Airflow를 처음 알게 되었고, execution_date와 start_date가 계속 헷갈렸었는데... 어느덧 회사에서 성공적으로 Airflow 1.10.14 버전에서 2.1.0 버전으로 migration을 완료하였습니다. 본 글에서는 Airflow 2로 migration의 레슨 런을 정리해봤습니다. 잘 정제된 글은 아닙니다만, 해당 주제로 조만간 회사 기술 블로그에서 더 상세하게 다루도록 하겠습니다.

 

Airflow v2

  • Airflow란?
    • Airflow는 Python으로 개발된 워크플로우 작성 + 스케줄링 관리 플랫폼입니다. 간단한 스케줄링 툴은 많지만, airflow의 매력은 태스크를 바탕으로 다양한 플로우 작성이 가능하고 실패한 작업들에 대해 BackFill을 잘 지원해주기 때문이라고 생각합니다.
  • Airflow v2의 특징
    • 스케줄링 퍼포먼스 개선
      참고: 에어플로우 공식 문서 
      • Airflow 1버전 대의 가장 큰 문제는 Dag들이 늘어났을 때 Scheduler가 Dag run과 Task instance를 바로바로 실행시키지 못한다는 점입니다.
      • Airflow 2버전대에서는 DAG Serialization과 Fast-Follow를 도입하여 Scheduler의 반복적인 DAG 파싱 작업을 줄이고 Task Scheduling 과정이 개선되었습니다. astronomer 블로그에서 벤치마크 테스트를 했을 때 10배 이상의 성능 개선이 있었다고 합니다(확실히 연속된 Task instance 간의 Lag가 많이 줄긴 했습니다)
      • Scheduler HA를 지원해서 Scale Out이 가능하도록 지원합니다. Active-Active 클러스터 형태로 Scheduler의 HA를 지원합니다. (Meta DB에서는 SELECT ... FOR UPDATE로 row-level rock이 걸리게 됩니다. 그런데 테스트를 해보니 Scheduler들은 race 형식으로 스케줄링을 하게 되어 DB에 부담이 가지 않을까 하는 생각이 들었습니다)
    • Web 서버 사용성 개선
      • (Airflow 1.10 버전 대에도 있긴 하지만) Dag Serialization을 통해 더 빠르게 웹 UI에서 더 빠르게 DAG 정보를 불러올 수 있습니다.
      • Auto Refresh 기능이 추가되었습니다.
      • UI/UX가 모-던해졌습니다.
    • Airflow Core Component에서 Provider 분리, Task Group, Smart Sensor, full rest API 도입 등등 꽤 많은 변화가 있습니다. 더 궁금하신 분들은 해당 글을 읽어보시면 도움이 될 것 같습니다.

migration 하는 과정

  • Airflow 공식 문서에 2 버전 대로 마이그레이션 하는 가이드가 있습니다. apache-airflow-upgrade-check 패키지를 설치하여 검사하고 Airflow db upgrade 명령어로 2버전 대에 맞게 업데이트하는 과정이죠.
  • 하지만 메이저 버전이 올라갔는데 script로만 처리하기엔 운영 환경에서 불안정하다고 생각이 들었습니다 (통수를 몇 번 맞고 나니.. 애초에 안 믿었던 걸 수도) 그래서 저희 쪽에선 Airflow 2 버전을 위한 별도의 환경을 구축하고 천천히 migration을 했습니다.
    1. Meta DB(Mysql), Log Storage(Google Cloud Storage) 등 Airflow 2를 위한 별도의 환경을 구축합니다.
    2. airflow 2 버전의 DAG들이 정상적으로 파싱되는지, 로깅이 잘 되는지 등 버그들이 있는지 확인합니다.
      • 대부분 하위호환이 잘 되긴 했지만 몇 가지 부분에서는 DAG 코드 수정이 필요했습니다.
        • Custom Operator를 생성할 때 kwargs에 custom 인자를 넣는 것이 금지되었습니다. 아래와 같이 별도의 파라미터를 넣어줘야 합니다.
          •  
          • def **init**(self, prev_delta=0, *args, **kwargs):
        • 기존의 import 경로들은 Deprecated warning과 함께 하위호환이 되지만 안 되는 녀석(Sensor 같은 녀석)들도 몇 개 존재했습니다. 그럴 경우 Import Error를 뱉어 Dag 파싱 자체가 되지 않았습니다(만약 import 경로에 관해 하위 호환을 주고 싶다면 DAG Code 레벨에서 별도로 분기 처리를 해줘야 할 것 같다는 생각이 듭니다)
          • (전) from airflow.sensors.external_task_sensor import ExternalTaskSensor
            (후) from airflow.sensors.external_task import ExternalTaskSensor
    3. 기본적으로 특정 그룹 DAG들의 catchup을 False로 준 후, Airflow 1에서 OFF, Airflow 2에서 ON을 하면서 정상적으로 동작하는지 확인합니다. 2개 이상의 DAG RUN이 성공적으로 실행되었다면 무사히 잘 동작한다고 판단하였습니다.
    4. 정상적으로 migration이 완료한 후 1주일 정도 경과를 지켜보고 문제가 없다고 판단되면, Airflow 1 버전을 내립니다.

 

옮기면서 겪었던 문제점들

  • Dag Serialization 과정이 포함된 Dag Parsing 과정이 생각보다 느립니다.
    • 저희 팀에서는 누구나 Airflow를 테스트해볼 수 있도록 개발 환경에서 격리된 Airlfow application을 띄워줍니다. 이때 Dag가 전부 파싱되는데 20분 이상이 걸리는 문제가 있습니다. 이에 .airflowignore 를 활용하여 특정 directory에 넣은 dag들만 파싱할 수 있도록 환경을 개선하였습니다. 이는 스케줄러의 CPU, Memory 자원 절감으로도 이어졌습니다.
  • Scheduler의 작업이 CPU, Memory 성능을 많이 먹습니다. 초반에 Dag Serialization 작업이 들어가면서 (Dag 파싱과 스케줄링 프로세스가 분리되었고 더 많은 리소스를 먹지 않았나라는 추측을 해봅니다) 이를 최소화하기 위해서 대표적으로 몇 가지 configuration을 수정했습니다.
    • airflow_scheduler_min_file_process_interval 높이기
    • airflow_scheduler_parsing_processes 낮추기
    • airflow_core_min_serialized_dag_update_interval 높이기
  • 자잘한 버그들이 계속 있기에 minor version 체크를 계속하고 코드 커스터마이징이 필요합니다.
    • Log Line Break이 안 되는 이슈
    • Auto Refresh 이슈
    • 등등
  • Airflow Code를 Clone 한 후 Dockerfile을 이용해 Container build를 하는 방식으로 커스터마이징을 진행했는데, 버그 픽스를 위해 버전업을 하려고 할 때 Local Build가 정상적으로 동작하지 않습니다(고쳐줘!!😭)

 

느낀 점

에어플로우야 다시 보지 말자~!

  • 기존 v1에서는 특정 시간 대에 Dag run이 수십 개가 동시에 실행될 때 Task Instance 스케줄링 Latency가 굉장히 길었습니다. 하지만 Airflow 2 넘어오면서 확실히 latency가 많이 줄었고 더 안정적으로 파이프라인 운영이 가능해졌습니다.
  • Kuberetes 환경에서 운영하면 Helm chart도 꽤 신경을 써야 했습니다. 어떤 차트를 쓰냐에 따라 kubernetes에서 failover가 잘 안될 수도 있었습니다. 저희 같은 경우 User Community Helm chart를 사용하였으나, Airflow에서 공식으로 Helm Chart를 지원해준다고 하여 나중에 한 번 POC를 해볼 생각입니다.
  • 뭔가 결과적으로 봤을 때 많은 코드 변화가 있는 것은 아닌데, stable 하게 팀 내에 도입하기까지 시간이 오래 걸렸습니다. 제가 airflow, kubernetes에 대한 배경지식과 히스토리가 부족한 상태에서 시작하다 보니 시행착오가 더 많았지만 Airflow의 복잡한 configuration이 또 한몫을 하는 것 같습니다(남 탓)

 

처음에 Kubernetes도 처음 써보고 Airflow를 다뤄본 경험도 적다 보니 정말 막막했습니다. 동료들이 도움을 주시지 않았다면 이곳에 작고 소중한 경험 공유마저 하지 못했을 것 같습니다. 본 주제에 대한 더 상세한 내용은 조만간 회사 기술 블로그에서 더 상세하게 다뤄보겠습니다. 감사합니다.