    *출처 https://netflixtechblog.com/data-pipeline-asset-management-with-dataflow-86525b3e21ca



    Data pipeline asset management with Dataflow

    by Sam Redai, Jai Balani, Olek Gorajek


    - Asset : Any business logic code in a raw or compiled form to be executed as part of the user defined data pipeline.

    - Data Pipeline : A set of tasks to be executed in a 

    The problem

    - 매주 월요일에 백업 작업을 실시함


    ( 1 ) 만약 주어진 경로에 백업파일이 없거나

    ( 2 ) 존재하지만 다른 관리자에게 디렉토리 접근 권한을 줬거나 우연한 사고로 삭제

    ( 3 ) 혹은 새로운 백업 기능을 새로 추가하고 싶어 업데이트 하는 동안 존재하는 코드를 망가뜨렸을때라면?

    이런 상황을 가정하고 해결하려는 글이다.


    Requirements delivering to the Netflix data engineers or anyone who would like to schedule a workflow with some external assets in it.

    executable carrying the actual business logic of the job

    JAR compiled from scala, python or SQL file

    1. Versioning

    : both the workflow definition and its assets to be versioned and we want the versions to be tied together in a clear way

    2. Transparency

    : to know which version of an asset is running along with every workflow instance, so if there are any issues we can easily identify which version caused the problem and to which one we could revert, if necessary.

    3. ACID deployment

    : all the workflow assets bundled in an atomic, durable, isolated and consistent manner. This way, if necessary, all we need to know is which version of the workflow to roll back to, and the rest would be taken care of for us.



    S3. : infinite cloud storage systems

    An illustration of a typical deployment pipeline manually constructed by a user

    Doesn't consider branch/PR deployment

    Can't rollback to previous workflow versions

    Runtime dependency on user-managed cloud storage locations

    Deployment of an asset


    ├── dataflow.yaml
    ├── pyspark-workflow
    │ ├── main.sch.yaml
    │ └── hello_world
    │     ├── ...
    │     └── setup.py
    └── scala-workflow
        ├── build.gradle
        ├── main.sch.yaml
        └── src
        ├── main
        │   └── ...
        └── test
            └── ...


    stranger-data$ dataflow project list
    Python Assets:
     -> ./pyspark-workflow/hello_world/setup.py
    Summary: 1 found.
    Gradle Assets:
     -> ./scala-workflow/build.gradle
    Summary: 1 found.
    Scheduler Workflows:
     -> ./scala-workflow/main.sch.yaml
     -> ./pyspark-workflow/main.sch.yaml
    Summary: 2found.
    stranger-data$ dataflow project test
    Testing Python Assets:
     -> ./pyspark-workflow/hello_world/setup.py... PASSED
    Summary: 1 successful, 0 failed.
    Testing Gradle Assets:
     -> ./scala-workflow/build.gradle... PASSED
    Summary: 1 successful, 0 failed.
    Building Scheduler Workflows:
     -> ./scala-workflow/main.sch.yaml... CREATED ./.workflows/scala-workflow.main.sch.rendered.yaml
     -> ./pyspark-workflow/main.sch.yaml... CREATED ./.workflows/pyspark-workflow.main.sch.rendered.yaml
    Summary: 2 successful, 0 failed.
    Testing Scheduler Workflows:
     -> ./scala-workflow/main.sch.yaml... PASSED
     -> ./pyspark-workflow/main.sch.yaml... PASSED
    Summary: 2 successful, 0 failed.
    stranger-data$ dataflow project deploy
    Building Python Assets:
     -> ./pyspark-workflow/hello_world/setup.py... CREATED ./pyspark-workflow/hello_world/dist/hello_world-0.0.1-py3.7.egg
    Summary: 1 successful, 0 failed.
    Deploying Python Assets:
     -> ./pyspark-workflow/hello_world/setup.py... DEPLOYED AS dataflow.egg.hello_world.user.stranger-data.master.39206ee8.3
    Summary: 1 successful, 0 failed.
    Building Gradle Assets:
     -> ./scala-workflow/build.gradle... CREATED ./scala-workflow/build/libs/scala-workflow-all.jar
    Summary: 1 successful, 0 failed.
    Deploying Gradle Assets:
     -> ./scala-workflow/build.gradle... DEPLOYED AS dataflow.jar.scala-workflow.user.stranger-data.master.39206ee8.11
    Summary: 1 successful, 0 failed.
    • created a new version of the workflow assets
    • assigned the asset a “UUID” (consisting of the “dataflow” string, asset type, asset namespace, git repo owner, git repo name, git branch name, commit hash and consecutive build number)
    • and deployed them to a Dataflow managed S3 location.
    stranger-data$ dataflow project list eggs --namespace hello_world --deployed
    Project namespaces with deployed EGGS:
     -> dataflow.egg.hello_world.user.stranger-data.master.39206ee8.3
     -> dataflow.egg.hello_world.user.stranger-data.master.39206ee8.2
     -> dataflow.egg.hello_world.user.stranger-data.master.39206ee8.1

    주어진 네임스페이스에서 위와 같은 데이터 플로우 커맨드를 작성하였다.

    hello_world 내 리스트는 branch와 hash에서 asset의 오래된 배포 버전이다. 

    stranger-data$ dataflow project list workflows
    Scheduler Workflows:
     -> ./scala-workflow/main.sch.yaml
     -> ./pyspark-workflow/main.sch.yaml
    Summary: 2 found.
    # At Part of the content of one of these workflows.
    stranger-data$ cat ./scala-workflow/main.sch.yaml
     - ddl -> write
     - write -> audit
     - audit -> publish
     - ddl: ...
     - write:
           script: ${dataflow.jar.scala-workflow}
           class: com.netflix.spark.ExampleApp
           conf: ...
           params: ...
     - audit: ...
     - publish: ...

    yaml 파일로 정의된 workflow는 Scheduler API 중에 어떤 컴파일도 필요하지 않다.

    그러나 Dataflow는 rendering이라고 불리는 특별한 단계를 거치는데, 이는 데이터플로우 내 변수들과 최종 버전을 구성하기 위한 단계를 뜻한다.

    위 dataflow.jar.scala-workflow는 워크플로우가 렌더링될 것이고 가장 최신의 scala workflow JAR 파일의 최신 버전으로 배포되는 것을 의미한다. JAR file은 새로운 배포판으로부터 갱신될 새로운 JAR의 빌드버전과 워크플로우의 흐름이 같은 레포지토리에 한 부분으로 만들어지는 것이 가능하다. 그러나 완전히 다른 프로젝트에서도 이 통합성이 발휘되어,  테스팅, 새로운 워크플로우의 배포버전이 섞일 수 있다.

    stranger-data$ dataflow project deploy
    Building Scheduler Workflows:
     -> ./scala-workflow/main.sch.yaml... CREATED ./.workflows/scala-workflow.main.sch.rendered.yaml
     -> ./pyspark-workflow/main.sch.yaml... CREATED ./.workflows/pyspark-workflow.main.sch.rendered.yaml
    Summary: 2 successful, 0 failed.
    Deploying Scheduler Workflows:
     -> ./scala-workflow/main.sch.yaml… DEPLOYED AS https://hawkins.com/scheduler/sandbox:user.stranger-data.scala-workflow
     -> ./pyspark-workflow/main.sch.yaml… DEPLOYED AS https://hawkins.com/scheduler/sandbox:user.stranger-data.pyspark-workflow
    Summary: 2 successful, 0 failed.
    stranger-data$ cat ./scala-workflow/main.sch.yaml
     - ddl -> write
     - write -> audit
     - audit -> publish
     - ddl: ...
     - write:
           script: s3://dataflow/jars/scala-workflow/user/stranger-data/master/39206ee8/1.jar
           class: com.netflix.spark.ExampleApp
           conf: ...
           params: ...
     - audit: ...
     - publish: ...

    Jenkins 이용 Deployment 예시

    이런 배포의 일을 하기도 하지만, 실제로 Dataflow를 만들어볼까?

    Netflix에서의 batch data pipeline(bootstrapping, standardization, automation)


    command line으로서, 데이터 파이프라인 배포를 유선화하고 (고객)경험을 향상 시켜주는 툴

    $ dataflow --help
    Usage: dataflow [OPTIONS] COMMAND [ARGS]...
      --docker-image TEXT  Url of the docker image to run in.
      --run-in-docker      Run dataflow in a docker container.
      -v, --verbose        Enables verbose mode.
      --version            Show the version and exit.
      --help               Show this message and exit.
      migration  Manage schema migration.
      mock       Generate or validate mock datasets.
      project    Manage a Dataflow project.
      sample     Generate fully functional sample workflows.

    - Dataflow CLI는 4개의 범주로 나뉨

    - 가장 많이 사용되는 것은 dataflow project : 데이터 파이프라인 레포지토리를 관리하기 위한 복제를 도와준다.

        - creation, testing, deployment와 같은 일을 수행하기 위한 복제 (folk)

    - data migration : 특별한 기능으로, 데이터 웨어하우스 내 커뮤니케이션과 테이블의 변화 추적을 자동화하기 위해 만들어진 것이다.

    Netflix만의 내부 선형 시스템 덕에, Dataflow migration은 하향식으로 테이블 질의의 사용량을 파악할 수 있게 해준다.

    -> 이는 dataflow에 의존하는 사람들에게 메시지를 제공해주며, 시작된 마이그레이션은 Dataflow를 그들의 변화를 따라가게 해주고 다른 밑에 있는 유저들과 의사소통이 가능하게 한다.


    Dataflow mock : Netflix의 데이터 웨어하우스로부터 선택된 테이블, 열, 몇몇 행에서 정형화된 mock 데이터가 있는 yaml 파일을 생성하게 해준다

    Sample workflows

    • clean DDL code,
    • proper table metadata settings,
    • transformation job (in a language of choice) wrapped in an optional WAP (Write, Audit, Publish) pattern,
    • sample set of data audits for the generated data,
    • and a fully functional unit test for your transformation logic.


    Business Logic

    Top 100 movies/shows in every country where Netflix operates on a daily basis.

    Step 1. On a daily basis, incrementally, sum up all viewing time of all movies and shows in every country.

    WITH STEP_1 AS (
            , country_code
            , SUM(view_hours) AS view_hours
        FROM some_db.source_table
        WHERE playback_date = CURRENT_DATE
        GROUP BY
            , country_code

    /Step 2. rank all titles from most-watched to list in every country

    WITH STEP_2 AS (
            , country_code
            , view_hours
            , RANK() OVER (
            	PARTITION BY country_code
                ORDER BY view_hours DESC
            ) AS title_rank
        FROM STEP_1

    Step 3. filter all titles to the top 100

    WITH STEP_3 AS (
            , country_code
            , view_hours
            , title_rank
        FROM STEP_2
        WHERE title_rank <= 100

    이 세 과정을 통해, Iceberg table을 이용해 데이터를 만든다.

    CREATE TABLE IF NOT EXISTS ${TARGET_DB}.dataflow_sample_results (
    	title_id INT COMMENT "Title ID of the movie or show."
        , country_code STRING COMMENT "Country code of the playback session."
        , title_rank INT COMMENT "Rank of a given title in a given country."
        , view_hours DOUBLE COMMENT "Total viewing hours of a given title in a given country."
    	"Example dataset brought to you by Dataflow. For more information in this
        and other examples please visit the Dataflow documentation page."
    	date DATE COMMENT "Playback date."
     sql> SELECT * FROM foo.dataflow_sample_results 
          WHERE date = 20220101 and country_code = 'US' 
          ORDER BY title_rank LIMIT 5;
     title_id | country_code | title_rank | view_hours | date
     11111111 | US           |          1 |   123      | 20220101
     44444444 | US           |          2 |   111      | 20220101
     33333333 | US           |          3 |   98       | 20220101
     55555555 | US           |          4 |   55       | 20220101
     22222222 | US           |          5 |   11       | 20220101
    (5 rows)
