-
[Netflix TechBlog] Data PipeLine_Asset ManagementData Engineer 2023. 6. 4. 14:35728x90
배포판 관리
*출처 https://netflixtechblog.com/data-pipeline-asset-management-with-dataflow-86525b3e21ca
https://netflixtechblog.com/ready-to-go-sample-data-pipelines-with-dataflow-17440a9e141d
- Asset : Any business logic code in a raw or compiled form to be executed as part of the user defined data pipeline.
- Data Pipeline : A set of tasks to be executed in a
The problem
- 매주 월요일에 백업 작업을 실시함
그러나,
( 1 ) 만약 주어진 경로에 백업파일이 없거나
( 2 ) 존재하지만 다른 관리자에게 디렉토리 접근 권한을 줬거나 우연한 사고로 삭제
( 3 ) 혹은 새로운 백업 기능을 새로 추가하고 싶어 업데이트 하는 동안 존재하는 코드를 망가뜨렸을때라면?
이런 상황을 가정하고 해결하려는 글이다.
Requirements delivering to the Netflix data engineers or anyone who would like to schedule a workflow with some external assets in it.
executable carrying the actual business logic of the job
JAR compiled from scala, python or SQL file
1. Versioning
: both the workflow definition and its assets to be versioned and we want the versions to be tied together in a clear way
2. Transparency
: to know which version of an asset is running along with every workflow instance, so if there are any issues we can easily identify which version caused the problem and to which one we could revert, if necessary.
3. ACID deployment
: all the workflow assets bundled in an atomic, durable, isolated and consistent manner. This way, if necessary, all we need to know is which version of the workflow to roll back to, and the rest would be taken care of for us.
Previous
S3. : infinite cloud storage systems
An illustration of a typical deployment pipeline manually constructed by a user
Doesn't consider branch/PR deployment
Can't rollback to previous workflow versions
Runtime dependency on user-managed cloud storage locations
Deployment of an asset
stranger-data
. ├── dataflow.yaml ├── pyspark-workflow │ ├── main.sch.yaml │ └── hello_world │ ├── ... │ └── setup.py └── scala-workflow ├── build.gradle ├── main.sch.yaml └── src ├── main │ └── ... └── test └── ...
stranger-data$ dataflow project list Python Assets: -> ./pyspark-workflow/hello_world/setup.py Summary: 1 found. Gradle Assets: -> ./scala-workflow/build.gradle Summary: 1 found. Scheduler Workflows: -> ./scala-workflow/main.sch.yaml -> ./pyspark-workflow/main.sch.yaml Summary: 2found.
stranger-data$ dataflow project test Testing Python Assets: -> ./pyspark-workflow/hello_world/setup.py... PASSED Summary: 1 successful, 0 failed. Testing Gradle Assets: -> ./scala-workflow/build.gradle... PASSED Summary: 1 successful, 0 failed. Building Scheduler Workflows: -> ./scala-workflow/main.sch.yaml... CREATED ./.workflows/scala-workflow.main.sch.rendered.yaml -> ./pyspark-workflow/main.sch.yaml... CREATED ./.workflows/pyspark-workflow.main.sch.rendered.yaml Summary: 2 successful, 0 failed. Testing Scheduler Workflows: -> ./scala-workflow/main.sch.yaml... PASSED -> ./pyspark-workflow/main.sch.yaml... PASSED Summary: 2 successful, 0 failed.
stranger-data$ dataflow project deploy Building Python Assets: -> ./pyspark-workflow/hello_world/setup.py... CREATED ./pyspark-workflow/hello_world/dist/hello_world-0.0.1-py3.7.egg Summary: 1 successful, 0 failed. Deploying Python Assets: -> ./pyspark-workflow/hello_world/setup.py... DEPLOYED AS dataflow.egg.hello_world.user.stranger-data.master.39206ee8.3 Summary: 1 successful, 0 failed. Building Gradle Assets: -> ./scala-workflow/build.gradle... CREATED ./scala-workflow/build/libs/scala-workflow-all.jar Summary: 1 successful, 0 failed. Deploying Gradle Assets: -> ./scala-workflow/build.gradle... DEPLOYED AS dataflow.jar.scala-workflow.user.stranger-data.master.39206ee8.11 Summary: 1 successful, 0 failed. ...
- created a new version of the workflow assets
- assigned the asset a “UUID” (consisting of the “dataflow” string, asset type, asset namespace, git repo owner, git repo name, git branch name, commit hash and consecutive build number)
- and deployed them to a Dataflow managed S3 location.
stranger-data$ dataflow project list eggs --namespace hello_world --deployed Project namespaces with deployed EGGS: hello_world -> dataflow.egg.hello_world.user.stranger-data.master.39206ee8.3 -> dataflow.egg.hello_world.user.stranger-data.master.39206ee8.2 -> dataflow.egg.hello_world.user.stranger-data.master.39206ee8.1
주어진 네임스페이스에서 위와 같은 데이터 플로우 커맨드를 작성하였다.
hello_world 내 리스트는 branch와 hash에서 asset의 오래된 배포 버전이다.
stranger-data$ dataflow project list workflows Scheduler Workflows: -> ./scala-workflow/main.sch.yaml -> ./pyspark-workflow/main.sch.yaml Summary: 2 found. # At Part of the content of one of these workflows. stranger-data$ cat ./scala-workflow/main.sch.yaml ... dag: - ddl -> write - write -> audit - audit -> publish jobs: - ddl: ... - write: spark: script: ${dataflow.jar.scala-workflow} class: com.netflix.spark.ExampleApp conf: ... params: ... - audit: ... - publish: ... ...
yaml 파일로 정의된 workflow는 Scheduler API 중에 어떤 컴파일도 필요하지 않다.
그러나 Dataflow는 rendering이라고 불리는 특별한 단계를 거치는데, 이는 데이터플로우 내 변수들과 최종 버전을 구성하기 위한 단계를 뜻한다.
위 dataflow.jar.scala-workflow는 워크플로우가 렌더링될 것이고 가장 최신의 scala workflow JAR 파일의 최신 버전으로 배포되는 것을 의미한다. JAR file은 새로운 배포판으로부터 갱신될 새로운 JAR의 빌드버전과 워크플로우의 흐름이 같은 레포지토리에 한 부분으로 만들어지는 것이 가능하다. 그러나 완전히 다른 프로젝트에서도 이 통합성이 발휘되어, 테스팅, 새로운 워크플로우의 배포버전이 섞일 수 있다.
stranger-data$ dataflow project deploy ... Building Scheduler Workflows: -> ./scala-workflow/main.sch.yaml... CREATED ./.workflows/scala-workflow.main.sch.rendered.yaml -> ./pyspark-workflow/main.sch.yaml... CREATED ./.workflows/pyspark-workflow.main.sch.rendered.yaml Summary: 2 successful, 0 failed. Deploying Scheduler Workflows: -> ./scala-workflow/main.sch.yaml… DEPLOYED AS https://hawkins.com/scheduler/sandbox:user.stranger-data.scala-workflow -> ./pyspark-workflow/main.sch.yaml… DEPLOYED AS https://hawkins.com/scheduler/sandbox:user.stranger-data.pyspark-workflow Summary: 2 successful, 0 failed.
stranger-data$ cat ./scala-workflow/main.sch.yaml ... dag: - ddl -> write - write -> audit - audit -> publish jobs: - ddl: ... - write: spark: script: s3://dataflow/jars/scala-workflow/user/stranger-data/master/39206ee8/1.jar class: com.netflix.spark.ExampleApp conf: ... params: ... - audit: ... - publish: ... ...
이런 배포의 일을 하기도 하지만, 실제로 Dataflow를 만들어볼까?
Netflix에서의 batch data pipeline(bootstrapping, standardization, automation)
DataFlow
command line으로서, 데이터 파이프라인 배포를 유선화하고 (고객)경험을 향상 시켜주는 툴
$ dataflow --help Usage: dataflow [OPTIONS] COMMAND [ARGS]... Options: --docker-image TEXT Url of the docker image to run in. --run-in-docker Run dataflow in a docker container. -v, --verbose Enables verbose mode. --version Show the version and exit. --help Show this message and exit. Commands: migration Manage schema migration. mock Generate or validate mock datasets. project Manage a Dataflow project. sample Generate fully functional sample workflows.
- Dataflow CLI는 4개의 범주로 나뉨
- 가장 많이 사용되는 것은 dataflow project : 데이터 파이프라인 레포지토리를 관리하기 위한 복제를 도와준다.
- creation, testing, deployment와 같은 일을 수행하기 위한 복제 (folk)
- data migration : 특별한 기능으로, 데이터 웨어하우스 내 커뮤니케이션과 테이블의 변화 추적을 자동화하기 위해 만들어진 것이다.
Netflix만의 내부 선형 시스템 덕에, Dataflow migration은 하향식으로 테이블 질의의 사용량을 파악할 수 있게 해준다.
-> 이는 dataflow에 의존하는 사람들에게 메시지를 제공해주며, 시작된 마이그레이션은 Dataflow를 그들의 변화를 따라가게 해주고 다른 밑에 있는 유저들과 의사소통이 가능하게 한다.
Dataflow mock : Netflix의 데이터 웨어하우스로부터 선택된 테이블, 열, 몇몇 행에서 정형화된 mock 데이터가 있는 yaml 파일을 생성하게 해준다
Sample workflows
- clean DDL code,
- proper table metadata settings,
- transformation job (in a language of choice) wrapped in an optional WAP (Write, Audit, Publish) pattern,
- sample set of data audits for the generated data,
- and a fully functional unit test for your transformation logic.
Business Logic
Top 100 movies/shows in every country where Netflix operates on a daily basis.
Step 1. On a daily basis, incrementally, sum up all viewing time of all movies and shows in every country.
WITH STEP_1 AS ( SELECT title_id , country_code , SUM(view_hours) AS view_hours FROM some_db.source_table WHERE playback_date = CURRENT_DATE GROUP BY title_id , country_code )
/Step 2. rank all titles from most-watched to list in every country
WITH STEP_2 AS ( SELECT title_id , country_code , view_hours , RANK() OVER ( PARTITION BY country_code ORDER BY view_hours DESC ) AS title_rank FROM STEP_1 )
Step 3. filter all titles to the top 100
WITH STEP_3 AS ( SELECT title_id , country_code , view_hours , title_rank FROM STEP_2 WHERE title_rank <= 100 )
이 세 과정을 통해, Iceberg table을 이용해 데이터를 만든다.
CREATE TABLE IF NOT EXISTS ${TARGET_DB}.dataflow_sample_results ( title_id INT COMMENT "Title ID of the movie or show." , country_code STRING COMMENT "Country code of the playback session." , title_rank INT COMMENT "Rank of a given title in a given country." , view_hours DOUBLE COMMENT "Total viewing hours of a given title in a given country." ) COMMENT "Example dataset brought to you by Dataflow. For more information in this and other examples please visit the Dataflow documentation page." PARTITIONED BY ( date DATE COMMENT "Playback date." ) STORED AS ICEBERG;
sql> SELECT * FROM foo.dataflow_sample_results WHERE date = 20220101 and country_code = 'US' ORDER BY title_rank LIMIT 5; title_id | country_code | title_rank | view_hours | date ----------+--------------+------------+------------+---------- 11111111 | US | 1 | 123 | 20220101 44444444 | US | 2 | 111 | 20220101 33333333 | US | 3 | 98 | 20220101 55555555 | US | 4 | 55 | 20220101 22222222 | US | 5 | 11 | 20220101 (5 rows)
728x90'Data Engineer' 카테고리의 다른 글
[Kafka] Consumer의 내부 동작 원리와 구현 (0) 2023.06.19 [Boaz] Data Pipeline 발제자료 (1) 2023.06.07 [Kafka] 프로듀서의 동작과 원리 (0) 2023.05.26 [Kafka] 카프카의 내부 동작원리 (0) 2023.05.20 [Kafka] Producer와 Consumer (0) 2023.05.16