ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spark Structured Streaming_1
    Data Engineer 2023. 10. 29. 16:53
    728x90

    공식 문서에 따르면 Spark Streaming은 더 이상 버전 업데이트가 되지 않는 구식(?)이 되어버려서, Spark Structured Streaming 툴을 사용해야 한다. 오늘은 Spark Structured Streaming에 대해 알아보기로 한다.

    *공식문서 : https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

     

    Structured Streaming Programming Guide - Spark 3.5.0 Documentation

    Structured Streaming Programming Guide Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on s

    spark.apache.org

     

    기본 실습

    가장 쉬운 wordcount.py 파일을 생성하여 실습을 진행해보자.

    코드 설명 : wordcount

    # wordcount.py
    
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import explode
    from pyspark.sql.functions import split
    
    spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCount") \
        .getOrCreate()
    
    # Create DataFrame representing the stream of input lines from connection to localhost:9999
    # 스트리밍 텍스트 데이터를 포함하는 무제한 테이블
    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load()
    
    # Split the lines into words
    words = lines.select(
       explode(
           split(lines.value, " ")
       ).alias("word")
    )
    
    # Generate running word count
    wordCounts = words.groupBy("word").count()
    
    # Start running the query that prints the running counts to the console
    query = wordCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()
    
    query.awaitTermination()

    데이터프레임 lines는 스트리밍 텍스트 데이터를 가지고 있는 테이블을 지칭한다. 이 테이블 내에는 value라는 문자열 형태의 칼럼이 하나 포함되어 있으며 스트리밍 텍스트 데이터의 각 줄은 테이블의 행이 된다. 아직 구동하기 전이므로 코드상으로 현재 데이터가 수신되진 않는다. 이후  words에서 쿼리를 실행하는데, 두 개의 내장 SQL  함수 split, explode를 사용하여 각 행을 각각 단어가 포함된 여러 행으로 분할한다. words에서 alias를 지정하지 않으면 칼럼명은 자동으로 col이 된다.

    wordCounts는 데이터셋의 고유한 값을 그룹화하고 개수를 계산하여 DataFrame을 정의한다. 이는 스트림의 실행 단어 수를 나타내는 스트리밍 DataFrame이다.

    query는 데이터 수신과 개수 계산을 시작하는 것으로, 업데이트할 때마다 전체 개수 세트를 콘솔에 인쇄하도록 설정했다. start()를 실행함으로써 스트리밍 계산을 시작하도록 한다.

    실행

    실행 파일의 위치에서 터미널을 켜서 다음과 같은 명령어를 실행한다.

    $ nc -lk 9999

    해당 명령어는 netcat을 이용하여 로컬 머신의 9999 포트에서 연결을 수신하고, 해당 포트로 들어오는 데이터를 기다리며, 연결이 끊겨도 다시 연결을 수신한다는 의미이다. 이 명령어를 실행했다면, 해당 터미널은 wordcount를 위해 단어를 입력하는 창이 된다.

    다른 터미널 창을 실행하여 다음과 같은 명령어를 실행한다.

    $ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999

    공식문서에는 위와 같이 나와있지만, 자신의 환경에 맞게 경로를 바꿔 설정하여 진행한다.

    본인의 경우, 해당 파일이 있는 경로상에 터미널을 실행해 wordcount.py를 spark.py로 저장했기 때문에 다음과 같이 입력하였다.

    입력창에 apache spark, apache hadoop, 공백, steph curry를 입력하였다.

    다른 터미널 창에 들어가보면 ,batch 단계별로, 업데이트되면서 실시간으로 입력한 단어를 반영하여 count해준다. 

    728x90

    'Data Engineer' 카테고리의 다른 글

    동기 작업과 비동기 작업  (1) 2023.12.03
    Airflow 연결 오류  (0) 2023.11.08
    데이터 엔지니어링 수명 주기 전체에 걸친 기술 선택  (0) 2023.09.17
    DMBOK  (0) 2023.09.03
    Airflow 사용해보기  (0) 2023.08.21
Designed by Tistory.