ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Airflow 사용해보기
    Data Engineer 2023. 8. 21. 01:57
    728x90

    Docker와 Docker-compose(v2 이상) 설치를 진행 후, 진행해보았다.

    우선 docker 환경에서 에어플로우를 실행하기 위해서는 docker-compose.yaml 파일이 필요하다.

    필요한 기본적인 파일은 에어플로우 공식 사이트(https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html)에서 확인이 가능하며, 터미널에서 다음과 같은 명령어를 실행해주면 된다.

    curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.0/docker-compose.yaml'
    # Licensed to the Apache Software Foundation (ASF) under one
    # or more contributor license agreements.  See the NOTICE file
    # distributed with this work for additional information
    # regarding copyright ownership.  The ASF licenses this file
    # to you under the Apache License, Version 2.0 (the
    # "License"); you may not use this file except in compliance
    # with the License.  You may obtain a copy of the License at
    #
    #   http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing,
    # software distributed under the License is distributed on an
    # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    # KIND, either express or implied.  See the License for the
    # specific language governing permissions and limitations
    # under the License.
    #
    
    # Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
    #
    # WARNING: This configuration is for local development. Do not use it in a production deployment.
    #
    # This configuration supports basic configuration using environment variables or an .env file
    # The following variables are supported:
    #
    # AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
    #                                Default: apache/airflow:2.7.0
    # AIRFLOW_UID                  - User ID in Airflow containers
    #                                Default: 50000
    # AIRFLOW_PROJ_DIR             - Base path to which all the files will be volumed.
    #                                Default: .
    # Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
    #
    # _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
    #                                Default: airflow
    # _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
    #                                Default: airflow
    # _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
    #                                Use this option ONLY for quick checks. Installing requirements at container
    #                                startup is done EVERY TIME the service is started.
    #                                A better way is to build a custom image or extend the official image
    #                                as described in https://airflow.apache.org/docs/docker-stack/build.html.
    #                                Default: ''
    #
    # Feel free to modify this file to suit your needs.
    ---
    version: '3.8'
    x-airflow-common:
      &airflow-common
      # In order to add custom dependencies or upgrade provider packages you can use your extended image.
      # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
      # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
      image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.7.0}
      # build: .
      environment:
        &airflow-common-env
        AIRFLOW__CORE__EXECUTOR: CeleryExecutor
        AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
        # For backward compatibility, with Airflow <2.3
        AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
        AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
        AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
        AIRFLOW__CORE__FERNET_KEY: ''
        AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
        AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
        AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
        # yamllint disable rule:line-length
        # Use simple http server on scheduler for health checks
        # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
        # yamllint enable rule:line-length
        AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
        # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
        # for other purpose (development, test and especially production usage) build/extend Airflow image.
        _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
      volumes:
        - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
        - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
        - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
        - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
      user: "${AIRFLOW_UID:-50000}:0"
      depends_on:
        &airflow-common-depends-on
        redis:
          condition: service_healthy
        postgres:
          condition: service_healthy
    
    services:
      postgres:
        image: postgres:13
        environment:
          POSTGRES_USER: airflow
          POSTGRES_PASSWORD: airflow
          POSTGRES_DB: airflow
        volumes:
          - postgres-db-volume:/var/lib/postgresql/data
        healthcheck:
          test: ["CMD", "pg_isready", "-U", "airflow"]
          interval: 10s
          retries: 5
          start_period: 5s
        restart: always
    
      redis:
        image: redis:latest
        expose:
          - 6379
        healthcheck:
          test: ["CMD", "redis-cli", "ping"]
          interval: 10s
          timeout: 30s
          retries: 50
          start_period: 30s
        restart: always
    
      airflow-webserver:
        <<: *airflow-common
        command: webserver
        ports:
          - "8080:8080"
        healthcheck:
          test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
          interval: 30s
          timeout: 10s
          retries: 5
          start_period: 30s
        restart: always
        depends_on:
          <<: *airflow-common-depends-on
          airflow-init:
            condition: service_completed_successfully
    
      airflow-scheduler:
        <<: *airflow-common
        command: scheduler
        healthcheck:
          test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
          interval: 30s
          timeout: 10s
          retries: 5
          start_period: 30s
        restart: always
        depends_on:
          <<: *airflow-common-depends-on
          airflow-init:
            condition: service_completed_successfully
    
      airflow-worker:
        <<: *airflow-common
        command: celery worker
        healthcheck:
          # yamllint disable rule:line-length
          test:
            - "CMD-SHELL"
            - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
          interval: 30s
          timeout: 10s
          retries: 5
          start_period: 30s
        environment:
          <<: *airflow-common-env
          # Required to handle warm shutdown of the celery workers properly
          # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
          DUMB_INIT_SETSID: "0"
        restart: always
        depends_on:
          <<: *airflow-common-depends-on
          airflow-init:
            condition: service_completed_successfully
    
      airflow-triggerer:
        <<: *airflow-common
        command: triggerer
        healthcheck:
          test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
          interval: 30s
          timeout: 10s
          retries: 5
          start_period: 30s
        restart: always
        depends_on:
          <<: *airflow-common-depends-on
          airflow-init:
            condition: service_completed_successfully
    
      airflow-init:
        <<: *airflow-common
        entrypoint: /bin/bash
        # yamllint disable rule:line-length
        command:
          - -c
          - |
            function ver() {
              printf "%04d%04d%04d%04d" $${1//./ }
            }
            airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
            airflow_version_comparable=$$(ver $${airflow_version})
            min_airflow_version=2.2.0
            min_airflow_version_comparable=$$(ver $${min_airflow_version})
            if (( airflow_version_comparable < min_airflow_version_comparable )); then
              echo
              echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
              echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
              echo
              exit 1
            fi
            if [[ -z "${AIRFLOW_UID}" ]]; then
              echo
              echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
              echo "If you are on Linux, you SHOULD follow the instructions below to set "
              echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
              echo "For other operating systems you can get rid of the warning with manually created .env file:"
              echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
              echo
            fi
            one_meg=1048576
            mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
            cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
            disk_available=$$(df / | tail -1 | awk '{print $$4}')
            warning_resources="false"
            if (( mem_available < 4000 )) ; then
              echo
              echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
              echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
              echo
              warning_resources="true"
            fi
            if (( cpus_available < 2 )); then
              echo
              echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
              echo "At least 2 CPUs recommended. You have $${cpus_available}"
              echo
              warning_resources="true"
            fi
            if (( disk_available < one_meg * 10 )); then
              echo
              echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
              echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
              echo
              warning_resources="true"
            fi
            if [[ $${warning_resources} == "true" ]]; then
              echo
              echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
              echo "Please follow the instructions to increase amount of resources available:"
              echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
              echo
            fi
            mkdir -p /sources/logs /sources/dags /sources/plugins
            chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
            exec /entrypoint airflow version
        # yamllint enable rule:line-length
        environment:
          <<: *airflow-common-env
          _AIRFLOW_DB_MIGRATE: 'true'
          _AIRFLOW_WWW_USER_CREATE: 'true'
          _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
          _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
          _PIP_ADDITIONAL_REQUIREMENTS: ''
          
        user: "0:0"
        volumes:
          - ${AIRFLOW_PROJ_DIR:-.}:/sources
    
      airflow-cli:
        <<: *airflow-common
        profiles:
          - debug
        environment:
          <<: *airflow-common-env
          CONNECTION_CHECK_MAX_COUNT: "0"
        # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
        command:
          - bash
          - -c
          - airflow
    
      # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
      # or by explicitly targeted on the command line e.g. docker-compose up flower.
      # See: https://docs.docker.com/compose/profiles/
      flower:
        <<: *airflow-common
        command: celery flower
        profiles:
          - flower
        ports:
          - "5555:5555"
        healthcheck:
          test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
          interval: 30s
          timeout: 10s
          retries: 5
          start_period: 30s
        restart: always
        depends_on:
          <<: *airflow-common-depends-on
          airflow-init:
            condition: service_completed_successfully
    
    volumes:
      postgres-db-volume:

    위에서 주요 환경변수들에 대해 알아보자.

     

    image

    도커 이미지를 정의하는 변수로, 최신의 이미지를 사용하는 것이 동기화에 좋다. 

    동기화를 해야 하는 이유는, 협업 시에도 일어날 수 있는 오류를 최소화할 수 있기 때문이다.

    environment

    airflow 인스턴스를 커스터마이징 하기 위한 변수다.

    AIRFLOW-CORE-EXECUTOR executor가 정의된다
    executor : 작업 인스턴스가 실행되는 메커니즘으로, 하나의 에어플로우엔 하나의 executor만 가질 수 있다. 예제에서 사용하는 CeleryExecutor의 경우 Celery Backend(Redis, RabbitMQ, Redis Sentinnel 등) 기반으로 실행되는 executor이다.
    AIRFLOW-CORE-SQL-ALCHEMY-CONN 메타 데이터베이스와 연결을 구체화한다
    AIRFLOW-CELERY-RESULT-BACKEND server-executor를 사용한다
    AIRFLOW-CELERY-BROKER-URL 스케쥴러와 워커 간에 태스크들과 메시지들을 교환하는 역할이다
    AIRFLOW-CORE-FERNET-KEY 암호화하는 역할
    AIRFLOW-CORE-DAGS-ARE-PAUSED-AT-CREATION 시작 시 DAG들을 중지시킬지 말지 결정하는 변수다
    AIRFLOW-LOAD-EXAMPLES 에어플로우 실행시 예제를 띄울지 말지 결정하는 변수다.
    AIRFLOW__API__AUTH_BACKENDS:  'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'

    volume

    도커에서 중요한 부분으로, 모든 폴더들과, DAG, 로그, 플러그인들이 volume을 통해 컨테이너들과 호스트 사이에서 동기화된다.

    user

    컨테이너에 대한 접근 권한을 확실하게 하기 위해 만들어진 변수

    service

    에어플로우가 필요로 하는 구성요소들이 나열된 변수

    airflow-init

    에어플로우 실행 시 인스턴스를 처음으로 실행하는 역할이다.

     

    에어플로우를 컨테이너 상에 띄우려면 터미널에 다음과 같은 명령어를 실행한다.

    docker-compose up airflow-init
    docker-compose up

    실행이 성공적이라면, 'docker ps' 명령어를 실행했을 때 다음과 같은 화면이 뜨고, 

    docker-compose.yaml에 정의된 webserver 컨테이너 주소 http://localhost:8080에 접속하면 에어플로우 실행창이 뜬다.

    터미널 상에서 컨테이너에 직접 명령을 하고 싶다면

    docker exec [CONTAINER_ID] airflow ~

    라고 입력하면 된다.

    api로 정보를 보고싶다면

    curl -X GET "http://localhost:8080/api/v1/dags"

    로 실행하면 된다.

    권한이 없기에, 401 에러가 뜬다. login 옵션이 필요하다

    DAG 작성

    내 airflow 폴더의 구성은 다음과 같다.

    from airflow import DAG
    from airflow.operators.python import PythonOperator, BranchPythonOperator
    from airflow.operators.bash import BashOperator
    
    from random import randint
    
    from datetime import datetime
    
    def _choose_best_model(ti):
        accuracies = ti.xcom_pull(task_ids=[
            'training_model_A', 
            'training_model_B', 
            'training_model_C'
            ])
        best_accuracy = max(accuracies)
        
        if (best_accuracy > 8):
            return 'accurate'
        return 'inaccurate'
    
    def _training_model():
        return randint(1, 10)
    
    with DAG("my_dag", start_date=datetime(2023, 8, 1),
             schedule_interval="@daily", catchup=False) as dag:
        
        training_model_A = PythonOperator(
            task_id="training_model_A",
            python_callable=_training_model
        )
    
        training_model_B = PythonOperator(
            task_id="training_model_B",
            python_callable=_training_model
        )
    
        training_model_C = PythonOperator(
            task_id="training_model_C",
            python_callable=_training_model
        )
    
        choose_best_model = BranchPythonOperator(
            task_id="choose_best_model",
            python_callable=_choose_best_model
        )
    
        accurate = BashOperator(
            task_id="accurate",
            bash_command="echo 'accurate'"
        )
    
        inaccurate = BashOperator(
            task_id="inaccurate",
            bash_command="echo 'inaccurate'"
        )
    
        # define the dependencies in airflow
        [training_model_A, training_model_B, training_model_C] >> choose_best_model >> [accurate, inaccurate]

    훈련 모델 A, B, C 가 있고, 0부터 10까지 난수를 생성하는 데 셋 중 가장 높은 모델의 accuracy가 8을 넘어가면 accurate, 그렇지 않으면 inaccurate를 반환하는 DAG이다. 

    Airflow에 성공적으로 실행하게 되면 다음과 같은 화면이 나온다.

    728x90

    'Data Engineer' 카테고리의 다른 글

    데이터 엔지니어링 수명 주기 전체에 걸친 기술 선택  (0) 2023.09.17
    DMBOK  (0) 2023.09.03
    <Kafka 발제 자료>  (0) 2023.08.20
    Kafka 실습  (0) 2023.08.15
    Spring 입문 강의 정리 노트  (0) 2023.07.31
Designed by Tistory.