ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Boaz] Data Pipeline 발제자료
    Data Engineer 2023. 6. 7. 12:40
    728x90

    현대 데이터 인프라의 현실

    • 데이터가 매우 다양한 소스에서 수집된다는 것!
    • 이 때문에 소스 시스템에서 스키마 및 비즈니스 로직 변경을 처리하는 일률적인 방법을 찾기 힘들다.

    1. 추상화 도입

    가능하면 소스 시스템과 수집 프로세스 사이에 추상화 계층을 도입하는 것이 좋음

    소스 시스템의 소유자가 추상화 방법을 유지/인식 하는 것이 Best!

    • 예시

    상황 : Postgres 데이터베이스에서 직접 데이터를 수집하는 대신 데이터베이스 소유자와 협력하여 데이터베이스에서 데이터 추출을 위해 쿼리할 수 있는 REST API를 구축하는 것을 고려

    API가 단순히 거쳐 지나가는 단계라고 할지라도 그것이 소스 시스템 소유자가 유지 관리하는 코드에 존재한다는 것은 시스템 소유자가 어떤 데이터가 추출되고 있는지 알고 있다는 것을 의미!

    → 애플리케이션 데이터베이스의 내부 구조 변경에 대해 걱정할 필요가 없다는 뜻

    • ) 소스 시스템 변경으로 인해 API 엔드포인트가 포함된 필드가 제거되는 경우 수행할 작업에 대한 마음의 결정을 내릴 수 있고, 단계적으로 제거되거나 히스토리 데이터로 지원되겠지만 앞으로 NULL 값을 가질 것

    소스 시스템과 이벤트를 구독하는 시스템의 세부사항을 서로 완전히 분리된 상태로 유지

    2.데이터 계약 유지 관리

    데이터 계약

    소스 시스템의 소유자와 데이터 파이프라인에서 사용하기 위해 해당 시스템에서 데이터를 수집하는 팀 간의 서면 계약

    • 데이터 추출이 어떤 방법으로 얼마나 자주 이루어지며, 소스 시스템과 수집 모두에 대한 연락처가 누구인지 명시해야
    • 깃허브 저장소 또는 내부 문서 사이트와 같이 잘 알려져 있고 찾기 쉬운 위치에 저장
    {
    ingestion_jobid: "orders_postgres",
    source_host: "my_host.com",
    source_db: "ecommerce",
    ingestion_type: "full",
    ingestion_frequency_minutes: "60",
    source_owner: "dev-team@mycompany.com",
    ingestion_owner: "data-eng@mycompany.com"
    };
    

    소스 시스템의 데이터베이스에서 직접 데이터를 수집하거나 추출을 위해 명시적으로 설계되지 않은 방법을 통해 데이터를 수집하는 경우

    • PR이 제출되거나 코드가 브랜치에 커밋될 때 데이터 계약에서 source_table로 나열된 테이블에 대한 변경사항을 찾는 Git 후크를 빌드한다. 테이블이 데이터 수집에 사용되는 테이블의 변경 사항을 조정하기 위해 누구에게 연락해야 하는지를 기고자에게 자동으로 알린다
    • 데이터 계약 자체가 Git 레포지토리에 있는 경우 Git 후크를 추가하여 계약 변경 사항을 확인
    • git hook
    • Git Hooks 는 Git 과 관련한 어떤 이벤트가 발생했을 때 특정 스크립트를 실행할 수 있도록 하는 기능이다. 크게 클라이언트 훅 과 서버 훅 으로 나뉘는데 클라이언트 훅 은 커밋, Merge 가 발생하거나 push 가 발생하기 전 클라이언트에서 실행하는 훅이다. 반면 서버 훅 은 Git repository 로 push 가 발생했을 때 서버에서 실행하는 훅이다.
    • 회사의 중앙 문서 사이트에 모든 데이터 계약을 읽을 수 있는 형태로 게시하고 검색 가능하게 만든다
    • 6개월 동안 업데이트되지 않은 데이터 계약에 대해 소스 시스템 및 수집 소유자에게 알리고 검토 및 업데이트하도록 요청하는 스크립트를 작성하고 예약

    Schema-on-Read의 고려사항

    데이터를 쓸 때 스키마를 정의하는 방식에서 데이터를 읽을 때 스키마를 정의하는 방식으로 설계를 이동하는 것이다.

    Schema-on-write

    • 소스에서 데이터를 추출할 때 구조가 정의되고 데이터가 데이터 레이크 또는 S3 버킷에 기록

    Schema-on-read

    • 스키마에 대한 엄격한 정의 없이 데이터가 데이터 레이크, S3 버킷 또는 기타 스토리지 시스템에 기록되는 패턴
      • 시스템에 배치된 주문을 정의하는 이벤트는 JSON 객체로 정의, 해당 객체의 속성은 시간이 지남에 따라 새 속성이 추가되거나 기존 속성이 제거됨에 따라 변경될 수 있다.

    Data Catalog

    • 데이터 레이크 및 웨어하우스의 데이터에 대한 메타데이터를 저장
      • 기술 메타데이터
        • 스키마, 테이블, 열, 파일 이름, 보고서 이름
      • 비즈니스 메타데이터
        • 사용자가 조직의 자산에 대해 가지고 있는 비즈니스 지식
        • 비즈니스 설명, 견해, 주석, 분류, 사용 적합성, 등급
      • 운영 메타데이터
        • 객체 업데이트 내용, ETL 작업, 사용자의 테이블 액세스 횟수
    • 데이터세트의 구조와 정의를 모두 저장
    • 로드 단계의 논리는 더욱 복잡화
      • 스키마 변경을 동적으로 처리!

     💡 수집 중에 새 필드가 탐지되면 웨어하우스의 테이블에 새 열을 동적으로 추가할 것인가?

    💡 데이터 파이프라인의 변환 단계에 있는 데이터나 소스 테이블의 변경 사항을 모델에 적용하는 데이터 분석가에게 어떻게 알릴 것인가?

     

    데이터 카탈로그에 저장된 메타데이터를 사용하여 조직 데이터를 관리하고, 도움이 된다! 다음과 같은 질문에 대한 해답 → Data Catalog

    • 지리적 위치 데이터를 어디서 찾고 탐색할 수 있는가?
    • 데이터 레이크의 데이터에 어떻게 쉽게 액세스할 수 있는가?
    • 실제로 운영 데이터의 품질을 개선하고 있는가?
    • 중요한 핵심 데이터 요소에 대한 표준을 정의했는가?
    • 고객의 개인 정보에 접근할 수 있는 사람은 누구인가?
    • 모든 데이터에 대해 정의된 보존 정책이 있는가?

    확장 복잡성

    소스 시스템과 다운스트림 데이터 모델이 제한적일 때 데이터 파이프라인을 구축하는 것은 충분히 어려운 일이다.

    → 상대적으로 작은 조직에서도 이런 숫자가 커질 경우 증가하는 복잡성을 처리하기 위해 파이프라인을 확장해야

    그렇다면, 확장하는 방식은?

    데이터 수집 표준화

    • 복잡성과 관련해, 수집하는 시스템의 개수보다 각 시스템이 완전히 동일하지 않다는 것이 문제점!

    수집

    • 다양한 소스 시스템 유형(Postgres, Kafka 등)을 처리할 수 있게 작성
    • 소스 시스템 유형이 많을수록 소스 코드가 커지고 유지 및 관리할 양도 증대
    • 동일 소스 시스템 유형에 대한 수집 작업은 쉽게 표준화되지 않는다.
      • REST API
        • 해당 API에 페이징, 증분 데이터 액세스 및 기타 기능에 대해 표준화된 방법이 없는 경우 데디터 엔지니어가 코드를 재사용 및 공유하지 않고 일회성으로 수집 작업을 구축(이 경우 해당 코드를 중앙에서 유지 관리 불가)

    조직에 따라 사용자가 수집하는 시스템을 거의 제어하지 못할 수도 있다.

    • 수집 시스템이 타사 플랫폼일 때
    • 내부 시스템이지만 조직 계층이 서로 다른 엔지니어링 팀에서 구축한 시스템일 때

    해결책은?

    1. 비기술적 요인
      1. 수집하는 시스템이 내부적으로 구축되었지만 제대로 표준화 되지 않은 경우
        1. 데이터 조직 파이프라인에 미치는 영향에 대한 인식을 높이면 시스템 소유자가 동의할 수 있다.

    대기업의 경우, 각 시스템을 구축하는 소프트웨어 엔지니어는 조직 내의 다른 팀과 완전히 같지 않은 시스템을 구축하고 있다는 사실을 인식하지 못할 수도 있다. 그렇기에 표준화 작업의 효율성과 유지 관리 측면에서의 이점을 확보하려면, 인내심과 올바른 접근 방법이 필요하다.

    [사용자가 제어할 수 있는 몇 가지 기술적 접근 방식]

    • 가능한 모든 코드 표준화 및 재사용
      • 이는 소프트웨어 엔지니어링의 일반적인 모범 사례지만, 데이터 수집 작업을 생성할 때 간과하는 경우가 잦다.
    • 구성 기반 데이터 수집 노력
      • 여러 Postgres 데이터베이스와 테이블에서 데이터를 수집 시 별도의 작업을 작성하지 말고 수집할 테이블과 스키마를 정의하는 구성 파일(DB 테이블의 레코드)을 통해 반복하는 단일 작업 작성
    • 자신의 추상화를 고려
      • 소스 시스템 소유자가 시스템과 수집 사이 표준화된 추상화를 구축하게 할 수 없다면 직접 수행하거나 해당 시스템과 파트너 관계를 맺고 대부분의 개발 작업을 수행하는 것을 고려
      • 예로, Postgres 또는 MySQL 데이터베이스에서 데이터를 수집해야 하는 경우 다른 수집 작업을 작성하는 대신 소스 시스템 팀의 허가를 받아 스트리밍 CDC를 구현할 수 있다.

    DAG

    : 방향성 비순환 그래프

    • 파이프라인 단계는 항상 지시에 따른다.
      • 실행 경로를 보장하기 위해, 작업 또는 여러 작업으로 시작하여 특정 작업으로 끝난다.
      • 모든 종속 작업이 성공적으로 완료되기 전에 작업이 실행되지 않게 한다.
    • 파이프라인 그래프는 순환적이어야 한다.
      • 작업은 이전에 완료된 작업을 가리킬 수 없다.(순환 불가)

    Airflow의 작업은 SQL 문 실행에서 파이썬 스크랩트 실행에 이르기까지 모든 것을 나타낼 수 있다. 파이썬을 사용하면 데이터 파이프라인에서 작업을 정의, 예약 및 실행할 수 있으며 적절한 순서로 실행되게 할 수 있다.

    • 데이터 모델링 로직의 재사용
      • 각 모델을 구축하는 SQL에서 로직을 반복
      • 모델을 서로 분리하여 모델 간에 수많은 종속성을 생성
      코드 재사용은 데이터 수집에서 이상적이듯 데이터 모델링에도 이상적이다. 단일 소스가 존재하도록 하고 버그나 비즈니스 로직 변경 시 변경해야 하는 코드의 양을 줄인다 (다만 파이프라인에서는 좀 더 복잡한 종속성 그래프를 가지게 된다).
    • 복잡성은 파이프라인 아래, 파이프라인의 변환 단계에서 데이터 모델링 중에 발생할 수 있다. 분석가가 데이터 모델을 많이 구축할 수 있도록 다음 두 가지 중 하나를 수행하는 경향이 있다

    서로 관련 없는 데이터 모델이라면 서로 영향을 주지 않기에 문제가 되지 않지만 일부 로직을 공유한다면 다음과 같이 리팩터링을 거친다.

    실습 예제

    # 테이블 생성 및 삽입
    CREATE TABLE Orders(
    	OrderId int,
    	OrderStatus varchar(30),
    	OrderDate timestamp,
    	CustomerId int,
    	OrderTotal numeric
    );
    
    INSERT INTO Orders
    	VALUES(1, 'Shipped', '2022-06-09', 100, 50.05)
    
    INSERT INTO Orders
    	VALUES(2, 'Shipped', '2022-07-09', 101, 57.45)
    
    INSERT INTO Orders
    	VALUES(3, 'Shipped', '2022-08-12', 102, 135.99)
    
    INSERT INTO Orders
    	VALUES(4, 'Shipped', '2022-08-12', 100, 43.00)
    
    # model_1.sql
    CREATE TABLE IF NOT EXISTS orders_by_day AS
    SELECT
    	CAST(OrderDate AS DATE) AS order_date,
    	COUNT(*) AS order_count
    FROM Orders
    GROUP BY CAST(OrderDate AS DATE);
    

    DAG에서 데이터 모델은 매번 재계산하지 않고 일별 주문 개수가 필요할 때 참조 가능.

    WHERE 절이나 조인에 더 복잡한 계산이나 쿼리를 사용하여 일별 주문 수를 얻는 것이 더 간단할 수도 있으나, 로직을 한 번 작성하여 재사용하는 것이 더 중요하다!

    <aside> 💡 재사용을 통해 단일 소스의 정보를 보장하고 단일 모델을 유지, 관리할 수 있으며, 데이터 웨어하우스가 복잡한 로직을 한 번만 실행하고 결과를 나중에 참조할 수 있도록 저장하기만 하게끔 하여 런타임도 절약할 수 있다!

    </aside>

    [재사용 반영 코드]

    # model_2.sql 
    SELECT
    	obd.order_date,
    	ot.order_count
    FROM orders_by_day obd
    LEFT JOIN other_table ot
    	ON ot.some_date = obd.order_date
    

    본 코드는 일별 주문 개수를 다시 계산하는 대신 **order_by_day 모델(=model_1)**을 사용한다.

    이렇게 하면 모델의 로직에서 버그가 발견되고 이를 연결된 여러 모델에서 함께 수정해야 하는 경우 로직을 단일 모델에 적용하고 그로부터 다른 모델이 파생되는 기회를 만들 수 있다. 결과적으로는 모델 간 종속성이 복잡해지지만 올바르게 처리되면 파이프라인의 데이터 모델링에 있는 논리가 더 안정적이고 데이터 정보가 여러 버전으로 발생될 가능성을 줄여준다.

    • 종속성 무결성 보장앞서 DAG는 종속성이 적절하게 정의가 되었지만, 팀이 더 많은 모델을 빌드함에 따라 종속성을 추적하는 것이 상당히 번거롭고 오류가 발생하기 쉽다.
      1. 개발 프로세스에 몇 가지 로직을 구축하여 SQL 스크립트의 종속성을 식별하고 스크립트가 의존하는 모든 테이블이 DAG의 위 단계에서 실행되게끔 한다.
        1. 이 방법은 간단하지 않음
          1. SQL 스크립트에서 테이블 이름을 구문 분석
          2. 더 일반적으로 데이터 분석가가 새 모델을 제출할때나 기존 모델에 수정 수행 시 구성 파일에 수동으로 종속성 목록을 제공하도록 요구하여 수행
          → 미리 해야하는 작업이 존재하고, 개발 프로세스에 약간의 마찰을 추가
      2. dbt와 같은 데이터 모델 프레임워크 사용
        1. 분석가가 모델 정의를 위해 작성한 SQL에서 바로 모델 간의 참조를 정의할 수 있는 메커니즘
      # model_2.sql
      SELECT
      	obt.order_date,
      	ot.order_count
      FROM {{ref('model_1')}} obd
      LEFT JOIN other_table ot
      	ON ot.some_date = obd.order_date;
      
      dbt는 model_2가 model_1에 의존한다는 것을 알고 적절한 순서로 실행되게 한다. dbt는 에어플로우와 같은 도구에서 강제로 빌드하는 대신 DAG를 동적으로 빌드한다. 데이터 모델이 실행 전에 dbt에 의해 컴파일되면 model_1에 대한 참조가 테이블 이름으로 채워진다.이렇게 dbt를 실행되면 각 테이블이 서로 참조되는 방식에 따라 각 모델을 나타내는 SQL 스크립트가 적절한 순서로 실행
    • $ dbt run
    • 데이터 모델 간의 종속성을 정의하고 검증하는 프로그래밍 방식
    • 데이터 모델 로직을 재사용하는 것이 모든 이점과 함께 트레이드 오프 또한 있다. 어떤 모델이 서로 의존하는지 추적하고 이러한 종속성이 오케스트레이션을 위해 DAG에 올바르게 정의되어 있는지 확인해야 한다.

    데이터 파이프라인 실습

    Workflow vs Dataflow

    Workflow

    • 프로세스를 완료하거나 특정 목표를 달성하는 데 필요한 작업, 동작 또는 단계의 순서를 정의
    • 프로세스에 관련된 개인 또는 시스템 간의 작업 조정 및 흐름에 중점
      • 작업, 종속성, 결정 지점, 역할 및 책임, 입력/출력의 순서를 보장
    • 프로젝트 관리, 비즈니스 프로세스 및 구조화된 활동 조정이 필요한 영역에서 사용

    Workflow 관리에는 효율적이고 효과적인 프로세스 실행을 보장하기 위한 Workflow 설계, 구현 및 모니터링이 포함된다.

    Dataflow

    • 시스템 또는 애플리케이션 내에서 데이터의 이동, 변환 및 처리
    • 정보 시스템 내에서 데이터가 입력, 조작 및 출력되는 방법을 설명
    • 작업 조정보다는 데이터의 흐름과 변환에 중점

    Dataflow는 데이터 처리, 정보 시스템 및 데이터 흐름 프로그래밍과 같은 프로그래밍 패러다임에서 사용된다. 데이터가 시스템을 통해 이동하는 방식을 시각화하고, 병목 현상을 식별하고, 데이터 처리 파이프라인을 최적화하는데 도움이 된다 .

    Workflow → 작업의 순서와 조정을 정의(작업 프로세스 관리)

    Dataflow → 시스템 또는 애플리케이션 내에서 데이터의 이동 및 변환(데이터 처리 관리)

    고려사항

    • 데이터 원천이 무엇인가?
      • 어떤 특징을 가진 데이터에 대한 파이프라인인가?
    • 목적이 무엇인가?
      • 최종 사용자가 누구이며, 어떤 용도로 이용할 데이터에 대한 파이프라인인가?
    • ETL? ELT? 데이터 웨어하우스? 데이터 레이크?

     

    • 1.8억 이상의 구독자들이 이용하는 거대 플랫폼
    • AWS와 Open Connect 두 클라우드를 사용
      • 이용자들에게 최선의 영상을 제공하기 위해 필요한 중추 역할
    • Client : 컨텐츠를 감상할 수 있는 모든 디바이스들 (TV, XBOX, laptap …)
    • OC or CDN
      • CDN : 지리적으로 다른 장소에서 사용되는 분산 서버 네트워크
        • 정적 컨텐츠(이미지, Video, CSS, Javascript…) 캐시가 가능
        • 비디오 스트리밍과 같은 모든 것을 다룰 수 있어서 버튼만 누른다면 원하는 컨텐츠를 제공받을 수 있다
    • BackEnd(Database)
      • 비디오 스트리밍을 포함하지 않는 모든 것을 다룸
        • 비디오 처리
        • 세계 각국에 위치한 서버들에 비디오들을 배급
        • 네트워크 트래픽 처리…
      → 대부분 AWS에서 수행
    • ELB?
      • Elastic Load Balancer
      • Front-end Service에서 트래픽을 라우팅하는 데 사용
      • 2개의 로드 밸런싱을 사용
        • Z - Zone / I - Instance(Server)
          • LB for Zone
            • Round-Robin 밸런싱 기반 DNS로 구성Data Processing in Netflix Using Kafka & Apache Chukwe
              • Netflix : 500B 데이터 이벤트들을 제공한다.
                • 하루당 1.3 PB(페타바이트)를 소비하는 5000억 개의 이벤트
                • 피크타임에 초당 25GB를 소비하는 8백만개의 이벤트
                  • Error logs
                  • UI Activities
                  • Performance events
                  • Video viewing activities
                  • Troubleshooting and diagnostic events
            • 넷플릭스 앱에서 비디오를 클릭하면, 넷플릭스는 데이터를 다양한 방면에서 나노초보다 적은 시간으로 처리하기 시작한다. 이를 구현하는데 Kafka와 Apache Chukwa를 사용한다.

    Apache Chukwa

    *공식문서 : https://chukwa.apache.org/

     

    Chukwa - Welcome to Apache Chukwa

    About Apache Chukwa Apache Chukwa is an open source data collection system for monitoring large distributed systems. Apache Chukwa is built on top of the Hadoop Distributed File System (HDFS) and Map/Reduce framework and inherits Hadoop’s scalability and

    chukwa.apache.org

    • 오픈 소스 데이터 수집 시스템
    • 분산 시스템으로부터 로그들과 이벤트들을 수집
    • HDFS와 맵리듀스 프레임워크 기반
      • 하둡의 확장성과 견고함을 반영함과 동시에, 수집한 데이터에 대한 많은 강력하고 유연한 툴킷들을 제공
    1. Chukwa는 시스템의 다른 부분들에서 이벤트들(대부분 로그 데이터)을 모으고 감시 또는 분석을 진행하거나, 대시보드를 작성할 수 있다.
    2. S3(Hadoop file Sequence) 상에 이벤트를 작성하여 차후 빅데이터 팀이 저장된 파일을 처리하고 Hive에 작성할 수 있다.

    → 일/시간 단위로 전 데이터를 스캔할 수 있는 기본적인 Batch Processing 과정

    • When to use Batch Processing?
      • Data Warehousing : 많은 양의 데이터를 데이터 웨어하우스에 옮겨야 할 때
      • ETL : ETL 작업의 효율을 높이고 오버헤드를 줄이기 위해 배치 프로세싱 적용
      • Analytical Processing : 복잡한 알고리즘이나 대규모 데이터 변환 과정 또는 상태 보고서를 작성할 시 → 자원 활용 최적화, 병렬 계산, 전반적인 분석 업무 향상 기대
      • Data Updates and Backfilling : 업데이트 대상이 큰 데이터셋이거나 , 변경 사항을 소급 적용해야 할 때.

    EMR/S3에 실시간 이벤트들을 업로드하기 위해 Chukwa는 카프카에게 트래픽을 제공한다.

    Kafka

    • Kafka로 가져오는 것부터 다양한 싱크들(S3, ElasticSearch, Secondary Kafka)로 데이터를 옮기는 일을 담당
    • 메시지들을 라우팅하는 것은 Apache Samja 프레임워크를 통해 진행
    • Chukwa로부터 받은 트래픽은 일정하지 않아 Kafka Cluster에서 추가적인 정제가 필요할 수도 있다.
      • 특정 카프카 토픽을 다른 카프카 토픽으로 옮기기 위해 라우터를 사용하는 이유

    Elastic Search

    • 최근 들어 넷플릭스 내에서 ElasticSearch의 사용량은 급격히 증가
    • 대략 150개의 엘라스틱서치 클러스터와 3500개의 호스트들과 함께 구동되고 있음

    넷플릭스는 데이터 시각화와 고객 지원, 그리고 시스템 에러 탐지에 elasticsearch를 사용하고 있다.

    [예시]

    고객이 비디오를 재생할 수 없을 때 고객 관리 담당자는 elasticsearch를 통해 문제를 해결한다

    → playback 팀은 elasticsearch에 접속하여 해당 유저에 대해 검색, 왜 기기에서 비디오가 재생되지 않는지 확인.

    • ElasticSearch : 해당 유저에게 일어난 모든 정보와 이벤트들이 기록되어 있음
      • 정보를 기록하는 역할로 사용(자원 사용량, 혹은 로그인 문제들)

    Apache Spark For Movie Recommendation

    넷플릭스에 접속하면, 많은 다양한 영화들의 목록이 로드된다. 넷플릭스는 이를 데이터들을 통해 개인화하여 어떤 종류의 영화들이 특정 유저들에게 보여질 것인지 결정한다. 이 모든 과정은 유저의 과거 데이터들과 선호도에 기반하고 있다.

    • 특정 유저에 대해 모든 영화들을 정렬하고 추천 순위를 매겨 영화를 추천
      • 이를 구현하기 위해 다양한 ML pipeline들이 있는 Spark Cluster들을 사용한다.
        • Row selection, sorting, title relevance ranking, artwork personalization(Thumbnail)
    728x90
Designed by Tistory.